Source code for GUIBRUSHR.scripts.download_test_data
# GUIBRUSHR/scripts/download_test_data.py
"""
Test data downloader for GUIBRUSHR.
Downloads petitRADTRANS opacity data and WASP-77Ab target dataset.
"""
import zipfile
from pathlib import Path
import gdown
# ── Google Drive FILE IDs ─────────────────────────────────────────────────────
PETITRADTRANS_FILE_ID = "1pXstm1Rr9KCzT09Jw-pEdT9eq2wFwqa6"
WASP77AB_FILE_ID = "1x_S-ufPZj5mnOxfhRPy8x0Cg8Gl19EOE"
PETITRADTRANS_SIZE_GB = 12
WASP77AB_SIZE_MB = 200
# ── Helpers ───────────────────────────────────────────────────────────────────
def _get_config_path() -> Path:
# __file__ = site-packages/GUIBRUSHR/scripts/download_test_data.py
return Path(__file__).parent.parent.resolve() / "Files" / "Configuration_Path" / "configuration.csv"
def _read_config(config_path: Path) -> dict:
config = {}
with open(config_path, "r") as f:
lines = f.readlines()
for line in lines[1:]: # skip header
line = line.strip()
if "," in line:
key, value = line.split(",", 1)
config[key.strip()] = value.strip()
return config
def _extract_zip_no_overwrite(zip_path: Path, dest: Path) -> None:
"""Extract zip into dest, skipping files that already exist."""
with zipfile.ZipFile(zip_path, "r") as z:
members = z.infolist()
total = len(members)
for i, member in enumerate(members, 1):
target_file = dest / member.filename
if not target_file.exists():
z.extract(member, dest)
print(f"\r Extracting: {i}/{total} files", end="", flush=True)
print()
def _extract_zip_overwrite(zip_path: Path, dest: Path) -> None:
"""Extract zip into dest, overwriting existing files."""
with zipfile.ZipFile(zip_path, "r") as z:
members = z.infolist()
total = len(members)
for i, member in enumerate(members, 1):
z.extract(member, dest)
print(f"\r Extracting: {i}/{total} files", end="", flush=True)
print()
# ── Main ──────────────────────────────────────────────────────────────────────
[docs]
def download_test_data_cli():
print("\n" + "=" * 60)
print("GUIBRUSHR - Test Data Downloader")
print("=" * 60)
# 1. Verifica configuration.csv
config_path = _get_config_path()
if not config_path.exists():
print(f"\n ✗ Configuration file not found at:\n {config_path}")
print(" Run 'guibrushr-config' to create it first.\n")
return
print(f"\n ✓ Configuration file found: {config_path}")
# 2. Leggi i path
try:
config = _read_config(config_path)
except Exception as e:
print(f"\n ✗ Failed to read configuration file: {e}\n")
return
prt_path = config.get("petitRadTrans_path", "").strip()
target_path = config.get("path_target_folders", "").strip()
# 3. Verifica che i path esistano
errors = []
if not prt_path:
errors.append(" ✗ 'petitRadTrans_path' is empty in configuration.csv")
elif not Path(prt_path).exists():
errors.append(f" ✗ petitRADTRANS path does not exist:\n {prt_path}")
if not target_path:
errors.append(" ✗ 'path_target_folders' is empty in configuration.csv")
elif not Path(target_path).exists():
errors.append(f" ✗ Target folders path does not exist:\n {target_path}")
if errors:
print()
for e in errors:
print(e)
print("\n Fix the paths with 'guibrushr-config' and try again.\n")
return
prt_path = Path(prt_path)
target_path = Path(target_path)
# 4. Calcola destinazioni estrazione
# prt_path = .../petitRADTRANS/input_data/
# zip contiene petitRADTRANS/ → estraiamo due livelli sopra
prt_extract_dest = prt_path.parent.parent
# target_path = .../Target_GUIBRUSHR/
# zip contiene wasp77Ab/ → estraiamo direttamente in target_path
wasp_extract_dest = target_path
print(f"\n ✓ petitRADTRANS path : {prt_path}")
print(f" ✓ Target path : {target_path}")
# 5. Chiedi conferma
print(f"""
Files that will be downloaded:
• petitRADTRANS.zip ~{PETITRADTRANS_SIZE_GB} GB → {prt_extract_dest}
• wasp77Ab.zip ~{WASP77AB_SIZE_MB} MB → {wasp_extract_dest}
⚠ petitRADTRANS: existing files will NOT be overwritten,
only missing files will be added.
⚠ wasp77Ab: will be extracted as wasp77Ab/ inside the target folder.
""")
confirm = input(" Proceed? [y/N]: ").strip().lower()
if confirm != "y":
print("\n Aborted.\n")
return
tmp_dir = Path(__file__).parent.parent.resolve() / "Files" / "_tmp_download"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 6. Download e estrazione wasp77Ab
print("\n── wasp77Ab dataset ─────────────────────────────────────────")
wasp_zip = tmp_dir / "wasp77Ab.zip"
print(f" Downloading wasp77Ab.zip (~{WASP77AB_SIZE_MB} MB)...")
gdown.download(id=WASP77AB_FILE_ID, output=str(wasp_zip), quiet=False)
print(" Extracting into target folder...")
_extract_zip_overwrite(wasp_zip, wasp_extract_dest)
wasp_zip.unlink()
print(f" ✓ wasp77Ab extracted to {wasp_extract_dest / 'wasp77Ab'}")
# 7. Download e estrazione petitRADTRANS (no overwrite)
print("\n── petitRADTRANS opacity data ───────────────────────────────")
prt_zip = tmp_dir / "petitRADTRANS.zip"
print(f" Downloading petitRADTRANS.zip (~{PETITRADTRANS_SIZE_GB} GB)...")
gdown.download(id=PETITRADTRANS_FILE_ID, output=str(prt_zip), quiet=False)
print(" Extracting (skipping existing files)...")
_extract_zip_no_overwrite(prt_zip, prt_extract_dest)
prt_zip.unlink()
print(f" ✓ petitRADTRANS data updated in {prt_path}")
# 8. Pulizia cartella tmp
try:
tmp_dir.rmdir()
except OSError:
pass
print("\n ✓ All done!\n")
if __name__ == "__main__":
download_test_data_cli()