# GUIBRUSHR/scripts/edit_db.py
"""
Interactive database manager for GUIBRUSHR.
Allows viewing and deleting entries from the database tables.
"""
from pathlib import Path
def _get_db_path() -> Path:
return Path(__file__).parent.parent.resolve() / "Files" / "DB" / "atmo.db"
# ── Fancy print helpers ───────────────────────────────────────────────────────
def _print_table(headers: list, rows: list) -> None:
if not rows:
print("\n (no entries found)\n")
return
col_widths = [max(len(str(h)), max(len(str(r[i])) for r in rows))
for i, h in enumerate(headers)]
sep = "+-" + "-+-".join("-" * w for w in col_widths) + "-+"
fmt = "| " + " | ".join(f"{{:<{w}}}" for w in col_widths) + " |"
print("\n" + sep)
print(fmt.format(*headers))
print(sep)
for row in rows:
print(fmt.format(*[str(v) if v is not None else "" for v in row]))
print(sep + "\n")
# ── Table handlers ────────────────────────────────────────────────────────────
def _handle_instruments(cursor, conn) -> None:
while True:
print("\n 1. View all instruments")
print(" 2. Delete instrument")
print(" 0. Back")
choice = input("\n Select: ").strip()
if choice == "1":
cursor.execute("SELECT * FROM instruments ORDER BY instrument;")
rows = cursor.fetchall()
_print_table(
["Instrument", "Resolution", "HWHM (km/s)", "WL min (μm)", "WL max (μm)"],
rows
)
elif choice == "2":
cursor.execute("SELECT instrument FROM instruments ORDER BY instrument;")
keys = [r[0] for r in cursor.fetchall()]
if not keys:
print("\n No instruments to delete.\n")
continue
print("\n Available instruments:")
for i, k in enumerate(keys, 1):
print(f" {i}. {k}")
val = input("\n Enter instrument name to delete: ").strip()
cursor.execute("DELETE FROM instruments WHERE instrument = ?", (val,))
conn.commit()
if cursor.rowcount > 0:
print(f"\n ✓ Instrument '{val}' deleted.")
else:
print(f"\n ✗ No instrument found with name '{val}'.")
elif choice == "0":
break
def _handle_observatories(cursor, conn) -> None:
while True:
print("\n 1. View all observatories")
print(" 2. Delete observatory")
print(" 0. Back")
choice = input("\n Select: ").strip()
if choice == "1":
cursor.execute("SELECT * FROM observatories ORDER BY name;")
rows = cursor.fetchall()
_print_table(
["Name", "Latitude", "Longitude", "Altitude (m)"],
rows
)
elif choice == "2":
cursor.execute("SELECT name FROM observatories ORDER BY name;")
keys = [r[0] for r in cursor.fetchall()]
if not keys:
print("\n No observatories to delete.\n")
continue
print("\n Available observatories:")
for i, k in enumerate(keys, 1):
print(f" {i}. {k}")
val = input("\n Enter observatory name to delete: ").strip()
cursor.execute("DELETE FROM observatories WHERE name = ?", (val,))
conn.commit()
if cursor.rowcount > 0:
print(f"\n ✓ Observatory '{val}' deleted.")
else:
print(f"\n ✗ No observatory found with name '{val}'.")
elif choice == "0":
break
def _handle_retrieval_runs(cursor, conn) -> None:
while True:
print("\n 1. View all retrieval runs")
print(" 2. Delete retrieval run")
print(" 0. Back")
choice = input("\n Select: ").strip()
if choice == "1":
cursor.execute("SELECT retrieval_date, target_id, chemistry, t_p_profile, "
"resolution, retrieval_model, user_id "
"FROM retrieval_run ORDER BY retrieval_date DESC;")
rows = cursor.fetchall()
_print_table(
["Date", "Target", "Chemistry", "T-P Profile",
"Resolution", "Model", "User"],
rows
)
elif choice == "2":
cursor.execute("SELECT retrieval_date, target_id, user_id "
"FROM retrieval_run ORDER BY retrieval_date DESC;")
runs = cursor.fetchall()
if not runs:
print("\n No retrieval runs to delete.\n")
continue
print("\n Available retrieval runs:")
for i, (date, target, user) in enumerate(runs, 1):
print(f" {i}. {date} | target: {target} | user: {user}")
val = input("\n Enter retrieval date to delete: ").strip()
cursor.execute("DELETE FROM retrieval_run WHERE retrieval_date = ?", (val,))
conn.commit()
if cursor.rowcount > 0:
print(f"\n ✓ Retrieval run '{val}' deleted.")
else:
print(f"\n ✗ No retrieval run found with date '{val}'.")
elif choice == "0":
break
# ── Main entry point ──────────────────────────────────────────────────────────
[docs]
def edit_db_cli():
import sqlite3
db_path = _get_db_path()
if not db_path.exists():
print(f"\n ✗ Database not found at {db_path}")
print(" Run 'guibrushr' once to initialize it.\n")
return
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
print("\n" + "=" * 50)
print("GUIBRUSHR - Database Manager")
print(f" DB: {db_path}")
print("=" * 50)
try:
while True:
print("\n 1. Instruments")
print(" 2. Observatories")
print(" 3. Retrieval runs")
print(" 0. Exit")
choice = input("\n Select table: ").strip()
if choice == "1":
_handle_instruments(cursor, conn)
elif choice == "2":
_handle_observatories(cursor, conn)
elif choice == "3":
_handle_retrieval_runs(cursor, conn)
elif choice == "0":
break
else:
print(" Invalid choice.")
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
edit_db_cli()