Source code for GUIBRUSHR.General_Constants.FunctionsAndConstants.general_functions

"""
General utility functions for GUIBRUSHR atmospheric retrieval analysis.

This module contains a comprehensive collection of utility functions used throughout
the GUIBRUSHR application for various tasks including:

- Molecular data processing and opacity calculations
- Spectroscopic data analysis and convolution operations
- FITS file I/O operations
- Cross-correlation analysis
- Principal Component Analysis (PCA) for systematic noise removal
- Temperature profile calculations
- Plotting and visualization utilities
- Mathematical operations for atmospheric modeling

The functions in this module support both high-resolution and low-resolution
spectroscopic data processing, and are designed to work with the petitRADTRANS
and PyratBay radiative transfer codes.

Dependencies:
    - numpy: Numerical computations
    - astropy: FITS file handling
    - matplotlib: Plotting
    - pandas: Data structure handling
    - scipy: Scientific computing
    - sklearn: Machine learning algorithms (PCA)
    - petitRADTRANS: Radiative transfer calculations
    - pyratbay: Alternative radiative transfer code
    - chemcat: Chemical abundance calculations

Author: GUIBRUSHR Development Team
"""

import copy
import os
import pickle
import re
import traceback
from pathlib import Path
from re import findall

from astropy.io import fits
import numpy as np
import chemcat
import pyratbay.io as io
from corner import corner
from matplotlib import pyplot as plt
import matplotlib.colors as mcolors
from pandas import read_csv, DataFrame
from petitRADTRANS.config import petitradtrans_config_parser
from petitRADTRANS import physical_constants as pc
from petitRADTRANS.spectral_model import SpectralModel
from re import sub

from scipy.stats import skewnorm
from scipy.optimize import curve_fit
from scipy import stats
import chemcat.utils as u
from scipy.signal import convolve
from scipy.interpolate import splrep, splev
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from typing import Dict, List, Optional, Tuple, Union, Any
from petitRADTRANS.plotlib import get_species_color
from petitRADTRANS.plotlib.style import default_color
from GUIBRUSHR.GUI.Input_Output_Panels.Input_Panels.TabPanels.FrameGenerationHRData.StellarDialog import StellarDialog
# from petitRADTRANS._input_data_loader import (
#     get_cia_aliases, get_species_basename, get_species_scientific_name, split_species_all_info
# )

from GUIBRUSHR.GUI.WIDGET.MyFigure import MyFigure
from GUIBRUSHR.GUI.LAYOUT.GraphicsConfig import GraphicsConfig
from GUIBRUSHR.General_Constants.Classes.BestFitParametersAndResults import BestFitParametersAndResults
from GUIBRUSHR.General_Constants.Classes.UserTemperatureProfile import UserTemperatureProfile
from GUIBRUSHR.General_Constants.FunctionsAndConstants.Constant_Variables import ConstantVariables
from GUIBRUSHR.General_Constants.FunctionsAndConstants.PTR_Modules import split_species_all_info, \
    get_species_scientific_name, get_cia_aliases, get_species_basename
from GUIBRUSHR.General_Constants.FunctionsAndConstants.radvel_g import rv_drive, timetrans_to_timeperi
from GUIBRUSHR.Retrieval.ExofastMCMC import exofast_gelmanrubin


