"""
General utility functions for GUIBRUSHR atmospheric retrieval analysis.
This module contains a comprehensive collection of utility functions used throughout
the GUIBRUSHR application for various tasks including:
- Molecular data processing and opacity calculations
- Spectroscopic data analysis and convolution operations
- FITS file I/O operations
- Cross-correlation analysis
- Principal Component Analysis (PCA) for systematic noise removal
- Temperature profile calculations
- Plotting and visualization utilities
- Mathematical operations for atmospheric modeling
The functions in this module support both high-resolution and low-resolution
spectroscopic data processing, and are designed to work with the petitRADTRANS
and PyratBay radiative transfer codes.
Dependencies:
- numpy: Numerical computations
- astropy: FITS file handling
- matplotlib: Plotting
- pandas: Data structure handling
- scipy: Scientific computing
- sklearn: Machine learning algorithms (PCA)
- petitRADTRANS: Radiative transfer calculations
- pyratbay: Alternative radiative transfer code
- chemcat: Chemical abundance calculations
Author: GUIBRUSHR Development Team
"""
import copy
import os
import pickle
import re
import traceback
from pathlib import Path
from re import findall
from astropy.io import fits
import numpy as np
import chemcat
import pyratbay.io as io
from corner import corner
from matplotlib import pyplot as plt
import matplotlib.colors as mcolors
from pandas import read_csv, DataFrame
from petitRADTRANS.config import petitradtrans_config_parser
from petitRADTRANS import physical_constants as pc
from petitRADTRANS.spectral_model import SpectralModel
from re import sub
from scipy.stats import skewnorm
from scipy.optimize import curve_fit
from scipy import stats
import chemcat.utils as u
from scipy.signal import convolve
from scipy.interpolate import splrep, splev
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from typing import Dict, List, Optional, Tuple, Union, Any
from petitRADTRANS.plotlib import get_species_color
from petitRADTRANS.plotlib.style import default_color
from GUIBRUSHR.GUI.Input_Output_Panels.Input_Panels.TabPanels.FrameGenerationHRData.StellarDialog import StellarDialog
# from petitRADTRANS._input_data_loader import (
# get_cia_aliases, get_species_basename, get_species_scientific_name, split_species_all_info
# )
from GUIBRUSHR.GUI.WIDGET.MyFigure import MyFigure
from GUIBRUSHR.GUI.LAYOUT.GraphicsConfig import GraphicsConfig
from GUIBRUSHR.General_Constants.Classes.BestFitParametersAndResults import BestFitParametersAndResults
from GUIBRUSHR.General_Constants.Classes.UserTemperatureProfile import UserTemperatureProfile
from GUIBRUSHR.General_Constants.FunctionsAndConstants.Constant_Variables import ConstantVariables
from GUIBRUSHR.General_Constants.FunctionsAndConstants.PTR_Modules import split_species_all_info, \
get_species_scientific_name, get_cia_aliases, get_species_basename
from GUIBRUSHR.General_Constants.FunctionsAndConstants.radvel_g import rv_drive, timetrans_to_timeperi
from GUIBRUSHR.Retrieval.ExofastMCMC import exofast_gelmanrubin
[docs]
def make_smooth_mono_cmap(color: str, name: str = 'custom') -> mcolors.LinearSegmentedColormap:
"""
Create a smooth monochromatic colormap with 5 control points.
The colormap transitions from black through dark shade, saturated color,
light shade, to white, creating a smooth gradient suitable for visualization.
Parameters:
color (str): Hex color code or named color for the saturated color point
name (str, optional): Name for the colormap. Defaults to 'custom'.
Returns:
LinearSegmentedColormap: Matplotlib colormap object with 256 color levels
"""
coeff = 0.4
rgb = mcolors.to_rgb(color)
dark = tuple(c * coeff for c in rgb)
light = tuple(1 - (1 - c) * coeff for c in rgb)
return mcolors.LinearSegmentedColormap.from_list(
name,
['black', dark, color, light, 'white'],
N=256
)
[docs]
def make_pyratbay_config(
output_folder: str,
target: Any,
pressure: Any,
species: Any,
wavelength: Any,
resolution: Any,
params
) -> str:
"""
Create a pyratbay configuration file based on guibrush configuration.
Parameters:
output_folder (str): Folder where to store the outputs.
target: Target object containing stellar and planetary parameters.
pressure: Pressure configuration object.
species: Species configuration object.
wavelength: Wavelength configuration object.
resolution: Resolution configuration object.
params: List of input parameters.
Returns:
str: Path to the created configuration file.
"""
try:
param_values = {
param.name: param.get_starting_value()
for param in params
if param is not None
}
log_file = f'{output_folder}/giano_b_retrieval.log'
cfg_file = f'{output_folder}/giano_b_retrieval.cfg'
# Build configuration text
cfg_text = (
"[pyrat]\n\nrunmode = spectrum\n"
f"logfile = {log_file}\n"
"rt_path = transit\n\n"
)
# System parameters
logp_ref = target.p0
cfg_text += (
"# System parameters\n"
f"rstar = {target.stellar_radius} rsun\n"
f"mstar = {target.stellar_mass} msun\n"
f"rplanet = {target.radius} rjup\n"
f"mplanet = {target.mass} mjup\n"
f"refpressure = {logp_ref} bar\n\n"
)
# Wavelength sampling
cfg_text += (
"# Spectral sampling\n"
f"wllow = {wavelength.min_wl_hr} um\n"
f"wlhigh = {wavelength.max_wl_hr} um\n"
f"wn_thinning = {resolution.lbl_high_res}\n\n"
)
# Atmospheric model
p_max = 10 ** pressure.range_max_pressures
p_min = 10 ** pressure.range_min_pressures
cfg_text += (
"# Atmospheric model\n"
f"nlayers = {pressure.n_layers}\n"
f"ptop = {p_min} bar\n"
f"pbottom = {p_max} bar\n\n"
)
# Chemistry configuration
atm_species = ' '.join(list(species.composition))
atm_vmr = []
for mol, vmr in species.composition.items():
if mol in ['H2', 'He']:
atm_vmr.append(str(vmr))
else:
atm_vmr.append('1e-10')
mass_fraction = ' '.join(atm_vmr)
cfg_text += (
"chemistry = uniform\n"
f"species = {atm_species}\n"
f"uniform = {mass_fraction}\n"
"radmodel = hydro_m\n\n"
)
# Temperature model
cfg_text += (
"tmodel = isothermal\n"
"tpars = 1000.0\n\n"
)
# Opacities configuration
cfg_text += "# Opacities\n"
lbl_models = ''
prt_path = petitradtrans_config_parser.get_input_data_path()
for i, mol in enumerate(species.line_species):
opacity_file = '\n ' + str(
Path(prt_path, "opacities", "lines", "line_by_line", mol,
species.line_species_isotope[i],
species.line_species_complete_name_hr[i] + ".xsec.petitRADTRANS.h5"))
lbl_models += opacity_file
if len(species.line_species) > 0:
cfg_text += (
f'extfile ={lbl_models}\n\n'
'tmin = 300\n'
'tmax = 3000\n'
'tstep = 150\n\n'
)
# CIA configuration
root = '{ROOT}/pyratbay/data/CIA/'
cia_dict = {
'H2-H2': root + 'CIA_Borysow_H2H2_0060-7000K_0.6-500um.dat',
'H2-He': root + 'CIA_Borysow_H2He_0050-7000K_0.5-031um.dat',
}
cia_models = ''
for pair in species.continum_opacity:
cia_models += f'\n {cia_dict[pair]}'
if len(species.continum_opacity) > 0:
cfg_text += f'csfile ={cia_models}\n\n'
# Rayleigh scattering configuration
rayleigh = None
if len(species.rayleigh_species) > 0:
rayleigh = [
f'\n dalgarno_{spec}'
for spec in species.rayleigh_species
]
ray_params = ''
if "k0" in param_values:
rayleigh.append('\n lecavelier')
ray_params = f"\nrpars = {param_values['k0']} {param_values['gamma']}"
if len(rayleigh) > 0:
rayleigh_models = ''.join(rayleigh)
cfg_text += f'rayleigh ={rayleigh_models}{ray_params}\n\n'
# Cloud configuration
if 'Pc' in param_values:
cfg_text += f'clouds = deck\ncpars = {param_values["Pc"]}\n\n'
# Write configuration to file
with open(cfg_file, 'w') as f:
f.write(cfg_text)
return cfg_file
except IOError as e:
print(f"Failed to write configuration file: {str(e)}")
[docs]
def create_fits(data: np.ndarray, pathf: Union[str, Path]) -> None:
"""
Create and save a FITS file from the provided data.
This function creates a primary HDU (Header/Data Unit) using the input data,
encapsulates it in an HDUList, and writes the FITS file to the specified path.
If a file with the same name exists, it will be overwritten.
Parameters:
data (np.ndarray): The data array to be stored in the FITS file.
pathf (Union[str, Path]): The destination file path for the FITS file.
"""
try:
if not isinstance(data, np.ndarray):
print("Input data must be a numpy array")
# Create a primary HDU from the input data
hdu = fits.PrimaryHDU(data)
# Create an HDUList to hold the primary HDU
hdul = fits.HDUList([hdu])
# Write the HDUList to the file, overwriting any existing file
hdul.writeto(pathf, overwrite=True)
except IOError as e:
print(f"Failed to write FITS file: {str(e)}")
except Exception as e:
print(f"Invalid data format: {str(e)}")
[docs]
def kernel_solid_body_rotation(
omegad: float,
f_rot_arr: np.ndarray,
Teq: float,
mu: float,
gs: float,
Rpj: float,
rad_mode: str,
index_nights_total: int,
rv_sampling: float = 0.1
) -> np.ndarray:
"""
Compute a rotational broadening kernel for spectroscopic observations.
A planet rotating as a rigid body with angular frequency omega broadens
spectral lines because different surface elements of the visible disk move
at different line-of-sight velocities: an element at projected distance x
from the rotation axis contributes flux at a radial velocity offset
v = omega * x. The rotational kernel K(v) is the disk-integrated weight
of all surface elements emitting at velocity v, and convolving a model
spectrum with K(v) reproduces the line broadening induced by planetary
rotation. The overall width of the kernel is set by the equatorial
rotational velocity v_eq = omega * Rp.
The rotation rate omega is a free parameter of the retrieval and is not
constrained to any particular value. For a tidally locked planet it equals
2*pi / P_orb, but the kernel is valid for any rotation rate.
The kernel geometry differs between emission and transmission spectroscopy:
- In emission spectroscopy, only the planetary disk itself contributes.
Each surface strip at projected distance x has a chord length
2*sqrt(Rp^2 - x^2), so K(v) is proportional to sqrt(1 - (v/v_eq)^2)
for \|v\| < v_eq, and zero outside. This is the classical solid-body
rotational kernel for a uniformly emitting disk: it peaks at v=0
(sub-observer point, longest chord) and drops to zero at v=±v_eq
(planetary limb, chord=0).
- In transmission spectroscopy, the relevant region is the atmospheric
annulus between Rp and Rp + z, where z = 5*H is the effective
atmospheric thickness and H = k_B * T_eq / (mu * m_H * g) is the
scale height. Defining v_eq_atm = omega * (Rp + z), two velocity
regions contribute differently:
- v_eq <= \|v\| < v_eq_atm (annulus only): the line of sight passes
only through the annulus, with weight
sqrt((Rp+z)^2 - (v/omega)^2) / z.
- \|v\| < v_eq (annulus above opaque disk): the line of sight passes
through the full annulus minus the opaque planetary disk, with
weight (sqrt((Rp+z)^2 - (v/omega)^2) - sqrt(Rp^2 - (v/omega)^2)) / z.
An asymmetry parameter f_rot (stored as log10 in f_rot_arr) is applied
to the receding hemisphere (v > 0) to allow for day/night asymmetries
in the transmission signal. It is a free parameter of the retrieval.
In both modes the kernel is normalized to unit sum before being returned,
so that convolution preserves the total flux of the spectrum.
Parameters
----------
omegad : float
Rotational angular frequency of the planet, in inverse days (d^-1).
The equatorial rotational velocity is v_eq = omega * Rp, where omega
is omegad converted to s^-1. This is a free parameter of the retrieval
and is not assumed to equal any particular value.
f_rot_arr : np.ndarray
Array of base-10 logarithms of the rotational asymmetry scaling
factors, one per observing night. Used only in transmission mode:
f_rot = 10^f_rot_arr[index_nights_total] is multiplied onto the
kernel for v > 0 to model day/night hemisphere asymmetries. It is
a free parameter of the retrieval.
Teq : float
Equilibrium temperature of the planet, in Kelvin. Used to compute
the atmospheric scale height H = k_B * T_eq / (mu * m_H * g),
which defines the vertical extent z = 5*H of the transmission
annulus. Only used in transmission mode.
mu : float
Mean molecular weight of the atmosphere, in units of the hydrogen
mass m_H. Used to compute the scale height. Only used in transmission
mode.
gs : float
Surface gravity of the planet, in cgs units (cm s^-2). Used to
compute the scale height. Only used in transmission mode.
Rpj : float
Planetary radius, in centimeters. Defines v_eq = omega * Rp (emission)
and the inner boundary of the transmission annulus (transmission).
rad_mode : str
Observing mode. Must be one of ConstantVariables.LIST_RAD_MODE:
- LIST_RAD_MODE[0]: transmission spectroscopy (annulus kernel).
- any other value: emission spectroscopy (solid disk kernel).
index_nights_total : int
Index into f_rot_arr selecting the asymmetry factor for the current
observing night. Only used in transmission mode.
rv_sampling : float, optional
Velocity step of the radial velocity grid on which the kernel is
computed, in km/s. Default is 0.1 km/s, fine enough to well-sample
the kernel profile for any realistic v_eq.
Returns
-------
np.ndarray
Normalized rotational broadening kernel, sampled on a velocity grid
spanning -30 to +30 km/s with step rv_sampling. The kernel sums to
unity by construction.
"""
# Physical constants
kb = pc.kB # Boltzmann constant (erg/K)
mH = 1.6738e-24 # Hydrogen mass (g)
# --- Atmospheric scale height ---
# H = k_B * T_eq / (mu * m_H * g), in cm.
# Used only in transmission mode to define the vertical extent of the
# absorbing annulus: z = 5*H is a standard approximation for the
# effective atmospheric thickness probed in transmission.
H = kb * Teq / (mu * mH * gs)
z = 5 * H
# --- Radial velocity grid ---
# The kernel is computed on a grid from -30 to +30 km/s with step
# rv_sampling (default 0.1 km/s), giving 601 points. This range is
# sufficient for any realistic v_eq: for WASP-77Ab (omega=0.665 d^-1,
# Rpj=1.21 Rj), v_eq ~ 0.67 km/s, well within ±30 km/s.
rv_array = np.arange(-30, 30 + rv_sampling, rv_sampling)
rv_array_cm = rv_array * 1e5 # convert km/s to cm/s
# --- Convert omega from d^-1 to s^-1 ---
# 1 d^-1 = 1/86400 s^-1 = 1.1574e-5 s^-1
omega = omegad * 1.157407407407407e-05 # s^-1
# --- Projected distance x on the disk ---
# For a surface element moving at line-of-sight velocity v = omega * x,
# the projected distance from the rotation axis is x = v / omega.
# Here rv_array_cm plays the role of v (in cm/s), so x is in cm.
# x = rv_array_cm / omega
ker_rot = np.zeros(len(rv_array))
# Equatorial rotational velocity: v_eq = omega * Rp (cm/s)
# Defines the velocity at the planetary limb (x = ±Rp).
# In transmission, also define v_eq_atm = omega * (Rp + z),
# the velocity at the outer edge of the atmospheric annulus.
v_eq = omega * Rpj # cm/s
v_eq_atm = omega * (Rpj + z) # cm/s
if rad_mode == ConstantVariables.LIST_RAD_MODE[0]: # Transmission mode
f_rot = 10 ** f_rot_arr[index_nights_total]
# Region 1 (annulus only): v_eq <= |v| < v_eq_atm
# The line of sight passes only through the atmospheric annulus.
mask1 = (np.abs(rv_array_cm) >= v_eq) & (np.abs(rv_array_cm) < v_eq_atm)
# Region 2 (annulus above opaque disk): |v| < v_eq
# The line of sight passes through the full annulus minus the opaque disk.
mask2 = np.abs(rv_array_cm) < v_eq
ker_rot[mask1] = np.sqrt((Rpj + z) ** 2 - (rv_array_cm[mask1] / omega) ** 2) / z
ker_rot[mask2] = (
np.sqrt((Rpj + z) ** 2 - (rv_array_cm[mask2] / omega) ** 2) -
np.sqrt(Rpj ** 2 - (rv_array_cm[mask2] / omega) ** 2)
) / z
ker_rot[rv_array_cm > 0] *= f_rot
else: # Emission mode
# K(v) ∝ sqrt(1 - (v/v_eq)^2), zero for |v| > v_eq
mask = np.abs(rv_array_cm) < v_eq
ker_rot[mask] = np.sqrt(1 - (rv_array_cm[mask] / v_eq) ** 2)
# Normalize to unit sum so that convolution preserves total flux.
return ker_rot / np.sum(ker_rot)
[docs]
def convolve_solid_body_rotation(
wl: np.ndarray,
model: np.ndarray,
ker_rot: np.ndarray,
rv_sampling: float = 0.1,
n_el: int = 321
) -> Tuple[np.ndarray, np.ndarray]:
"""
Convolve a model spectrum with a rotational broadening kernel.
This function is the second convolution step in the model preparation
pipeline, applied after convolve_resolution. It takes the rotational
kernel produced by kernel_solid_body_rotation (defined on a fine velocity
grid with step rv_sampling) and convolves it with the model spectrum.
The procedure mirrors that of convolve_resolution:
1. The pixel velocity step of the model wavelength grid is computed as
dv = c * dlambda / lambda, constant by construction for a log-uniform
grid (constant resolving power R_model).
2. The kernel is resampled from its fine velocity grid (step rv_sampling,
default 0.1 km/s) onto a coarser grid whose spacing matches the model
pixel velocity step, using spline interpolation. This step is mandatory
because discrete convolution requires kernel and signal to share the
same grid spacing.
3. The resampled kernel is convolved with the model in 'valid' mode
(scipy.signal.convolve), discarding the n_el//2 edge pixels on each
side where the kernel does not fully overlap the signal, and the result
is normalized by the kernel sum to ensure flux conservation.
Note: unlike convolve_resolution, this function does not apply a radial
velocity shift. The shift is handled entirely by convolve_resolution via
kernel translation. This function is responsible only for the rotational
line broadening.
Parameters
----------
wl : np.ndarray
1D array of wavelength values for the model spectrum. Must be
log-uniform (constant resolving power R_model), so that the pixel
velocity step dv = c / R_model is constant across the grid.
model : np.ndarray
1D array of spectral flux values corresponding to wl. This is the
spectrum already convolved with the instrumental LSF by
convolve_resolution.
ker_rot : np.ndarray
Rotational broadening kernel computed by kernel_solid_body_rotation,
sampled on a velocity grid from -30 to +30 km/s with step rv_sampling.
Must be normalized to unit sum.
rv_sampling : float, optional
Velocity step of the input kernel grid, in km/s. Must match the
rv_sampling used in kernel_solid_body_rotation. Default is 0.1 km/s.
n_el : int, optional
Number of elements in the resampled kernel. Must be odd to ensure
symmetry around zero and avoid introducing a spurious velocity shift.
The half-extent of the resampled grid is (n_el - 1) / 2 pixels.
Default is 321, giving a half-extent of 160 pixels.
Returns
-------
wavelength : np.ndarray
Wavelength array of the convolved spectrum, shorter than the input
by n_el // 2 = 160 points on each side due to 'valid' mode convolution.
rconv : np.ndarray
Rotationally broadened flux array, flux-conserving by construction.
"""
# --- Radial velocity grid for the input kernel ---
# This must match the grid used in kernel_solid_body_rotation.
rv_array = np.arange(-30, 30 + rv_sampling, rv_sampling)
# --- Pixel velocity step of the model wavelength grid ---
# dv = c * dlambda / lambda, constant by construction for a log-uniform
# grid. Computed on each adjacent pixel pair and averaged as a numerical
# verification (see convolve_resolution Step 2 for a detailed discussion).
dv = ConstantVariables.CLIGHT / 1e5 * (wl[1:] - wl[:-1]) / wl[:-1]
rv_pix = np.mean(dv)
# --- Resampled velocity grid matching the model pixel step ---
# A grid of n_el points (odd, default 321) with spacing rv_pix, spanning
# ±(n_el-1)/2 * rv_pix = ±160 * rv_pix km/s. The odd number of points
# ensures symmetry around zero, preventing spurious velocity offsets.
# (See convolve_resolution Step 3 for a detailed discussion.)
max_val = ((n_el - 1) / 2) * rv_pix
rv_array_mod = np.linspace(-max_val, max_val, n_el)
n_rv = len(rv_array_mod)
# --- Resample the kernel onto the model grid via spline interpolation ---
# splrep builds a B-spline representation of ker_rot on rv_array, and
# splev evaluates it on rv_array_mod. After this step the kernel has
# the same velocity spacing as the model, making discrete convolution
# well-defined. (See convolve_resolution Step 4b for a detailed discussion.)
csscaled = splrep(rv_array, ker_rot)
ker_rot_pix = splev(rv_array_mod, csscaled, der=0)
# --- Convolve in 'valid' mode and normalize ---
# scipy.signal.convolve in 'valid' mode returns only the n - K + 1 central
# output points where the kernel fully overlaps the signal, discarding the
# n_rv // 2 = 160 edge pixels on each side. The wavelength array is trimmed
# by the same amount. A final normalization by np.sum(ker_rot_pix) corrects
# for any residual deviation from unity introduced by the spline resampling,
# ensuring strict flux conservation. (See convolve_resolution Step 5 for a
# detailed discussion.)
ini = int(n_rv / 2)
wavelength = wl[ini:-ini]
rconv = convolve(model, ker_rot_pix, mode="valid") / np.sum(ker_rot_pix)
return wavelength, rconv
[docs]
def convolve_resolution(
wavelength_model: np.ndarray,
flux_model: np.ndarray,
lsf_hwhm: float,
radial_velocity_shift: float,
lsf_grid_step: float = 0.1,
kernel_size: int = 321
) -> Tuple[np.ndarray, np.ndarray]:
"""
Degrade a high-resolution model spectrum to instrumental resolution and apply
a Doppler shift.
A real spectrograph cannot reproduce an infinitely narrow spectral line: even
a perfectly monochromatic source produces a broadened profile on the detector,
due to diffraction, optical aberrations, slit width, and pixel sampling. This
broadening is described by the Line Spread Function (LSF). To compare a
synthetic model with observed data, the model must be degraded to the same
instrumental resolution, i.e. convolved with the LSF.
The LSF is approximated as a Gaussian whose width is expressed in velocity
units. Working in velocity space is convenient because the LSF width is
approximately constant in velocity across the full spectral range, whereas
in wavelength units it varies with lambda. The Half Width at Half Maximum
is HWHM = c / (2R), where R is the nominal resolving power of the instrument,
and the corresponding Gaussian standard deviation is sigma = HWHM / sqrt(2*ln(2)).
The factor sqrt(2*ln(2)) ~ 1.177 comes from setting the Gaussian equal to
half its peak value and solving for v: G(v_1/2) = exp(-v_1/2^2 / 2*sigma^2) = 1/2
gives v_1/2 = sigma * sqrt(2*ln(2)), which is by definition the HWHM.
The Doppler shift of the planet is applied by translating the kernel by
+v_rad rather than shifting the spectrum itself. By the translation property
of convolution these two operations are mathematically equivalent, but acting
on the smooth 321-point kernel avoids interpolating the full model flux array,
concentrating any numerical error on the kernel alone.
Parameters
----------
wavelength_model : np.ndarray
1D array of wavelength values for the model spectrum. Must be computed
at constant resolving power R_model, so that the ratio dlambda/lambda
= 1/R_model is constant by construction (log-uniform grid), yielding
a constant velocity step dv = c / R_model per pixel.
flux_model : np.ndarray
1D array of flux values corresponding to wavelength_model.
lsf_hwhm : float
Half Width at Half Maximum of the instrumental LSF, in km/s.
Related to spectral resolution by: HWHM = c / (2R).
Example: IGRINS (R ~ 45000) has HWHM ~ 3.3 km/s.
radial_velocity_shift : float
Radial velocity shift to apply to the model, in km/s. Positive values
correspond to redshift (planet receding), negative to blueshift.
This is typically a free parameter of the MCMC retrieval, representing
the velocity offset between model and observed data.
lsf_grid_step : float, optional
Velocity step for the initial fine LSF kernel grid, in km/s. Must be
small enough to well-sample the Gaussian peak. Default is 0.1 km/s.
kernel_size : int, optional
Number of elements in the resampled convolution kernel. Must be odd
to ensure the grid is symmetric around zero, which avoids introducing
a spurious velocity shift during convolution. The half-extent of the
resampled grid is (kernel_size - 1) / 2 pixels. Default is 321,
giving a half-extent of 160 pixels.
Returns
-------
wavelength_out : np.ndarray
Wavelength array of the convolved spectrum. Shorter than the input by
kernel_size // 2 = 160 points on each side, because scipy.signal.convolve
in 'valid' mode discards the edge pixels where the kernel does not fully
overlap the signal (see Step 5).
flux_convolved : np.ndarray
Convolved and velocity-shifted flux array, flux-conserving by construction.
"""
# --- Step 1: Build the Gaussian LSF kernel on a fine velocity grid ---
# The grid spans ±30 km/s with a step of lsf_grid_step (default 0.1 km/s),
# yielding 601 points. This range is intentionally much wider than the
# Gaussian itself: for IGRINS (HWHM ~ 3.3 km/s) the kernel drops to
# negligible values already beyond ±10 km/s, so nothing is lost at the edges.
#
# HWHM_TO_SIGMA = sqrt(2 * ln(2)) ~ 1.177 converts HWHM to Gaussian sigma.
# It comes from solving G(v_1/2) = 1/2:
# exp(-v_1/2^2 / 2*sigma^2) = 1/2 => v_1/2 = sigma * sqrt(2*ln(2)) = HWHM
# therefore: sigma = HWHM / sqrt(2*ln(2))
#
# The kernel is normalized to unit sum so that the convolution preserves
# the total flux of the spectrum.
lsf_velocity_grid = np.arange(-30, 30 + lsf_grid_step, lsf_grid_step)
HWHM_TO_SIGMA = 1.177 # = sqrt(2 * ln(2))
lsf_sigma = lsf_hwhm / HWHM_TO_SIGMA
lsf_kernel = np.exp(-0.5 * (lsf_velocity_grid / lsf_sigma) ** 2)
lsf_kernel = lsf_kernel / np.sum(lsf_kernel)
# --- Step 2: Compute the velocity step of the model wavelength grid ---
# The model is computed at constant resolving power R_model, so its
# wavelength grid has dlambda/lambda = 1/R_model = constant by construction
# (log-uniform grid). The corresponding pixel velocity step is therefore
# dv = c * dlambda/lambda = c / R_model, constant across the full grid.
# We compute it explicitly on each pair of adjacent pixels and take the
# mean as a numerical verification. This value dv defines the spacing of
# the resampled kernel in the next step.
# Example: for R_model = 2.5e5 (petitRADTRANS at R=1e6, downsampled by 4),
# dv = 3e5 / 2.5e5 = 1.2 km/s per pixel.
velocity_step_per_pixel = ConstantVariables.CLIGHT * (wavelength_model[1:] - wavelength_model[:-1]) / wavelength_model[:-1]
mean_velocity_step = np.mean(velocity_step_per_pixel)
# --- Step 3: Build the resampled velocity grid matching the model ---
# The discrete convolution requires that the kernel and the model spectrum
# share the same velocity grid spacing. We therefore create a new grid of
# kernel_size points (odd, default 321) with spacing mean_velocity_step,
# so that each kernel element corresponds to exactly one model pixel.
# The half-extent is (kernel_size - 1) / 2 = 160 pixels, corresponding to
# ±160 * 1.2 = ±192 km/s for R_model = 2.5e5. This is sufficient to
# accommodate both the LSF width (a few km/s) and any realistic planetary
# radial velocity shift.
# The odd kernel_size ensures the grid is symmetric around zero, preventing
# any spurious velocity offset from being introduced during convolution.
half_kernel_extent = ((kernel_size - 1) / 2) * mean_velocity_step
resampled_velocity_grid = np.linspace(-half_kernel_extent, half_kernel_extent, kernel_size)
# --- Step 4: Shift and resample the LSF kernel ---
# Two operations are performed simultaneously:
#
# (a) SHIFT: adding radial_velocity_shift to the fine grid coordinates
# translates the Gaussian peak from 0 to +v_rad km/s. By the
# translation property of convolution, this is mathematically equivalent
# to shifting the model spectrum by +v_rad, but acts only on the smooth
# 321-point kernel instead of the full model array, concentrating any
# numerical error on the kernel alone.
#
# (b) RESAMPLE: splrep/splev interpolates the shifted kernel from the fine
# grid (step = 0.1 km/s, 601 points) onto the coarser grid built in
# Step 3 (step = mean_velocity_step, 321 points). After this step,
# kernel and model share the same grid spacing and discrete convolution
# is well-defined.
shifted_kernel_spline = splrep(lsf_velocity_grid + radial_velocity_shift, lsf_kernel)
resampled_lsf_kernel = splev(resampled_velocity_grid, shifted_kernel_spline, der=0)
# --- Step 5: Convolve and return ---
# scipy.signal.convolve is called in 'valid' mode. In a discrete convolution
# between a signal of N points and a kernel of K points, the output has
# N + K - 1 points in 'full' mode. However, the first and last K//2 = 160
# output points correspond to positions where the kernel extends beyond the
# signal boundary, requiring the signal to be padded with zeros — which
# introduces boundary artefacts. 'valid' mode discards these edge points
# and returns only the N - K + 1 central values where the kernel overlaps
# the signal completely. The wavelength array is trimmed by the same amount.
#
# A final normalization by np.sum(resampled_lsf_kernel) corrects for any
# residual deviation from unity introduced by the spline resampling in
# Step 4, ensuring strict flux conservation in the output.
half_kernel_size = kernel_size // 2
wavelength_out = wavelength_model[half_kernel_size:-half_kernel_size]
flux_convolved = convolve(flux_model, resampled_lsf_kernel, mode="valid") / np.sum(resampled_lsf_kernel)
return wavelength_out, flux_convolved
[docs]
def create_path_night(
pathfolder: Union[str, Path],
target: str,
rad_mode: str,
instrument: str
) -> str:
"""
Construct and return the file path for a specific night's observation data.
The path is built by concatenating the following components:
- `pathfolder`: The base directory containing observation data.
- `target`: The target object or observation name.
- A constant subdirectory defined by `ConstantVariables.HR_INSTRUMENTS`, representing high-resolution instruments.
- `rad_mode`: The radiative mode, which specifies the type of observation.
This can be either 'emission' or 'transmission'.
- `instrument`: The instrument identifier.
Parameters:
pathfolder (Union[str, Path]): The base directory path.
target (str): The target object or observation name.
rad_mode (str): The radiative mode; valid options are 'emission' or 'transmission'.
instrument (str): The instrument identifier.
Returns:
str: The fully constructed file path as a string.
"""
# Validate rad_mode
if rad_mode not in ConstantVariables.LIST_RAD_MODE:
print(f"Invalid rad_mode: {rad_mode}. Must be 'Emission' or 'Transmission'")
# Construct the full path using pathlib.Path
full_path = Path(pathfolder, target, ConstantVariables.HR_INSTRUMENTS, rad_mode, instrument)
# Convert the Path object to a string and return it
return str(full_path)
[docs]
def get_csv_value(
df: DataFrame,
search_value: Any,
search_column: str = "Variable",
target_column: str = "Value"
) -> Optional[Any]:
"""
Retrieve a value from a DataFrame based on a search condition.
This function searches for rows in the provided DataFrame where the value in
`search_column` matches `search_value`. If a matching row is found, the function
returns the value in the corresponding `target_column` from the first match.
If no match is found, the function returns None.
Parameters:
df (pd.DataFrame): The DataFrame to search.
search_value (Any): The value to look for in the `search_column`.
search_column (str, optional): The column name to search within.
Defaults to "Variable".
target_column (str, optional): The column name from which to retrieve the value.
Defaults to "Value".
Returns:
Optional[Any]: The value from the `target_column` of the first matching row,
or None if no match is found.
"""
# Validate column names
if search_column not in df.columns:
print(f"Search column '{search_column}' not found in DataFrame")
if target_column not in df.columns:
print(f"Target column '{target_column}' not found in DataFrame")
# Filter the DataFrame to find rows where the search_column matches search_value
matching_rows = df[df[search_column] == search_value]
# Return None if no match is found, otherwise return the value from target_column
return None if matching_rows.empty else matching_rows[target_column].values[0]
[docs]
def plot_tp_profile(
range_min_pressures: float,
range_max_pressures: float,
n_layers: int,
parameters_tp_profile: Any,
gravity: float,
format_temperature: str,
path_imm: Union[str, Path],
err: Any,
rng: Any
) -> None:
"""
Plot the pressure-temperature (PT) profile for an atmosphere and save the figure.
Parameters:
range_min_pressures (float): Log10 of the minimum pressure (bar).
range_max_pressures (float): Log10 of the maximum pressure (bar).
n_layers (int): Number of pressure layers.
parameters_tp_profile (Any): Parameters for the temperature profile.
gravity (float): Surface gravity.
format_temperature (str): Name of the temperature profile method to use.
path_imm (Union[str, Path]): Path to save the output image.
err (Any): Error parameter for the temperature profile.
rng (Any): Random number generator or seed for the temperature profile.
Returns:
None
"""
pressures = np.logspace(
range_min_pressures, range_max_pressures, n_layers
)
root = MyFigure("Pressure-Temperature", type_data="Plot")
ax = root.return_ax(["PT profile"], ["Temperature [K]"], ["Pressure [bar]"])
ax.set_xlabel("Temperature [K]", fontsize=GraphicsConfig.PLOT_FONT_SIZE_LABEL)
ax.set_ylabel("Pressure [bar]", fontsize=GraphicsConfig.PLOT_FONT_SIZE_LABEL)
ax.set_title("PT profile", fontsize=GraphicsConfig.PLOT_FONT_SIZE_TITLE)
ax.tick_params(axis='both', labelsize=GraphicsConfig.PLOT_FONT_SIZE_TICK)
ax.set_yscale("log")
ax.set_ylim([max(pressures), min(pressures)])
tp_profile_obj = UserTemperatureProfile(
pressures,
parameters_tp_profile,
gravity,
err,
rng
)
function_tp_profile = getattr(tp_profile_obj, format_temperature)
temperature, temp_layer_i = function_tp_profile()
ax.plot(temperature, pressures, alpha=0.5, color="red", label="PT profile")
if temp_layer_i is not None:
pressures_sample = np.logspace(
range_min_pressures, range_max_pressures, 1000
)
n_profiles = len(temp_layer_i[0, :])
n_samples = 1000
temperature_sample = np.zeros([n_profiles + 1, n_samples])
for index in range(n_profiles):
temperature_sample[index, :] = splev(pressures_sample, splrep(pressures, temp_layer_i[:, index]))
temperature_sample[-1, :] = splev(pressures_sample, splrep(pressures, temperature))
sigmas = np.std(temperature_sample, ddof=1, axis=0)
ax.fill_betweenx(pressures_sample, temperature_sample[-1, :] - sigmas, temperature_sample[-1, :] + sigmas, alpha=0.3, color="purple", label=r"$\sigma$")
ax.fill_betweenx(pressures_sample, temperature_sample[-1, :] - 2 * sigmas, temperature_sample[-1, :] + 2 * sigmas, alpha=0.3, color="blue", label=r"2$\sigma$")
ax.legend(fontsize=GraphicsConfig.PLOT_FONT_SIZE_LEGEND)
root.create_figure(str(Path(path_imm, "tp_profile.jpg")), 600)
# Save plot data to pkl for later reconstruction
pkl_data = {
"range_min_pressures": range_min_pressures,
"range_max_pressures": range_max_pressures,
"n_layers": n_layers,
"parameters_tp_profile": parameters_tp_profile,
"gravity": gravity,
"format_temperature": format_temperature,
"pressures": pressures,
"temperature": temperature,
"temp_layer_i": temp_layer_i,
}
with open(str(Path(path_imm, "tp_profile.pkl")), "wb") as _f:
pickle.dump(pkl_data, _f)
[docs]
def opacities_contribution_plot(
path_results: Union[str, Path],
opacities_contribution: Dict[str, Any],
resolution: str,
function: str = "plot",
wl_scale: str = "log",
invert_y_axis: int = 0,
instruments_LR=None,
offsetLR_arr=None,
rp=None,
rs=None,
rad_mode: str = "Transmission",
) -> None:
"""
Plot and save the contribution of different opacity sources to the atmospheric model.
This function creates a visualization showing how different opacity sources (line species,
gas continua, clouds, etc.) contribute to the total atmospheric opacity. The plot is saved
as a high-resolution image in the specified output directory.
Parameters:
path_results (Union[str, Path]): Directory path where the output plot will be saved.
opacities_contribution (Dict[str, Any]): Dictionary containing opacity contributions from
different sources. Keys are source types (e.g., 'line_species', 'cloud_species'),
values are dictionaries mapping species names to their contribution data.
resolution (str): Resolution identifier used in the plot title and filename.
function (str, optional): Plotting function to use ('plot', 'scatter', etc.). Defaults to "plot".
wl_scale (str, optional): Scale for the wavelength axis ('log' or 'linear'). Defaults to "log".
invert_y_axis (int, optional): Whether to invert the y-axis. Defaults to False.
instruments_LR: Dictionary containing the instruments and their LR values.
offsetLR_arr: Array containing the offset LR values.
rp: Radius of the planet.
rs: Radius of the star.
rad_mode: Radiative mode.
Returns:
None
"""
# TODO
print("TODO opacities contribution for Emission")
if rad_mode == "Emission":
return
#
if rad_mode == "Transmission":
ylabel = r"Transit radius [$R_{{Jup}}$]"
else:
ylabel = r'1 + Emission lines'
# Validate input parameters
if wl_scale not in ['log', 'linear']:
print(f"Invalid wl_scale: {wl_scale}. Must be 'log' or 'linear'")
# Initialize color and style mappings for different opacity sources
opacity_sources_colors = {
'line_species': None,
'gas_continuum_contributors': None,
'cloud_species': None,
'rayleigh_species': None,
'opaque_cloud_top_pressure': 'darkgray',
'power_law': 'red',
'gray_opacity': 'gray',
'cloud_photosphere_median_optical_depth': 'gold',
'additional_absorption_opacities_function': 'darkviolet',
'additional_scattering_opacities_function': 'magenta',
'stellar_intensities': 'yellow',
'Total': 'k'
}
opacity_sources_linestyles = {
'line_species': '-',
'gas_continuum_contributors': '--',
'cloud_species': '-.',
'rayleigh_species': (0, (1, 0.66)), # densely dotted
'opaque_cloud_top_pressure': '-.',
'power_law': ':',
'gray_opacity': ':',
'cloud_photosphere_median_optical_depth': '-.',
'additional_absorption_opacities_function': ':',
'additional_scattering_opacities_function': ':',
'stellar_intensities': '-',
'Total': '-'
}
# Initialize color generator for species without predefined colors
default_species_color_index = (i for i in range(1001))
# Process species colors for each opacity source
for opacity_source, species_list in opacities_contribution.items():
if not isinstance(species_list, dict):
continue
if opacity_sources_colors[opacity_source] is None:
opacity_sources_colors[opacity_source] = {}
for species in species_list:
# Handle special case for gas continuum contributors
if opacity_source == 'gas_continuum_contributors':
_species = split_species_all_info(get_cia_aliases(species))[0]
_species = _species.split('--', 1)
if _species[0] == _species[1]:
_species = _species[0]
elif 'H2' in _species:
_species = _species[1] if _species[0] == 'H2' else _species[0]
else:
_species = _species[0]
else:
_species = species
# Assign color if not already assigned
if species not in opacity_sources_colors[opacity_source]:
species_color = get_species_color(get_species_basename(_species), implemented_only=False)
if species_color == default_color:
i = next(default_species_color_index)
if i == 1000:
print("Cannot support more than 1000 species")
species_color = f"C{i}"
opacity_sources_colors[opacity_source][species] = species_color
# Create figure and axis
root = MyFigure(
resolution + " Opacity Contribution",
type_data="Plot",
personal_size=[0.9, 0.6]
)
axs = root.return_ax(
[resolution + " Opacity Contribution"],
["Wavelength [nm]"],
[ylabel],
subplots=(1, 1),
figsize=(12, 6)
)
# Initialize tracking variables for y-axis limits
min_y = np.inf
max_y = 0
# Plot each opacity source
for key, value in opacities_contribution.items():
if value is None:
continue
if key == "Total":
wavelengths = value[0]
contribution = value[1]
# Plot total contribution
axs.plot(
wavelengths,
contribution,
label=key,
color=opacity_sources_colors[key],
linestyle=opacity_sources_linestyles[key]
)
# max_y = np.nanmax([max_y, np.nanmax(contribution)])
else:
if isinstance(value, dict):
# Plot individual species contributions
for species_name, species_data in value.items():
if species_data is None:
continue
# Format species label
_species = get_species_scientific_name(species_name)
if key == 'rayleigh_species':
_species += ' (Rayleigh)'
__species = species_name + ' (Rayleigh)'
elif key == 'gas_continuum_contributors':
__species = species_name.rsplit('-NatAbund', 1)[0]
else:
__species = species_name
if species_data is not None:
wavelengths = species_data[0]
contribution = species_data[1]
# Plot species contribution
getattr(axs, function)(
wavelengths,
contribution,
label=_species,
color=opacity_sources_colors[key][species_name],
linestyle=opacity_sources_linestyles[key],
alpha=0.5
)
# min_y = np.nanmin([min_y, np.nanmin(contribution)])
else:
# Plot non-species opacity sources
key_name = key.replace('_', ' ')
wavelengths = value[0]
contribution = value[1]
getattr(axs, function)(
wavelengths,
contribution,
label=key_name,
color=opacity_sources_colors[key],
linestyle=opacity_sources_linestyles[key],
alpha=0.5
)
# min_y = np.nanmin([min_y, np.nanmin(contribution)])
min_y = np.nanmin([min_y, np.nanmin(contribution)])
max_y = np.nanmax([max_y, np.nanmax(contribution)])
if instruments_LR:
for index_LR, instrument in enumerate(instruments_LR):
# Calculate offset for current instrument (first instrument has no offset)
offsetLR = 0 if (index_LR == 0 or offsetLR_arr is None) else offsetLR_arr[index_LR - 1]
# Extract instrument data
ldata = instrument.lr_data.ldata_LR
rdata = instrument.lr_data.rdata_LR - offsetLR # Shift data to model level
errdata = instrument.lr_data.errdata_LR
step = instrument.lr_data.step_LR
if rad_mode == "Transmission":
rdata_temp = 1 - rdata
rdata = np.sqrt(rdata_temp) * rs * ConstantVariables.RATIO_RSUN_RJUP_MEAN
errdata = rs * ConstantVariables.RATIO_RSUN_RJUP_MEAN * errdata / (2 * np.sqrt(rdata_temp)) # error propagation
# Plot observational data with error bars
axs.errorbar(
ldata, rdata, yerr=errdata, xerr=step, ls='none',
label=f"{instrument.name} + {-offsetLR:.5f}",
alpha=0.5, color="Red"
)
# Configure plot appearance
axs.legend(loc='center left', bbox_to_anchor=(1, 0.5))
delta_y = max_y - min_y
axs.set_ylim([min_y - delta_y * 0.05, max_y + delta_y * 0.05])
if wl_scale == 'log':
axs.set_xscale("log")
if invert_y_axis:
axs.invert_yaxis()
plt.subplots_adjust(right=0.5)
# Create output directory and save plot
path_imm = Path(path_results, "opacity_contribution")
path_imm.mkdir(parents=True, exist_ok=True)
try:
root.create_figure(str(Path(path_imm, resolution + "_img.png")), 600, False)
except IOError as e:
print(f"Failed to save opacity contribution plot: {str(e)}")
[docs]
def calculate_mass_molecule(elements, periodic_table):
"""
Calculate the total molecular mass of a molecule from its elemental composition.
This function parses element strings that may contain mass numbers, atom counts,
and ionization states, then calculates the total molecular mass using atomic
masses from the periodic table.
Args:
elements: List of element strings with format like 'H2', '16O', 'C1+', etc.
periodic_table: DataFrame containing periodic table data with columns
'Symbol', 'NumberofProtons', 'NumberofNeutrons'
Returns:
float: Total molecular mass in atomic mass units (u)
"""
total_mass = 0
# Define atomic masses (in atomic mass units, u)
proton_mass = 1.007276 # u
neutron_mass = 1.008665 # u
electron_mass = 0.000548579909 # u
#
for element in elements:
# Count the number of removed electrons indicated by '+' symbols
electrons_removed = element.count("+")
# Remove numerical parts and extra characters to obtain the elemental symbol
symbol = sub(r'[0-9]+', '', element).replace("+", "").replace("_", "")
if len(symbol) > 4:
# Skip elements with an unexpectedly long symbol
continue
# Extract all numeric parts from the element string (e.g., mass numbers and counts)
amounts = findall(r'\d+', element)
if len(amounts) > 1:
# If two numbers are present: the first is the mass number, the second is the count
proton_plus_neutron = amounts[0]
atom_in_molecule = amounts[1]
elif len(amounts) == 1:
# If one number is present, determine its role by checking its position relative to the symbol
if symbol in element[:len(symbol)]:
# Use periodic table values if the number is part of standard element notation
table_row = periodic_table[periodic_table["Symbol"] == element[:len(symbol)]]
proton_plus_neutron = int(table_row["NumberofNeutrons"].iloc[0]) + int(table_row["NumberofProtons"].iloc[0])
atom_in_molecule = amounts[0]
else:
proton_plus_neutron = amounts[0]
atom_in_molecule = "1"
else:
# If no numeric value is found, default to using the periodic table (assume count = 1)
table_row = periodic_table[periodic_table["Symbol"] == element[:len(symbol)]]
proton_plus_neutron = int(table_row["NumberofNeutrons"].iloc[0]) + int(table_row["NumberofProtons"].iloc[0])
atom_in_molecule = "1"
# Retrieve the number of protons (and by extension, electrons) for the element
number_protons = number_electrons = int(
periodic_table[periodic_table["Symbol"] == symbol]["NumberofProtons"].iloc[0]
)
# Calculate the number of neutrons from the total mass number minus the number of protons
number_neutrons = int(proton_plus_neutron) - number_protons
# Add the mass contribution of this element to the total mass of the isotope
total_mass += ((number_protons * proton_mass +
number_neutrons * neutron_mass +
(number_electrons - electrons_removed) * electron_mass) *
int(atom_in_molecule))
return total_mass
[docs]
def get_condensed_line_list():
"""
Retrieve condensed molecule information from petitRADTRANS opacity data.
This function scans the condensed molecule opacity files in the petitRADTRANS
installation and extracts information about available condensed species
including their chemical formulas, names, masses, and visualization names.
Returns:
tuple: A tuple containing:
- np.ndarray: Array of condensed molecule chemical formulas
- np.ndarray: Array of condensed molecule file names
- np.ndarray: Array of calculated molecular masses (in u)
- np.ndarray: Array of visualization names for display
Note:
If no condensed molecules are found, returns arrays with ["None"]
and mass [-999] as placeholders.
"""
# Load required paths from ConstantVariables
petitradtrans = ConstantVariables.path_petitradtrans
path_default = ConstantVariables.path_default
# Read the periodic table CSV file (assumed to return a pandas DataFrame)
periodic_table_path = Path(path_default, "GUIBRUSHR", "Files", "Molecules", "periodic_table.csv")
periodic_table = read_csv(str(periodic_table_path))
# Define the path containing line-by-line opacity data for molecules
clouds_path = Path(petitradtrans, "opacities", "continuum", "clouds")
# Initialize containers for molecules, isotopes, opacity_line_lists_HR, and computed masses
all_condensed_molec_from_yaml = ConstantVariables.ALL_CONDENSED_MOLEC
condensed_molecs = []
condensed_names = []
condensed_masses = []
condensed_visualization_name = []
#
try:
# Retrieve a sorted list of molecule directories
condensed_names = [cloud.name.removesuffix(".cotable.petitRADTRANS.h5") for cloud in sorted(clouds_path.glob('**/*.h5'))]
for i, condensed_name in enumerate(condensed_names):
name_split = condensed_name.split("-")
chemical_formula = []
for nn in name_split:
if "NatAbund" in nn:
break
else:
chemical_formula.append(nn)
condensed_molecs.append("".join(chemical_formula))
condensed_masses.append(calculate_mass_molecule(chemical_formula, periodic_table))
if len(condensed_names) == 0:
condensed_visualization_name = ["None"]
condensed_molecs = ["None"]
condensed_names = ["None"]
condensed_masses = [-999]
else:
condensed_visualization_name = [all_condensed_molec_from_yaml[key] for key in condensed_names]
except Exception as e:
print(e, "\nNo condensed molecules present in opacities")
# Return the sorted molecules array and the dictionaries for isotopes, opacity_line_lists_HR, and masses
return np.array(condensed_molecs), np.array(condensed_names), np.array(condensed_masses), np.array(condensed_visualization_name)
# def get_line_lists() -> Tuple[np.ndarray, Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, float]]:
# """
# Retrieve line lists, isotopes, opacity_line_list identifiers, and compute isotopic masses
# from molecular opacity data used by petitRADTRANS.
#
# This function performs the following steps:
# 1. Reads the periodic table data from a CSV file to obtain atomic properties
# 2. Scans the 'line_by_line' directory for molecule directories
# 3. For each molecule, processes its isotopes and computes their masses
# 4. Extracts opacity_line_list information from the opacity files
#
# Returns:
# Tuple containing:
# - np.ndarray: Sorted array of molecule names
# - Dict[str, np.ndarray]: Dictionary mapping molecules to their isotope names
# - Dict[str, np.ndarray]: Dictionary mapping isotopes to their opacity_line_list identifiers
# - Dict[str, float]: Dictionary mapping isotopes to their computed masses (in atomic mass units)
#
# Raises:
# FileNotFoundError: If the periodic table CSV file is not found
# IOError: If there are issues reading the files
# ValueError: If the periodic table data is malformed
# """
# # Load required paths from ConstantVariables
# petitradtrans = ConstantVariables.path_petitradtrans
# path_default = ConstantVariables.path_default
#
# # Define atomic masses (in atomic mass units, u)
# proton_mass = 1.007276 # u
# neutron_mass = 1.008665 # u
# electron_mass = 0.000548579909 # u
#
# # Read the periodic table CSV file
# periodic_table_path = Path(path_default, "GUIBRUSHR", "Files", "Molecules", "periodic_table.csv")
# try:
# periodic_table = read_csv(str(periodic_table_path))
# except FileNotFoundError:
# print(f"Periodic table file not found at {periodic_table_path}")
# except Exception as e:
# print(f"Error reading periodic table file: {str(e)}")
#
# # Define the path containing line-by-line opacity data
# line_lists_path = Path(petitradtrans, "opacities", "lines", "line_by_line")
#
# # Initialize containers for molecules, isotopes, opacity_line_lists_HR, and masses
# isotopes = {}
# opacity_line_lists_HR = {}
# masses_isotopes = {}
#
# try:
# # Get sorted list of molecule directories
# molecs = sorted([d.name for d in line_lists_path.iterdir() if d.is_dir()])
#
# # Process each molecule directory
# for molec in molecs:
# path_isotope = line_lists_path / molec
# isotopes[molec] = np.array([d.name for d in path_isotope.iterdir() if d.is_dir()])
#
# # Process each isotope for the current molecule
# for isotope in isotopes[molec]:
# elements = isotope.split("-")
# total_mass = 0.0
#
# # Calculate mass for each element in the isotope
# for element in elements:
# electrons_removed = element.count("+")
# symbol = sub(r'[0-9]+', '', element).replace("+", "").replace("_", "")
#
# if len(symbol) > 4:
# continue
#
# # Extract numeric parts from element string
# amounts = findall(r'\d+', element)
#
# # Determine mass number and atom count
# if len(amounts) > 1:
# proton_plus_neutron = amounts[0]
# atom_in_molecule = amounts[1]
# elif len(amounts) == 1:
# if symbol in element[:len(symbol)]:
# table_row = periodic_table[periodic_table["Symbol"] == element[:len(symbol)]]
# proton_plus_neutron = int(table_row["NumberofNeutrons"]) + int(table_row["NumberofProtons"])
# atom_in_molecule = amounts[0]
# else:
# proton_plus_neutron = amounts[0]
# atom_in_molecule = "1"
# else:
# table_row = periodic_table[periodic_table["Symbol"] == element[:len(symbol)]]
# proton_plus_neutron = int(table_row["NumberofNeutrons"]) + int(table_row["NumberofProtons"])
# atom_in_molecule = "1"
#
# # Get number of protons and calculate neutrons
# number_protons = number_electrons = int(
# periodic_table[periodic_table["Symbol"] == symbol]["NumberofProtons"]
# )
# number_neutrons = int(proton_plus_neutron) - number_protons
#
# # Add element's mass contribution to total
# total_mass += (
# (number_protons * proton_mass +
# number_neutrons * neutron_mass +
# (number_electrons - electrons_removed) * electron_mass) *
# int(atom_in_molecule)
# )
#
# # Store computed mass and get opacity_line_list information
# masses_isotopes[isotope] = total_mass
# path_opacity_line_list = path_isotope / isotope
# opacity_line_lists_HR[isotope] = np.array([
# d.name.removesuffix(".xsec.petitRADTRANS.h5")
# for d in path_opacity_line_list.iterdir() if not d.is_dir()
# ])
#
# except Exception as e:
# print(f"Error processing molecules: {str(e)}\nNo molecules present in opacities")
# return np.array([]), {}, {}, {}
#
# return np.array(molecs), isotopes, opacity_line_lists_HR, masses_isotopes
[docs]
def get_line_lists():
"""
Retrieve line lists, isotopes, opacity_line_list identifiers, and compute isotopic masses
from molecular opacity data used by petitRADTRANS.
This function performs the following steps:
1. Reads the periodic table data from a CSV file to obtain numbers of protons
and neutrons for each element.
2. Scans the 'line_by_line' directory (within the opacities folder) for molecule directories.
3. For each molecule, lists its available isotopes (each isotope is represented by a directory).
4. Parses each isotope's name (which encodes its elemental composition) to
determine the elemental symbols and their corresponding mass numbers,
compute the total mass of the isotope using predefined atomic masses
(proton: 1.007276 u, neutron: 1.008665 u, electron: 0.000548579909 u),
and adjust the electron count based on the number of '+' symbols.
5. For each isotope, retrieves the available opacity_line_list file names (by removing the
".xsec.petitRADTRANS.h5" suffix).
Returns:
tuple: A tuple containing:
- np.ndarray: Sorted array of molecule names.
- dict: Dictionary mapping each molecule to an array of its isotope names.
- dict: Dictionary mapping each isotope to an array of HR opacity_line_list identifiers.
- dict: Dictionary mapping each isotope to an array of LR opacity_line_list identifiers.
- dict: Dictionary mapping each isotope to its computed total mass (in atomic mass units).
"""
# Load required paths from ConstantVariables
petitradtrans = ConstantVariables.path_petitradtrans
path_default = ConstantVariables.path_default
# Read the periodic table CSV file (assumed to return a pandas DataFrame)
periodic_table_path = Path(path_default, "GUIBRUSHR", "Files", "Molecules", "periodic_table.csv")
periodic_table = read_csv(str(periodic_table_path))
# Define the path containing line-by-line opacity data for molecules
line_lists_path = Path(petitradtrans, "opacities", "lines", "line_by_line")
# Initialize containers for molecules, isotopes, opacity_line_lists_HR, and computed masses
molecs = []
isotopes = dict()
opacity_line_lists_HR = dict()
opacity_line_lists_LR = dict()
masses_isotopes = dict()
try:
# Retrieve a sorted list of molecule directories
molecs = sorted([d.name for d in line_lists_path.iterdir() if d.is_dir()])
# Loop over each molecule directory
for molec in molecs:
# Path to the current molecule's isotope directories
path_isotope = line_lists_path / molec
# List available isotopes for this molecule (each as a directory)
isotopes[molec] = np.array([d.name for d in path_isotope.iterdir() if d.is_dir()])
# Process each isotope for the current molecule
for isotope in isotopes[molec]:
# The isotope name encodes its elemental composition separated by "-"
elements = isotope.split("-")
# Store the computed mass for the isotope
masses_isotopes[isotope] = calculate_mass_molecule(elements, periodic_table)
# Retrieve opacity_line_list file names (strip the specific suffix) from the isotope directory
path_opacity_line_list = path_isotope / isotope
opacity_line_lists_HR[isotope] = np.array([
d.name.removesuffix(".xsec.petitRADTRANS.h5")
for d in path_opacity_line_list.iterdir() if not d.is_dir()
])
if len(opacity_line_lists_HR[isotope]) == 0:
opacity_line_lists_HR[isotope] = np.array(["None"])
# LR
path_opacity_line_list_LR = Path(str(path_opacity_line_list).replace("line_by_line", "correlated_k"))
if path_opacity_line_list_LR.exists():
opacity_line_lists_LR[isotope] = np.array([
d.name.removesuffix(".ktable.petitRADTRANS.h5")
for d in path_opacity_line_list_LR.iterdir() if not d.is_dir()
])
if len(opacity_line_lists_LR[isotope]) == 0:
opacity_line_lists_LR[isotope] = ["None"]
else:
opacity_line_lists_LR[isotope] = ["None"]
except Exception as e:
print(str(traceback.format_exc()), "\nNo molecules present in opacities")
# Return the sorted molecules array and the dictionaries for isotopes, opacity_line_lists_HR, and masses
return np.array(molecs), isotopes, opacity_line_lists_HR, opacity_line_lists_LR, masses_isotopes
[docs]
def generate_chemcat(
min_pressure: float,
max_pressure: float,
nlayersf: int,
path_default: Union[str, Path]
) -> Tuple[np.ndarray, chemcat.Network, List[str]]:
"""
Generate a chemical catalog and retrieve molecular masses for atmospheric modeling.
This function creates a chemical network with specified pressure layers and temperature,
and prepares molecular data for atmospheric modeling.
Parameters:
min_pressure (float): Logarithm of minimum pressure (bar)
max_pressure (float): Logarithm of maximum pressure (bar)
nlayersf (int): Number of pressure layers
path_default (Union[str, Path]): Base path to default files directory
Returns:
Tuple containing:
- np.ndarray: Array of molecular masses for selected molecules
- chemcat.Network: Chemical network object
- List[str]: List of species names compatible with petitRADTRANS
"""
# Construct path to molecules data file
molecules_path = Path(path_default) / "GUIBRUSHR" / "Files" / "Molecules" / "molecules.dat"
names, massf, diam = None, None, None
try:
# Read molecular data
names, massf, diam = io.read_molecs(str(molecules_path))
except FileNotFoundError:
print(f"Molecules data file not found at {molecules_path}")
except Exception as e:
print(f"Error reading molecular data: {str(e)}")
# Build list of molecules from various chemical groups
molecules = (
ConstantVariables.HCNO_NEUTRALS.split() +
ConstantVariables.IONS.split() +
ConstantVariables.ALKALI.split() +
ConstantVariables.METALS.split() +
ConstantVariables.METAL_OXIDES.split()
)
# Format species names for petitRADTRANS compatibility
species_compatible_with_prt = [
mol if mol in ConstantVariables.IONS else mol.replace("+", "_+")
for mol in molecules
]
# Generate pressure grid and temperature array
pressuresf = np.logspace(min_pressure, max_pressure, nlayersf)
temp = np.full(nlayersf, 1000.0) # Constant temperature of 1000 K
# Create chemical network
chemcat_objf = chemcat.Network(pressuresf, temp, molecules)
# Extract molecular masses
try:
molecular_masses = np.array([massf[names == spec][0] for spec in molecules])
except IndexError:
print("One or more molecules not found in molecular data")
return molecular_masses, chemcat_objf, species_compatible_with_prt
[docs]
def read_fits(
path_fits: Union[str, Path],
transpose: bool = False,
intransit: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Read data from a FITS file and optionally apply slicing and transposition.
Parameters:
path_fits (Union[str, Path]): Path to the FITS file
transpose (bool, optional): Whether to transpose the data array. Defaults to False.
intransit (Optional[np.ndarray], optional): Indices to slice the data along the second axis.
If provided, data is sliced as data[:, intransit, :].
Defaults to None.
Returns:
np.ndarray: The processed data array from the FITS file
"""
try:
with fits.open(path_fits) as hdul:
data = hdul[0].data
if intransit is not None:
data = data[:, intransit, :]
if transpose:
data = np.transpose(data)
return data
except FileNotFoundError:
print(f"FITS file not found at {path_fits}")
except Exception as e:
print(f"Error reading FITS file: {str(e)}")
[docs]
def get_depth_filename(rad_mode: str) -> str:
"""Return the depth FITS filename based on the radiative transfer mode."""
if rad_mode == "Emission":
return "emission_depth.fits"
return "transmission_depth.fits"
[docs]
def read_depth_fits(folder: Union[str, Path], rad_mode: str, **kwargs) -> np.ndarray:
"""Read the depth FITS file with fallback to legacy transit_depth.fits."""
folder = Path(folder)
primary = folder / get_depth_filename(rad_mode)
fallback = folder / "transit_depth.fits"
if primary.exists():
return read_fits(str(primary), **kwargs)
if fallback.exists():
print(f"{primary.name} not found, falling back to transit_depth.fits")
return read_fits(str(fallback), **kwargs)
return read_fits(str(primary), **kwargs)
[docs]
def plot_vmr(
pressures: np.ndarray,
vmr_dict: Dict[str, np.ndarray],
folder_plot: Union[str, Path]
) -> None:
"""
Plot volume mixing ratio (VMR) profiles for different chemical species groups.
This function creates a 2x3 grid of plots showing VMR profiles for:
- All species
- HCNO neutrals
- Ions
- Alkali metals
- Metals
- Metal oxides
Parameters:
pressures (np.ndarray): Pressure values (in bar)
vmr_dict (Dict[str, np.ndarray]): Dictionary mapping species names to their VMR profiles
folder_plot (Union[str, Path]): Directory where the plot will be saved
"""
# Create figure with 2x3 subplots
root = MyFigure("VMR profiles", type_data="Plot", personal_size=[0.8, 0.8])
# Define plot labels and titles
xlabels = ["$Log_{10} VMR$"] * 6
ylabels = ["$Log_{10}$ Pressure (bar)"] * 6
titles = [
"VMR profiles all",
"VMR profiles HCNO neutrals",
"VMR profiles ions",
"VMR profiles alkali",
"VMR profiles metals",
"VMR profiles metal oxides",
]
# Create subplots
ax = root.return_ax(
titles,
xlabels,
ylabels,
subplots=[2, 3],
figsize=(14, 12),
dpi=100,
hspace=0.4,
wspace=0.3,
)
# Initialize lists for different species groups
species_groups = {
'all': [],
'HCNO_neutrals': [],
'ions': [],
'alkali': [],
'metals': [],
'metal_oxides': []
}
vmr_groups = {key: [] for key in species_groups.keys()}
# Sort species into groups
for molec, vmr in vmr_dict.items():
molec_name_compatible = molec.replace("_", "")
# Add to all species
species_groups['all'].append(molec)
vmr_groups['all'].append(vmr)
# Add to specific groups
if molec_name_compatible in ConstantVariables.HCNO_NEUTRALS:
species_groups['HCNO_neutrals'].append(molec)
vmr_groups['HCNO_neutrals'].append(vmr)
if molec_name_compatible in ConstantVariables.IONS:
species_groups['ions'].append(molec)
vmr_groups['ions'].append(vmr)
if molec_name_compatible in ConstantVariables.ALKALI:
species_groups['alkali'].append(molec)
vmr_groups['alkali'].append(vmr)
if molec_name_compatible in ConstantVariables.METALS:
species_groups['metals'].append(molec)
vmr_groups['metals'].append(vmr)
if molec_name_compatible in ConstantVariables.METAL_OXIDES:
species_groups['metal_oxides'].append(molec)
vmr_groups['metal_oxides'].append(vmr)
# Plot each group
group_positions = {
'all': (0, 0),
'HCNO_neutrals': (0, 1),
'ions': (0, 2),
'alkali': (1, 0),
'metals': (1, 1),
'metal_oxides': (1, 2)
}
for group, (row, col) in group_positions.items():
if species_groups[group]: # Only plot if there are species in the group
u.plot_vmr(
pressures,
np.transpose(vmr_groups[group]),
species_groups[group],
axis=ax[row, col],
fontsize=12
)
try:
# Save the plot
root.create_figure(str(Path(folder_plot, 'vmr_profiles.jpg')), 600)
except IOError as e:
print(f"Failed to save VMR profiles plot: {str(e)}")
[docs]
def pre_plot(
folder_retrieval_final: Union[str, Path],
arr_values: np.ndarray,
remove_chain: [bool, int],
corner_plot: [bool, int],
best_fit_method : str
) -> Tuple[int, float, float, float, np.ndarray, List[str], np.ndarray, np.ndarray, List[float], List[str], str, str, List[str], np.ndarray]:
"""
Prepare posterior chain statistics and parameter labels for plotting.
Parameters:
folder_retrieval_final (Union[str, Path]): Path to the final retrieval folder
arr_values (np.ndarray): Array containing parameter information
remove_chain (bool): Whether to remove chains based on chi2 criteria
corner_plot (bool): Whether to process data for corner plots
best_fit_method (str): Method used to extract best fit parameters
Returns:
Tuple containing:
- int: Number of bad chains
- float: AIC value
- float: Last maximum likelihood
- float: Current maximum likelihood
- np.ndarray: Flattened chi2 values
- List[str]: Best-fit parameter names
- np.ndarray: Chain parameters
- np.ndarray: Parameter medians
- List[float]: Fixed parameter values
- List[str]: Fixed parameter names
- str: Summary string
- str: Parameter names string
- List[str]: Parameter labels
- np.ndarray: Parameter errors
"""
best_fit_parameters_obj = get_medians(folder_retrieval_final, arr_values, remove_chain, corner_plot, best_fit_method)
# Compute AIC
aic_var = 2 * len(best_fit_parameters_obj.list_best_pars) - 2 * best_fit_parameters_obj.current_max_likelihood
# Format summary strings
stringa = ""
error_list = np.zeros(len(best_fit_parameters_obj.list_best_pars))
for i, param in enumerate(best_fit_parameters_obj.list_best_pars):
error = np.std(best_fit_parameters_obj.arr1[:, i])
error_list[i] = error
sorted_arr = np.sort(best_fit_parameters_obj.arr1[:, i])
pos = int(len(sorted_arr) * 0.95)
upper_limits = sorted_arr[pos]
if corner_plot:
stringa += f"{param}: {best_fit_method}.={best_fit_parameters_obj.best_fit_parameters[i]:.6f}, E.+={error:.6f}, U.L.={upper_limits:.6f}\n"
else:
if best_fit_method == "From sampler":
value_str = np.median(best_fit_parameters_obj.best_fit_parameters[:, i])
pre_str = "Median "
else:
pre_str = ""
value_str = best_fit_parameters_obj.best_fit_parameters[i]
stringa += f"{pre_str}{best_fit_method} {param} = {value_str:.6f}\n"
# Create parameter labels for corner plots
stringa_par = ""
lista_parametri = []
if corner_plot:
for elem in best_fit_parameters_obj.list_best_pars_labels:
stringa_par += elem + ","
no_number_elem = re.sub(r'\d+', '', elem)
number = re.findall(r'\d+', elem)
if no_number_elem in ConstantVariables.LIST_MULTIPLE_PARAM:
string_form = ConstantVariables.DICT_LABELS[no_number_elem].replace("x", number[0])
else:
#TODO remove for compatibility
if elem == "csuo":
elem = "co_ratio"
string_form = ConstantVariables.DICT_LABELS[elem]
string_form = string_form.replace('r"', "", 1).replace('"', "")
lista_parametri.append(string_form)
stringa_par = stringa_par[:-1] # Remove trailing comma
print("Producing the plot...")
return (
best_fit_parameters_obj.number_bad_chains,
aic_var,
best_fit_parameters_obj.last_max_LH,
best_fit_parameters_obj.current_max_likelihood,
best_fit_parameters_obj.chi2_monodimensional,
best_fit_parameters_obj.list_best_pars,
best_fit_parameters_obj.arr1,
best_fit_parameters_obj.best_fit_parameters,
best_fit_parameters_obj.value_fix,
best_fit_parameters_obj.lista_par_fix,
stringa,
stringa_par,
lista_parametri,
error_list,
)
[docs]
def cross_correlation(
type_cc: str,
arr1: np.ndarray,
arr2: np.ndarray,
sigma: Optional[np.ndarray] = None
) -> float:
"""
Compute cross-correlation between two arrays using different methods.
Parameters:
type_cc (str): Type of correlation to compute ('numpy', 'normal', or 'weighted')
arr1 (np.ndarray): First input array
arr2 (np.ndarray): Second input array
sigma (Optional[np.ndarray]): Error array for weighted correlation
Returns:
float: Correlation coefficient
"""
if type_cc == ConstantVariables.LIST_CORRELATION_METHODS[0]: # Numpy
correlation_matrix = np.corrcoef(arr1, arr2)
cc_result = correlation_matrix[0, 1]
elif type_cc == ConstantVariables.LIST_CORRELATION_METHODS[1]: # Normal
cc_result = c_correlate(arr1, arr2, np.array([0]))[0]
else: # Weighted
if sigma is None:
print("sigma must be provided for weighted correlation")
cc_result = w_correlate(arr1, arr2, sigma)[0]
if not np.isfinite(cc_result):
print(f"Warning: Non-finite correlation result: {cc_result}")
cc_result = 0.0
return cc_result
[docs]
def c_correlate(
s_1: np.ndarray,
s_2: np.ndarray,
lags
) -> np.ndarray:
"""
Compute normalized cross-correlation of two signals at specified lags.
Parameters:
s_1 (np.ndarray): First input signal
s_2 (np.ndarray): Second input signal
lags: Array of lags at which to compute correlation
Returns:
np.ndarray: Array of correlation values for each lag
"""
if s_1.shape != s_2.shape:
print("Input signals must have the same shape")
n_samples = s_1.shape[0]
s1_centered = s_1 - np.mean(s_1)
s2_centered = s_2 - np.mean(s_2)
correlation = np.empty(len(lags), dtype=float)
norm_factor = np.sqrt(np.sum(s1_centered ** 2) * np.sum(s2_centered ** 2))
for idx, lag in enumerate(lags):
if lag >= 0:
product_sum = np.sum(s1_centered[:n_samples - lag] * s2_centered[lag:])
else:
product_sum = np.sum(s1_centered[-lag:] * s2_centered[:n_samples + lag])
correlation[idx] = product_sum
return correlation / norm_factor
[docs]
def w_correlate(
s_1: np.ndarray,
s_2: np.ndarray,
es_1: np.ndarray
) -> np.ndarray:
"""
Compute weighted cross-correlation between two signals.
Parameters:
s_1 (np.ndarray): First input signal
s_2 (np.ndarray): Second input signal
es_1 (np.ndarray): Error array for the first signal
Returns:
np.ndarray: Array containing the weighted correlation value
"""
if s_1.shape != s_2.shape:
print("Input signals must have the same shape")
s1_centered = s_1 - np.mean(s_1)
s2_centered = s_2 - np.mean(s_2)
correlation = np.empty(1, dtype=float)
correlation[0] = np.sum((s1_centered * s2_centered) / es_1 ** 2)
return correlation
[docs]
def linear_solver(
topca_l: np.ndarray,
res_l: np.ndarray
):
"""
Solve a linear system using Singular Value Decomposition (SVD).
This function solves the linear system Ax = b using SVD decomposition,
where A is the design matrix (res_l) and b is the target vector (topca_l).
The solution is computed in a numerically stable way using SVD.
Parameters:
topca_l (np.ndarray): Target vector b in the system Ax = b
res_l (np.ndarray): Design matrix A in the system Ax = b
Returns:
Results: The reconstructed array after solving the linear system
Error: Whether the linear system was solved successfully
"""
error = False
try:
# Perform SVD decomposition
uf, s, vh = np.linalg.svd(res_l, full_matrices=False)
# Compute intermediate matrices
uT = np.transpose(uf)
nD = np.diag(1 / s)
vHcT = np.transpose(vh.conj())
# Solve the system in steps:
# 1. U diag(s) Vh x = b <=> diag(s) Vh x = U.T b = c
c = np.dot(uT, topca_l)
# 2. diag(s) Vh x = c <=> Vh x = diag(1/s) c = w
w = np.dot(nD, c)
# 3. Vh x = w <=> x = Vh.H w
x = np.dot(vHcT, w)
# Reconstruct the array
return np.dot(res_l, x), error
except np.linalg.LinAlgError as e:
print(f"SVD computation failed: {str(e)}")
error = True
return None, error
[docs]
def skewed_gaussian(x, amp, center, width, skew):
"""
Calculate a skewed Gaussian distribution.
Args:
x: Input values
amp: Amplitude of the distribution
center: Center position
width: Width parameter
skew: Skewness parameter
Returns:
Values of the skewed Gaussian distribution
"""
return amp * skewnorm.pdf(x, skew, loc=center, scale=width)
[docs]
def estimate_continuum(spectrum_new, num_windows=40, top_window=10, check_gaussian=False):
"""
Estimate and remove the continuum from a spectrum using windowed median fitting.
Args:
spectrum_new (np.ndarray): Input spectrum array.
num_windows (int): Number of windows to divide the spectrum into.
top_window (int): Number of top values per window used for continuum estimation.
check_gaussian (bool): If True, fit a skewed Gaussian instead of a polynomial.
Returns:
tuple: (continuum, model_after_processing) where continuum is the estimated
continuum and model_after_processing is the spectrum normalized by it.
"""
window_size = len(spectrum_new) // num_windows
medians_continuum = []
wl_continuum = []
# Calculate medians_continuum for continuum fitting
for i in range(num_windows):
xv = (i * window_size + (i + 1) * window_size) / 2
window_element = spectrum_new[i * window_size: (i + 1) * window_size]
top_10 = np.sort(window_element)[-top_window:]
medians_continuum.append(np.median(top_10))
wl_continuum.append(xv)
wl_continuum = np.array(wl_continuum)
medians_continuum = np.array(medians_continuum)
try:
if check_gaussian:
p0 = [1, np.mean(wl_continuum), np.std(wl_continuum) * 2, 0]
params, _ = curve_fit(skewed_gaussian, wl_continuum, medians_continuum, p0=p0)
# skewed_fit = skewed_gaussian(wl_continuum, *params)
continuum = skewed_gaussian(np.arange(len(spectrum_new)), *params)
else:
par = np.polyfit(wl_continuum, medians_continuum, 4)
continuum = np.polyval(par, np.arange(len(spectrum_new)))
except Exception as e:
par = np.polyfit(wl_continuum, medians_continuum, 4)
continuum = np.polyval(par, np.arange(len(spectrum_new)))
model_after_processing = spectrum_new / continuum # we want the continuum at one, not at 0.9 or similar
return continuum, model_after_processing
[docs]
def trpca(
lcr_mask: np.ndarray,
lcrm_mask_nomask: np.ndarray,
good_pixel: np.ndarray,
ncomp: int,
smooth_on: bool,
smooth_size: int,
apply_standardize : bool,
model_reprocessing_type: str,
subtract_mean_spectrum: bool = False,
pca_mode: str = "spatial"
):
"""
Process light curve residuals by removing systematic components using PCA.
This function processes light curve residuals by:
1. Normalizing the data by subtracting column-wise means
2. Optionally applying PCA-based component removal
3. Updating the output array for good pixels
4. Optionally applying smoothing
Parameters:
lcr_mask (np.ndarray): 2D array of light curve residuals with telluric mask
lcrm_mask_nomask (np.ndarray): 2D array to store processed residuals
good_pixel (np.ndarray): Boolean mask or indices of good pixels
ncomp (int): Number of PCA components to use
smooth_on (bool): Whether to apply smoothing
smooth_size (int): Window size for smoothing filter
apply_standardize (bool): Whether to apply standardization
model_reprocessing_type (str): Processing mode, either "hard" or "soft"
subtract_mean_spectrum (bool): Whether to subtract mean spectrum
pca_mode (str): PCA mode - "spatial" (samples=images, features=pixels) or
"temporal" (samples=pixels, features=images). Default: "spatial"
Returns:
np.ndarray: Processed light curve residuals
error: Whether the processing was successful
"""
error = False
# Normalize the light curve residuals
lcrm = np.copy(lcr_mask)
means = np.mean(lcrm, axis=0)
lcrm = lcrm - np.expand_dims(means, axis=0)
# Center data to column
if subtract_mean_spectrum:
mean_spectrum = np.mean(lcrm, axis=1)
lcrm = lcrm - np.expand_dims(mean_spectrum, axis=1)
# Apply PCA-based component removal if in "hard" mode
if model_reprocessing_type == "hard":
try:
# Prepare data based on PCA mode
# Spatial mode (default): samples=images, features=pixels -> matrix shape (n_pixels, n_images)
# Temporal mode: samples=pixels, features=images -> matrix shape (n_images, n_pixels)
if pca_mode == "temporal":
# Transpose for temporal PCA: now rows are images, columns are pixels
pca_input_matrix = lcrm.T
else:
# Spatial PCA (default): rows are pixels, columns are images
pca_input_matrix = lcrm
# Create pipeline with optional standardization
if apply_standardize:
transformer = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=ncomp, svd_solver="full"))
])
else:
transformer = Pipeline([
('pca', PCA(n_components=ncomp, svd_solver="full"))
])
comps = transformer.fit_transform(pca_input_matrix)
ones_column = np.ones((comps.shape[0], 1))
comps_with_ones = np.hstack((comps, ones_column))
except Exception as e: # except (np.linalg.LinAlgError, ValueError) as e:
print(f"Error applying PCA-based component removal: {str(e)}")
error = True
return None, error
#
array_reconstruct, error_trpca = linear_solver(pca_input_matrix, comps_with_ones)
if error_trpca:
error = True
print(f"PCA processing failed in solver")
return None, error
#
diff_pca = pca_input_matrix - array_reconstruct
# Transform back to original orientation if temporal mode was used
if pca_mode == "temporal":
lcrm_pca = diff_pca.T
else:
lcrm_pca = diff_pca
else:
lcrm_pca = lcrm
# Update output array for good pixels
lcrm_mask_nomask[good_pixel, :] = lcrm_pca
# Apply smoothing if requested
if smooth_on:
# Note: Smoothing implementation is currently disabled
# This could be reimplemented using scipy.ndimage.uniform_filter
pass
return lcrm_pca, error
[docs]
def get_keys_by_value(
dictionary: Dict[str, Any],
target_value: Any
) -> Optional[str]:
"""
Find the first key in a dictionary whose value contains the target value.
This function searches through the dictionary values to find the first key
whose value contains the target value. It's useful for reverse lookups in
dictionaries where values are collections.
Parameters:
dictionary (Dict[str, Any]): Dictionary to search through
target_value (Any): Value to search for in the dictionary values
Returns:
Optional[str]: The first key whose value contains target_value,
or None if no match is found
"""
try:
return next(
(key for key, value in dictionary.items() if target_value in value),
None
)
except TypeError:
# Handle case where values are not iterable
return None
[docs]
def compute_adjusted_abundance(
pressure: np.ndarray,
temperature: np.ndarray,
coeff,
base_vmr
) -> np.ndarray:
"""
Compute adjusted abundance accounting for dissociation effects.
This function calculates the adjusted abundance of a species by considering
both the base volume mixing ratio and dissociation effects. The dissociation
is modeled using a pressure and temperature dependent function.
Parameters:
pressure (np.ndarray): Pressure values in bar
temperature (np.ndarray): Temperature values in Kelvin
coeff: Coefficients [a, b, c] for dissociation calculation
base_vmr: Base volume mixing ratio from reference layer
Returns:
np.ndarray: Adjusted abundance values
The calculation follows these steps:
1. Compute dissociation factor (Ad) using pressure and temperature
2. Convert to inverse dissociation factor (1/Ad)
3. Combine with inverse base abundance (1/A0) in quadrature
4. Return final adjusted abundance (A)
"""
# Calculate dissociation factor
log10Ad = coeff[0] * np.log10(pressure) + coeff[1] / temperature - coeff[2]
one_over_Ad = 1.0 / (10 ** log10Ad)
# Calculate inverse of base abundance
one_over_A0 = 1.0 / base_vmr
# Combine terms in quadrature
one_over_A = (np.sqrt(one_over_A0) + np.sqrt(one_over_Ad)) ** 2
# Return final adjusted abundance
return 1.0 / one_over_A
[docs]
def spectral_convolution_ptr(wavelength, spectrum, pixel_dv):
"""Convolve a spectrum to instrumental resolution using petitRADTRANS."""
# Calculate resolution from pixel velocity width
resolution = ConstantVariables.CLIGHT / (2 * pixel_dv)
return SpectralModel.convolve(wavelength, spectrum, resolution)
[docs]
def get_stellar_model(path_pkl, min_wl_cm, max_wl_cm):
"""Load a stellar spectrum from a pickle file and optionally trim to a wavelength range (cm)."""
with open(path_pkl, "rb") as f:
stellar_spectrum = pickle.load(f)
wavelength = stellar_spectrum["wavelength"]
spectrum = stellar_spectrum["spectrum"]
if min_wl_cm is not None and max_wl_cm is not None:
mask = (wavelength >= min_wl_cm) & (wavelength <= max_wl_cm)
wavelength = wavelength[mask]
spectrum = spectrum[mask]
return wavelength, np.pi * spectrum
[docs]
def generate_star_model_hr(path_pkl, pixel_dv, min_wl=None, max_wl=None):
"""
Generate a stellar spectrum model from a pickled file.
This function loads a stellar spectrum from a pickle file, convolves it
with the instrumental resolution, and creates a spline interpolation model.
Parameters
----------
path_pkl : str or Path
Path to the pickle file containing stellar spectrum data
pixel_dv : float
Pixel velocity width for resolution calculation
min_wl : float
Minimum wavelength for interpolation model in cm
max_wl : float
Maximum wavelength for interpolation model in cm
Returns
-------
tuple
Spline representation of the stellar spectrum model
"""
# Load stellar spectrum data from pickle file
wavelength, spectrum = get_stellar_model(path_pkl, min_wl, max_wl)
# Convolve spectrum with instrumental resolution
spc = spectral_convolution_ptr(wavelength, spectrum, pixel_dv)
# Create spline interpolation model
spline_model = splrep(wavelength, spc)
return wavelength * 1e7, spectrum, spline_model
[docs]
def rv_planet_and_star(
eccentricity, target, ecc_retrieval, opi_retrieval, kp_retrieval, night_obj, mode,
index_night_in_current_instrument, dVsys_arr, counter_derivative_params
):
"""
Compute planetary and stellar radial velocities for a given night.
Args:
eccentricity (bool): Whether to use eccentric orbit model.
target: Target object with orbital parameters.
ecc_retrieval (float): Retrieved eccentricity value.
opi_retrieval (float): Retrieved argument of periastron.
kp_retrieval (float): Retrieved planetary radial velocity semi-amplitude (km/s).
night_obj: Night object containing phase and barycentric velocity data.
mode (str): Operating mode ('CC', 'Retrieval', or synthetic).
index_night_in_current_instrument (int): Index of the night within the instrument.
dVsys_arr: Array of systemic velocity offsets per night.
counter_derivative_params (int): Counter for derivative parameter indexing.
Returns:
tuple: (vtot, vtot_star, kpph) planetary velocity, stellar velocity, and Kp*phase arrays.
"""
phases = night_obj.phases_considered
hjdk = night_obj.hjd_ck_considered
barycentric = night_obj.barycentric_velocity_considered
if mode == "CC":
kp = target.kp
ecc = target.eccentricity
opi = target.argument_of_periastron
coeff_vtot = -1
elif mode == "Retrieval":
kp = kp_retrieval
ecc = ecc_retrieval
opi = opi_retrieval
coeff_vtot = 1
else: # synthetic generation
kp = target.kp
ecc = target.eccentricity
opi = target.argument_of_periastron
coeff_vtot = 1
phases = night_obj.phases
hjdk = night_obj.hjd_ck
barycentric = night_obj.barycentric_velocity
# SYSTEMIC VELOCITY CORRECTIONS
# Apply night-specific systemic velocity offset (first night is reference)
dvsys = 0 if (index_night_in_current_instrument == 0 or dVsys_arr is None or mode != "Retrieval") else dVsys_arr[counter_derivative_params]
if -999 in night_obj.hjd_ck:
hjd0 = 0
period = 1
times_of_observation = phases
else:
hjd0 = target.hjd0
period = target.period
times_of_observation = hjdk
systemic_velocity = target.systemic_velocity
ks = target.ks
if ks == 0:
ks = target.kp * target.mass * np.sin(np.deg2rad(target.inclination)) / (
target.stellar_mass * ConstantVariables.SOLAR_TO_JUPITER_MASSES)
if mode != "Retrieval":
# TODO REMOVE
print(f"Insert ks in yaml! ks_km_s: {ks}")
vtot, vtot_star, kpph = calculate_rv_planet_and_star(
hjd0, times_of_observation, eccentricity, period, ecc, opi, kp, ks, phases,
coeff_vtot, systemic_velocity, barycentric, dvsys
)
return vtot, vtot_star, kpph
[docs]
def calculate_rv_planet_and_star(
hjd0, times_of_observation, eccentricity, period, ecc, opi, kp, ks, phases,
coeff_vtot, systemic_velocity, barycentric, dvsys, vsys_extra=0
):
"""
Calculate planetary and stellar radial velocities from orbital parameters.
Returns:
tuple: (vtot, vtot_star, kpph) total velocity shift for planet, for star, and Kp*phase.
"""
if eccentricity:
T_periastro = timetrans_to_timeperi(
hjd0, period, ecc, opi - np.pi,
)
orbital_element = np.array([
period, T_periastro, ecc, opi, kp
])
kpph = rv_drive(
times_of_observation, orbital_element,
)
else:
# Simple circular orbit approximation
kpph = kp * np.sin(2 * np.pi * phases)
# Calculate total velocity shift for planet spectrum
vtot = coeff_vtot * (-kpph - (systemic_velocity + vsys_extra + dvsys) + barycentric)
# Calculate stellar radial velocity for stellar spectrum correction
if ks is not None:
vtot_star = (
ks * np.sin(2 * np.pi * phases) - systemic_velocity + barycentric
)
else:
vtot_star = None
return vtot, vtot_star, kpph
[docs]
def rv_DopplerShadow(target, phases):
"""
Compute the Rossiter-McLaughlin (Doppler shadow) radial velocity anomaly.
Based on Cegla, H. M., et al. 2016, A&A, 588, A127.
Args:
target: Target object with projected_obliquity, a_Rs_ratio, inclination, v_sini,
and systemic_velocity attributes.
phases (np.ndarray): Orbital phase array.
Returns:
np.ndarray: Radial velocity anomaly plus systemic velocity (km/s).
"""
# formulae taken from
# Cegla, H. M., et al. 2016, A&A, 588, A127
# ---- center of the planet at any given orbital phase ----#
xp = target.a_Rs_ratio * np.sin(2 * np.pi * phases) # xp=a/R* sin(2*pi*phi)
yp = -target.a_Rs_ratio * np.cos(2 * np.pi * phases) * np.cos(np.deg2rad(target.inclination)) # yp=-a/R* con(2*pi*phi)*cos(ip)
l = np.deg2rad(target.projected_obliquity)
# ---- orthogonal distance from the spin-axis ----#
x = xp * np.cos(l) - yp * np.sin(l) # x_|_=xp * cos(lamda) - yp * sin(lamda)
# IF differential rotation and alpha is known
# diff. rot: Omega = Omega_eq*(1-alpha*sin2theta)
# y = xp*np.sin(l.radian) - yp*np.cos(l.radian)
# z = np.sqrt(1 - (x**2) - (y**2))
# beta = np.pi/2 + istar.radian.value
# z1 = z*np.cos(beta) - y*sin(beta)
# y1 = z*np.sin(beta) - y*cos(beta)
# v = x*vsini*(1-alpha*(y1**2))
v = x * target.v_sini
return v + target.systemic_velocity
# ----------------------------------------------------------------------
# convenience wrapper
# ----------------------------------------------------------------------
[docs]
def ask_stellar_params(parent=None, stellar_teff: Optional[float] = None) -> Tuple[Optional[float], Optional[float], Optional[float]]:
"""
Open stellar parameter dialog and return user-specified stellar properties.
This convenience function creates and displays a modal dialog for entering
stellar atmospheric parameters needed for synthetic spectrum generation.
The dialog allows input of effective temperature, surface gravity, and
metallicity with validation and reasonable defaults.
Parameters
----------
parent : tk.Misc or None, optional
Parent widget for the dialog. If None, uses default root window
or creates a new Tk instance, by default None
stellar_teff : float or None, optional
Initial effective temperature value to populate in the dialog,
by default None
Returns
-------
tuple[float or None, float or None, float or None]
Tuple containing stellar parameters:
- T_eff : Effective temperature in Kelvin (rounded to 5 digits)
- log_g : Surface gravity in cgs units (log10(g))
- Fe_H : Metallicity [Fe/H] in dex
Returns (None, None, None) if user cancels or closes dialog
Notes
-----
The stellar parameters are used to select appropriate stellar atmosphere
models for synthetic spectrum generation. The dialog provides validation
to ensure physically reasonable values are entered.
"""
# Use default root or create new instance if no parent provided
import tkinter as tk
if parent is None:
parent = tk._default_root or tk.Tk()
# Create and display stellar parameter dialog
dialog = StellarDialog(parent, stellar_teff)
# Return dialog result or defaults if dialog was cancelled
return getattr(dialog, "result", (None, None, None))
[docs]
def download_star_spectrum(figure_tk, target_obj, folder_stellar_spc):
"""
Download and save a PHOENIX stellar spectrum if not already present.
Args:
figure_tk: Parent Tkinter widget for the stellar parameter dialog.
target_obj: Target object with stellar_effective_temperature attribute.
folder_stellar_spc (str): Directory for storing the stellar spectrum file.
Returns:
str or None: Path to the stellar spectrum pickle file, or None if cancelled.
"""
# Download stellar spectrum if missing
path_phoenixf = str(Path(folder_stellar_spc, "star_spectrum_default.pkl"))
if not os.path.exists(path_phoenixf):
print("Stellar spectrum not found. Requesting stellar parameters...")
temperature, gravity, met = ask_stellar_params(
parent=figure_tk,
stellar_teff=target_obj.stellar_effective_temperature
)
if temperature is not None:
print(f"Downloading stellar spectrum: T_eff={temperature}K, log(g)={gravity}, [Fe/H]={met}")
# Create directory if it doesn't exist
os.makedirs(folder_stellar_spc, exist_ok=True)
# Download stellar spectrum files
file_wlen_phoenix = (
f"wget -P {folder_stellar_spc} "
f"ftp://phoenix.astro.physik.uni-goettingen.de/HiResFITS/"
f"PHOENIX-ACES-AGSS-COND-2011/Z{met}/"
f"lte{temperature}-{gravity}0-0.0.PHOENIX-ACES-AGSS-COND-2011-HiRes.fits"
)
file_spectrum_phoenix = (
f"wget -P {folder_stellar_spc} "
f"ftp://phoenix.astro.physik.uni-goettingen.de/HiResFITS/"
f"WAVE_PHOENIX-ACES-AGSS-COND-2011.fits"
)
print("Downloading spectrum files...")
os.system(file_wlen_phoenix)
os.system(file_spectrum_phoenix)
# Process downloaded files
path_spc = str(Path(
folder_stellar_spc,
f"lte{temperature}-{gravity}0-0.0.PHOENIX-ACES-AGSS-COND-2011-HiRes.fits"
))
path_wlen = str(Path(folder_stellar_spc, "WAVE_PHOENIX-ACES-AGSS-COND-2011.fits"))
path_pkl = str(Path(folder_stellar_spc, "star_spectrum_default.pkl"))
print("Processing and saving stellar spectrum...")
# Load and process spectrum data
spc = fits.open(path_spc)[0].data
wlen = fits.open(path_wlen)[0].data
star_model = {
"wavelength": wlen * 1e-8, # angstrom to cm
"spectrum": spc / np.pi
}
# Save processed spectrum
with open(path_pkl, "wb") as file:
pickle.dump(star_model, file)
print("Stellar spectrum saved successfully.")
return path_phoenixf
else:
print("Stellar parameter dialog cancelled. Aborting synthetic generation.")
return None
else:
print("Using existing stellar spectrum.")
return path_phoenixf
[docs]
def compute_transit_durations(period, a_Rs, k, inc_deg, ecc=0.0, omega=np.pi / 2):
"""
Compute transit durations T14, T23, T12 (=T34) for circular and eccentric orbits.
Parameters
----------
period : float
Orbital period in days.
a_Rs : float
Semi-major axis in units of stellar radii (a / R_star).
k : float
Planet-to-star radius ratio (Rp / Rs).
inc_deg : float
Orbital inclination in degrees.
ecc : float, optional
Orbital eccentricity (default 0 for circular).
omega : float, optional
Argument of periastron of the STAR in radians (default pi/2).
Returns
-------
dict with keys:
'T14', 'T23', 'T12' : durations in days
'T14_hours', 'T23_hours', 'T12_hours' : durations in hours
'b' : impact parameter
'ecc_factor' : eccentricity correction factor (1.0 if circular)
'grazing' : True if grazing transit (T23 = 0)
'limph_T14', 'limph_T12' : phase limits = T / (2*P)
References
----------
Winn (2010), "Transits and Occultations", arXiv:1001.2010, Eq. 14-16.
Kipping (2010), MNRAS 407, 301, arXiv:1004.3819.
"""
inc = np.radians(inc_deg)
sin_i = np.sin(inc)
# Impact parameter
# Eccentric: b = (a/Rs) * cos(i) * (1 - e^2) / (1 + e*sin(omega))
# Circular: b = (a/Rs) * cos(i)
if ecc > 0:
ecc_factor_b = (1.0 - ecc ** 2) / (1.0 + ecc * np.sin(omega))
else:
ecc_factor_b = 1.0
b = a_Rs * np.cos(inc) * ecc_factor_b
if b > (1.0 + k):
raise ValueError(
f"No transit: b = {b:.4f} > 1 + k = {1 + k:.4f}."
)
# Winn (2010) Eq. 14-15
arg_T14_sq = (1.0 + k) ** 2 - b ** 2
arg_T23_sq = (1.0 - k) ** 2 - b ** 2
if arg_T14_sq < 0:
raise ValueError(
f"No transit: (1+k)^2 - b^2 = {arg_T14_sq:.6f} < 0"
)
grazing = False
if arg_T23_sq < 0:
grazing = True
arg_T23_sq = 0.0
sin_T14 = min((1.0 / a_Rs) * np.sqrt(arg_T14_sq) / sin_i, 1.0)
sin_T23 = min((1.0 / a_Rs) * np.sqrt(arg_T23_sq) / sin_i, 1.0)
T14_circ = (period / np.pi) * np.arcsin(sin_T14)
T23_circ = (period / np.pi) * np.arcsin(sin_T23)
# Eccentricity correction (Winn 2010, Eq. 16)
# Psi = sqrt(1 - e^2) / (1 + e*sin(omega))
if ecc > 0:
ecc_factor = np.sqrt(1.0 - ecc ** 2) / (1.0 + ecc * np.sin(omega))
else:
ecc_factor = 1.0
T14 = T14_circ * ecc_factor
T23 = T23_circ * ecc_factor
T12 = (T14 - T23) / 2.0
limph_T14 = T14 / (2.0 * period)
limph_T12 = T12 / (2.0 * period)
limph_T23 = T23 / (2.0 * period)
print(f"T14={T14 * 24:.2f}hours, T23={T23 * 24:.2f}hours, T12={T12 * 24:.2f}hours, limph_T14={limph_T14:.2f}, limph_T12={limph_T12:.2f}, limph_T23={limph_T23:.2f}")
return {
'T14': T14,
'T23': T23,
'T12': T12,
'T14_hours': T14 * 24.0,
'T23_hours': T23 * 24.0,
'T12_hours': T12 * 24.0,
'b': b,
'ecc_factor': ecc_factor,
'grazing': grazing,
'limph_T14': limph_T14,
'limph_T12': limph_T12,
'limph_T23': limph_T23,
}