Source code for GUIBRUSHR.scripts.edit_db

# GUIBRUSHR/scripts/edit_db.py
"""
Interactive database manager for GUIBRUSHR.
Allows viewing and deleting entries from the database tables.
"""

from pathlib import Path


def _get_db_path() -> Path:
    return Path(__file__).parent.parent.resolve() / "Files" / "DB" / "atmo.db"


# ── Fancy print helpers ───────────────────────────────────────────────────────

def _print_table(headers: list, rows: list) -> None:
    if not rows:
        print("\n  (no entries found)\n")
        return

    col_widths = [max(len(str(h)), max(len(str(r[i])) for r in rows))
                  for i, h in enumerate(headers)]

    sep = "+-" + "-+-".join("-" * w for w in col_widths) + "-+"
    fmt = "| " + " | ".join(f"{{:<{w}}}" for w in col_widths) + " |"

    print("\n" + sep)
    print(fmt.format(*headers))
    print(sep)
    for row in rows:
        print(fmt.format(*[str(v) if v is not None else "" for v in row]))
    print(sep + "\n")


# ── Table handlers ────────────────────────────────────────────────────────────

def _handle_instruments(cursor, conn) -> None:
    while True:
        print("\n  1. View all instruments")
        print("  2. Delete instrument")
        print("  0. Back")
        choice = input("\n  Select: ").strip()

        if choice == "1":
            cursor.execute("SELECT * FROM instruments ORDER BY instrument;")
            rows = cursor.fetchall()
            _print_table(
                ["Instrument", "Resolution", "HWHM (km/s)", "WL min (μm)", "WL max (μm)"],
                rows
            )

        elif choice == "2":
            cursor.execute("SELECT instrument FROM instruments ORDER BY instrument;")
            keys = [r[0] for r in cursor.fetchall()]
            if not keys:
                print("\n  No instruments to delete.\n")
                continue
            print("\n  Available instruments:")
            for i, k in enumerate(keys, 1):
                print(f"    {i}. {k}")
            val = input("\n  Enter instrument name to delete: ").strip()
            cursor.execute("DELETE FROM instruments WHERE instrument = ?", (val,))
            conn.commit()
            if cursor.rowcount > 0:
                print(f"\n  ✓ Instrument '{val}' deleted.")
            else:
                print(f"\n  ✗ No instrument found with name '{val}'.")

        elif choice == "0":
            break


def _handle_observatories(cursor, conn) -> None:
    while True:
        print("\n  1. View all observatories")
        print("  2. Delete observatory")
        print("  0. Back")
        choice = input("\n  Select: ").strip()

        if choice == "1":
            cursor.execute("SELECT * FROM observatories ORDER BY name;")
            rows = cursor.fetchall()
            _print_table(
                ["Name", "Latitude", "Longitude", "Altitude (m)"],
                rows
            )

        elif choice == "2":
            cursor.execute("SELECT name FROM observatories ORDER BY name;")
            keys = [r[0] for r in cursor.fetchall()]
            if not keys:
                print("\n  No observatories to delete.\n")
                continue
            print("\n  Available observatories:")
            for i, k in enumerate(keys, 1):
                print(f"    {i}. {k}")
            val = input("\n  Enter observatory name to delete: ").strip()
            cursor.execute("DELETE FROM observatories WHERE name = ?", (val,))
            conn.commit()
            if cursor.rowcount > 0:
                print(f"\n  ✓ Observatory '{val}' deleted.")
            else:
                print(f"\n  ✗ No observatory found with name '{val}'.")

        elif choice == "0":
            break


def _handle_retrieval_runs(cursor, conn) -> None:
    while True:
        print("\n  1. View all retrieval runs")
        print("  2. Delete retrieval run")
        print("  0. Back")
        choice = input("\n  Select: ").strip()

        if choice == "1":
            cursor.execute("SELECT retrieval_date, target_id, chemistry, t_p_profile, "
                           "resolution, retrieval_model, user_id "
                           "FROM retrieval_run ORDER BY retrieval_date DESC;")
            rows = cursor.fetchall()
            _print_table(
                ["Date", "Target", "Chemistry", "T-P Profile",
                 "Resolution", "Model", "User"],
                rows
            )

        elif choice == "2":
            cursor.execute("SELECT retrieval_date, target_id, user_id "
                           "FROM retrieval_run ORDER BY retrieval_date DESC;")
            runs = cursor.fetchall()
            if not runs:
                print("\n  No retrieval runs to delete.\n")
                continue
            print("\n  Available retrieval runs:")
            for i, (date, target, user) in enumerate(runs, 1):
                print(f"    {i}. {date}  |  target: {target}  |  user: {user}")
            val = input("\n  Enter retrieval date to delete: ").strip()
            cursor.execute("DELETE FROM retrieval_run WHERE retrieval_date = ?", (val,))
            conn.commit()
            if cursor.rowcount > 0:
                print(f"\n  ✓ Retrieval run '{val}' deleted.")
            else:
                print(f"\n  ✗ No retrieval run found with date '{val}'.")

        elif choice == "0":
            break


# ── Main entry point ──────────────────────────────────────────────────────────

[docs] def edit_db_cli(): import sqlite3 db_path = _get_db_path() if not db_path.exists(): print(f"\n ✗ Database not found at {db_path}") print(" Run 'guibrushr' once to initialize it.\n") return conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() print("\n" + "=" * 50) print("GUIBRUSHR - Database Manager") print(f" DB: {db_path}") print("=" * 50) try: while True: print("\n 1. Instruments") print(" 2. Observatories") print(" 3. Retrieval runs") print(" 0. Exit") choice = input("\n Select table: ").strip() if choice == "1": _handle_instruments(cursor, conn) elif choice == "2": _handle_observatories(cursor, conn) elif choice == "3": _handle_retrieval_runs(cursor, conn) elif choice == "0": break else: print(" Invalid choice.") finally: cursor.close() conn.close()
if __name__ == "__main__": edit_db_cli()