[docs] def make_smooth_mono_cmap(color: str, name: str = 'custom') -> mcolors.LinearSegmentedColormap: """ Create a smooth monochromatic colormap with 5 control points. The colormap transitions from black through dark shade, saturated color, light shade, to white, creating a smooth gradient suitable for visualization. Parameters: color (str): Hex color code or named color for the saturated color point name (str, optional): Name for the colormap. Defaults to 'custom'. Returns: LinearSegmentedColormap: Matplotlib colormap object with 256 color levels """ coeff = 0.4 rgb = mcolors.to_rgb(color) dark = tuple(c * coeff for c in rgb) light = tuple(1 - (1 - c) * coeff for c in rgb) return mcolors.LinearSegmentedColormap.from_list( name, ['black', dark, color, light, 'white'], N=256 )
[docs] def make_pyratbay_config( output_folder: str, target: Any, pressure: Any, species: Any, wavelength: Any, resolution: Any, params ) -> str: """ Create a pyratbay configuration file based on guibrush configuration. Parameters: output_folder (str): Folder where to store the outputs. target: Target object containing stellar and planetary parameters. pressure: Pressure configuration object. species: Species configuration object. wavelength: Wavelength configuration object. resolution: Resolution configuration object. params: List of input parameters. Returns: str: Path to the created configuration file. """ try: param_values = { param.name: param.get_starting_value() for param in params if param is not None } log_file = f'{output_folder}/giano_b_retrieval.log' cfg_file = f'{output_folder}/giano_b_retrieval.cfg' # Build configuration text cfg_text = ( "[pyrat]\n\nrunmode = spectrum\n" f"logfile = {log_file}\n" "rt_path = transit\n\n" ) # System parameters logp_ref = target.p0 cfg_text += ( "# System parameters\n" f"rstar = {target.stellar_radius} rsun\n" f"mstar = {target.stellar_mass} msun\n" f"rplanet = {target.radius} rjup\n" f"mplanet = {target.mass} mjup\n" f"refpressure = {logp_ref} bar\n\n" ) # Wavelength sampling cfg_text += ( "# Spectral sampling\n" f"wllow = {wavelength.min_wl_hr} um\n" f"wlhigh = {wavelength.max_wl_hr} um\n" f"wn_thinning = {resolution.lbl_high_res}\n\n" ) # Atmospheric model p_max = 10 ** pressure.range_max_pressures p_min = 10 ** pressure.range_min_pressures cfg_text += ( "# Atmospheric model\n" f"nlayers = {pressure.n_layers}\n" f"ptop = {p_min} bar\n" f"pbottom = {p_max} bar\n\n" ) # Chemistry configuration atm_species = ' '.join(list(species.composition)) atm_vmr = [] for mol, vmr in species.composition.items(): if mol in ['H2', 'He']: atm_vmr.append(str(vmr)) else: atm_vmr.append('1e-10') mass_fraction = ' '.join(atm_vmr) cfg_text += ( "chemistry = uniform\n" f"species = {atm_species}\n" f"uniform = {mass_fraction}\n" "radmodel = hydro_m\n\n" ) # Temperature model cfg_text += ( "tmodel = isothermal\n" "tpars = 1000.0\n\n" ) # Opacities configuration cfg_text += "# Opacities\n" lbl_models = '' prt_path = petitradtrans_config_parser.get_input_data_path() for i, mol in enumerate(species.line_species): opacity_file = '\n ' + str( Path(prt_path, "opacities", "lines", "line_by_line", mol, species.line_species_isotope[i], species.line_species_complete_name_hr[i] + ".xsec.petitRADTRANS.h5")) lbl_models += opacity_file if len(species.line_species) > 0: cfg_text += ( f'extfile ={lbl_models}\n\n' 'tmin = 300\n' 'tmax = 3000\n' 'tstep = 150\n\n' ) # CIA configuration root = '{ROOT}/pyratbay/data/CIA/' cia_dict = { 'H2-H2': root + 'CIA_Borysow_H2H2_0060-7000K_0.6-500um.dat', 'H2-He': root + 'CIA_Borysow_H2He_0050-7000K_0.5-031um.dat', } cia_models = '' for pair in species.continum_opacity: cia_models += f'\n {cia_dict[pair]}' if len(species.continum_opacity) > 0: cfg_text += f'csfile ={cia_models}\n\n' # Rayleigh scattering configuration rayleigh = None if len(species.rayleigh_species) > 0: rayleigh = [ f'\n dalgarno_{spec}' for spec in species.rayleigh_species ] ray_params = '' if "k0" in param_values: rayleigh.append('\n lecavelier') ray_params = f"\nrpars = {param_values['k0']} {param_values['gamma']}" if len(rayleigh) > 0: rayleigh_models = ''.join(rayleigh) cfg_text += f'rayleigh ={rayleigh_models}{ray_params}\n\n' # Cloud configuration if 'Pc' in param_values: cfg_text += f'clouds = deck\ncpars = {param_values["Pc"]}\n\n' # Write configuration to file with open(cfg_file, 'w') as f: f.write(cfg_text) return cfg_file except IOError as e: print(f"Failed to write configuration file: {str(e)}")
[docs] def create_fits(data: np.ndarray, pathf: Union[str, Path]) -> None: """ Create and save a FITS file from the provided data. This function creates a primary HDU (Header/Data Unit) using the input data, encapsulates it in an HDUList, and writes the FITS file to the specified path. If a file with the same name exists, it will be overwritten. Parameters: data (np.ndarray): The data array to be stored in the FITS file. pathf (Union[str, Path]): The destination file path for the FITS file. """ try: if not isinstance(data, np.ndarray): print("Input data must be a numpy array") # Create a primary HDU from the input data hdu = fits.PrimaryHDU(data) # Create an HDUList to hold the primary HDU hdul = fits.HDUList([hdu]) # Write the HDUList to the file, overwriting any existing file hdul.writeto(pathf, overwrite=True) except IOError as e: print(f"Failed to write FITS file: {str(e)}") except Exception as e: print(f"Invalid data format: {str(e)}")
[docs] def kernel_solid_body_rotation( omegad: float, f_rot_arr: np.ndarray, Teq: float, mu: float, gs: float, Rpj: float, rad_mode: str, index_nights_total: int, rv_sampling: float = 0.1 ) -> np.ndarray: """ Compute a rotational broadening kernel for spectroscopic observations. A planet rotating as a rigid body with angular frequency omega broadens spectral lines because different surface elements of the visible disk move at different line-of-sight velocities: an element at projected distance x from the rotation axis contributes flux at a radial velocity offset v = omega * x. The rotational kernel K(v) is the disk-integrated weight of all surface elements emitting at velocity v, and convolving a model spectrum with K(v) reproduces the line broadening induced by planetary rotation. The overall width of the kernel is set by the equatorial rotational velocity v_eq = omega * Rp. The rotation rate omega is a free parameter of the retrieval and is not constrained to any particular value. For a tidally locked planet it equals 2*pi / P_orb, but the kernel is valid for any rotation rate. The kernel geometry differs between emission and transmission spectroscopy: - In emission spectroscopy, only the planetary disk itself contributes. Each surface strip at projected distance x has a chord length 2*sqrt(Rp^2 - x^2), so K(v) is proportional to sqrt(1 - (v/v_eq)^2) for \|v\| < v_eq, and zero outside. This is the classical solid-body rotational kernel for a uniformly emitting disk: it peaks at v=0 (sub-observer point, longest chord) and drops to zero at v=±v_eq (planetary limb, chord=0). - In transmission spectroscopy, the relevant region is the atmospheric annulus between Rp and Rp + z, where z = 5*H is the effective atmospheric thickness and H = k_B * T_eq / (mu * m_H * g) is the scale height. Defining v_eq_atm = omega * (Rp + z), two velocity regions contribute differently: - v_eq <= \|v\| < v_eq_atm (annulus only): the line of sight passes only through the annulus, with weight sqrt((Rp+z)^2 - (v/omega)^2) / z. - \|v\| < v_eq (annulus above opaque disk): the line of sight passes through the full annulus minus the opaque planetary disk, with weight (sqrt((Rp+z)^2 - (v/omega)^2) - sqrt(Rp^2 - (v/omega)^2)) / z. An asymmetry parameter f_rot (stored as log10 in f_rot_arr) is applied to the receding hemisphere (v > 0) to allow for day/night asymmetries in the transmission signal. It is a free parameter of the retrieval. In both modes the kernel is normalized to unit sum before being returned, so that convolution preserves the total flux of the spectrum. Parameters ---------- omegad : float Rotational angular frequency of the planet, in inverse days (d^-1). The equatorial rotational velocity is v_eq = omega * Rp, where omega is omegad converted to s^-1. This is a free parameter of the retrieval and is not assumed to equal any particular value. f_rot_arr : np.ndarray Array of base-10 logarithms of the rotational asymmetry scaling factors, one per observing night. Used only in transmission mode: f_rot = 10^f_rot_arr[index_nights_total] is multiplied onto the kernel for v > 0 to model day/night hemisphere asymmetries. It is a free parameter of the retrieval. Teq : float Equilibrium temperature of the planet, in Kelvin. Used to compute the atmospheric scale height H = k_B * T_eq / (mu * m_H * g), which defines the vertical extent z = 5*H of the transmission annulus. Only used in transmission mode. mu : float Mean molecular weight of the atmosphere, in units of the hydrogen mass m_H. Used to compute the scale height. Only used in transmission mode. gs : float Surface gravity of the planet, in cgs units (cm s^-2). Used to compute the scale height. Only used in transmission mode. Rpj : float Planetary radius, in centimeters. Defines v_eq = omega * Rp (emission) and the inner boundary of the transmission annulus (transmission). rad_mode : str Observing mode. Must be one of ConstantVariables.LIST_RAD_MODE: - LIST_RAD_MODE[0]: transmission spectroscopy (annulus kernel). - any other value: emission spectroscopy (solid disk kernel). index_nights_total : int Index into f_rot_arr selecting the asymmetry factor for the current observing night. Only used in transmission mode. rv_sampling : float, optional Velocity step of the radial velocity grid on which the kernel is computed, in km/s. Default is 0.1 km/s, fine enough to well-sample the kernel profile for any realistic v_eq. Returns ------- np.ndarray Normalized rotational broadening kernel, sampled on a velocity grid spanning -30 to +30 km/s with step rv_sampling. The kernel sums to unity by construction. """ # Physical constants kb = pc.kB # Boltzmann constant (erg/K) mH = 1.6738e-24 # Hydrogen mass (g) # --- Atmospheric scale height --- # H = k_B * T_eq / (mu * m_H * g), in cm. # Used only in transmission mode to define the vertical extent of the # absorbing annulus: z = 5*H is a standard approximation for the # effective atmospheric thickness probed in transmission. H = kb * Teq / (mu * mH * gs) z = 5 * H # --- Radial velocity grid --- # The kernel is computed on a grid from -30 to +30 km/s with step # rv_sampling (default 0.1 km/s), giving 601 points. This range is # sufficient for any realistic v_eq: for WASP-77Ab (omega=0.665 d^-1, # Rpj=1.21 Rj), v_eq ~ 0.67 km/s, well within ±30 km/s. rv_array = np.arange(-30, 30 + rv_sampling, rv_sampling) rv_array_cm = rv_array * 1e5 # convert km/s to cm/s # --- Convert omega from d^-1 to s^-1 --- # 1 d^-1 = 1/86400 s^-1 = 1.1574e-5 s^-1 omega = omegad * 1.157407407407407e-05 # s^-1 # --- Projected distance x on the disk --- # For a surface element moving at line-of-sight velocity v = omega * x, # the projected distance from the rotation axis is x = v / omega. # Here rv_array_cm plays the role of v (in cm/s), so x is in cm. # x = rv_array_cm / omega ker_rot = np.zeros(len(rv_array)) # Equatorial rotational velocity: v_eq = omega * Rp (cm/s) # Defines the velocity at the planetary limb (x = ±Rp). # In transmission, also define v_eq_atm = omega * (Rp + z), # the velocity at the outer edge of the atmospheric annulus. v_eq = omega * Rpj # cm/s v_eq_atm = omega * (Rpj + z) # cm/s if rad_mode == ConstantVariables.LIST_RAD_MODE[0]: # Transmission mode f_rot = 10 ** f_rot_arr[index_nights_total] # Region 1 (annulus only): v_eq <= |v| < v_eq_atm # The line of sight passes only through the atmospheric annulus. mask1 = (np.abs(rv_array_cm) >= v_eq) & (np.abs(rv_array_cm) < v_eq_atm) # Region 2 (annulus above opaque disk): |v| < v_eq # The line of sight passes through the full annulus minus the opaque disk. mask2 = np.abs(rv_array_cm) < v_eq ker_rot[mask1] = np.sqrt((Rpj + z) ** 2 - (rv_array_cm[mask1] / omega) ** 2) / z ker_rot[mask2] = ( np.sqrt((Rpj + z) ** 2 - (rv_array_cm[mask2] / omega) ** 2) - np.sqrt(Rpj ** 2 - (rv_array_cm[mask2] / omega) ** 2) ) / z ker_rot[rv_array_cm > 0] *= f_rot else: # Emission mode # K(v) ∝ sqrt(1 - (v/v_eq)^2), zero for |v| > v_eq mask = np.abs(rv_array_cm) < v_eq ker_rot[mask] = np.sqrt(1 - (rv_array_cm[mask] / v_eq) ** 2) # Normalize to unit sum so that convolution preserves total flux. return ker_rot / np.sum(ker_rot)
[docs] def convolve_solid_body_rotation( wl: np.ndarray, model: np.ndarray, ker_rot: np.ndarray, rv_sampling: float = 0.1, n_el: int = 321 ) -> Tuple[np.ndarray, np.ndarray]: """ Convolve a model spectrum with a rotational broadening kernel. This function is the second convolution step in the model preparation pipeline, applied after convolve_resolution. It takes the rotational kernel produced by kernel_solid_body_rotation (defined on a fine velocity grid with step rv_sampling) and convolves it with the model spectrum. The procedure mirrors that of convolve_resolution: 1. The pixel velocity step of the model wavelength grid is computed as dv = c * dlambda / lambda, constant by construction for a log-uniform grid (constant resolving power R_model). 2. The kernel is resampled from its fine velocity grid (step rv_sampling, default 0.1 km/s) onto a coarser grid whose spacing matches the model pixel velocity step, using spline interpolation. This step is mandatory because discrete convolution requires kernel and signal to share the same grid spacing. 3. The resampled kernel is convolved with the model in 'valid' mode (scipy.signal.convolve), discarding the n_el//2 edge pixels on each side where the kernel does not fully overlap the signal, and the result is normalized by the kernel sum to ensure flux conservation. Note: unlike convolve_resolution, this function does not apply a radial velocity shift. The shift is handled entirely by convolve_resolution via kernel translation. This function is responsible only for the rotational line broadening. Parameters ---------- wl : np.ndarray 1D array of wavelength values for the model spectrum. Must be log-uniform (constant resolving power R_model), so that the pixel velocity step dv = c / R_model is constant across the grid. model : np.ndarray 1D array of spectral flux values corresponding to wl. This is the spectrum already convolved with the instrumental LSF by convolve_resolution. ker_rot : np.ndarray Rotational broadening kernel computed by kernel_solid_body_rotation, sampled on a velocity grid from -30 to +30 km/s with step rv_sampling. Must be normalized to unit sum. rv_sampling : float, optional Velocity step of the input kernel grid, in km/s. Must match the rv_sampling used in kernel_solid_body_rotation. Default is 0.1 km/s. n_el : int, optional Number of elements in the resampled kernel. Must be odd to ensure symmetry around zero and avoid introducing a spurious velocity shift. The half-extent of the resampled grid is (n_el - 1) / 2 pixels. Default is 321, giving a half-extent of 160 pixels. Returns ------- wavelength : np.ndarray Wavelength array of the convolved spectrum, shorter than the input by n_el // 2 = 160 points on each side due to 'valid' mode convolution. rconv : np.ndarray Rotationally broadened flux array, flux-conserving by construction. """ # --- Radial velocity grid for the input kernel --- # This must match the grid used in kernel_solid_body_rotation. rv_array = np.arange(-30, 30 + rv_sampling, rv_sampling) # --- Pixel velocity step of the model wavelength grid --- # dv = c * dlambda / lambda, constant by construction for a log-uniform # grid. Computed on each adjacent pixel pair and averaged as a numerical # verification (see convolve_resolution Step 2 for a detailed discussion). dv = ConstantVariables.CLIGHT / 1e5 * (wl[1:] - wl[:-1]) / wl[:-1] rv_pix = np.mean(dv) # --- Resampled velocity grid matching the model pixel step --- # A grid of n_el points (odd, default 321) with spacing rv_pix, spanning # ±(n_el-1)/2 * rv_pix = ±160 * rv_pix km/s. The odd number of points # ensures symmetry around zero, preventing spurious velocity offsets. # (See convolve_resolution Step 3 for a detailed discussion.) max_val = ((n_el - 1) / 2) * rv_pix rv_array_mod = np.linspace(-max_val, max_val, n_el) n_rv = len(rv_array_mod) # --- Resample the kernel onto the model grid via spline interpolation --- # splrep builds a B-spline representation of ker_rot on rv_array, and # splev evaluates it on rv_array_mod. After this step the kernel has # the same velocity spacing as the model, making discrete convolution # well-defined. (See convolve_resolution Step 4b for a detailed discussion.) csscaled = splrep(rv_array, ker_rot) ker_rot_pix = splev(rv_array_mod, csscaled, der=0) # --- Convolve in 'valid' mode and normalize --- # scipy.signal.convolve in 'valid' mode returns only the n - K + 1 central # output points where the kernel fully overlaps the signal, discarding the # n_rv // 2 = 160 edge pixels on each side. The wavelength array is trimmed # by the same amount. A final normalization by np.sum(ker_rot_pix) corrects # for any residual deviation from unity introduced by the spline resampling, # ensuring strict flux conservation. (See convolve_resolution Step 5 for a # detailed discussion.) ini = int(n_rv / 2) wavelength = wl[ini:-ini] rconv = convolve(model, ker_rot_pix, mode="valid") / np.sum(ker_rot_pix) return wavelength, rconv
[docs] def convolve_resolution( wavelength_model: np.ndarray, flux_model: np.ndarray, lsf_hwhm: float, radial_velocity_shift: float, lsf_grid_step: float = 0.1, kernel_size: int = 321 ) -> Tuple[np.ndarray, np.ndarray]: """ Degrade a high-resolution model spectrum to instrumental resolution and apply a Doppler shift. A real spectrograph cannot reproduce an infinitely narrow spectral line: even a perfectly monochromatic source produces a broadened profile on the detector, due to diffraction, optical aberrations, slit width, and pixel sampling. This broadening is described by the Line Spread Function (LSF). To compare a synthetic model with observed data, the model must be degraded to the same instrumental resolution, i.e. convolved with the LSF. The LSF is approximated as a Gaussian whose width is expressed in velocity units. Working in velocity space is convenient because the LSF width is approximately constant in velocity across the full spectral range, whereas in wavelength units it varies with lambda. The Half Width at Half Maximum is HWHM = c / (2R), where R is the nominal resolving power of the instrument, and the corresponding Gaussian standard deviation is sigma = HWHM / sqrt(2*ln(2)). The factor sqrt(2*ln(2)) ~ 1.177 comes from setting the Gaussian equal to half its peak value and solving for v: G(v_1/2) = exp(-v_1/2^2 / 2*sigma^2) = 1/2 gives v_1/2 = sigma * sqrt(2*ln(2)), which is by definition the HWHM. The Doppler shift of the planet is applied by translating the kernel by +v_rad rather than shifting the spectrum itself. By the translation property of convolution these two operations are mathematically equivalent, but acting on the smooth 321-point kernel avoids interpolating the full model flux array, concentrating any numerical error on the kernel alone. Parameters ---------- wavelength_model : np.ndarray 1D array of wavelength values for the model spectrum. Must be computed at constant resolving power R_model, so that the ratio dlambda/lambda = 1/R_model is constant by construction (log-uniform grid), yielding a constant velocity step dv = c / R_model per pixel. flux_model : np.ndarray 1D array of flux values corresponding to wavelength_model. lsf_hwhm : float Half Width at Half Maximum of the instrumental LSF, in km/s. Related to spectral resolution by: HWHM = c / (2R). Example: IGRINS (R ~ 45000) has HWHM ~ 3.3 km/s. radial_velocity_shift : float Radial velocity shift to apply to the model, in km/s. Positive values correspond to redshift (planet receding), negative to blueshift. This is typically a free parameter of the MCMC retrieval, representing the velocity offset between model and observed data. lsf_grid_step : float, optional Velocity step for the initial fine LSF kernel grid, in km/s. Must be small enough to well-sample the Gaussian peak. Default is 0.1 km/s. kernel_size : int, optional Number of elements in the resampled convolution kernel. Must be odd to ensure the grid is symmetric around zero, which avoids introducing a spurious velocity shift during convolution. The half-extent of the resampled grid is (kernel_size - 1) / 2 pixels. Default is 321, giving a half-extent of 160 pixels. Returns ------- wavelength_out : np.ndarray Wavelength array of the convolved spectrum. Shorter than the input by kernel_size // 2 = 160 points on each side, because scipy.signal.convolve in 'valid' mode discards the edge pixels where the kernel does not fully overlap the signal (see Step 5). flux_convolved : np.ndarray Convolved and velocity-shifted flux array, flux-conserving by construction. """ # --- Step 1: Build the Gaussian LSF kernel on a fine velocity grid --- # The grid spans ±30 km/s with a step of lsf_grid_step (default 0.1 km/s), # yielding 601 points. This range is intentionally much wider than the # Gaussian itself: for IGRINS (HWHM ~ 3.3 km/s) the kernel drops to # negligible values already beyond ±10 km/s, so nothing is lost at the edges. # # HWHM_TO_SIGMA = sqrt(2 * ln(2)) ~ 1.177 converts HWHM to Gaussian sigma. # It comes from solving G(v_1/2) = 1/2: # exp(-v_1/2^2 / 2*sigma^2) = 1/2 => v_1/2 = sigma * sqrt(2*ln(2)) = HWHM # therefore: sigma = HWHM / sqrt(2*ln(2)) # # The kernel is normalized to unit sum so that the convolution preserves # the total flux of the spectrum. lsf_velocity_grid = np.arange(-30, 30 + lsf_grid_step, lsf_grid_step) HWHM_TO_SIGMA = 1.177 # = sqrt(2 * ln(2)) lsf_sigma = lsf_hwhm / HWHM_TO_SIGMA lsf_kernel = np.exp(-0.5 * (lsf_velocity_grid / lsf_sigma) ** 2) lsf_kernel = lsf_kernel / np.sum(lsf_kernel) # --- Step 2: Compute the velocity step of the model wavelength grid --- # The model is computed at constant resolving power R_model, so its # wavelength grid has dlambda/lambda = 1/R_model = constant by construction # (log-uniform grid). The corresponding pixel velocity step is therefore # dv = c * dlambda/lambda = c / R_model, constant across the full grid. # We compute it explicitly on each pair of adjacent pixels and take the # mean as a numerical verification. This value dv defines the spacing of # the resampled kernel in the next step. # Example: for R_model = 2.5e5 (petitRADTRANS at R=1e6, downsampled by 4), # dv = 3e5 / 2.5e5 = 1.2 km/s per pixel. velocity_step_per_pixel = ConstantVariables.CLIGHT * (wavelength_model[1:] - wavelength_model[:-1]) / wavelength_model[:-1] mean_velocity_step = np.mean(velocity_step_per_pixel) # --- Step 3: Build the resampled velocity grid matching the model --- # The discrete convolution requires that the kernel and the model spectrum # share the same velocity grid spacing. We therefore create a new grid of # kernel_size points (odd, default 321) with spacing mean_velocity_step, # so that each kernel element corresponds to exactly one model pixel. # The half-extent is (kernel_size - 1) / 2 = 160 pixels, corresponding to # ±160 * 1.2 = ±192 km/s for R_model = 2.5e5. This is sufficient to # accommodate both the LSF width (a few km/s) and any realistic planetary # radial velocity shift. # The odd kernel_size ensures the grid is symmetric around zero, preventing # any spurious velocity offset from being introduced during convolution. half_kernel_extent = ((kernel_size - 1) / 2) * mean_velocity_step resampled_velocity_grid = np.linspace(-half_kernel_extent, half_kernel_extent, kernel_size) # --- Step 4: Shift and resample the LSF kernel --- # Two operations are performed simultaneously: # # (a) SHIFT: adding radial_velocity_shift to the fine grid coordinates # translates the Gaussian peak from 0 to +v_rad km/s. By the # translation property of convolution, this is mathematically equivalent # to shifting the model spectrum by +v_rad, but acts only on the smooth # 321-point kernel instead of the full model array, concentrating any # numerical error on the kernel alone. # # (b) RESAMPLE: splrep/splev interpolates the shifted kernel from the fine # grid (step = 0.1 km/s, 601 points) onto the coarser grid built in # Step 3 (step = mean_velocity_step, 321 points). After this step, # kernel and model share the same grid spacing and discrete convolution # is well-defined. shifted_kernel_spline = splrep(lsf_velocity_grid + radial_velocity_shift, lsf_kernel) resampled_lsf_kernel = splev(resampled_velocity_grid, shifted_kernel_spline, der=0) # --- Step 5: Convolve and return --- # scipy.signal.convolve is called in 'valid' mode. In a discrete convolution # between a signal of N points and a kernel of K points, the output has # N + K - 1 points in 'full' mode. However, the first and last K//2 = 160 # output points correspond to positions where the kernel extends beyond the # signal boundary, requiring the signal to be padded with zeros — which # introduces boundary artefacts. 'valid' mode discards these edge points # and returns only the N - K + 1 central values where the kernel overlaps # the signal completely. The wavelength array is trimmed by the same amount. # # A final normalization by np.sum(resampled_lsf_kernel) corrects for any # residual deviation from unity introduced by the spline resampling in # Step 4, ensuring strict flux conservation in the output. half_kernel_size = kernel_size // 2 wavelength_out = wavelength_model[half_kernel_size:-half_kernel_size] flux_convolved = convolve(flux_model, resampled_lsf_kernel, mode="valid") / np.sum(resampled_lsf_kernel) return wavelength_out, flux_convolved
[docs] def create_path_night( pathfolder: Union[str, Path], target: str, rad_mode: str, instrument: str ) -> str: """ Construct and return the file path for a specific night's observation data. The path is built by concatenating the following components: - `pathfolder`: The base directory containing observation data. - `target`: The target object or observation name. - A constant subdirectory defined by `ConstantVariables.HR_INSTRUMENTS`, representing high-resolution instruments. - `rad_mode`: The radiative mode, which specifies the type of observation. This can be either 'emission' or 'transmission'. - `instrument`: The instrument identifier. Parameters: pathfolder (Union[str, Path]): The base directory path. target (str): The target object or observation name. rad_mode (str): The radiative mode; valid options are 'emission' or 'transmission'. instrument (str): The instrument identifier. Returns: str: The fully constructed file path as a string. """ # Validate rad_mode if rad_mode not in ConstantVariables.LIST_RAD_MODE: print(f"Invalid rad_mode: {rad_mode}. Must be 'Emission' or 'Transmission'") # Construct the full path using pathlib.Path full_path = Path(pathfolder, target, ConstantVariables.HR_INSTRUMENTS, rad_mode, instrument) # Convert the Path object to a string and return it return str(full_path)
[docs] def get_csv_value( df: DataFrame, search_value: Any, search_column: str = "Variable", target_column: str = "Value" ) -> Optional[Any]: """ Retrieve a value from a DataFrame based on a search condition. This function searches for rows in the provided DataFrame where the value in `search_column` matches `search_value`. If a matching row is found, the function returns the value in the corresponding `target_column` from the first match. If no match is found, the function returns None. Parameters: df (pd.DataFrame): The DataFrame to search. search_value (Any): The value to look for in the `search_column`. search_column (str, optional): The column name to search within. Defaults to "Variable". target_column (str, optional): The column name from which to retrieve the value. Defaults to "Value". Returns: Optional[Any]: The value from the `target_column` of the first matching row, or None if no match is found. """ # Validate column names if search_column not in df.columns: print(f"Search column '{search_column}' not found in DataFrame") if target_column not in df.columns: print(f"Target column '{target_column}' not found in DataFrame") # Filter the DataFrame to find rows where the search_column matches search_value matching_rows = df[df[search_column] == search_value] # Return None if no match is found, otherwise return the value from target_column return None if matching_rows.empty else matching_rows[target_column].values[0]
[docs] def plot_tp_profile( range_min_pressures: float, range_max_pressures: float, n_layers: int, parameters_tp_profile: Any, gravity: float, format_temperature: str, path_imm: Union[str, Path], err: Any, rng: Any ) -> None: """ Plot the pressure-temperature (PT) profile for an atmosphere and save the figure. Parameters: range_min_pressures (float): Log10 of the minimum pressure (bar). range_max_pressures (float): Log10 of the maximum pressure (bar). n_layers (int): Number of pressure layers. parameters_tp_profile (Any): Parameters for the temperature profile. gravity (float): Surface gravity. format_temperature (str): Name of the temperature profile method to use. path_imm (Union[str, Path]): Path to save the output image. err (Any): Error parameter for the temperature profile. rng (Any): Random number generator or seed for the temperature profile. Returns: None """ pressures = np.logspace( range_min_pressures, range_max_pressures, n_layers ) root = MyFigure("Pressure-Temperature", type_data="Plot") ax = root.return_ax(["PT profile"], ["Temperature [K]"], ["Pressure [bar]"]) ax.set_xlabel("Temperature [K]", fontsize=GraphicsConfig.PLOT_FONT_SIZE_LABEL) ax.set_ylabel("Pressure [bar]", fontsize=GraphicsConfig.PLOT_FONT_SIZE_LABEL) ax.set_title("PT profile", fontsize=GraphicsConfig.PLOT_FONT_SIZE_TITLE) ax.tick_params(axis='both', labelsize=GraphicsConfig.PLOT_FONT_SIZE_TICK) ax.set_yscale("log") ax.set_ylim([max(pressures), min(pressures)]) tp_profile_obj = UserTemperatureProfile( pressures, parameters_tp_profile, gravity, err, rng ) function_tp_profile = getattr(tp_profile_obj, format_temperature) temperature, temp_layer_i = function_tp_profile() ax.plot(temperature, pressures, alpha=0.5, color="red", label="PT profile") if temp_layer_i is not None: pressures_sample = np.logspace( range_min_pressures, range_max_pressures, 1000 ) n_profiles = len(temp_layer_i[0, :]) n_samples = 1000 temperature_sample = np.zeros([n_profiles + 1, n_samples]) for index in range(n_profiles): temperature_sample[index, :] = splev(pressures_sample, splrep(pressures, temp_layer_i[:, index])) temperature_sample[-1, :] = splev(pressures_sample, splrep(pressures, temperature)) sigmas = np.std(temperature_sample, ddof=1, axis=0) ax.fill_betweenx(pressures_sample, temperature_sample[-1, :] - sigmas, temperature_sample[-1, :] + sigmas, alpha=0.3, color="purple", label=r"$\sigma$") ax.fill_betweenx(pressures_sample, temperature_sample[-1, :] - 2 * sigmas, temperature_sample[-1, :] + 2 * sigmas, alpha=0.3, color="blue", label=r"2$\sigma$") ax.legend(fontsize=GraphicsConfig.PLOT_FONT_SIZE_LEGEND) root.create_figure(str(Path(path_imm, "tp_profile.jpg")), 600) # Save plot data to pkl for later reconstruction pkl_data = { "range_min_pressures": range_min_pressures, "range_max_pressures": range_max_pressures, "n_layers": n_layers, "parameters_tp_profile": parameters_tp_profile, "gravity": gravity, "format_temperature": format_temperature, "pressures": pressures, "temperature": temperature, "temp_layer_i": temp_layer_i, } with open(str(Path(path_imm, "tp_profile.pkl")), "wb") as _f: pickle.dump(pkl_data, _f)
[docs] def plot_corner_metallicity(array_molecules, names_molecules_corner, folder_retrieval, filename, title_figure): """ Generate and save a corner plot for molecular abundance posteriors. Args: array_molecules (np.ndarray): 2D array of posterior samples (n_samples x n_params). names_molecules_corner (list): Labels for each parameter axis. folder_retrieval (str): Path to the retrieval output folder. filename (str): Base filename for the saved plot. title_figure (str): Title displayed on the plot window. """ contour_kwargs = {"colors": "black"} contourf_kwargs = {"colors": ["black"]} hist_kwargs = {"color": "black"} fill_countours = False # --- DEBUG: check for NaN/Inf --- for i, name in enumerate(names_molecules_corner): col = array_molecules[:, i] n_nan = np.sum(np.isnan(col)) n_inf = np.sum(np.isinf(col)) if n_nan > 0 or n_inf > 0: print(f"ATTENTION: {name} (col {i}): {n_nan} NaN, {n_inf} Inf") figure_output = corner( array_molecules, labels=names_molecules_corner, quantiles=[0.16, 0.5, 0.84], show_titles=True, smooth=True, title_kwargs={"fontsize": 11}, fill_countours=fill_countours, contour_kwargs=contour_kwargs, contourf_kwargs=contourf_kwargs, hist_kwargs=hist_kwargs, labelpad=0.1, use_math_text=True, plot_density=False, plot_datapoints=True, plot_contours=True ) os.system("mkdir -p " + str(Path(folder_retrieval, "pmcmcg"))) filepdf = str(Path(folder_retrieval, "pmcmcg", f"{filename}.pdf")) filepng = str(Path(folder_retrieval, "pmcmcg", f"{filename}.png")) plt.tight_layout() plt.savefig(fname=filepdf, bbox_inches="tight", pad_inches=0.1) plt.savefig(fname=filepng, bbox_inches="tight", pad_inches=0.1) plt.close(figure_output) # Save plot data to pkl for later reconstruction pkl_data = { "array_molecules": array_molecules, "names_molecules_corner": names_molecules_corner, "filename": filename, "title_figure": title_figure, } with open(str(Path(folder_retrieval, "pmcmcg", f"{filename}.pkl")), "wb") as _f: pickle.dump(pkl_data, _f) MyFigure(title_figure, type_data="Image", image_path=filepng)
[docs] def opacities_contribution_plot( path_results: Union[str, Path], opacities_contribution: Dict[str, Any], resolution: str, function: str = "plot", wl_scale: str = "log", invert_y_axis: int = 0, instruments_LR=None, offsetLR_arr=None, rp=None, rs=None, rad_mode: str = "Transmission", ) -> None: """ Plot and save the contribution of different opacity sources to the atmospheric model. This function creates a visualization showing how different opacity sources (line species, gas continua, clouds, etc.) contribute to the total atmospheric opacity. The plot is saved as a high-resolution image in the specified output directory. Parameters: path_results (Union[str, Path]): Directory path where the output plot will be saved. opacities_contribution (Dict[str, Any]): Dictionary containing opacity contributions from different sources. Keys are source types (e.g., 'line_species', 'cloud_species'), values are dictionaries mapping species names to their contribution data. resolution (str): Resolution identifier used in the plot title and filename. function (str, optional): Plotting function to use ('plot', 'scatter', etc.). Defaults to "plot". wl_scale (str, optional): Scale for the wavelength axis ('log' or 'linear'). Defaults to "log". invert_y_axis (int, optional): Whether to invert the y-axis. Defaults to False. instruments_LR: Dictionary containing the instruments and their LR values. offsetLR_arr: Array containing the offset LR values. rp: Radius of the planet. rs: Radius of the star. rad_mode: Radiative mode. Returns: None """ # TODO print("TODO opacities contribution for Emission") if rad_mode == "Emission": return # if rad_mode == "Transmission": ylabel = r"Transit radius [$R_{{Jup}}$]" else: ylabel = r'1 + Emission lines' # Validate input parameters if wl_scale not in ['log', 'linear']: print(f"Invalid wl_scale: {wl_scale}. Must be 'log' or 'linear'") # Initialize color and style mappings for different opacity sources opacity_sources_colors = { 'line_species': None, 'gas_continuum_contributors': None, 'cloud_species': None, 'rayleigh_species': None, 'opaque_cloud_top_pressure': 'darkgray', 'power_law': 'red', 'gray_opacity': 'gray', 'cloud_photosphere_median_optical_depth': 'gold', 'additional_absorption_opacities_function': 'darkviolet', 'additional_scattering_opacities_function': 'magenta', 'stellar_intensities': 'yellow', 'Total': 'k' } opacity_sources_linestyles = { 'line_species': '-', 'gas_continuum_contributors': '--', 'cloud_species': '-.', 'rayleigh_species': (0, (1, 0.66)), # densely dotted 'opaque_cloud_top_pressure': '-.', 'power_law': ':', 'gray_opacity': ':', 'cloud_photosphere_median_optical_depth': '-.', 'additional_absorption_opacities_function': ':', 'additional_scattering_opacities_function': ':', 'stellar_intensities': '-', 'Total': '-' } # Initialize color generator for species without predefined colors default_species_color_index = (i for i in range(1001)) # Process species colors for each opacity source for opacity_source, species_list in opacities_contribution.items(): if not isinstance(species_list, dict): continue if opacity_sources_colors[opacity_source] is None: opacity_sources_colors[opacity_source] = {} for species in species_list: # Handle special case for gas continuum contributors if opacity_source == 'gas_continuum_contributors': _species = split_species_all_info(get_cia_aliases(species))[0] _species = _species.split('--', 1) if _species[0] == _species[1]: _species = _species[0] elif 'H2' in _species: _species = _species[1] if _species[0] == 'H2' else _species[0] else: _species = _species[0] else: _species = species # Assign color if not already assigned if species not in opacity_sources_colors[opacity_source]: species_color = get_species_color(get_species_basename(_species), implemented_only=False) if species_color == default_color: i = next(default_species_color_index) if i == 1000: print("Cannot support more than 1000 species") species_color = f"C{i}" opacity_sources_colors[opacity_source][species] = species_color # Create figure and axis root = MyFigure( resolution + " Opacity Contribution", type_data="Plot", personal_size=[0.9, 0.6] ) axs = root.return_ax( [resolution + " Opacity Contribution"], ["Wavelength [nm]"], [ylabel], subplots=(1, 1), figsize=(12, 6) ) # Initialize tracking variables for y-axis limits min_y = np.inf max_y = 0 # Plot each opacity source for key, value in opacities_contribution.items(): if value is None: continue if key == "Total": wavelengths = value[0] contribution = value[1] # Plot total contribution axs.plot( wavelengths, contribution, label=key, color=opacity_sources_colors[key], linestyle=opacity_sources_linestyles[key] ) # max_y = np.nanmax([max_y, np.nanmax(contribution)]) else: if isinstance(value, dict): # Plot individual species contributions for species_name, species_data in value.items(): if species_data is None: continue # Format species label _species = get_species_scientific_name(species_name) if key == 'rayleigh_species': _species += ' (Rayleigh)' __species = species_name + ' (Rayleigh)' elif key == 'gas_continuum_contributors': __species = species_name.rsplit('-NatAbund', 1)[0] else: __species = species_name if species_data is not None: wavelengths = species_data[0] contribution = species_data[1] # Plot species contribution getattr(axs, function)( wavelengths, contribution, label=_species, color=opacity_sources_colors[key][species_name], linestyle=opacity_sources_linestyles[key], alpha=0.5 ) # min_y = np.nanmin([min_y, np.nanmin(contribution)]) else: # Plot non-species opacity sources key_name = key.replace('_', ' ') wavelengths = value[0] contribution = value[1] getattr(axs, function)( wavelengths, contribution, label=key_name, color=opacity_sources_colors[key], linestyle=opacity_sources_linestyles[key], alpha=0.5 ) # min_y = np.nanmin([min_y, np.nanmin(contribution)]) min_y = np.nanmin([min_y, np.nanmin(contribution)]) max_y = np.nanmax([max_y, np.nanmax(contribution)]) if instruments_LR: for index_LR, instrument in enumerate(instruments_LR): # Calculate offset for current instrument (first instrument has no offset) offsetLR = 0 if (index_LR == 0 or offsetLR_arr is None) else offsetLR_arr[index_LR - 1] # Extract instrument data ldata = instrument.lr_data.ldata_LR rdata = instrument.lr_data.rdata_LR - offsetLR # Shift data to model level errdata = instrument.lr_data.errdata_LR step = instrument.lr_data.step_LR if rad_mode == "Transmission": rdata_temp = 1 - rdata rdata = np.sqrt(rdata_temp) * rs * ConstantVariables.RATIO_RSUN_RJUP_MEAN errdata = rs * ConstantVariables.RATIO_RSUN_RJUP_MEAN * errdata / (2 * np.sqrt(rdata_temp)) # error propagation # Plot observational data with error bars axs.errorbar( ldata, rdata, yerr=errdata, xerr=step, ls='none', label=f"{instrument.name} + {-offsetLR:.5f}", alpha=0.5, color="Red" ) # Configure plot appearance axs.legend(loc='center left', bbox_to_anchor=(1, 0.5)) delta_y = max_y - min_y axs.set_ylim([min_y - delta_y * 0.05, max_y + delta_y * 0.05]) if wl_scale == 'log': axs.set_xscale("log") if invert_y_axis: axs.invert_yaxis() plt.subplots_adjust(right=0.5) # Create output directory and save plot path_imm = Path(path_results, "opacity_contribution") path_imm.mkdir(parents=True, exist_ok=True) try: root.create_figure(str(Path(path_imm, resolution + "_img.png")), 600, False) except IOError as e: print(f"Failed to save opacity contribution plot: {str(e)}")
[docs] def calculate_mass_molecule(elements, periodic_table): """ Calculate the total molecular mass of a molecule from its elemental composition. This function parses element strings that may contain mass numbers, atom counts, and ionization states, then calculates the total molecular mass using atomic masses from the periodic table. Args: elements: List of element strings with format like 'H2', '16O', 'C1+', etc. periodic_table: DataFrame containing periodic table data with columns 'Symbol', 'NumberofProtons', 'NumberofNeutrons' Returns: float: Total molecular mass in atomic mass units (u) """ total_mass = 0 # Define atomic masses (in atomic mass units, u) proton_mass = 1.007276 # u neutron_mass = 1.008665 # u electron_mass = 0.000548579909 # u # for element in elements: # Count the number of removed electrons indicated by '+' symbols electrons_removed = element.count("+") # Remove numerical parts and extra characters to obtain the elemental symbol symbol = sub(r'[0-9]+', '', element).replace("+", "").replace("_", "") if len(symbol) > 4: # Skip elements with an unexpectedly long symbol continue # Extract all numeric parts from the element string (e.g., mass numbers and counts) amounts = findall(r'\d+', element) if len(amounts) > 1: # If two numbers are present: the first is the mass number, the second is the count proton_plus_neutron = amounts[0] atom_in_molecule = amounts[1] elif len(amounts) == 1: # If one number is present, determine its role by checking its position relative to the symbol if symbol in element[:len(symbol)]: # Use periodic table values if the number is part of standard element notation table_row = periodic_table[periodic_table["Symbol"] == element[:len(symbol)]] proton_plus_neutron = int(table_row["NumberofNeutrons"].iloc[0]) + int(table_row["NumberofProtons"].iloc[0]) atom_in_molecule = amounts[0] else: proton_plus_neutron = amounts[0] atom_in_molecule = "1" else: # If no numeric value is found, default to using the periodic table (assume count = 1) table_row = periodic_table[periodic_table["Symbol"] == element[:len(symbol)]] proton_plus_neutron = int(table_row["NumberofNeutrons"].iloc[0]) + int(table_row["NumberofProtons"].iloc[0]) atom_in_molecule = "1" # Retrieve the number of protons (and by extension, electrons) for the element number_protons = number_electrons = int( periodic_table[periodic_table["Symbol"] == symbol]["NumberofProtons"].iloc[0] ) # Calculate the number of neutrons from the total mass number minus the number of protons number_neutrons = int(proton_plus_neutron) - number_protons # Add the mass contribution of this element to the total mass of the isotope total_mass += ((number_protons * proton_mass + number_neutrons * neutron_mass + (number_electrons - electrons_removed) * electron_mass) * int(atom_in_molecule)) return total_mass
[docs] def get_condensed_line_list(): """ Retrieve condensed molecule information from petitRADTRANS opacity data. This function scans the condensed molecule opacity files in the petitRADTRANS installation and extracts information about available condensed species including their chemical formulas, names, masses, and visualization names. Returns: tuple: A tuple containing: - np.ndarray: Array of condensed molecule chemical formulas - np.ndarray: Array of condensed molecule file names - np.ndarray: Array of calculated molecular masses (in u) - np.ndarray: Array of visualization names for display Note: If no condensed molecules are found, returns arrays with ["None"] and mass [-999] as placeholders. """ # Load required paths from ConstantVariables petitradtrans = ConstantVariables.path_petitradtrans path_default = ConstantVariables.path_default # Read the periodic table CSV file (assumed to return a pandas DataFrame) periodic_table_path = Path(path_default, "GUIBRUSHR", "Files", "Molecules", "periodic_table.csv") periodic_table = read_csv(str(periodic_table_path)) # Define the path containing line-by-line opacity data for molecules clouds_path = Path(petitradtrans, "opacities", "continuum", "clouds") # Initialize containers for molecules, isotopes, opacity_line_lists_HR, and computed masses all_condensed_molec_from_yaml = ConstantVariables.ALL_CONDENSED_MOLEC condensed_molecs = [] condensed_names = [] condensed_masses = [] condensed_visualization_name = [] # try: # Retrieve a sorted list of molecule directories condensed_names = [cloud.name.removesuffix(".cotable.petitRADTRANS.h5") for cloud in sorted(clouds_path.glob('**/*.h5'))] for i, condensed_name in enumerate(condensed_names): name_split = condensed_name.split("-") chemical_formula = [] for nn in name_split: if "NatAbund" in nn: break else: chemical_formula.append(nn) condensed_molecs.append("".join(chemical_formula)) condensed_masses.append(calculate_mass_molecule(chemical_formula, periodic_table)) if len(condensed_names) == 0: condensed_visualization_name = ["None"] condensed_molecs = ["None"] condensed_names = ["None"] condensed_masses = [-999] else: condensed_visualization_name = [all_condensed_molec_from_yaml[key] for key in condensed_names] except Exception as e: print(e, "\nNo condensed molecules present in opacities") # Return the sorted molecules array and the dictionaries for isotopes, opacity_line_lists_HR, and masses return np.array(condensed_molecs), np.array(condensed_names), np.array(condensed_masses), np.array(condensed_visualization_name)
# def get_line_lists() -> Tuple[np.ndarray, Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, float]]: # """ # Retrieve line lists, isotopes, opacity_line_list identifiers, and compute isotopic masses # from molecular opacity data used by petitRADTRANS. # # This function performs the following steps: # 1. Reads the periodic table data from a CSV file to obtain atomic properties # 2. Scans the 'line_by_line' directory for molecule directories # 3. For each molecule, processes its isotopes and computes their masses # 4. Extracts opacity_line_list information from the opacity files # # Returns: # Tuple containing: # - np.ndarray: Sorted array of molecule names # - Dict[str, np.ndarray]: Dictionary mapping molecules to their isotope names # - Dict[str, np.ndarray]: Dictionary mapping isotopes to their opacity_line_list identifiers # - Dict[str, float]: Dictionary mapping isotopes to their computed masses (in atomic mass units) # # Raises: # FileNotFoundError: If the periodic table CSV file is not found # IOError: If there are issues reading the files # ValueError: If the periodic table data is malformed # """ # # Load required paths from ConstantVariables # petitradtrans = ConstantVariables.path_petitradtrans # path_default = ConstantVariables.path_default # # # Define atomic masses (in atomic mass units, u) # proton_mass = 1.007276 # u # neutron_mass = 1.008665 # u # electron_mass = 0.000548579909 # u # # # Read the periodic table CSV file # periodic_table_path = Path(path_default, "GUIBRUSHR", "Files", "Molecules", "periodic_table.csv") # try: # periodic_table = read_csv(str(periodic_table_path)) # except FileNotFoundError: # print(f"Periodic table file not found at {periodic_table_path}") # except Exception as e: # print(f"Error reading periodic table file: {str(e)}") # # # Define the path containing line-by-line opacity data # line_lists_path = Path(petitradtrans, "opacities", "lines", "line_by_line") # # # Initialize containers for molecules, isotopes, opacity_line_lists_HR, and masses # isotopes = {} # opacity_line_lists_HR = {} # masses_isotopes = {} # # try: # # Get sorted list of molecule directories # molecs = sorted([d.name for d in line_lists_path.iterdir() if d.is_dir()]) # # # Process each molecule directory # for molec in molecs: # path_isotope = line_lists_path / molec # isotopes[molec] = np.array([d.name for d in path_isotope.iterdir() if d.is_dir()]) # # # Process each isotope for the current molecule # for isotope in isotopes[molec]: # elements = isotope.split("-") # total_mass = 0.0 # # # Calculate mass for each element in the isotope # for element in elements: # electrons_removed = element.count("+") # symbol = sub(r'[0-9]+', '', element).replace("+", "").replace("_", "") # # if len(symbol) > 4: # continue # # # Extract numeric parts from element string # amounts = findall(r'\d+', element) # # # Determine mass number and atom count # if len(amounts) > 1: # proton_plus_neutron = amounts[0] # atom_in_molecule = amounts[1] # elif len(amounts) == 1: # if symbol in element[:len(symbol)]: # table_row = periodic_table[periodic_table["Symbol"] == element[:len(symbol)]] # proton_plus_neutron = int(table_row["NumberofNeutrons"]) + int(table_row["NumberofProtons"]) # atom_in_molecule = amounts[0] # else: # proton_plus_neutron = amounts[0] # atom_in_molecule = "1" # else: # table_row = periodic_table[periodic_table["Symbol"] == element[:len(symbol)]] # proton_plus_neutron = int(table_row["NumberofNeutrons"]) + int(table_row["NumberofProtons"]) # atom_in_molecule = "1" # # # Get number of protons and calculate neutrons # number_protons = number_electrons = int( # periodic_table[periodic_table["Symbol"] == symbol]["NumberofProtons"] # ) # number_neutrons = int(proton_plus_neutron) - number_protons # # # Add element's mass contribution to total # total_mass += ( # (number_protons * proton_mass + # number_neutrons * neutron_mass + # (number_electrons - electrons_removed) * electron_mass) * # int(atom_in_molecule) # ) # # # Store computed mass and get opacity_line_list information # masses_isotopes[isotope] = total_mass # path_opacity_line_list = path_isotope / isotope # opacity_line_lists_HR[isotope] = np.array([ # d.name.removesuffix(".xsec.petitRADTRANS.h5") # for d in path_opacity_line_list.iterdir() if not d.is_dir() # ]) # # except Exception as e: # print(f"Error processing molecules: {str(e)}\nNo molecules present in opacities") # return np.array([]), {}, {}, {} # # return np.array(molecs), isotopes, opacity_line_lists_HR, masses_isotopes
[docs] def get_line_lists(): """ Retrieve line lists, isotopes, opacity_line_list identifiers, and compute isotopic masses from molecular opacity data used by petitRADTRANS. This function performs the following steps: 1. Reads the periodic table data from a CSV file to obtain numbers of protons and neutrons for each element. 2. Scans the 'line_by_line' directory (within the opacities folder) for molecule directories. 3. For each molecule, lists its available isotopes (each isotope is represented by a directory). 4. Parses each isotope's name (which encodes its elemental composition) to determine the elemental symbols and their corresponding mass numbers, compute the total mass of the isotope using predefined atomic masses (proton: 1.007276 u, neutron: 1.008665 u, electron: 0.000548579909 u), and adjust the electron count based on the number of '+' symbols. 5. For each isotope, retrieves the available opacity_line_list file names (by removing the ".xsec.petitRADTRANS.h5" suffix). Returns: tuple: A tuple containing: - np.ndarray: Sorted array of molecule names. - dict: Dictionary mapping each molecule to an array of its isotope names. - dict: Dictionary mapping each isotope to an array of HR opacity_line_list identifiers. - dict: Dictionary mapping each isotope to an array of LR opacity_line_list identifiers. - dict: Dictionary mapping each isotope to its computed total mass (in atomic mass units). """ # Load required paths from ConstantVariables petitradtrans = ConstantVariables.path_petitradtrans path_default = ConstantVariables.path_default # Read the periodic table CSV file (assumed to return a pandas DataFrame) periodic_table_path = Path(path_default, "GUIBRUSHR", "Files", "Molecules", "periodic_table.csv") periodic_table = read_csv(str(periodic_table_path)) # Define the path containing line-by-line opacity data for molecules line_lists_path = Path(petitradtrans, "opacities", "lines", "line_by_line") # Initialize containers for molecules, isotopes, opacity_line_lists_HR, and computed masses molecs = [] isotopes = dict() opacity_line_lists_HR = dict() opacity_line_lists_LR = dict() masses_isotopes = dict() try: # Retrieve a sorted list of molecule directories molecs = sorted([d.name for d in line_lists_path.iterdir() if d.is_dir()]) # Loop over each molecule directory for molec in molecs: # Path to the current molecule's isotope directories path_isotope = line_lists_path / molec # List available isotopes for this molecule (each as a directory) isotopes[molec] = np.array([d.name for d in path_isotope.iterdir() if d.is_dir()]) # Process each isotope for the current molecule for isotope in isotopes[molec]: # The isotope name encodes its elemental composition separated by "-" elements = isotope.split("-") # Store the computed mass for the isotope masses_isotopes[isotope] = calculate_mass_molecule(elements, periodic_table) # Retrieve opacity_line_list file names (strip the specific suffix) from the isotope directory path_opacity_line_list = path_isotope / isotope opacity_line_lists_HR[isotope] = np.array([ d.name.removesuffix(".xsec.petitRADTRANS.h5") for d in path_opacity_line_list.iterdir() if not d.is_dir() ]) if len(opacity_line_lists_HR[isotope]) == 0: opacity_line_lists_HR[isotope] = np.array(["None"]) # LR path_opacity_line_list_LR = Path(str(path_opacity_line_list).replace("line_by_line", "correlated_k")) if path_opacity_line_list_LR.exists(): opacity_line_lists_LR[isotope] = np.array([ d.name.removesuffix(".ktable.petitRADTRANS.h5") for d in path_opacity_line_list_LR.iterdir() if not d.is_dir() ]) if len(opacity_line_lists_LR[isotope]) == 0: opacity_line_lists_LR[isotope] = ["None"] else: opacity_line_lists_LR[isotope] = ["None"] except Exception as e: print(str(traceback.format_exc()), "\nNo molecules present in opacities") # Return the sorted molecules array and the dictionaries for isotopes, opacity_line_lists_HR, and masses return np.array(molecs), isotopes, opacity_line_lists_HR, opacity_line_lists_LR, masses_isotopes
[docs] def generate_chemcat( min_pressure: float, max_pressure: float, nlayersf: int, path_default: Union[str, Path] ) -> Tuple[np.ndarray, chemcat.Network, List[str]]: """ Generate a chemical catalog and retrieve molecular masses for atmospheric modeling. This function creates a chemical network with specified pressure layers and temperature, and prepares molecular data for atmospheric modeling. Parameters: min_pressure (float): Logarithm of minimum pressure (bar) max_pressure (float): Logarithm of maximum pressure (bar) nlayersf (int): Number of pressure layers path_default (Union[str, Path]): Base path to default files directory Returns: Tuple containing: - np.ndarray: Array of molecular masses for selected molecules - chemcat.Network: Chemical network object - List[str]: List of species names compatible with petitRADTRANS """ # Construct path to molecules data file molecules_path = Path(path_default) / "GUIBRUSHR" / "Files" / "Molecules" / "molecules.dat" names, massf, diam = None, None, None try: # Read molecular data names, massf, diam = io.read_molecs(str(molecules_path)) except FileNotFoundError: print(f"Molecules data file not found at {molecules_path}") except Exception as e: print(f"Error reading molecular data: {str(e)}") # Build list of molecules from various chemical groups molecules = ( ConstantVariables.HCNO_NEUTRALS.split() + ConstantVariables.IONS.split() + ConstantVariables.ALKALI.split() + ConstantVariables.METALS.split() + ConstantVariables.METAL_OXIDES.split() ) # Format species names for petitRADTRANS compatibility species_compatible_with_prt = [ mol if mol in ConstantVariables.IONS else mol.replace("+", "_+") for mol in molecules ] # Generate pressure grid and temperature array pressuresf = np.logspace(min_pressure, max_pressure, nlayersf) temp = np.full(nlayersf, 1000.0) # Constant temperature of 1000 K # Create chemical network chemcat_objf = chemcat.Network(pressuresf, temp, molecules) # Extract molecular masses try: molecular_masses = np.array([massf[names == spec][0] for spec in molecules]) except IndexError: print("One or more molecules not found in molecular data") return molecular_masses, chemcat_objf, species_compatible_with_prt
[docs] def read_fits( path_fits: Union[str, Path], transpose: bool = False, intransit: Optional[np.ndarray] = None ) -> np.ndarray: """ Read data from a FITS file and optionally apply slicing and transposition. Parameters: path_fits (Union[str, Path]): Path to the FITS file transpose (bool, optional): Whether to transpose the data array. Defaults to False. intransit (Optional[np.ndarray], optional): Indices to slice the data along the second axis. If provided, data is sliced as data[:, intransit, :]. Defaults to None. Returns: np.ndarray: The processed data array from the FITS file """ try: with fits.open(path_fits) as hdul: data = hdul[0].data if intransit is not None: data = data[:, intransit, :] if transpose: data = np.transpose(data) return data except FileNotFoundError: print(f"FITS file not found at {path_fits}") except Exception as e: print(f"Error reading FITS file: {str(e)}")
[docs] def get_depth_filename(rad_mode: str) -> str: """Return the depth FITS filename based on the radiative transfer mode.""" if rad_mode == "Emission": return "emission_depth.fits" return "transmission_depth.fits"
[docs] def read_depth_fits(folder: Union[str, Path], rad_mode: str, **kwargs) -> np.ndarray: """Read the depth FITS file with fallback to legacy transit_depth.fits.""" folder = Path(folder) primary = folder / get_depth_filename(rad_mode) fallback = folder / "transit_depth.fits" if primary.exists(): return read_fits(str(primary), **kwargs) if fallback.exists(): print(f"{primary.name} not found, falling back to transit_depth.fits") return read_fits(str(fallback), **kwargs) return read_fits(str(primary), **kwargs)
[docs] def plot_vmr( pressures: np.ndarray, vmr_dict: Dict[str, np.ndarray], folder_plot: Union[str, Path] ) -> None: """ Plot volume mixing ratio (VMR) profiles for different chemical species groups. This function creates a 2x3 grid of plots showing VMR profiles for: - All species - HCNO neutrals - Ions - Alkali metals - Metals - Metal oxides Parameters: pressures (np.ndarray): Pressure values (in bar) vmr_dict (Dict[str, np.ndarray]): Dictionary mapping species names to their VMR profiles folder_plot (Union[str, Path]): Directory where the plot will be saved """ # Create figure with 2x3 subplots root = MyFigure("VMR profiles", type_data="Plot", personal_size=[0.8, 0.8]) # Define plot labels and titles xlabels = ["$Log_{10} VMR$"] * 6 ylabels = ["$Log_{10}$ Pressure (bar)"] * 6 titles = [ "VMR profiles all", "VMR profiles HCNO neutrals", "VMR profiles ions", "VMR profiles alkali", "VMR profiles metals", "VMR profiles metal oxides", ] # Create subplots ax = root.return_ax( titles, xlabels, ylabels, subplots=[2, 3], figsize=(14, 12), dpi=100, hspace=0.4, wspace=0.3, ) # Initialize lists for different species groups species_groups = { 'all': [], 'HCNO_neutrals': [], 'ions': [], 'alkali': [], 'metals': [], 'metal_oxides': [] } vmr_groups = {key: [] for key in species_groups.keys()} # Sort species into groups for molec, vmr in vmr_dict.items(): molec_name_compatible = molec.replace("_", "") # Add to all species species_groups['all'].append(molec) vmr_groups['all'].append(vmr) # Add to specific groups if molec_name_compatible in ConstantVariables.HCNO_NEUTRALS: species_groups['HCNO_neutrals'].append(molec) vmr_groups['HCNO_neutrals'].append(vmr) if molec_name_compatible in ConstantVariables.IONS: species_groups['ions'].append(molec) vmr_groups['ions'].append(vmr) if molec_name_compatible in ConstantVariables.ALKALI: species_groups['alkali'].append(molec) vmr_groups['alkali'].append(vmr) if molec_name_compatible in ConstantVariables.METALS: species_groups['metals'].append(molec) vmr_groups['metals'].append(vmr) if molec_name_compatible in ConstantVariables.METAL_OXIDES: species_groups['metal_oxides'].append(molec) vmr_groups['metal_oxides'].append(vmr) # Plot each group group_positions = { 'all': (0, 0), 'HCNO_neutrals': (0, 1), 'ions': (0, 2), 'alkali': (1, 0), 'metals': (1, 1), 'metal_oxides': (1, 2) } for group, (row, col) in group_positions.items(): if species_groups[group]: # Only plot if there are species in the group u.plot_vmr( pressures, np.transpose(vmr_groups[group]), species_groups[group], axis=ax[row, col], fontsize=12 ) try: # Save the plot root.create_figure(str(Path(folder_plot, 'vmr_profiles.jpg')), 600) except IOError as e: print(f"Failed to save VMR profiles plot: {str(e)}")
[docs] def get_medians( folder_retrieval_final: Union[str, Path], arr_values, remove_chain: bool, corner_plot_or_simulation: bool, best_fit_method: str, n_samples: int = 500 ) -> BestFitParametersAndResults: """ Extract and process parameter statistics from MCMC chains. This function processes MCMC chain data to compute median values and other statistics for fitted parameters. It handles chain rejection based on chi2 criteria and performs burn-in removal. Parameters: folder_retrieval_final (Union[str, Path]): Path to the final retrieval folder arr_values: Array containing parameter information remove_chain (bool): Whether to remove chains based on chi2 criteria corner_plot_or_simulation (bool): Whether to process data for corner plots best_fit_method: Method used to extract best fit parameters n_samples: Number of samples to use for random model extraction Returns: BestFitParametersAndResults: Object containing processed parameter statistics """ from scipy.stats import gaussian_kde from scipy.signal import find_peaks # Extract best-fit parameters list_best_pars = arr_values[ConstantVariables.DICT_FILTER_TABLE["Fitted Params"]["index"]].split(",") list_best_pars_labels = copy.copy(list_best_pars) # Process molecule parameters all_molec = ConstantVariables.ALL_MOLEC for i, param in enumerate(list_best_pars): name_par = param.replace("_", "") if name_par in all_molec: list_best_pars_labels[i] = name_par # Extract fixed parameters fix_params = arr_values[ConstantVariables.DICT_FILTER_TABLE["#Fixed Params"]["index"]].split(",") lista_par_fix = [] value_fix = [] for elem in fix_params: name, value = elem.split("=") lista_par_fix.append(name.strip()) value_fix.append(float(value)) parsc = None chi2 = None # Load chain data pkl_fname = str(Path(folder_retrieval_final, "partial_prob_and_chain_burnin.pkl")) try: with open(pkl_fname, "rb") as f: m = pickle.load(f) if isinstance(m, dict): parsc = m["parameters"] chi2 = m["chi2"] det_chain = m["det_chain"] else: parsc = np.array(m[0]) chi2 = np.array(m[1]) except FileNotFoundError: print(f"Chain data file not found at {pkl_fname}") except Exception as e: print(f"Error loading chain data: {str(e)}") # Compute chain statistics medchi2 = np.median(chi2) medchi2_chains = np.median(chi2, axis=0) err_chi2 = stats.median_abs_deviation(chi2, axis=None) # Identify bad chains aa_chains = np.where(medchi2_chains < medchi2 - 1.0 * err_chi2) number_bad_chains = len(aa_chains[0]) if np.size(aa_chains) > 0: print("At least one chain to remove") else: print("All chains seem good") # Select chains based on criteria bb_chains = np.where(medchi2_chains > medchi2 - 1.0 * err_chi2)[0] if remove_chain else np.where(medchi2_chains > -np.inf)[0] chi2 = chi2[:, bb_chains.T] # Determine burn-in index burnndx = 0 for jj in range(chi2.shape[1]): tmpndx = np.where(chi2[:, jj] > medchi2)[0] if len(tmpndx) > 0 and tmpndx[0] > burnndx: burnndx = tmpndx[0] # else: # burnndx = 0 if burnndx > (chi2.shape[0] - 3): burnndx = 0 print(f"Burn-in: {burnndx}") # Remove burn-in and reshape parsc = parsc[:, burnndx:, bb_chains.T] arr1 = parsc.reshape((parsc.shape[0], -1)).T chi2 = chi2[burnndx:, :] chi2_monodimensional = chi2.reshape(-1).T # Compute convergence statistics converged, gelmanrubin, tz = exofast_gelmanrubin.exofast_gelmanrubin(parsc) print(f"Shape of parameter vector: {arr1.shape}, {chi2_monodimensional.shape}") print(f"Convergence status: {converged}, {gelmanrubin}, {tz}") # Load and update likelihood dir_f = folder_retrieval_final max_likelihood_pkl_path = Path(dir_f, "max_likelihood.pkl") try: if os.path.exists(str(max_likelihood_pkl_path)): with open(str(max_likelihood_pkl_path), "rb") as fo: last_max_LH = pickle.load(fo) print(f"Last max likelihood: {last_max_LH}") else: last_max_LH = np.nan except Exception as e: print(f"Error loading likelihood: {str(e)}") last_max_LH = np.nan current_max_likelihood = np.max(chi2_monodimensional) try: with open(str(Path(dir_f, "max_likelihood.pkl")), "wb") as fo: pickle.dump(current_max_likelihood, fo) except Exception as e: print(f"Error saving likelihood: {str(e)}") print(f"Current Max likelihood: {current_max_likelihood}") # Process eccentricity parameters for corner plots if corner_plot_or_simulation: h_ecc_arr = None k_ecc_arr = None # Handle h_ecc parameter if "h_ecc" in list_best_pars: h_ecc_arr = np.array(arr1[:, list_best_pars.index("h_ecc")]) elif "h_ecc" in lista_par_fix: h_ecc = value_fix[lista_par_fix.index("h_ecc")] h_ecc_arr = np.full(arr1.shape[0], h_ecc) arr1 = np.insert(arr1, list_best_pars.index("k_ecc"), h_ecc_arr, axis=1) list_best_pars.insert(list_best_pars.index("k_ecc"), "h_ecc") # Handle k_ecc parameter if "k_ecc" in list_best_pars: k_ecc_arr = np.array(arr1[:, list_best_pars.index("k_ecc")]) elif "k_ecc" in lista_par_fix: k_ecc = value_fix[lista_par_fix.index("k_ecc")] k_ecc_arr = np.full(arr1.shape[0], k_ecc) arr1 = np.insert(arr1, list_best_pars.index("h_ecc") + 1, k_ecc_arr, axis=1) list_best_pars.insert(list_best_pars.index("h_ecc") + 1, "k_ecc") # Convert to eccentricity and argument of periastron if h_ecc_arr is not None and k_ecc_arr is not None: arr1[:, list_best_pars.index("h_ecc")] = np.power(h_ecc_arr, 2) + np.power(k_ecc_arr, 2) ecc_opi_par = np.arctan2(h_ecc_arr, k_ecc_arr) ecc_opi_par[ecc_opi_par < 0] += 2 * np.pi arr1[:, list_best_pars.index("k_ecc")] = ecc_opi_par # Compute final statistics if best_fit_method == "Median": # With the median best_fit_parameters = np.median(arr1, axis=0) elif best_fit_method == "Max LH": # With the max of likelihood # MAP = maximum of the posterior idx_MAP = np.argmax(chi2_monodimensional) params_MAP = arr1[idx_MAP] best_fit_parameters = np.array(params_MAP) print("MAP estimate:", params_MAP) elif best_fit_method == "KDE": # # With KDE n_params = arr1.shape[1] best_fit_parameters = np.empty(n_params) for i in range(n_params): samples = arr1[:, i] kde = gaussian_kde(samples) x_grid = np.linspace(samples.min(), samples.max(), 1000) posterior_density = kde(x_grid) idx_MAP = np.argmax(posterior_density) best_fit_parameters[i] = x_grid[idx_MAP] print(f"Parameter {i}: MAP = {best_fit_parameters[i]:.4f}, Median = {np.median(samples):.4f}") elif best_fit_method == "Max from param": # With histogram maximum bin median # For each parameter: find bin with most samples, then take median within that bin n_params = arr1.shape[1] best_fit_parameters = np.empty(n_params) for i in range(n_params): samples = arr1[:, i] # Use Freedman-Diaconis rule for optimal bin width # This automatically determines a good number of bins based on data statistics counts, bin_edges = np.histogram(samples, bins='fd') # Find the bin with maximum count idx_max_bin = np.argmax(counts) # Get samples within this bin bin_left = bin_edges[idx_max_bin] bin_right = bin_edges[idx_max_bin + 1] samples_in_max_bin = samples[(samples >= bin_left) & (samples < bin_right)] # Take median of samples in the maximum bin if len(samples_in_max_bin) > 0: best_fit_parameters[i] = np.median(samples_in_max_bin) else: # Fallback to overall median if no samples found (shouldn't happen) best_fit_parameters[i] = np.median(samples) print(f"Parameter {i}: Max bin median = {best_fit_parameters[i]:.4f}, " f"Bin range = [{bin_left:.4f}, {bin_right:.4f}], " f"N samples in bin = {len(samples_in_max_bin)}, " f"Overall median = {np.median(samples):.4f}") else: n_total = arr1.shape[0] sample_indices = np.random.choice(n_total, size=min(n_samples, n_total), replace=False) best_fit_parameters = arr1[sample_indices, :] # best_fit_parameters_obj = BestFitParametersAndResults( number_bad_chains, last_max_LH, current_max_likelihood, chi2_monodimensional, list_best_pars, arr1, best_fit_parameters, value_fix, lista_par_fix, [], # lista_parametri list_best_pars_labels ) return best_fit_parameters_obj
[docs] def pre_plot( folder_retrieval_final: Union[str, Path], arr_values: np.ndarray, remove_chain: [bool, int], corner_plot: [bool, int], best_fit_method : str ) -> Tuple[int, float, float, float, np.ndarray, List[str], np.ndarray, np.ndarray, List[float], List[str], str, str, List[str], np.ndarray]: """ Prepare posterior chain statistics and parameter labels for plotting. Parameters: folder_retrieval_final (Union[str, Path]): Path to the final retrieval folder arr_values (np.ndarray): Array containing parameter information remove_chain (bool): Whether to remove chains based on chi2 criteria corner_plot (bool): Whether to process data for corner plots best_fit_method (str): Method used to extract best fit parameters Returns: Tuple containing: - int: Number of bad chains - float: AIC value - float: Last maximum likelihood - float: Current maximum likelihood - np.ndarray: Flattened chi2 values - List[str]: Best-fit parameter names - np.ndarray: Chain parameters - np.ndarray: Parameter medians - List[float]: Fixed parameter values - List[str]: Fixed parameter names - str: Summary string - str: Parameter names string - List[str]: Parameter labels - np.ndarray: Parameter errors """ best_fit_parameters_obj = get_medians(folder_retrieval_final, arr_values, remove_chain, corner_plot, best_fit_method) # Compute AIC aic_var = 2 * len(best_fit_parameters_obj.list_best_pars) - 2 * best_fit_parameters_obj.current_max_likelihood # Format summary strings stringa = "" error_list = np.zeros(len(best_fit_parameters_obj.list_best_pars)) for i, param in enumerate(best_fit_parameters_obj.list_best_pars): error = np.std(best_fit_parameters_obj.arr1[:, i]) error_list[i] = error sorted_arr = np.sort(best_fit_parameters_obj.arr1[:, i]) pos = int(len(sorted_arr) * 0.95) upper_limits = sorted_arr[pos] if corner_plot: stringa += f"{param}: {best_fit_method}.={best_fit_parameters_obj.best_fit_parameters[i]:.6f}, E.+={error:.6f}, U.L.={upper_limits:.6f}\n" else: if best_fit_method == "From sampler": value_str = np.median(best_fit_parameters_obj.best_fit_parameters[:, i]) pre_str = "Median " else: pre_str = "" value_str = best_fit_parameters_obj.best_fit_parameters[i] stringa += f"{pre_str}{best_fit_method} {param} = {value_str:.6f}\n" # Create parameter labels for corner plots stringa_par = "" lista_parametri = [] if corner_plot: for elem in best_fit_parameters_obj.list_best_pars_labels: stringa_par += elem + "," no_number_elem = re.sub(r'\d+', '', elem) number = re.findall(r'\d+', elem) if no_number_elem in ConstantVariables.LIST_MULTIPLE_PARAM: string_form = ConstantVariables.DICT_LABELS[no_number_elem].replace("x", number[0]) else: #TODO remove for compatibility if elem == "csuo": elem = "co_ratio" string_form = ConstantVariables.DICT_LABELS[elem] string_form = string_form.replace('r"', "", 1).replace('"', "") lista_parametri.append(string_form) stringa_par = stringa_par[:-1] # Remove trailing comma print("Producing the plot...") return ( best_fit_parameters_obj.number_bad_chains, aic_var, best_fit_parameters_obj.last_max_LH, best_fit_parameters_obj.current_max_likelihood, best_fit_parameters_obj.chi2_monodimensional, best_fit_parameters_obj.list_best_pars, best_fit_parameters_obj.arr1, best_fit_parameters_obj.best_fit_parameters, best_fit_parameters_obj.value_fix, best_fit_parameters_obj.lista_par_fix, stringa, stringa_par, lista_parametri, error_list, )
[docs] def cross_correlation( type_cc: str, arr1: np.ndarray, arr2: np.ndarray, sigma: Optional[np.ndarray] = None ) -> float: """ Compute cross-correlation between two arrays using different methods. Parameters: type_cc (str): Type of correlation to compute ('numpy', 'normal', or 'weighted') arr1 (np.ndarray): First input array arr2 (np.ndarray): Second input array sigma (Optional[np.ndarray]): Error array for weighted correlation Returns: float: Correlation coefficient """ if type_cc == ConstantVariables.LIST_CORRELATION_METHODS[0]: # Numpy correlation_matrix = np.corrcoef(arr1, arr2) cc_result = correlation_matrix[0, 1] elif type_cc == ConstantVariables.LIST_CORRELATION_METHODS[1]: # Normal cc_result = c_correlate(arr1, arr2, np.array([0]))[0] else: # Weighted if sigma is None: print("sigma must be provided for weighted correlation") cc_result = w_correlate(arr1, arr2, sigma)[0] if not np.isfinite(cc_result): print(f"Warning: Non-finite correlation result: {cc_result}") cc_result = 0.0 return cc_result
[docs] def c_correlate( s_1: np.ndarray, s_2: np.ndarray, lags ) -> np.ndarray: """ Compute normalized cross-correlation of two signals at specified lags. Parameters: s_1 (np.ndarray): First input signal s_2 (np.ndarray): Second input signal lags: Array of lags at which to compute correlation Returns: np.ndarray: Array of correlation values for each lag """ if s_1.shape != s_2.shape: print("Input signals must have the same shape") n_samples = s_1.shape[0] s1_centered = s_1 - np.mean(s_1) s2_centered = s_2 - np.mean(s_2) correlation = np.empty(len(lags), dtype=float) norm_factor = np.sqrt(np.sum(s1_centered ** 2) * np.sum(s2_centered ** 2)) for idx, lag in enumerate(lags): if lag >= 0: product_sum = np.sum(s1_centered[:n_samples - lag] * s2_centered[lag:]) else: product_sum = np.sum(s1_centered[-lag:] * s2_centered[:n_samples + lag]) correlation[idx] = product_sum return correlation / norm_factor
[docs] def w_correlate( s_1: np.ndarray, s_2: np.ndarray, es_1: np.ndarray ) -> np.ndarray: """ Compute weighted cross-correlation between two signals. Parameters: s_1 (np.ndarray): First input signal s_2 (np.ndarray): Second input signal es_1 (np.ndarray): Error array for the first signal Returns: np.ndarray: Array containing the weighted correlation value """ if s_1.shape != s_2.shape: print("Input signals must have the same shape") s1_centered = s_1 - np.mean(s_1) s2_centered = s_2 - np.mean(s_2) correlation = np.empty(1, dtype=float) correlation[0] = np.sum((s1_centered * s2_centered) / es_1 ** 2) return correlation
[docs] def linear_solver( topca_l: np.ndarray, res_l: np.ndarray ): """ Solve a linear system using Singular Value Decomposition (SVD). This function solves the linear system Ax = b using SVD decomposition, where A is the design matrix (res_l) and b is the target vector (topca_l). The solution is computed in a numerically stable way using SVD. Parameters: topca_l (np.ndarray): Target vector b in the system Ax = b res_l (np.ndarray): Design matrix A in the system Ax = b Returns: Results: The reconstructed array after solving the linear system Error: Whether the linear system was solved successfully """ error = False try: # Perform SVD decomposition uf, s, vh = np.linalg.svd(res_l, full_matrices=False) # Compute intermediate matrices uT = np.transpose(uf) nD = np.diag(1 / s) vHcT = np.transpose(vh.conj()) # Solve the system in steps: # 1. U diag(s) Vh x = b <=> diag(s) Vh x = U.T b = c c = np.dot(uT, topca_l) # 2. diag(s) Vh x = c <=> Vh x = diag(1/s) c = w w = np.dot(nD, c) # 3. Vh x = w <=> x = Vh.H w x = np.dot(vHcT, w) # Reconstruct the array return np.dot(res_l, x), error except np.linalg.LinAlgError as e: print(f"SVD computation failed: {str(e)}") error = True return None, error
[docs] def skewed_gaussian(x, amp, center, width, skew): """ Calculate a skewed Gaussian distribution. Args: x: Input values amp: Amplitude of the distribution center: Center position width: Width parameter skew: Skewness parameter Returns: Values of the skewed Gaussian distribution """ return amp * skewnorm.pdf(x, skew, loc=center, scale=width)
[docs] def estimate_continuum(spectrum_new, num_windows=40, top_window=10, check_gaussian=False): """ Estimate and remove the continuum from a spectrum using windowed median fitting. Args: spectrum_new (np.ndarray): Input spectrum array. num_windows (int): Number of windows to divide the spectrum into. top_window (int): Number of top values per window used for continuum estimation. check_gaussian (bool): If True, fit a skewed Gaussian instead of a polynomial. Returns: tuple: (continuum, model_after_processing) where continuum is the estimated continuum and model_after_processing is the spectrum normalized by it. """ window_size = len(spectrum_new) // num_windows medians_continuum = [] wl_continuum = [] # Calculate medians_continuum for continuum fitting for i in range(num_windows): xv = (i * window_size + (i + 1) * window_size) / 2 window_element = spectrum_new[i * window_size: (i + 1) * window_size] top_10 = np.sort(window_element)[-top_window:] medians_continuum.append(np.median(top_10)) wl_continuum.append(xv) wl_continuum = np.array(wl_continuum) medians_continuum = np.array(medians_continuum) try: if check_gaussian: p0 = [1, np.mean(wl_continuum), np.std(wl_continuum) * 2, 0] params, _ = curve_fit(skewed_gaussian, wl_continuum, medians_continuum, p0=p0) # skewed_fit = skewed_gaussian(wl_continuum, *params) continuum = skewed_gaussian(np.arange(len(spectrum_new)), *params) else: par = np.polyfit(wl_continuum, medians_continuum, 4) continuum = np.polyval(par, np.arange(len(spectrum_new))) except Exception as e: par = np.polyfit(wl_continuum, medians_continuum, 4) continuum = np.polyval(par, np.arange(len(spectrum_new))) model_after_processing = spectrum_new / continuum # we want the continuum at one, not at 0.9 or similar return continuum, model_after_processing
[docs] def trpca( lcr_mask: np.ndarray, lcrm_mask_nomask: np.ndarray, good_pixel: np.ndarray, ncomp: int, smooth_on: bool, smooth_size: int, apply_standardize : bool, model_reprocessing_type: str, subtract_mean_spectrum: bool = False, pca_mode: str = "spatial" ): """ Process light curve residuals by removing systematic components using PCA. This function processes light curve residuals by: 1. Normalizing the data by subtracting column-wise means 2. Optionally applying PCA-based component removal 3. Updating the output array for good pixels 4. Optionally applying smoothing Parameters: lcr_mask (np.ndarray): 2D array of light curve residuals with telluric mask lcrm_mask_nomask (np.ndarray): 2D array to store processed residuals good_pixel (np.ndarray): Boolean mask or indices of good pixels ncomp (int): Number of PCA components to use smooth_on (bool): Whether to apply smoothing smooth_size (int): Window size for smoothing filter apply_standardize (bool): Whether to apply standardization model_reprocessing_type (str): Processing mode, either "hard" or "soft" subtract_mean_spectrum (bool): Whether to subtract mean spectrum pca_mode (str): PCA mode - "spatial" (samples=images, features=pixels) or "temporal" (samples=pixels, features=images). Default: "spatial" Returns: np.ndarray: Processed light curve residuals error: Whether the processing was successful """ error = False # Normalize the light curve residuals lcrm = np.copy(lcr_mask) means = np.mean(lcrm, axis=0) lcrm = lcrm - np.expand_dims(means, axis=0) # Center data to column if subtract_mean_spectrum: mean_spectrum = np.mean(lcrm, axis=1) lcrm = lcrm - np.expand_dims(mean_spectrum, axis=1) # Apply PCA-based component removal if in "hard" mode if model_reprocessing_type == "hard": try: # Prepare data based on PCA mode # Spatial mode (default): samples=images, features=pixels -> matrix shape (n_pixels, n_images) # Temporal mode: samples=pixels, features=images -> matrix shape (n_images, n_pixels) if pca_mode == "temporal": # Transpose for temporal PCA: now rows are images, columns are pixels pca_input_matrix = lcrm.T else: # Spatial PCA (default): rows are pixels, columns are images pca_input_matrix = lcrm # Create pipeline with optional standardization if apply_standardize: transformer = Pipeline([ ('scaler', StandardScaler()), ('pca', PCA(n_components=ncomp, svd_solver="full")) ]) else: transformer = Pipeline([ ('pca', PCA(n_components=ncomp, svd_solver="full")) ]) comps = transformer.fit_transform(pca_input_matrix) ones_column = np.ones((comps.shape[0], 1)) comps_with_ones = np.hstack((comps, ones_column)) except Exception as e: # except (np.linalg.LinAlgError, ValueError) as e: print(f"Error applying PCA-based component removal: {str(e)}") error = True return None, error # array_reconstruct, error_trpca = linear_solver(pca_input_matrix, comps_with_ones) if error_trpca: error = True print(f"PCA processing failed in solver") return None, error # diff_pca = pca_input_matrix - array_reconstruct # Transform back to original orientation if temporal mode was used if pca_mode == "temporal": lcrm_pca = diff_pca.T else: lcrm_pca = diff_pca else: lcrm_pca = lcrm # Update output array for good pixels lcrm_mask_nomask[good_pixel, :] = lcrm_pca # Apply smoothing if requested if smooth_on: # Note: Smoothing implementation is currently disabled # This could be reimplemented using scipy.ndimage.uniform_filter pass return lcrm_pca, error
[docs] def get_keys_by_value( dictionary: Dict[str, Any], target_value: Any ) -> Optional[str]: """ Find the first key in a dictionary whose value contains the target value. This function searches through the dictionary values to find the first key whose value contains the target value. It's useful for reverse lookups in dictionaries where values are collections. Parameters: dictionary (Dict[str, Any]): Dictionary to search through target_value (Any): Value to search for in the dictionary values Returns: Optional[str]: The first key whose value contains target_value, or None if no match is found """ try: return next( (key for key, value in dictionary.items() if target_value in value), None ) except TypeError: # Handle case where values are not iterable return None
[docs] def compute_adjusted_abundance( pressure: np.ndarray, temperature: np.ndarray, coeff, base_vmr ) -> np.ndarray: """ Compute adjusted abundance accounting for dissociation effects. This function calculates the adjusted abundance of a species by considering both the base volume mixing ratio and dissociation effects. The dissociation is modeled using a pressure and temperature dependent function. Parameters: pressure (np.ndarray): Pressure values in bar temperature (np.ndarray): Temperature values in Kelvin coeff: Coefficients [a, b, c] for dissociation calculation base_vmr: Base volume mixing ratio from reference layer Returns: np.ndarray: Adjusted abundance values The calculation follows these steps: 1. Compute dissociation factor (Ad) using pressure and temperature 2. Convert to inverse dissociation factor (1/Ad) 3. Combine with inverse base abundance (1/A0) in quadrature 4. Return final adjusted abundance (A) """ # Calculate dissociation factor log10Ad = coeff[0] * np.log10(pressure) + coeff[1] / temperature - coeff[2] one_over_Ad = 1.0 / (10 ** log10Ad) # Calculate inverse of base abundance one_over_A0 = 1.0 / base_vmr # Combine terms in quadrature one_over_A = (np.sqrt(one_over_A0) + np.sqrt(one_over_Ad)) ** 2 # Return final adjusted abundance return 1.0 / one_over_A
[docs] def spectral_convolution_ptr(wavelength, spectrum, pixel_dv): """Convolve a spectrum to instrumental resolution using petitRADTRANS.""" # Calculate resolution from pixel velocity width resolution = ConstantVariables.CLIGHT / (2 * pixel_dv) return SpectralModel.convolve(wavelength, spectrum, resolution)
[docs] def get_stellar_model(path_pkl, min_wl_cm, max_wl_cm): """Load a stellar spectrum from a pickle file and optionally trim to a wavelength range (cm).""" with open(path_pkl, "rb") as f: stellar_spectrum = pickle.load(f) wavelength = stellar_spectrum["wavelength"] spectrum = stellar_spectrum["spectrum"] if min_wl_cm is not None and max_wl_cm is not None: mask = (wavelength >= min_wl_cm) & (wavelength <= max_wl_cm) wavelength = wavelength[mask] spectrum = spectrum[mask] return wavelength, np.pi * spectrum
[docs] def generate_star_model_hr(path_pkl, pixel_dv, min_wl=None, max_wl=None): """ Generate a stellar spectrum model from a pickled file. This function loads a stellar spectrum from a pickle file, convolves it with the instrumental resolution, and creates a spline interpolation model. Parameters ---------- path_pkl : str or Path Path to the pickle file containing stellar spectrum data pixel_dv : float Pixel velocity width for resolution calculation min_wl : float Minimum wavelength for interpolation model in cm max_wl : float Maximum wavelength for interpolation model in cm Returns ------- tuple Spline representation of the stellar spectrum model """ # Load stellar spectrum data from pickle file wavelength, spectrum = get_stellar_model(path_pkl, min_wl, max_wl) # Convolve spectrum with instrumental resolution spc = spectral_convolution_ptr(wavelength, spectrum, pixel_dv) # Create spline interpolation model spline_model = splrep(wavelength, spc) return wavelength * 1e7, spectrum, spline_model
[docs] def rv_planet_and_star( eccentricity, target, ecc_retrieval, opi_retrieval, kp_retrieval, night_obj, mode, index_night_in_current_instrument, dVsys_arr, counter_derivative_params ): """ Compute planetary and stellar radial velocities for a given night. Args: eccentricity (bool): Whether to use eccentric orbit model. target: Target object with orbital parameters. ecc_retrieval (float): Retrieved eccentricity value. opi_retrieval (float): Retrieved argument of periastron. kp_retrieval (float): Retrieved planetary radial velocity semi-amplitude (km/s). night_obj: Night object containing phase and barycentric velocity data. mode (str): Operating mode ('CC', 'Retrieval', or synthetic). index_night_in_current_instrument (int): Index of the night within the instrument. dVsys_arr: Array of systemic velocity offsets per night. counter_derivative_params (int): Counter for derivative parameter indexing. Returns: tuple: (vtot, vtot_star, kpph) planetary velocity, stellar velocity, and Kp*phase arrays. """ phases = night_obj.phases_considered hjdk = night_obj.hjd_ck_considered barycentric = night_obj.barycentric_velocity_considered if mode == "CC": kp = target.kp ecc = target.eccentricity opi = target.argument_of_periastron coeff_vtot = -1 elif mode == "Retrieval": kp = kp_retrieval ecc = ecc_retrieval opi = opi_retrieval coeff_vtot = 1 else: # synthetic generation kp = target.kp ecc = target.eccentricity opi = target.argument_of_periastron coeff_vtot = 1 phases = night_obj.phases hjdk = night_obj.hjd_ck barycentric = night_obj.barycentric_velocity # SYSTEMIC VELOCITY CORRECTIONS # Apply night-specific systemic velocity offset (first night is reference) dvsys = 0 if (index_night_in_current_instrument == 0 or dVsys_arr is None or mode != "Retrieval") else dVsys_arr[counter_derivative_params] if -999 in night_obj.hjd_ck: hjd0 = 0 period = 1 times_of_observation = phases else: hjd0 = target.hjd0 period = target.period times_of_observation = hjdk systemic_velocity = target.systemic_velocity ks = target.ks if ks == 0: ks = target.kp * target.mass * np.sin(np.deg2rad(target.inclination)) / ( target.stellar_mass * ConstantVariables.SOLAR_TO_JUPITER_MASSES) if mode != "Retrieval": # TODO REMOVE print(f"Insert ks in yaml! ks_km_s: {ks}") vtot, vtot_star, kpph = calculate_rv_planet_and_star( hjd0, times_of_observation, eccentricity, period, ecc, opi, kp, ks, phases, coeff_vtot, systemic_velocity, barycentric, dvsys ) return vtot, vtot_star, kpph
[docs] def calculate_rv_planet_and_star( hjd0, times_of_observation, eccentricity, period, ecc, opi, kp, ks, phases, coeff_vtot, systemic_velocity, barycentric, dvsys, vsys_extra=0 ): """ Calculate planetary and stellar radial velocities from orbital parameters. Returns: tuple: (vtot, vtot_star, kpph) total velocity shift for planet, for star, and Kp*phase. """ if eccentricity: T_periastro = timetrans_to_timeperi( hjd0, period, ecc, opi - np.pi, ) orbital_element = np.array([ period, T_periastro, ecc, opi, kp ]) kpph = rv_drive( times_of_observation, orbital_element, ) else: # Simple circular orbit approximation kpph = kp * np.sin(2 * np.pi * phases) # Calculate total velocity shift for planet spectrum vtot = coeff_vtot * (-kpph - (systemic_velocity + vsys_extra + dvsys) + barycentric) # Calculate stellar radial velocity for stellar spectrum correction if ks is not None: vtot_star = ( ks * np.sin(2 * np.pi * phases) - systemic_velocity + barycentric ) else: vtot_star = None return vtot, vtot_star, kpph
[docs] def rv_DopplerShadow(target, phases): """ Compute the Rossiter-McLaughlin (Doppler shadow) radial velocity anomaly. Based on Cegla, H. M., et al. 2016, A&A, 588, A127. Args: target: Target object with projected_obliquity, a_Rs_ratio, inclination, v_sini, and systemic_velocity attributes. phases (np.ndarray): Orbital phase array. Returns: np.ndarray: Radial velocity anomaly plus systemic velocity (km/s). """ # formulae taken from # Cegla, H. M., et al. 2016, A&A, 588, A127 # ---- center of the planet at any given orbital phase ----# xp = target.a_Rs_ratio * np.sin(2 * np.pi * phases) # xp=a/R* sin(2*pi*phi) yp = -target.a_Rs_ratio * np.cos(2 * np.pi * phases) * np.cos(np.deg2rad(target.inclination)) # yp=-a/R* con(2*pi*phi)*cos(ip) l = np.deg2rad(target.projected_obliquity) # ---- orthogonal distance from the spin-axis ----# x = xp * np.cos(l) - yp * np.sin(l) # x_|_=xp * cos(lamda) - yp * sin(lamda) # IF differential rotation and alpha is known # diff. rot: Omega = Omega_eq*(1-alpha*sin2theta) # y = xp*np.sin(l.radian) - yp*np.cos(l.radian) # z = np.sqrt(1 - (x**2) - (y**2)) # beta = np.pi/2 + istar.radian.value # z1 = z*np.cos(beta) - y*sin(beta) # y1 = z*np.sin(beta) - y*cos(beta) # v = x*vsini*(1-alpha*(y1**2)) v = x * target.v_sini return v + target.systemic_velocity
# ---------------------------------------------------------------------- # convenience wrapper # ----------------------------------------------------------------------
[docs] def ask_stellar_params(parent=None, stellar_teff: Optional[float] = None) -> Tuple[Optional[float], Optional[float], Optional[float]]: """ Open stellar parameter dialog and return user-specified stellar properties. This convenience function creates and displays a modal dialog for entering stellar atmospheric parameters needed for synthetic spectrum generation. The dialog allows input of effective temperature, surface gravity, and metallicity with validation and reasonable defaults. Parameters ---------- parent : tk.Misc or None, optional Parent widget for the dialog. If None, uses default root window or creates a new Tk instance, by default None stellar_teff : float or None, optional Initial effective temperature value to populate in the dialog, by default None Returns ------- tuple[float or None, float or None, float or None] Tuple containing stellar parameters: - T_eff : Effective temperature in Kelvin (rounded to 5 digits) - log_g : Surface gravity in cgs units (log10(g)) - Fe_H : Metallicity [Fe/H] in dex Returns (None, None, None) if user cancels or closes dialog Notes ----- The stellar parameters are used to select appropriate stellar atmosphere models for synthetic spectrum generation. The dialog provides validation to ensure physically reasonable values are entered. """ # Use default root or create new instance if no parent provided import tkinter as tk if parent is None: parent = tk._default_root or tk.Tk() # Create and display stellar parameter dialog dialog = StellarDialog(parent, stellar_teff) # Return dialog result or defaults if dialog was cancelled return getattr(dialog, "result", (None, None, None))
[docs] def download_star_spectrum(figure_tk, target_obj, folder_stellar_spc): """ Download and save a PHOENIX stellar spectrum if not already present. Args: figure_tk: Parent Tkinter widget for the stellar parameter dialog. target_obj: Target object with stellar_effective_temperature attribute. folder_stellar_spc (str): Directory for storing the stellar spectrum file. Returns: str or None: Path to the stellar spectrum pickle file, or None if cancelled. """ # Download stellar spectrum if missing path_phoenixf = str(Path(folder_stellar_spc, "star_spectrum_default.pkl")) if not os.path.exists(path_phoenixf): print("Stellar spectrum not found. Requesting stellar parameters...") temperature, gravity, met = ask_stellar_params( parent=figure_tk, stellar_teff=target_obj.stellar_effective_temperature ) if temperature is not None: print(f"Downloading stellar spectrum: T_eff={temperature}K, log(g)={gravity}, [Fe/H]={met}") # Create directory if it doesn't exist os.makedirs(folder_stellar_spc, exist_ok=True) # Download stellar spectrum files file_wlen_phoenix = ( f"wget -P {folder_stellar_spc} " f"ftp://phoenix.astro.physik.uni-goettingen.de/HiResFITS/" f"PHOENIX-ACES-AGSS-COND-2011/Z{met}/" f"lte{temperature}-{gravity}0-0.0.PHOENIX-ACES-AGSS-COND-2011-HiRes.fits" ) file_spectrum_phoenix = ( f"wget -P {folder_stellar_spc} " f"ftp://phoenix.astro.physik.uni-goettingen.de/HiResFITS/" f"WAVE_PHOENIX-ACES-AGSS-COND-2011.fits" ) print("Downloading spectrum files...") os.system(file_wlen_phoenix) os.system(file_spectrum_phoenix) # Process downloaded files path_spc = str(Path( folder_stellar_spc, f"lte{temperature}-{gravity}0-0.0.PHOENIX-ACES-AGSS-COND-2011-HiRes.fits" )) path_wlen = str(Path(folder_stellar_spc, "WAVE_PHOENIX-ACES-AGSS-COND-2011.fits")) path_pkl = str(Path(folder_stellar_spc, "star_spectrum_default.pkl")) print("Processing and saving stellar spectrum...") # Load and process spectrum data spc = fits.open(path_spc)[0].data wlen = fits.open(path_wlen)[0].data star_model = { "wavelength": wlen * 1e-8, # angstrom to cm "spectrum": spc / np.pi } # Save processed spectrum with open(path_pkl, "wb") as file: pickle.dump(star_model, file) print("Stellar spectrum saved successfully.") return path_phoenixf else: print("Stellar parameter dialog cancelled. Aborting synthetic generation.") return None else: print("Using existing stellar spectrum.") return path_phoenixf
[docs] def compute_transit_durations(period, a_Rs, k, inc_deg, ecc=0.0, omega=np.pi / 2): """ Compute transit durations T14, T23, T12 (=T34) for circular and eccentric orbits. Parameters ---------- period : float Orbital period in days. a_Rs : float Semi-major axis in units of stellar radii (a / R_star). k : float Planet-to-star radius ratio (Rp / Rs). inc_deg : float Orbital inclination in degrees. ecc : float, optional Orbital eccentricity (default 0 for circular). omega : float, optional Argument of periastron of the STAR in radians (default pi/2). Returns ------- dict with keys: 'T14', 'T23', 'T12' : durations in days 'T14_hours', 'T23_hours', 'T12_hours' : durations in hours 'b' : impact parameter 'ecc_factor' : eccentricity correction factor (1.0 if circular) 'grazing' : True if grazing transit (T23 = 0) 'limph_T14', 'limph_T12' : phase limits = T / (2*P) References ---------- Winn (2010), "Transits and Occultations", arXiv:1001.2010, Eq. 14-16. Kipping (2010), MNRAS 407, 301, arXiv:1004.3819. """ inc = np.radians(inc_deg) sin_i = np.sin(inc) # Impact parameter # Eccentric: b = (a/Rs) * cos(i) * (1 - e^2) / (1 + e*sin(omega)) # Circular: b = (a/Rs) * cos(i) if ecc > 0: ecc_factor_b = (1.0 - ecc ** 2) / (1.0 + ecc * np.sin(omega)) else: ecc_factor_b = 1.0 b = a_Rs * np.cos(inc) * ecc_factor_b if b > (1.0 + k): raise ValueError( f"No transit: b = {b:.4f} > 1 + k = {1 + k:.4f}." ) # Winn (2010) Eq. 14-15 arg_T14_sq = (1.0 + k) ** 2 - b ** 2 arg_T23_sq = (1.0 - k) ** 2 - b ** 2 if arg_T14_sq < 0: raise ValueError( f"No transit: (1+k)^2 - b^2 = {arg_T14_sq:.6f} < 0" ) grazing = False if arg_T23_sq < 0: grazing = True arg_T23_sq = 0.0 sin_T14 = min((1.0 / a_Rs) * np.sqrt(arg_T14_sq) / sin_i, 1.0) sin_T23 = min((1.0 / a_Rs) * np.sqrt(arg_T23_sq) / sin_i, 1.0) T14_circ = (period / np.pi) * np.arcsin(sin_T14) T23_circ = (period / np.pi) * np.arcsin(sin_T23) # Eccentricity correction (Winn 2010, Eq. 16) # Psi = sqrt(1 - e^2) / (1 + e*sin(omega)) if ecc > 0: ecc_factor = np.sqrt(1.0 - ecc ** 2) / (1.0 + ecc * np.sin(omega)) else: ecc_factor = 1.0 T14 = T14_circ * ecc_factor T23 = T23_circ * ecc_factor T12 = (T14 - T23) / 2.0 limph_T14 = T14 / (2.0 * period) limph_T12 = T12 / (2.0 * period) limph_T23 = T23 / (2.0 * period) print(f"T14={T14 * 24:.2f}hours, T23={T23 * 24:.2f}hours, T12={T12 * 24:.2f}hours, limph_T14={limph_T14:.2f}, limph_T12={limph_T12:.2f}, limph_T23={limph_T23:.2f}") return { 'T14': T14, 'T23': T23, 'T12': T12, 'T14_hours': T14 * 24.0, 'T23_hours': T23 * 24.0, 'T12_hours': T12 * 24.0, 'b': b, 'ecc_factor': ecc_factor, 'grazing': grazing, 'limph_T14': limph_T14, 'limph_T12': limph_T12, 'limph_T23': limph_T23, }