Source code for GUIBRUSHR.scripts.download_test_data

# GUIBRUSHR/scripts/download_test_data.py
"""
Test data downloader for GUIBRUSHR.
Downloads petitRADTRANS opacity data and WASP-77Ab target dataset.
"""

import zipfile
from pathlib import Path

import gdown

# ── Google Drive FILE IDs ─────────────────────────────────────────────────────
PETITRADTRANS_FILE_ID = "1pXstm1Rr9KCzT09Jw-pEdT9eq2wFwqa6"
WASP77AB_FILE_ID      = "1x_S-ufPZj5mnOxfhRPy8x0Cg8Gl19EOE"

PETITRADTRANS_SIZE_GB = 12
WASP77AB_SIZE_MB      = 200


# ── Helpers ───────────────────────────────────────────────────────────────────

def _get_config_path() -> Path:
    # __file__ = site-packages/GUIBRUSHR/scripts/download_test_data.py
    return Path(__file__).parent.parent.resolve() / "Files" / "Configuration_Path" / "configuration.csv"


def _read_config(config_path: Path) -> dict:
    config = {}
    with open(config_path, "r") as f:
        lines = f.readlines()
    for line in lines[1:]:  # skip header
        line = line.strip()
        if "," in line:
            key, value = line.split(",", 1)
            config[key.strip()] = value.strip()
    return config


def _extract_zip_no_overwrite(zip_path: Path, dest: Path) -> None:
    """Extract zip into dest, skipping files that already exist."""
    with zipfile.ZipFile(zip_path, "r") as z:
        members = z.infolist()
        total = len(members)
        for i, member in enumerate(members, 1):
            target_file = dest / member.filename
            if not target_file.exists():
                z.extract(member, dest)
            print(f"\r  Extracting: {i}/{total} files", end="", flush=True)
    print()


def _extract_zip_overwrite(zip_path: Path, dest: Path) -> None:
    """Extract zip into dest, overwriting existing files."""
    with zipfile.ZipFile(zip_path, "r") as z:
        members = z.infolist()
        total = len(members)
        for i, member in enumerate(members, 1):
            z.extract(member, dest)
            print(f"\r  Extracting: {i}/{total} files", end="", flush=True)
    print()


# ── Main ──────────────────────────────────────────────────────────────────────

[docs] def download_test_data_cli(): print("\n" + "=" * 60) print("GUIBRUSHR - Test Data Downloader") print("=" * 60) # 1. Verifica configuration.csv config_path = _get_config_path() if not config_path.exists(): print(f"\n ✗ Configuration file not found at:\n {config_path}") print(" Run 'guibrushr-config' to create it first.\n") return print(f"\n ✓ Configuration file found: {config_path}") # 2. Leggi i path try: config = _read_config(config_path) except Exception as e: print(f"\n ✗ Failed to read configuration file: {e}\n") return prt_path = config.get("petitRadTrans_path", "").strip() target_path = config.get("path_target_folders", "").strip() # 3. Verifica che i path esistano errors = [] if not prt_path: errors.append(" ✗ 'petitRadTrans_path' is empty in configuration.csv") elif not Path(prt_path).exists(): errors.append(f" ✗ petitRADTRANS path does not exist:\n {prt_path}") if not target_path: errors.append(" ✗ 'path_target_folders' is empty in configuration.csv") elif not Path(target_path).exists(): errors.append(f" ✗ Target folders path does not exist:\n {target_path}") if errors: print() for e in errors: print(e) print("\n Fix the paths with 'guibrushr-config' and try again.\n") return prt_path = Path(prt_path) target_path = Path(target_path) # 4. Calcola destinazioni estrazione # prt_path = .../petitRADTRANS/input_data/ # zip contiene petitRADTRANS/ → estraiamo due livelli sopra prt_extract_dest = prt_path.parent.parent # target_path = .../Target_GUIBRUSHR/ # zip contiene wasp77Ab/ → estraiamo direttamente in target_path wasp_extract_dest = target_path print(f"\n ✓ petitRADTRANS path : {prt_path}") print(f" ✓ Target path : {target_path}") # 5. Chiedi conferma print(f""" Files that will be downloaded: • petitRADTRANS.zip ~{PETITRADTRANS_SIZE_GB} GB → {prt_extract_dest} • wasp77Ab.zip ~{WASP77AB_SIZE_MB} MB → {wasp_extract_dest} ⚠ petitRADTRANS: existing files will NOT be overwritten, only missing files will be added. ⚠ wasp77Ab: will be extracted as wasp77Ab/ inside the target folder. """) confirm = input(" Proceed? [y/N]: ").strip().lower() if confirm != "y": print("\n Aborted.\n") return tmp_dir = Path(__file__).parent.parent.resolve() / "Files" / "_tmp_download" tmp_dir.mkdir(parents=True, exist_ok=True) # 6. Download e estrazione wasp77Ab print("\n── wasp77Ab dataset ─────────────────────────────────────────") wasp_zip = tmp_dir / "wasp77Ab.zip" print(f" Downloading wasp77Ab.zip (~{WASP77AB_SIZE_MB} MB)...") gdown.download(id=WASP77AB_FILE_ID, output=str(wasp_zip), quiet=False) print(" Extracting into target folder...") _extract_zip_overwrite(wasp_zip, wasp_extract_dest) wasp_zip.unlink() print(f" ✓ wasp77Ab extracted to {wasp_extract_dest / 'wasp77Ab'}") # 7. Download e estrazione petitRADTRANS (no overwrite) print("\n── petitRADTRANS opacity data ───────────────────────────────") prt_zip = tmp_dir / "petitRADTRANS.zip" print(f" Downloading petitRADTRANS.zip (~{PETITRADTRANS_SIZE_GB} GB)...") gdown.download(id=PETITRADTRANS_FILE_ID, output=str(prt_zip), quiet=False) print(" Extracting (skipping existing files)...") _extract_zip_no_overwrite(prt_zip, prt_extract_dest) prt_zip.unlink() print(f" ✓ petitRADTRANS data updated in {prt_path}") # 8. Pulizia cartella tmp try: tmp_dir.rmdir() except OSError: pass print("\n ✓ All done!\n")
if __name__ == "__main__": download_test_data_cli()