"""
ModelData Module
This module contains the core ModelData class for atmospheric retrieval calculations.
It handles parameter management, model calculations, and likelihood evaluations for
both high-resolution and low-resolution observations.
This is the CORE of the code - all logic and sequence of operations are preserved.
"""
import os
import pickle
import traceback
import multiprocessing
from pathlib import Path
from multiprocessing import Manager
from re import sub
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from petitRADTRANS import physical_constants as phys_const
import pyratbay.atmosphere as pa
from scipy.integrate import trapezoid
from scipy.interpolate import splrep, splev
import pyratbay.spectrum as ps
from GUIBRUSHR.General_Constants.Classes.UserTemperatureProfile import UserTemperatureProfile
from GUIBRUSHR.General_Constants.Classes.ValueErrorTP import ValueErrorTP
from GUIBRUSHR.General_Constants.FunctionsAndConstants.Constant_Variables import ConstantVariables
from GUIBRUSHR.General_Constants.Classes.Instrument import Instrument
from GUIBRUSHR.General_Constants.Classes.Night import Night
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.Atmosphere import Atmosphere
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.Retrieval import Retrieval
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.Bestpars import Bestpars
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.Random import Random
from GUIBRUSHR.General_Constants.Classes.Target import Target
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.ParamForModel import ParamForModel
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.NotRetrieval import NotRetrieval
from GUIBRUSHR.Retrieval.ExofastMCMC.StructReturnExofast import StructReturnExofast
from GUIBRUSHR.General_Constants.FunctionsAndConstants.general_functions import (
get_csv_value, convolve_resolution, trpca, compute_adjusted_abundance,
convolve_solid_body_rotation, kernel_solid_body_rotation, rv_planet_and_star, estimate_continuum
)
[docs]
def get_param_array_initial_value(array, index, single_value=True):
"""
Get initial parameter value from array at specified index.
Args:
array: Parameter array
index: Index to retrieve
single_value: If True, return single value, else return array
Returns:
Initial parameter value or None if not found
"""
if array[index] is None:
return None
if single_value:
return array[index].get_starting_value()
else:
return array[index].get_starting_value_arr()
[docs]
class ModelData:
"""
Core class for atmospheric retrieval model data and calculations.
This class manages all aspects of atmospheric modeling including:
- Parameter initialization and management
- Model calculations for both high and low resolution
- Likelihood evaluations
- MCMC chain operations
Attributes:
path_default: Default path for operations
params_list: List of all possible parameters
list_multiple_param: Parameters that can have multiple values
clight: Speed of light constant
model_type: Type of model (Retrieval, Model, etc.)
atmosphere: Atmosphere object for calculations
retrieval_data: Retrieval configuration data
bestpars_data: Best parameters data
random_obj: Random number generator object
"""
[docs]
def __init__(
self,
path_params=None,
path_df=None,
id_process=None,
table_output_file=None,
minwlen_lr=None,
maxwlen_lr=None,
minwlen_hr=None,
maxwlen_hr=None,
model_type="Retrieval",
lbl_sampling_hr=None,
lbl_sampling_lr=None,
range_min=None,
range_max=None,
nlayers=None,
manual_model_obj=None,
load_new_opacities=True,
path_default=None,
):
"""
Initialize ModelData object.
Args:
path_params: Path to parameters file
path_df: Path to dataframe file
id_process: Process ID for parallel operations
table_output_file: Output file for table data
minwlen_lr: Minimum wavelength for low resolution
maxwlen_lr: Maximum wavelength for low resolution
minwlen_hr: Minimum wavelength for high resolution
maxwlen_hr: Maximum wavelength for high resolution
model_type: Type of model calculation
lbl_sampling_hr: Line-by-line sampling parameter hr
lbl_sampling_lr: Line-by-line sampling parameter lr
range_min: Minimum pressure range
range_max: Maximum pressure range
nlayers: Number of atmospheric layers
manual_model_obj: Manual model object for direct initialization
load_new_opacities: Whether to load new opacity data
path_default: Default working directory path
"""
# Initialize core attributes
self.lbl_sampling_hr = lbl_sampling_hr
self.lbl_sampling_lr = lbl_sampling_lr
os.chdir(path_default)
self.path_default = path_default
self.params_list = ConstantVariables.params_list
self.list_multiple_param = ConstantVariables.LIST_MULTIPLE_PARAM
self.clight = ConstantVariables.CLIGHT
# Initialize data objects
self.smooth_data = None
self.bestpars_data = None
self.retrieval_data = None
self.atmosphere = None
self.random_obj = None
self.wlen_list_overplot = None
self.jitter_list = None
self.table_output_file = None
# Initialize parameter array and indices
self.initial_param_array = [None for _ in self.params_list]
self.start_general_1 = self.get_index("kp")
self.start_elements = self.get_index(ConstantVariables.LIST_ELEMENT_FOR_HYBRID[0])
self.start_molec = self.get_index("H2")
self.start_condensed = ConstantVariables.start_condensed_molecs
# Store configuration parameters
self.model_type = model_type
# Initialize based on input method
if path_params is not None:
# Initialize from parameter files
df_parameters = self.read_df_parameters(path_params)
self.read_df_information(
path_df, df_parameters,
id_process, table_output_file,
minwlen_lr, maxwlen_lr, minwlen_hr, maxwlen_hr, range_min, range_max, nlayers
)
else:
# Initialize from manual model object
self.populate_from_manual_model(
manual_model_obj, load_new_opacities,
minwlen_lr, maxwlen_lr, minwlen_hr, maxwlen_hr, range_min, range_max, nlayers
)
[docs]
def populate_from_manual_model(
self,
manual_model_obj,
load_new_opacities,
minwlen_lr,
maxwlen_lr,
minwlen_hr,
maxwlen_hr,
range_min_press=None,
range_max_press=None,
nlayers=None,
):
"""
Populate model data from a manual model object.
This method initializes the model using parameters from a manual model object
instead of reading from parameter files. It processes both parameter arrays
and molecule lists to set up the atmospheric model.
Args:
manual_model_obj: Manual model object containing parameters and molecules
load_new_opacities: Whether to load new opacity data
minwlen_lr: Minimum wavelength for low resolution
maxwlen_lr: Maximum wavelength for low resolution
minwlen_hr: Minimum wavelength for high resolution
maxwlen_hr: Maximum wavelength for high resolution
range_min_press: Minimum pressure range
range_max_press: Maximum pressure range
nlayers: Number of atmospheric layers
"""
# Initialize parameter lists
scale = []
rayleigh_species = []
line_species = []
line_species_isotope = []
line_species_complete_name_hr = []
line_species_complete_name_lr = []
list_bestpars = []
list_bestpars_initial_value = []
list_fixed = []
list_condensed_molecules = []
isotope = None
# Process parameters from manual model object
for elem in manual_model_obj.param_array:
if elem is not None and elem.is_considered():
# Extract parameter information
name, name_for_list_molec, molec_formula = elem.get_name()
mass_elem = elem.mass_molec
value_during_retrieval = elem.get_value()
constant_VMR = elem.VMR
rayleigh = elem.get_rayleigh_value()
# Add parameter to model
parameters = self.add_param_manual_model(
name, value_during_retrieval, constant_VMR, mass_elem,
molec_formula, name_for_list_molec, scale, rayleigh,
rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, isotope, list_fixed,
list_condensed_molecules
)
# Update parameter lists with returned values
(
scale, rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, list_fixed,
list_condensed_molecules
) = parameters
# Process molecules from manual model object
molec_list = manual_model_obj.molecs
for molec in molec_list.split("\n"):
string_comp = molec.split(",")
# Parse molecule information
name = molec_formula = string_comp[0]
isotope = string_comp[1]
name_for_list_molec = string_comp[2]
mass_elem = float(string_comp[4])
value_during_retrieval = float(string_comp[5])
constant_VMR = int(string_comp[6])
rayleigh = int(string_comp[8])
# Add molecule parameter to model
parameters = self.add_param_manual_model(
name, value_during_retrieval, constant_VMR, mass_elem,
molec_formula, name_for_list_molec, scale, rayleigh,
rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, isotope, list_fixed,
list_condensed_molecules
)
# Update parameter lists with returned values
(
scale, rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, list_fixed,
list_condensed_molecules
) = parameters
# Process molecules from manual model object
condensed_list = manual_model_obj.condensed
if condensed_list != "":
for condensed in condensed_list.split("\n"):
string_comp = condensed.split(",")
# Parse molecule information
name = name_for_list_molec = string_comp[0]
molec_formula = string_comp[-1]
isotope = None
mass_elem = float(string_comp[2])
value_during_retrieval = float(string_comp[3])
constant_VMR = True
rayleigh = False
# Add molecule parameter to model
parameters = self.add_param_manual_model(
name, value_during_retrieval, constant_VMR, mass_elem,
molec_formula, name_for_list_molec, scale, rayleigh,
rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, isotope, list_fixed,
list_condensed_molecules
)
# Update parameter lists with returned values
(
scale, rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, list_fixed,
list_condensed_molecules
) = parameters
# Process molecules from manual model object
hybrid_list = manual_model_obj.hybrid
if hybrid_list != "":
for hybrid in hybrid_list.split("\n"):
string_hybrid = hybrid.split(",")
value_during_retrieval = float(string_hybrid[2])
name = name_for_list_molec = string_hybrid[0]
molec_formula = None
isotope = None
mass_elem = None
constant_VMR = True
rayleigh = False
# Add molecule parameter to model
parameters = self.add_param_manual_model(
name, value_during_retrieval, constant_VMR, mass_elem,
molec_formula, name_for_list_molec, scale, rayleigh,
rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, isotope, list_fixed,
list_condensed_molecules
)
# Update parameter lists with returned values
(
scale, rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, list_fixed,
list_condensed_molecules
) = parameters
# Initialize core objects
self.random_obj = Random(1990) # not needed in forward model
self.bestpars_data = Bestpars(list_bestpars, list_bestpars_initial_value)
# Initialize atmosphere if needed
if load_new_opacities:
self.atmosphere = Atmosphere()
# Update atmosphere parameters
self.atmosphere.update_params(
line_species,
line_species_isotope,
line_species_complete_name_hr,
line_species_complete_name_lr,
list_condensed_molecules,
rayleigh_species,
range_min_press,
range_max_press,
nlayers,
lbl_high_res=self.lbl_sampling_hr,
lbl_low_res=1,
manual_model_obj=manual_model_obj,
chemistry=manual_model_obj.chemistry,
start_molecs=self.start_molec
)
# Initialize retrieval data
self.retrieval_data = Retrieval(scale, manual_model_obj=manual_model_obj)
# Set up opacities and wavelength ranges if needed
if load_new_opacities:
self.atmosphere.set_wl_range(
minwlen_lr=minwlen_lr, maxwlen_lr=maxwlen_lr,
minwlen_hr=minwlen_hr, maxwlen_hr=maxwlen_hr,
)
self.atmosphere.read_opacities(
table_output_file=manual_model_obj.table_output_file,
path_target=manual_model_obj.path_targets,
stellar_spectrum_type_hr=manual_model_obj.path_stellar_spectrum
)
[docs]
def add_param_manual_model(
self,
name,
value_during_retrieval,
constant_VMR,
mass_elem,
molec_formula,
name_for_list_molec,
scale,
rayleigh,
rayleigh_species,
line_species,
line_species_isotope,
line_species_complete_name_hr,
line_species_complete_name_lr,
list_bestpars,
list_bestpars_initial_value,
isotope,
list_fixed,
list_condensed_molecules
):
"""
Add a parameter from manual model to the parameter management system.
This method processes a single parameter from a manual model object and
adds it to the appropriate parameter lists based on its properties.
Args:
name: Parameter name
value_during_retrieval: Parameter value for retrieval
constant_VMR: Whether VMR is constant
mass_elem: Molecular mass
molec_formula: Molecular formula
name_for_list_molec: Name for molecule list
scale: Scale factor list
rayleigh: Whether Rayleigh scattering applies
rayleigh_species: List of Rayleigh species
line_species: List of line species
line_species_isotope: List of isotopes
line_species_complete_name_hr: List of complete species names HR
line_species_complete_name_lr: List of complete species names LR
list_bestpars: List of best parameters
list_bestpars_initial_value: List of initial values
isotope: Isotope information
list_fixed: List of fixed parameters
list_condensed_molecules: List of condensed molecules
Returns:
Tuple of updated parameter lists
"""
# Set default parameter properties for manual model
in_best_pars = 0
scale_param = 0
sigma_prior_param = None
is_molec = mass_elem is not None
range_param = np.inf
opacity_name_lr = isotope
# Use parameter management system to process the parameter
return self.parameter_management(
name, in_best_pars, scale_param, constant_VMR,
-range_param, range_param, is_molec, molec_formula,
mass_elem, name_for_list_molec, rayleigh,
value_during_retrieval, sigma_prior_param, isotope, opacity_name_lr,
scale, rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, list_fixed, list_condensed_molecules
)
[docs]
def read_df_parameters(self, path_df_parameters):
"""
Read df_parameters.csv file containing the fitting parameters.
This method reads a CSV file containing parameter definitions and processes
each parameter to build the complete parameter structure for the model.
Args:
path_df_parameters: Path to the parameters CSV file
Returns:
Tuple containing all processed parameter information:
- mass_vector: List of molecular masses
- scale: Array of scale factors
- rayleigh_species: List of Rayleigh scattering species
- line_species: List of line species
- line_species_isotope: List of isotope information
- line_species_complete_name_hr: List of complete HR species names
- line_species_complete_name_lr: List of complete LR species names
- list_bestpars: List of best fit parameters
- list_bestpars_initial_value: List of initial parameter values
- list_fixed: List of fixed parameters
- list_condensed_molecules: List of condensed phase molecules
"""
# Initialize parameter lists
rayleigh_species = []
line_species = []
line_species_isotope = []
line_species_complete_name_hr = []
line_species_complete_name_lr = []
list_condensed_molecules = []
list_bestpars = []
list_bestpars_initial_value = []
list_fixed = []
scale = []
mass_vector = []
# Read and process CSV file
df_parameters = pd.read_csv(path_df_parameters)
df_parameters = df_parameters.where(pd.notnull(df_parameters), None)
for _, row in df_parameters.iterrows():
name_for_retrieval = row.get('name')
name_for_list_molec = row.get('name')
molec_param = int(row.get('molec'))
value_during_retrieval = float(row.get('value'))
scale_param = float(row.get('scale'))
range_min_param = float(row.get('range_min'))
range_max_param = float(row.get('range_max'))
rayleigh_species_param = int(row.get('rayleigh_species'))
in_bestpars_param = int(row.get('in_bestpars'))
mass_param = row.get('mass')
sigma_prior_param = row.get('sigma_prior')
molec_formula_param = row.get('molec_formula')
constant_vmr_param = row.get('constant_vmr')
isotope = row.get('isotope')
opacity_name_lr = row.get('opacity_name_lr') or isotope
# Process mass parameter
if mass_param is None or np.isnan(mass_param):
mass_param = None
else:
mass_param = float(mass_param)
mass_vector.append(mass_param)
# Process sigma prior parameter
if sigma_prior_param is None or np.isnan(sigma_prior_param):
sigma_prior_param = None
else:
sigma_prior_param = float(sigma_prior_param)
# Process molecule formula and name
if molec_param == 0:
molec_formula_param = None
elif molec_param == 1:
name_for_retrieval = molec_formula_param
# Use parameter management to process this parameter
(
scale, rayleigh_species, line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, list_fixed, list_condensed_molecules
) = self.parameter_management(
name_for_retrieval, in_bestpars_param,
scale_param, constant_vmr_param,
range_min_param, range_max_param,
molec_param, molec_formula_param,
mass_param, name_for_list_molec, rayleigh_species_param,
value_during_retrieval, sigma_prior_param,
isotope, opacity_name_lr, scale, rayleigh_species,
line_species, line_species_isotope,
line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars,
list_bestpars_initial_value, list_fixed, list_condensed_molecules
)
# Finalize processing
scale = np.array(scale)
if len(list_condensed_molecules) == 0:
list_condensed_molecules = None
return (
mass_vector,
scale,
rayleigh_species,
line_species,
line_species_isotope,
line_species_complete_name_hr,
line_species_complete_name_lr,
list_bestpars,
list_bestpars_initial_value,
list_fixed,
list_condensed_molecules
)
# noinspection PyUnresolvedReferences
[docs]
def parameter_management(
self,
name_for_retrieval,
in_bestpars_param,
scale_param,
constant_vmr_param,
range_min_param,
range_max_param,
molec_param,
molec_formula_param,
mass_param,
name_for_list_molec,
rayleigh_species_param,
value_during_retrieval,
sigma_prior_param,
isotope,
opacity_name_lr,
scale,
rayleigh_species,
line_species,
line_species_isotope,
line_species_complete_name_hr,
line_species_complete_name_lr,
list_bestpars,
list_bestpars_initial_value,
list_fixed,
list_condensed_molecules
):
"""
Manage parameter processing and categorization for the retrieval system.
This method handles the complex logic of processing individual parameters
and categorizing them into appropriate lists based on their properties.
It manages parameter creation, updates, and classification into different
species types (Rayleigh, line species, condensed molecules).
Args:
name_for_retrieval: Parameter name used in retrieval
in_bestpars_param: Whether parameter is in best parameters list
scale_param: Scale factor for parameter
constant_vmr_param: Whether VMR is constant
range_min_param: Minimum parameter range
range_max_param: Maximum parameter range
molec_param: Whether parameter is molecular
molec_formula_param: Molecular formula
mass_param: Molecular mass
name_for_list_molec: Name for molecule list
rayleigh_species_param: Whether parameter is Rayleigh species
value_during_retrieval: Parameter value during retrieval
sigma_prior_param: Sigma prior for parameter
isotope: Isotope information
opacity_name_lr: Opacity name for low resolution
scale: Scale factors list
rayleigh_species: Rayleigh species list
line_species: Line species list
line_species_isotope: Isotope list
line_species_complete_name_hr: Complete species names list HR
line_species_complete_name_lr: Complete species names list LR
list_bestpars: Best parameters list
list_bestpars_initial_value: Initial values list
list_fixed: Fixed parameters list
list_condensed_molecules: Condensed molecules list
Returns:
Tuple of updated parameter lists
"""
# Resolve parameter name (handle numbered parameters like jitter1, jitter2)
if name_for_retrieval not in self.params_list:
name = sub(r"\d+", "", name_for_retrieval)
else:
name = name_for_retrieval
# Create parameter object if it doesn't exist
if self.initial_param_array[self.get_index(name)] is None:
self.initial_param_array[self.get_index(name)] = ParamForModel(
name, in_bestpars_param, scale_param, constant_vmr_param,
range_min_param, range_max_param, molec_param, molec_formula_param,
mass_param, name_for_list_molec, isotope,
)
# Update parameter values based on parameter type
if name in self.list_multiple_param:
# Handle multi-value parameters (jitter, f_rot, etc.)
self.initial_param_array[self.get_index(name)].append_elem(
value_during_retrieval, sigma_prior_param
)
else:
# Handle single-value parameters
self.initial_param_array[self.get_index(name)].update_starting_value(
value_during_retrieval, sigma_prior_param
)
# Categorize parameter for fitting vs. fixed
if in_bestpars_param:
list_bestpars.append(name_for_retrieval)
list_bestpars_initial_value.append(value_during_retrieval)
scale.append(scale_param)
else:
list_fixed.append(name_for_retrieval)
# Categorize parameter by species type
if int(rayleigh_species_param) == 2:
# Both Rayleigh scattering and line opacity species
rayleigh_species.append(name_for_retrieval)
if molec_param and name.replace("_", "") in ConstantVariables.ALL_MOLEC:
line_species.append(name_for_retrieval)
line_species_isotope.append(isotope)
line_species_complete_name_hr.append(name_for_list_molec)
line_species_complete_name_lr.append(opacity_name_lr)
elif int(rayleigh_species_param) == 1:
# Rayleigh scattering species only
rayleigh_species.append(name_for_retrieval)
elif molec_param and name.replace("_", "") in ConstantVariables.ALL_MOLEC:
# Line opacity species
line_species.append(name_for_retrieval)
line_species_isotope.append(isotope)
line_species_complete_name_hr.append(name_for_list_molec)
line_species_complete_name_lr.append(opacity_name_lr)
elif name_for_retrieval in ConstantVariables.ALL_CONDENSED_MOLEC:
# Condensed phase molecules
list_condensed_molecules.append(name_for_retrieval)
return (
scale,
rayleigh_species,
line_species,
line_species_isotope,
line_species_complete_name_hr,
line_species_complete_name_lr,
list_bestpars,
list_bestpars_initial_value,
list_fixed,
list_condensed_molecules
)
[docs]
def get_value(self, params, name, single_value=True):
"""
Extract value of named parameter from params array.
Args:
params: Array of parameter objects
name: Parameter name to extract
single_value: If True, return single value, else return array
Returns:
Parameter value(s) or None if parameter not found
"""
index = self.get_index(name)
if params[index] is None:
return None
if single_value:
return params[index].get_retrieval_value()
return params[index].get_retrieval_value_arr()
[docs]
def get_index(self, param_name):
"""
Find index of parameter with given name in the parameters list.
This method handles both exact parameter names and parameter names
with numeric suffixes (e.g., 'jitter1', 'jitter2' -> 'jitter').
Args:
param_name: Name of parameter to find
Returns:
Index of parameter in params_list
"""
# Try exact match first
if param_name in self.params_list:
return self.params_list.index(param_name)
# If not found, try removing numbers from name
name_without_numbers = sub(r"\d+", "", param_name)
return int(self.params_list.index(name_without_numbers))
# noinspection PyUnresolvedReferences
[docs]
def calculate_mass_fraction(
self,
temperature,
metallicity,
c_o_ratio,
si_o_ratio_final,
vmr_peak_arr,
pressure_peak_arr,
width_peak_arr,
):
"""
Compute mass mixing ratios for atmospheric species.
This method calculates mass mixing ratios based on the selected chemistry
model (equilibrium, free chemistry, or hybrid). It handles volume mixing
ratio calculations, mean molecular weight computation, and species abundance
adjustments for dissociation effects.
Args:
temperature: 1D temperature profile array
metallicity: log(metallicity) enhancement factor for equilibrium chemistry
c_o_ratio: Carbon-to-oxygen ratio for equilibrium chemistry
si_o_ratio_final: Si-to-oxygen ratio for equilibrium chemistry
vmr_peak_arr: VMR peak values for variable abundance profiles
pressure_peak_arr: Pressure peak positions for variable profiles
width_peak_arr: Width parameters for variable abundance profiles
Returns:
Tuple containing:
- mass_fraction: Dictionary of mass mixing ratio profiles
{species name: 1D profile} for each species
- MMW: 1D mean molecular weight profile array
- vmr: 2D volume mixing ratio array of shape [nlayers, nmol]
- mean_VMR_and_MF_string: String containing mean VMR and mass fractions per species
- mean_VMR_and_MF_dict: Dictionary mapping species to their mean VMR, MF, and linelist name
"""
# Get parameters and molecular masses
params = self.initial_param_array
masses = self.atmosphere.species_obj.mass_vector
# Calculate VMR based on chemistry model
array_condensed_molecules = np.array(params[self.start_condensed:])
array_molecules_full = np.array(params[self.start_molec:self.start_elements])
array_molecules_excluded = np.array(
params[self.start_molec + self.atmosphere.species_obj.skip_molec_param_array:self.start_elements]
)
e_ratio = {}
if c_o_ratio is not None:
e_ratio["C_O"] = c_o_ratio
if si_o_ratio_final is not None:
e_ratio["Si_O"] = si_o_ratio_final
# EQUILIBRIUM CHEMISTRY: Use thermochemical equilibrium
if self.atmosphere.chemistry == ConstantVariables.LIST_CHEMISTRY_TABLE[0]: # Equilibrium
species = list(self.atmosphere.species_compatible_with_prt)
vmr = self.atmosphere.chemcat_obj.thermochemical_equilibrium(
temperature=temperature,
metallicity=metallicity,
e_ratio=e_ratio,
)
elif self.atmosphere.chemistry == ConstantVariables.LIST_CHEMISTRY_TABLE[1]:
pressure = self.atmosphere.pressure_data.pressures
nlayers = len(pressure)
species = list(self.atmosphere.species_obj.composition)
nmol = len(species)
vmr = np.zeros((nlayers, nmol))
# To correct indices of vmr_peak, pressure_peak, width_peak, a4
i_peak = 0
index_metals = []
# He and H2 are NOT included (+2 reason)
for mol in array_molecules_excluded:
if mol is None:
continue
imol = species.index(mol.name)
if mol.molec_formula not in ConstantVariables.NOT_VMR_METALS:
index_metals.append(imol)
vmr_value = 10 ** self.get_value(params, mol.molec_formula)
if mol.is_VMR_costant:
vmr[:, imol] = vmr_value
else:
vmr_peak_par = vmr_peak_arr[i_peak]
pressure_peak_par = pressure_peak_arr[i_peak]
width_peak_par = width_peak_arr[i_peak]
# a4_par = a4_arr[i_peak]
vmr_pf_log = np.log10(vmr_value) - vmr_peak_par * np.exp(
-np.power(
(np.log10(pressure) - pressure_peak_par)
/ width_peak_par,
2,
)
)
vmr_pf_log[np.where(vmr_pf_log > -1)] = -1
vmr_pf = 10 ** vmr_pf_log
i_peak += 1
vmr[:, imol] = vmr_pf
# condensed
for condensed_mol in array_condensed_molecules:
if condensed_mol is None:
continue
imol = species.index(condensed_mol.name)
index_metals.append(imol)
vmr_value = 10 ** self.get_value(params, condensed_mol.name)
vmr[:, imol] = vmr_value
vmr_metals = np.sum(vmr[:, index_metals], axis=1)
# TBD: Reject sample if np.any(vmr_metals>1)
if self.atmosphere.species_obj.include_h_m:
# Compute the H2/He ratio from the provided parameters
H2_He_ratio = self.get_value(params, "H2") / self.get_value(params, "He")
# Get indices for the relevant species in the vmr array
i_H2 = species.index('H2')
i_He = species.index('He')
i_H = species.index('H')
i_em = species.index('e-')
i_hm = species.index('H-')
# Update the helium (He) volume mixing ratio (vmr) based on the available non-metal abundance.
# The total abundance allocated to H2, and He is (1 - vmr_metals).
vmr[:, i_He] = (1.0 - vmr_metals) / (1.0 + H2_He_ratio)
# --- H2 Abundance ---
# Define coefficients for H2 dissociation adjustment
coeff_h2 = [1, 2.41e04, 6.5] # [1, 0.0001 * 2.41, 6.5]
# The reference H2 abundance is based on the H2/He ratio times the He abundance in the last layer.
base_vmr_H2 = H2_He_ratio * vmr[-1, i_He]
# Compute the adjusted H2 abundance across all layers
A_H2 = compute_adjusted_abundance(pressure, temperature, coeff_h2, base_vmr_H2)
vmr[:, i_H2] = A_H2
# For atomic hydrogen (H), the abundance profile is taken as the reverse of the H2 profile.
vmr[:, i_H] = A_H2[::-1]
# --- Free Electrons (e-) ---
# Define coefficients for electron dissociation adjustment
coeff_em = [-0.4, 0, 0]
base_vmr_em = vmr[-1, i_em]
A_em = compute_adjusted_abundance(pressure, temperature, coeff_em, base_vmr_em)
vmr[:, i_em] = A_em
# --- Negative Hydrogen Ions (H-) ---
# Define coefficients for H- dissociation adjustment
coeff_hm = [0.6, -0.14e4, 7.7] # [0.6, 0.0001 * -0.14, 7.7]
base_vmr_hm = vmr[-1, i_hm]
A_hm = compute_adjusted_abundance(pressure, temperature, coeff_hm, base_vmr_hm)
vmr[:, i_hm] = A_hm
# --- Water Vapor (H2O) ---
if 'H2O' in species:
i_h2o = species.index('H2O')
# Define coefficients for H2O dissociation adjustment
coeff_h2o = [2, 4.83e4, 15.9] # [2, 0.0001 * 4.83, 15.9]
base_vmr_h2o = vmr[-1, i_h2o]
A_h2o = compute_adjusted_abundance(pressure, temperature, coeff_h2o, base_vmr_h2o)
vmr[:, i_h2o] = A_h2o
else:
H2_He_ratio = self.get_value(params, "H2") / self.get_value(params, "He")
i_H2 = species.index('H2')
i_He = species.index('He')
# Distribute the remaining VMR (after metals) between H2 and He
# enforcing the fixed ratio R = vmr_H2 / vmr_He.
# From: vmr_H2 + vmr_He = (1 - vmr_metals) and vmr_H2 = R * vmr_He
# → vmr_He = (1 - vmr_metals) / (1 + R), vmr_H2 = R * vmr_He
vmr[:, i_He] = (1.0 - vmr_metals) / (1.0 + H2_He_ratio)
vmr[:, i_H2] = H2_He_ratio * vmr[:, i_He]
else:
species = list(self.atmosphere.species_compatible_with_prt)
e_abundances = dict()
for element in params[self.start_elements:]:
if element is None:
continue
e_abundances[element.molec_formula] = self.get_value(params, element.name)
vmr = self.atmosphere.chemcat_obj.thermochemical_equilibrium(
temperature=temperature,
metallicity=metallicity,
e_abundances=e_abundances,
e_ratio={"C_O": c_o_ratio},
)
# Mean molecular weight
MMW = pa.mean_weight(vmr, species, mass=masses)
# Mass-mixing ratio abundances:
mass_fractions_PTR = {}
mean_VMR_and_MF_dict = {}
mean_VMR_and_MF_string = ""
err = False
for mol in array_molecules_full:
if mol is not None:
name = mol.name_for_list_molec
i = species.index(mol.molec_formula)
mass_fractions_PTR[name] = vmr[:, i] * masses[i] / MMW
mean_VMR_and_MF_dict[mol.molec_formula] = {
"VMR": np.mean(vmr[:, i]),
"MF": np.mean(mass_fractions_PTR[name]),
"LineList": name
}
mean_VMR_and_MF_string += f"{mol.molec_formula}: Mean VMR = {np.mean(vmr[:, i])}:.7e; Mean MF {np.mean(mass_fractions_PTR[name]):.7e}\n"
if np.any(mass_fractions_PTR[name] < 0):
print(f"Warning: negative mass fraction values found for {name}")
err = True
for condensed_mol in array_condensed_molecules:
if condensed_mol is not None:
name = condensed_mol.name_for_list_molec
i = species.index(name)
mass_fractions_PTR[name] = vmr[:, i] * masses[i] / MMW
mean_VMR_and_MF_dict[condensed_mol.molec_formula] = {
"VMR": np.mean(vmr[:, i]),
"MF": np.mean(mass_fractions_PTR[name]),
"LineList": name
}
mean_VMR_and_MF_string += f"{condensed_mol.molec_formula}: Mean VMR = {np.mean(vmr[:, i])}:.7e; Mean MF {np.mean(mass_fractions_PTR[name]):.7e}\n"
if np.any(mass_fractions_PTR[name] < 0):
print(f"Warning: negative mass fraction values found for {name}")
err = True
return mass_fractions_PTR, MMW, vmr, mean_VMR_and_MF_string, mean_VMR_and_MF_dict, err
def _save_trpca_error_debug_info(
self,
lcrm_mask,
lcrm_mask_nomask,
good_pixel,
night,
h,
dict_calc_model,
temperature,
mass_fraction,
vmr,
MMW,
wl_full_resolution_HR,
model_full_resolution_HR,
depth_full_resolution_HR,
):
"""
Save detailed debug information when TRPCA (Telluric Removal PCA) fails.
This method creates a comprehensive dictionary containing all relevant
data for troubleshooting PCA failures during telluric removal and saves
it to a pickle file for later analysis.
Args:
lcrm_mask: Masked telluric-removed model spectrum
lcrm_mask_nomask: Unmasked telluric-removed model spectrum
good_pixel: Boolean array of valid pixels
night: Night object containing observation data
h: Current spectral order index
dict_calc_model: Dictionary with calculated model parameters
temperature: Temperature profile array
mass_fraction: Mass fraction dictionary
vmr: Volume mixing ratio array
MMW: Mean molecular weight array
wl_full_resolution_HR: High-resolution wavelength array
model_full_resolution_HR: High-resolution model spectrum
depth_full_resolution_HR: High-resolution transit depth spectrum from model
Note:
Saves debug info to pickle file and prints a message.
"""
error_trpca_dictionary = {
"lcrm_mask": lcrm_mask,
"lcrm_mask_nomask": lcrm_mask_nomask,
"good_pixel": good_pixel,
"nfc": night.nfc[h],
"tell_rm_method": self.retrieval_data.tell_rm_method,
"smooth_on": night.smooth_on,
"smooth_size": night.smooth_size,
"model_reprocessing": self.retrieval_data.model_reprocessing,
"subtraction_of_the_average_spectrum": night.subtraction_of_the_average_spectrum,
"dict_calc_model": dict_calc_model,
"temperature": temperature,
"mass_fraction": mass_fraction,
"vmr": vmr,
"MMW": MMW,
"LIST_PT_PROFILE_TABLE": ConstantVariables.LIST_PT_PROFILE_TABLE,
"instruments_HR": self.atmosphere.resolution_obj.instruments_HR,
"retrieval_data": self.retrieval_data,
"wl_full_resolution_HR": wl_full_resolution_HR,
"model_full_resolution_HR": model_full_resolution_HR,
"depth_full_resolution_HR": depth_full_resolution_HR,
"stellar_spectrum": self.atmosphere.stellar_spectrum,
"stellar_spline_model_HR": self.atmosphere.stellar_spline_model_hr,
"atmospherehr": {
"pressures": self.atmosphere.pressure_data.pressures,
"line_species_complete_name_hr": self.atmosphere.species_obj.line_species_complete_name_hr,
"rayleigh_species": self.atmosphere.species_obj.rayleigh_species,
"gas_continuum_contributors": self.atmosphere.species_obj.continum_opacity,
"wavelength_boundaries": np.array([
self.atmosphere.wavelength.min_wl_hr,
self.atmosphere.wavelength.max_wl_hr
]),
"line_opacity_mode": "lbl",
"line_by_line_opacity_sampling": self.atmosphere.resolution_obj.lbl_high_res
},
"SOLAR_TO_JUPITER_MASSES": ConstantVariables.SOLAR_TO_JUPITER_MASSES,
"clight": ConstantVariables.CLIGHT,
"stellar_spectrum_type_hr": self.atmosphere.stellar_spectrum_type_hr,
"jitter_list": self.jitter_list
}
# Save debug information to pickle file
error_file_path = f"{self.retrieval_data.path_results}/error_trpca.pkl"
with open(error_file_path, "ab") as fo:
pickle.dump(error_trpca_dictionary, fo)
print(
f"TRPCA FAILED, check file: {self.retrieval_data.path_results}"
"/error_trpca.pkl\n"
)
[docs]
def calculate_det(self, params):
"""
Calculate the determinant for prior probability evaluation.
This method computes the determinant term used in Bayesian parameter
estimation, incorporating Gaussian priors for parameters that have
sigma_prior values defined.
Args:
params: Array of parameter objects
Returns:
det: Determinant value for prior probability calculation
"""
det = 1.0
for iel, param in enumerate(params):
# Skip parameters without priors
if param is None or param.sigma_prior is None:
continue
# Determine if parameter has single or multiple values
single_value = param.name not in self.list_multiple_param
value = self.get_value(params, param.name, single_value)
# Get initial parameter value for comparison
initial_value = get_param_array_initial_value(
self.initial_param_array, iel, single_value
)
sigma = params[iel].get_sigma_prior()
# Calculate Gaussian prior contribution
if single_value:
det *= np.exp(-((value - initial_value) ** 2) / (2 * sigma ** 2))
else:
# Handle array parameters (e.g., jitter, f_rot)
for index_arr in range(len(value)):
det *= np.exp(
-((value[index_arr] - initial_value[index_arr]) ** 2)
/ (2 * sigma[index_arr] ** 2)
)
return det
# fmt: off
[docs]
def high_resolution_lhood(
self,
temperature,
mass_fraction,
vmr,
MMW,
dict_calc_model
):
"""
Compute high-resolution likelihood with advanced spectral processing.
This method performs the most complex spectral processing in the atmospheric
retrieval system. It handles:
1. Full-resolution atmospheric model calculation
2. Instrumental resolution convolution
3. Stellar rotation broadening (solid body rotation)
4. Orbital dynamics and Doppler shifts
5. Telluric removal via Principal Component Analysis (PCA)
6. Chi-square likelihood evaluation against observations
The method preserves exact mathematical operations critical for
high-precision atmospheric spectroscopy and exoplanet detection.
Args:
temperature: Atmospheric temperature profile
mass_fraction: Mass fraction profiles for all species
vmr: Volume mixing ratios for all species
MMW: Mean molecular weight profile
dict_calc_model: Dictionary containing all model parameters
Returns:
Tuple containing:
- status: True if successful, False if errors occurred
- lhood: Log-likelihood value
- wl_full_resolution_HR: Full resolution wavelength array
- depth_full_resolution_HR: Full resolution spectrum
- opacity_contribution_HR: Opacity contributions (for plotting)
- lh_high_resolution: Dictionary of per-instrument per-night likelihood details
"""
# EXTRACT PARAMETERS FROM MODEL CALCULATION DICTIONARY
rp = dict_calc_model["rp"] # Planet radius
gravity = dict_calc_model["gravity"] # Surface gravity
omegad = dict_calc_model["omegad"] # Rotation velocity (log scale)
rv = dict_calc_model["rv"] # Systemic radial velocity
sf = dict_calc_model["sf"] # Single scaling factor
sf_arr = dict_calc_model["sf_arr"] # Multiple scaling factors
f_rot_arr = dict_calc_model["f_rot_arr"] # Rotation factors per night
T0 = dict_calc_model["T0"] # Reference temperature
T_low = dict_calc_model["T_low"] # Low pressure temperature
T3_node = dict_calc_model["T3_node"] # Temperature node 3
ecc = dict_calc_model["ecc"] # Orbital eccentricity
opi = dict_calc_model["opi"] # Argument of periastron
kp = dict_calc_model["kp"] # Planet velocity semi-amplitude
jitter_arr = dict_calc_model["jitter_arr"] # Noise jitter per night
dVsys_arr = dict_calc_model["dVsys_arr"] # Systemic velocity variations
# INITIALIZE VARIABLES
lhood = 0 # Total log-likelihood
csscaled = None # Spline for convolved spectrum
opacity_contribution_HR = None
lh_high_resolution = {}
# CALCULATE FULL-RESOLUTION ATMOSPHERIC MODEL
# This is the core atmospheric model calculation producing the theoretical spectrum
wl_full_resolution_HR, model_full_resolution_HR, depth_full_resolution_HR = self.atmosphere.calc_model(
temperature, mass_fraction, vmr, MMW, dict_calc_model, True,
)
# SETUP PLOTTING FOR MODEL ANALYSIS (if needed)
pdf = None
if "Model" in self.model_type:
os.system(f"mkdir -p {self.retrieval_data.path_results}/model_high_low_res/")
pdf = PdfPages(
f"{self.retrieval_data.path_results}/model_high_low_res/model_reprocessing.pdf"
)
if "Opacity" in self.model_type:
# Calculate opacity contributions for analysis
opacity_contribution_HR = self.atmosphere.opacity_contribution(
temperature, mass_fraction, MMW, dict_calc_model, True
)
# PROCESS EACH HIGH-RESOLUTION INSTRUMENT
counter_nights_total = -1 # Global night counter across all instruments
counter_derivative_params = -1
for index_instrument, instrument in enumerate(self.atmosphere.resolution_obj.instruments_HR):
lh_night = {}
# INSTRUMENTAL RESOLUTION CONVOLUTION (if no stellar rotation)
if omegad is None:
# Convolve theoretical spectrum with instrument resolution function
wl_convolved_resolution, model_convolved_resolution = convolve_resolution(
wl_full_resolution_HR, model_full_resolution_HR, instrument.hwhm_km_s, rv, self.atmosphere.rv_sampling
)
# Create spline interpolation for fast evaluation at arbitrary wavelengths
csscaled = splrep(wl_convolved_resolution, model_convolved_resolution)
# PROCESS EACH OBSERVATION NIGHT
for index_night_in_current_instrument, night in enumerate(instrument.night_arr):
counter_nights_total += 1
if index_night_in_current_instrument != 0:
counter_derivative_params += 1
# DETERMINE SCALING FACTOR FOR THIS NIGHT
scale_night = 0
if sf is not None:
# Use single scaling factor for all nights
scale_night = sf
elif sf_arr is not None:
# Use night-specific scaling factor
scale_night = sf_arr[counter_nights_total]
scale_HR = 10 ** scale_night # Convert from log to linear scale
# STELLAR ROTATION BROADENING (if included)
if omegad is not None:
omegade = 10 ** omegad # Convert rotation velocity from log scale
# Determine reference temperature for rotation kernel calculation
# Different temperature profile formats use different reference temperatures
if self.retrieval_data.format_temperature == ConstantVariables.LIST_PT_PROFILE_TABLE[3]:
T0 = T_low
elif self.retrieval_data.format_temperature == ConstantVariables.LIST_PT_PROFILE_TABLE[4]:
T0 = T3_node
# Calculate solid body rotation kernel
ker_rot = kernel_solid_body_rotation(
omegade, f_rot_arr, T0, np.mean(MMW), gravity, rp,
self.atmosphere.rad_mode, counter_nights_total, self.atmosphere.rv_sampling
)
# Apply rotation broadening to the spectrum
wl_convolved_rot, model_convolved_rot = convolve_solid_body_rotation(
wl_full_resolution_HR, model_full_resolution_HR, ker_rot, self.atmosphere.rv_sampling
)
# Apply instrumental resolution convolution after rotation
wl_convolved_resolution_after_rot, model_convolved_resolution_after_rot = convolve_resolution(
wl_convolved_rot, model_convolved_rot, instrument.hwhm_km_s, rv, self.atmosphere.rv_sampling
)
# Create spline for the rotation+resolution convolved spectrum
csscaled = splrep(wl_convolved_resolution_after_rot, model_convolved_resolution_after_rot)
# ORBITAL DYNAMICS AND RADIAL VELOCITY CALCULATION
vtot, vtot_star, _ = rv_planet_and_star(
self.retrieval_data.eccentricity, self.atmosphere.target, ecc, opi, kp, night, "Retrieval",
index_night_in_current_instrument, dVsys_arr, counter_derivative_params
)
lh_current_night = 0
# PROCESS EACH SPECTRAL ORDER
for h in range(night.n_good_orders):
# Extract observational data for this order
wavelengths_night = night.lambdas[:, h, :] # Wavelength array
spectrum_only_planet_night = night.spectra[:, h, :] # Observed spectra
error_spectrum_only_planet_night = night.sigma_spectra[:, h, :] # Error bars
good_pixels = night.maskinvm[:, h, :] == 0 # Valid pixel mask
# APPLY DOPPLER SHIFTS TO WAVELENGTH GRID
# Reshape wavelength array to match good pixels structure
wl_data_masked = wavelengths_night[np.where(good_pixels)].reshape(
-1, np.shape(good_pixels)[1]
)
# Calculate Doppler shifts for planet and star
dl_planet = wl_data_masked * vtot / self.clight # Planet velocity shift
wlen_shifted_planet = dl_planet + wl_data_masked # Planet rest frame wavelengths
dl_star = wl_data_masked * vtot_star / self.clight # Stellar velocity shift
wlen_shifted_star = dl_star + wl_data_masked # Stellar rest frame wavelengths
Fscaled = None
mol_modf = None
stellar_spectrum_HR = None
# CALCULATE OBSERVED SPECTRUM BASED ON OBSERVATION MODE
if self.atmosphere.rad_mode == 'Transmission':
# TRANSMISSION SPECTROSCOPY
# Evaluate atmospheric model at Doppler-shifted wavelengths
model_eval = splev(wlen_shifted_planet, csscaled, der=0)
# Convert from altitude to transit depth
mol_mod = model_eval / phys_const.r_jup_mean
mol_modf = (mol_mod / (self.atmosphere.target.stellar_radius * ConstantVariables.RATIO_RSUN_RJUP_MEAN)) ** 2
spectrum = 1 - mol_modf # Transit depth spectrum
else:
# EMISSION SPECTROSCOPY
# Calculate stellar spectrum contribution
stellar_spectrum_HR = splev(wlen_shifted_star*1e-7, self.atmosphere.stellar_spline_model_hr)
# Evaluate planetary emission model
model_eval = splev(wlen_shifted_planet, csscaled, der=0)
# Calculate planet-to-star flux ratio
# Stellar spc is already multiplied by np.pi to go from erg/cm^2/cm/s/sr to erg/cm^2/cm/s
Fscaled = (model_eval / stellar_spectrum_HR) * (
rp / (self.atmosphere.target.stellar_radius * phys_const.r_sun)) ** 2
spectrum = 1 + Fscaled # Combined stellar + planetary flux
if np.sum(spectrum < 0) > 0:
if "Model" in self.model_type:
os.system(f"mkdir -p {self.retrieval_data.path_results}/error_spectrum/")
with open(f"{self.retrieval_data.path_results}/error_spectrum/spectrum.pkl", "wb") as pkl_spectrum:
error_spc_pkl = {
"spectrum": spectrum,
"Fscaled": Fscaled,
"mol_modf": mol_modf,
"stellar_spectrum_HR": stellar_spectrum_HR
}
pickle.dump(error_spc_pkl, pkl_spectrum)
print(f"Negative values in spectrum for order {h} of night {index_night_in_current_instrument}.")
return False, -np.inf, wl_full_resolution_HR, depth_full_resolution_HR, opacity_contribution_HR, lh_high_resolution
# PREPARE DATA FOR TELLURIC REMOVAL VIA PCA
model_in_log = np.log10(spectrum) # Convert to log scale for PCA processing
good_pixel = np.where(good_pixels[:, -1])
good_pixel = good_pixel[0]
# SETUP PCA PROCESSING BASED ON METHOD # TODO can be moved in night_extraction
lcrm_mask_nomask = np.zeros_like(night.array_reconstructed_from_PCA_components[:, h, :])
if self.retrieval_data.model_reprocessing == "hard":
# Hard PCA: Use existing telluric removal data
lcrm_mask = night.array_reconstructed_from_PCA_components[good_pixel, h, :]
if night.subtraction_of_the_average_spectrum:
lcrm_mask = lcrm_mask + night.average_spectrum_to_be_added[h]
else: # "soft" method
# Soft PCA: Simplified approach with minimal telluric removal
lcrm_mask = np.zeros_like(night.array_reconstructed_from_PCA_components[good_pixel, h, :])
# INJECT ATMOSPHERIC MODEL INTO IN-TRANSIT DATA
# Add theoretical atmospheric signal to in-transit observations
lcrm_mask[:, night.intransit] = (
model_in_log + (lcrm_mask[:, night.intransit])
)
# EXECUTE TELLURIC REMOVAL PCA
# Apply Time-Resolved Principal Component Analysis (TRPCA)
# This removes telluric contamination while preserving the atmospheric signal
lcrm_pca, error_trpca = trpca(
lcrm_mask,
lcrm_mask_nomask,
good_pixel,
night.nfc[h], # Number of components
night.smooth_on, # Smoothing flag
night.smooth_size, # Smoothing size
night.apply_standardize,
self.retrieval_data.model_reprocessing, # Processing method
night.subtraction_of_the_average_spectrum,
night.pca_mode # PCA mode: spatial or temporal
)
if error_trpca:
if "Model" in self.model_type:
print("Error in TRPCA processing.")
# Handle PCA failure with detailed error logging
self._save_trpca_error_debug_info(
lcrm_mask, lcrm_mask_nomask, good_pixel, night, h,
dict_calc_model, temperature, mass_fraction, vmr, MMW,
wl_full_resolution_HR, model_full_resolution_HR,
depth_full_resolution_HR,
)
return False, -np.inf, wl_full_resolution_HR, depth_full_resolution_HR, opacity_contribution_HR, lh_high_resolution
# CALCULATE CHI-SQUARE LIKELIHOOD
# Extract only in-transit data for likelihood calculation
lcrm_pca = lcrm_pca[:, night.intransit]
# Add jitter noise to error bars if specified
if self.jitter_list is not None and self.jitter_list.size > 0:
errplusjit = np.sqrt(
error_spectrum_only_planet_night[good_pixel, :] ** 2 + jitter_arr[counter_nights_total] ** 2
)
else:
errplusjit = error_spectrum_only_planet_night[good_pixel, :]
if (
self.retrieval_data.mask_phase_enabled and
self.retrieval_data.mask_night_index[counter_nights_total]
):
if self.retrieval_data.mask_inside_range:
mask_phases_indices_current_night = np.where(np.logical_and(
night.phase_new_range_considered >= self.retrieval_data.min_phase,
night.phase_new_range_considered <= self.retrieval_data.max_phase
))[0]
else:
mask_phases_indices_current_night = np.where(np.logical_or(
night.phase_new_range_considered <= self.retrieval_data.min_phase,
night.phase_new_range_considered >= self.retrieval_data.max_phase
))[0]
spectrum_only_planet_night[:, mask_phases_indices_current_night] = 0
lcrm_pca[:, mask_phases_indices_current_night] = 0
errplusjit[:, mask_phases_indices_current_night] = 1
# Calculate chi-square for this spectral order
chiquadro_HR = np.sum(
(
(
spectrum_only_planet_night[good_pixel, :] # Observed data
- (lcrm_pca * scale_HR) # Scaled model
)
/ errplusjit # Error bars
)
** 2
)
n_good_pixels = len(good_pixel)
n_spectra = spectrum_only_planet_night.shape[1] # or the temporal dimension
# Total number of data points
dof = n_good_pixels * n_spectra
chi2_reduced = chiquadro_HR / dof
# Add to total log-likelihood (including error normalization)
lh_current_order = (-np.sum(np.log(errplusjit)) - 0.5 * chiquadro_HR)
lh_current_night += lh_current_order
lh_night[night.date] = f"{chiquadro_HR};{lh_current_night};{chi2_reduced};{dof};{self.bestpars_data.nfit}"
lhood += lh_current_order
# GENERATE DIAGNOSTIC PLOTS (for model analysis mode)
if "Model" in self.model_type:
# Create two-panel plot showing model injection and PCA result
fig, axs = plt.subplots(2, 1, figsize=(12, 5))
axs[0].set_title(
"Night: "
+ instrument.nights[index_night_in_current_instrument]
+ " Order: "
+ str(h)
)
# Prepare phase array for plotting (center around transit)
phforplot = night.phases_considered
phforplot[phforplot > 0.5] -= 1
# TOP PANEL: Show injected atmospheric model
model_in_logforplot = np.zeros(
np.shape(night.array_reconstructed_from_PCA_components[:, h, night.intransit])
)
# Fill good pixels with model signal, bad pixels with mean
model_in_logforplot[np.where(good_pixels[:, 0] == 1)] = model_in_log
model_in_logforplot[np.where(good_pixels[:, 0] == 0)] = np.mean(model_in_log)
# Plot model as 2D image (wavelength vs. orbital phase)
result_im = axs[0].imshow(
model_in_logforplot.T[:, 500:701],
aspect="auto",
origin="lower",
interpolation="None",
extent=[
wavelengths_night[500, 0],
wavelengths_night[701, 0],
phforplot[0],
phforplot[-1],
],
)
_ = fig.colorbar(result_im, ax=[axs[0]])
# BOTTOM PANEL: Show PCA-processed result
lcrm_pcaforplot = np.zeros(
np.shape(night.array_reconstructed_from_PCA_components[:, h, night.intransit])
)
# Fill good pixels with PCA result, bad pixels with mean
lcrm_pcaforplot[np.where(good_pixels[:, 0] == 1)] = lcrm_pca
lcrm_pcaforplot[np.where(good_pixels[:, 0] == 0)] = np.mean(lcrm_pca)
# Plot PCA result as 2D image
result_im2 = axs[1].imshow(
lcrm_pcaforplot.T[:, 500:701],
aspect="auto",
origin="lower",
interpolation="None",
extent=[
wavelengths_night[500, 0],
wavelengths_night[701, 0],
phforplot[0],
phforplot[-1],
],
)
_ = fig.colorbar(result_im2, ax=[axs[1]])
# Save plot and clean up
pdf.savefig(fig)
fig.clear()
plt.close(fig)
lh_high_resolution[instrument.name] = lh_night
# RETURN RESULTS
return True, lhood, wl_full_resolution_HR, depth_full_resolution_HR, opacity_contribution_HR, lh_high_resolution
def _bin_emission_lr(self, instrument, ind_planet, model_LR, wl_full_resolution_LR, i, rp):
# Get stellar flux for this instrument and bin
# Note: stellar_spectrum_LR[instrument.name] can be either:
# - A single integrated flux value (if stellar_spectrum_type_lr == "Integral")
# - An array of point values (if stellar_spectrum_type_lr == "Planck" or "Phoenix")
flux_star_instrument_in_bin = self.atmosphere.stellar_spectrum[instrument.name]
# Extract planet flux values within this wavelength bin
flux_planet_in_bin = model_LR[ind_planet]
# INTEGRAL METHOD: Integrate planet flux over bin and normalize by stellar flux
if self.atmosphere.stellar_spectrum_type_lr_mode == "Integral":
if len(ind_planet) == 0:
flux_planet_full_in_bin = 0
else:
if len(ind_planet) == 1:
# Create simple 2-point integration using bin boundaries
x = np.array([instrument.lr_data.lbottom[i], instrument.lr_data.ltop[i]])
y_temp = model_LR[ind_planet]
# Duplicate flux values at boundaries for integration
y = np.append(y_temp, y_temp)
else:
y = flux_planet_in_bin
x = wl_full_resolution_LR[ind_planet]
# Perform trapezoidal integration of planet flux over wavelength bin
flux_planet_full_in_bin = trapezoid(y, x)
# Calculate eclipse depth: (integrated planet flux) / (stellar flux)
eclipse_ratio_bin = flux_planet_full_in_bin / flux_star_instrument_in_bin[i]
# POINT-SAMPLED METHOD: Use flux ratios at individual wavelength points
else: # Planck or Phoenix stellar spectrum
# Direct ratio of planet to stellar flux
# stellar_spectrum_LR contains point-by-point stellar flux values
eclipse_ratio_bin = np.mean(flux_planet_in_bin / flux_star_instrument_in_bin[i])
# Scale eclipse depth by (Rp/R*)^2 to get observed contrast
# This converts from flux ratio to observable transit/eclipse depth
Fscaled = (
eclipse_ratio_bin *
(rp / (self.atmosphere.target.stellar_radius * phys_const.r_sun)) ** 2
)
# Store as relative spectrum: 1 + eclipse_depth
# (1.0 = stellar continuum, values > 1 indicate planetary emission)
return 1 + Fscaled
@staticmethod
def _bin_transmission_lr(instrument, model_after_preprocessing, depth_full_resolution_LR, ind_planet):
if instrument.lr_data.flux_norm:
value = np.mean(model_after_preprocessing[ind_planet])
else:
# Average all spectrum values within the wavelength bin
# This represents the mean transit depth in the bin
value = np.mean(depth_full_resolution_LR[ind_planet]) # already considered the 1 - transit_depth
return value
def _preprocess_model_lr_transmission(self, instrument, wl_full_resolution_LR, depth_full_resolution_LR, num_windows=40, top_window=10):
if instrument.lr_data.flux_norm:
if instrument.hwhm_km_s is not None and self.atmosphere.resolution_obj.HR_resolution > instrument.lr_data.resolution_dataset:
wl_new, spectrum_new = convolve_resolution(
wl_full_resolution_LR, depth_full_resolution_LR, instrument.hwhm_km_s, 0,
self.atmosphere.rv_sampling
)
else:
wl_new = instrument.lr_data.ldata_LR
spline_model = splrep(wl_full_resolution_LR, depth_full_resolution_LR)
spectrum_new = splev(wl_new, spline_model)
_, model_after_processing = estimate_continuum(spectrum_new, num_windows=num_windows, top_window=top_window, check_gaussian=False)
wl_after_processing = wl_new
else: # Emission or simple transit not in form of normalixed flux
wl_after_processing = wl_full_resolution_LR
model_after_processing = depth_full_resolution_LR
return wl_after_processing, model_after_processing
def _bin_spectrum_to_instrument_resolution(
self,
instrument,
wl_full_resolution_LR,
model_LR,
depth_full_resolution_LR,
rp,
):
"""
Bin a full-resolution spectrum to the instrument's spectral resolution.
This method performs spectral binning by integrating the high-resolution
atmospheric model spectrum over the wavelength bins defined by the
instrument's resolution. It handles both emission and transmission modes,
with special treatment for emission spectroscopy including stellar flux
normalization and eclipse depth calculations.
The binning process:
1. For each wavelength bin defined by the instrument
2. Find all full-resolution wavelength points within the bin
3. For emission: calculate eclipse depth using planet/star flux ratio
4. For transmission: average the spectrum values in the bin
5. Handle edge cases (e.g., zero flux) with appropriate fallbacks
Args:
instrument: Instrument object containing wavelength bin definitions
(lr_data.lbottom, lr_data.ltop, lr_data.npix)
wl_full_resolution_LR: Full-resolution wavelength array [microns]
model_LR: Full-resolution model flux array (planet flux for emission,
transit depth for transmission)
depth_full_resolution_LR: Full-resolution spectrum array
rp: Planet radius in cm
Returns:
spectrum_binned_LR: 1D array of binned spectrum values with length
equal to instrument.lr_data.npix. Values are set
to unity (1.0) where NaN occurs, representing a
neutral (featureless) spectrum.
Notes:
- For emission mode with integral stellar spectrum type, uses
trapezoidal integration over each bin
- For emission mode with point-sampled stellar spectrum (Planck/Phoenix),
uses point-by-point ratios
- NaN values in the output are replaced with 1.0 (neutral spectrum)
- The returned spectrum is in relative units (1.0 = continuum level)
"""
# Pre process data
if self.atmosphere.rad_mode == "Emission":
wl_after_processing = wl_full_resolution_LR
model_after_processing = model_LR
else:
wl_after_processing, model_after_processing = self._preprocess_model_lr_transmission(
instrument, wl_full_resolution_LR, depth_full_resolution_LR
)
if self.atmosphere.use_hr_linelists_for_lr or self.atmosphere.temporary_force_new_binning:
spectrum_binned_LR = ps.bin_spectrum(
instrument.lr_data.ldata_LR_micron, wl_after_processing / 1e3, model_after_processing,
half_widths=instrument.lr_data.step_LR_micron, gaps=None
)
if self.atmosphere.rad_mode == "Emission":
eclipse_ratio_bin = spectrum_binned_LR / self.atmosphere.stellar_spectrum[instrument.name]
spectrum_binned_LR_scaled = (
eclipse_ratio_bin *
(rp / (self.atmosphere.target.stellar_radius * phys_const.r_sun)) ** 2
)
spectrum_binned_LR = 1 + spectrum_binned_LR_scaled
else:
# Initialize binned spectrum array with unity (neutral spectrum baseline)
spectrum_binned_LR = np.ones(instrument.lr_data.npix)
# Iterate over each wavelength bin defined by the instrument
for i, (bottom, top) in enumerate(zip(instrument.lr_data.lbottom, instrument.lr_data.ltop)):
# Find indices of full-resolution wavelengths that fall within this bin
# Using open-closed interval: (bottom, top]
ind_planet = np.where(
(wl_after_processing > bottom) & (wl_after_processing <= top)
)[0]
# EMISSION SPECTROSCOPY: Calculate planet-to-star flux ratio
if self.atmosphere.rad_mode == "Emission":
spectrum_binned_LR[i] = self._bin_emission_lr(instrument, ind_planet, model_after_processing, wl_after_processing, i, rp)
# TRANSMISSION SPECTROSCOPY: Simple averaging over bin
else:
spectrum_binned_LR[i] = self._bin_transmission_lr(instrument, model_after_processing, depth_full_resolution_LR, ind_planet)
# CLEANUP: Replace any NaN values with unity (neutral spectrum)
# NaN can occur from division by zero or empty bins
spectrum_binned_LR[np.isnan(spectrum_binned_LR)] = 1
return spectrum_binned_LR
[docs]
def low_resolution_lhood(
self,
temperature,
mass_fraction,
vmr,
MMW,
dict_calc_model,
rp,
):
"""
Compute likelihood for low-resolution observations.
This method calculates the likelihood by comparing the binned theoretical
spectrum with low-resolution observational data. It handles spectral
binning, offset corrections, and chi-square calculation for each instrument.
Args:
temperature: Temperature profile
mass_fraction: Mass fraction profiles for species
vmr: Volume mixing ratios
MMW: Mean molecular weight profile
dict_calc_model: Dictionary containing model calculation parameters
rp: Planet radius in cm
Returns:
Tuple containing:
- lhood: Log-likelihood value
- wl_full_resolution_LR: Full resolution wavelength array
- depth_full_resolution_LR: Full resolution spectrum
- wl_binned_LR_complete: Binned wavelength arrays for all instruments
- spectrum_binned_LR_complete: Binned spectra for all instruments
- opacity_contribution_LR: Opacity contributions (if model plotting)
- lh_low_resolution: Dictionary of per-instrument likelihood details
"""
lhood = 0
opacity_contribution_LR = None
wl_binned_LR_complete = []
spectrum_binned_LR_complete = []
# Calculate full resolution model spectrum
wl_full_resolution_LR, model_LR, depth_full_resolution_LR = self.atmosphere.calc_model(
temperature, mass_fraction, vmr, MMW, dict_calc_model, False,
)
# Calculate opacity contributions for model plotting
if "Opacity" in self.model_type:
opacity_contribution_LR = self.atmosphere.opacity_contribution(
temperature, mass_fraction, MMW, dict_calc_model, False
)
lh_low_resolution = {}
# Process each low-resolution instrument
for index_LR, instrument in enumerate(self.atmosphere.resolution_obj.instruments_LR):
# Bin the full-resolution spectrum to instrument resolution
spectrum_binned_LR = self._bin_spectrum_to_instrument_resolution(
instrument=instrument,
wl_full_resolution_LR=wl_full_resolution_LR,
model_LR=model_LR,
depth_full_resolution_LR=depth_full_resolution_LR,
rp=rp
)
# Apply instrument-specific offset correction
offsetLR_arr = dict_calc_model["offsetLR_arr"]
offsetLR = 0 if (index_LR == 0 or offsetLR_arr is None) else offsetLR_arr[index_LR - 1]
spectrum_binned_LR += offsetLR
# Store results for output
wl_binned_LR_complete.append(instrument.lr_data.ldata_LR)
spectrum_binned_LR_complete.append(spectrum_binned_LR)
# Calculate chi-square and likelihood contribution
# Compute residuals
residuals = instrument.lr_data.rdata_LR - spectrum_binned_LR
# Select error array element-wise based on the sign of each residual:
# - positive residual → upper error (sigma_plus)
# - negative residual → lower error (sigma_minus)
sigma_used = np.where(
residuals >= 0,
instrument.lr_data.errdata_LR_plus,
instrument.lr_data.errdata_LR_minus
)
chiquadro_LR = np.sum((residuals / sigma_used) ** 2)
dof = instrument.lr_data.npix
chi2_reduced = chiquadro_LR / dof
lh_single_instrument = (-np.sum(np.log(sigma_used)) - 0.5 * chiquadro_LR)
lhood += lh_single_instrument
lh_low_resolution[instrument.name] = f"{chiquadro_LR};{lh_single_instrument};{chi2_reduced};{instrument.lr_data.npix};{self.bestpars_data.nfit}"
return (
lhood, wl_full_resolution_LR, depth_full_resolution_LR, wl_binned_LR_complete, spectrum_binned_LR_complete,
opacity_contribution_LR, lh_low_resolution
)
[docs]
def lh_function_gib(self, params):
"""
Evaluate the core likelihood function for atmospheric retrieval.
This is the HEART of the atmospheric retrieval system. It takes input
parameters, performs boundary checks, extracts all parameter values,
calculates temperature profiles, computes mass fractions, and evaluates
both high-resolution and low-resolution likelihoods.
The function preserves the exact sequence of operations critical for
atmospheric modeling and MCMC retrieval.
Args:
params: List of ParamForModel objects containing all retrieval parameters
Returns:
Tuple containing:
- lhood: Log-likelihood value (float)
- det: Determinant value for Bayesian priors (float)
- not_retrieval_obj: NotRetrieval object with model results (or None)
"""
# BOUNDARY CHECKING: Reject parameters outside valid ranges
if "Retrieval" in self.model_type:
in_bounds = np.all([
param.boundaries_check()
for param in params
if param is not None
])
if not in_bounds:
return -np.inf, 1, None
# EXTRACT ORBITAL AND OBSERVATIONAL PARAMETERS
# Radial velocity parameters
kp = self.get_value(params, 'kp') # Planet orbital velocity semi-amplitude
rv = self.get_value(params, "rv") # Systemic radial velocity
# Scaling factors for spectral fitting
sf = self.get_value(params, "sf") # Single scaling factor
sf_arr = self.get_value(params, "sf_multi", single_value=False) # Multiple scaling factors
# Orbital eccentricity parameters (Thiele-Innes elements)
h_ecc = self.get_value(params, "h_ecc") # h = e*sin(ω)
k_ecc = self.get_value(params, "k_ecc") # k = e*cos(ω)
if self.retrieval_data.eccentricity:
# Convert Thiele-Innes elements to eccentricity and argument of periastron
ecc = np.power(h_ecc, 2) + np.power(k_ecc, 2) # e² = h² + k²
opi = np.arctan2(h_ecc, k_ecc) # ω = arctan2(h, k)
# Physical constraint: eccentricity must be between 0 and 1
if not 0 <= ecc <= 1:
return -np.inf, 1, None
else:
ecc = None
opi = None
# EXTRACT ATMOSPHERIC AND CLOUD PARAMETERS
# Cloud deck parameters
Pc = self.get_value(params, "Pc") # Cloud deck pressure
Pc = None if Pc is None else 10**Pc # Convert from log to linear
# Haze and cloud properties
haze_factor = self.get_value(params, "haze_factor") # Haze enhancement factor
haze_factor = 1 if haze_factor is None else 10**haze_factor
cloud_fraction = self.get_value(params, "cloud_fraction") # Cloud coverage fraction
cloud_fraction = 1 if cloud_fraction is None else cloud_fraction
# Opacity parameters
k0 = self.get_value(params, "k0") # Reference opacity
k0 = None if k0 is None else 10**k0
gamma = self.get_value(params, "gamma") # Power law index for opacity
k_cond = self.get_value(params, "k_cond") # Condensate opacity
k_cond = None if k_cond is None else 10**k_cond
k_opac = self.get_value(params, "k_opac") # Additional opacity parameter
k_opac = None if k_opac is None else 10**k_opac
# Wavelength-dependent parameters
lambda0_micron = self.get_value(params, "lambda0_micron") # Reference wavelength
xi = self.get_value(params, "xi") # Wavelength dependency parameter
omega_scale_micron = self.get_value(params, "omega_scale_micron") # Scale factor
omega_scale_micron = None if omega_scale_micron is None else 10**omega_scale_micron
# EXTRACT ARRAY PARAMETERS (multiple values per night/instrument)
jitter_arr = self.get_value(params, "jitter", single_value=False) # Noise jitter per night
vmr_peak_arr = self.get_value(params, "vmr_peak", single_value=False) # VMR peak values
pressure_peak_arr = self.get_value(params, "pressure_peak", single_value=False) # Pressure peaks
width_peak_arr = self.get_value(params, "width_peak", single_value=False) # Peak widths
offsetLR_arr = self.get_value(params, "offsetLR", single_value=False) # LR instrument offsets
# EXTRACT PLANETARY PARAMETERS
# Reference pressure level
P_ref = self.get_value(params, "Pref")
P_ref = 0.1 if P_ref is None else 10**P_ref # Default 0.1 bar or converted from log
# Planet radius and gravity calculation
rp = self.get_value(params, "rp") # Planet radius in Jupiter radii
rp_jup = rp
rp *= phys_const.r_jup_mean # Convert to meters
gravity = ( # Surface gravity calculation
phys_const.G * (self.atmosphere.target.mass * phys_const.m_jup)
/ np.power(rp, 2)
)
# EXTRACT TEMPERATURE PROFILE PARAMETERS
# Basic temperature parameters
T0 = self.get_value(params, "T0") # Reference temperature
kappa_IR = self.get_value(params, "kappa_IR") # IR opacity
kappa_IR_e = None if kappa_IR is None else 10 ** kappa_IR
gamma_g = self.get_value(params, "gamma_g") # Adiabatic index
gamma_g_e = None if gamma_g is None else 10 ** gamma_g
T_int = self.get_value(params, "T_int") # Internal temperature
# Madhu temperature profile parameters
p1 = self.get_value(params, "p1") # Pressure point 1
p2 = self.get_value(params, "p2") # Pressure point 2
p3 = self.get_value(params, "p3") # Pressure point 3
alpha1 = self.get_value(params, "alpha1") # Temperature gradient 1
alpha2 = self.get_value(params, "alpha2") # Temperature gradient 2
# Physical constraint for Madhu profile: p1 must be < p3
if self.retrieval_data.format_temperature == "madhu" and p1 > p3:
return -np.inf, 1, None
# EXTRACT CHEMISTRY AND NODE PARAMETERS
# Atmospheric chemistry parameters
metallicity = self.get_value(params, "met") # Metallicity enhancement
c_o_ratio = self.get_value(params, "co_ratio") # Carbon-to-oxygen ratio
c_o_ratio_linear = self.get_value(params, "co_ratio_linear") # Carbon-to-oxygen ratio
si_o_ratio_linear = self.get_value(params, "sio_ratio_linear") # Si-to-oxygen ratio
c_o_ratio_final = 10 ** c_o_ratio if c_o_ratio is not None else c_o_ratio_linear
si_o_ratio_final = si_o_ratio_linear
# Temperature-pressure profile nodes
T_low = self.get_value(params, "T_low") # Low pressure temperature
T_high = self.get_value(params, "T_high") # High pressure temperature
P_low = self.get_value(params, "P_low") # Low pressure boundary
P_high = self.get_value(params, "P_high") # High pressure boundary
# Additional temperature nodes
T0_node = self.get_value(params, "T0_node")
T1_node = self.get_value(params, "T1_node")
T2_node = self.get_value(params, "T2_node")
T3_node = self.get_value(params, "T3_node")
P1_node = self.get_value(params, "P1_node")
P2_node = self.get_value(params, "P2_node")
# Physical constraint: high pressure must be greater than low pressure
if P_high is not None and P_high >= P_low:
return -np.inf, 1, None
# EXTRACT ROTATION AND ADDITIONAL PARAMETERS
# Stellar rotation and line broadening
omegad = self.get_value(params, "omega") # Rotation velocity
f_rot_arr = self.get_value(params, "f_rot", single_value=False) # Rotation factors per night
# Systemic velocity variations per night
dVsys_arr = self.get_value(params, "dVsys", single_value=False)
# Cloud microphysics parameters
std_radius_distribution = self.get_value(params, "std_radius_distribution") # Particle size distribution
cloud_fsed = self.get_value(params, "cloud_fsed") # Sedimentation efficiency
# Atmospheric mixing
eddy_diff_coeff = self.get_value(params, "eddy_diff_coeff") # Eddy diffusion coefficient
eddy_diff_coeff_e = None if eddy_diff_coeff is None else np.ones_like(
self.atmosphere.pressure_data.pressures) * (10 ** eddy_diff_coeff)
# TEMPERATURE PROFILE SETUP
# Create parameter dictionary for temperature profile calculation
parameters_tp_profile = {
"T0": ValueErrorTP(T0),
"kappa_IR": ValueErrorTP(kappa_IR_e),
"gamma_g": ValueErrorTP(gamma_g_e),
"T_int": ValueErrorTP(T_int),
"T_low": ValueErrorTP(T_low),
"T_high": ValueErrorTP(T_high),
"P_low": ValueErrorTP(P_low),
"P_high": ValueErrorTP(P_high),
"p1": ValueErrorTP(p1),
"p2": ValueErrorTP(p2),
"p3": ValueErrorTP(p3),
"alpha1": ValueErrorTP(alpha1),
"alpha2": ValueErrorTP(alpha2),
"T0_node": ValueErrorTP(T0_node),
"T1_node": ValueErrorTP(T1_node),
"T2_node": ValueErrorTP(T2_node),
"T3_node": ValueErrorTP(T3_node),
"P1_node": ValueErrorTP(P1_node),
"P2_node": ValueErrorTP(P2_node),
}
# TEMPERATURE PROFILE CALCULATION
# Create temperature profile object with all parameters
tp_profile_obj = UserTemperatureProfile(
self.atmosphere.pressure_data.pressures,
parameters_tp_profile,
gravity,
False,
None
)
# Get the appropriate temperature profile function and calculate temperature
function_tp_profile = getattr(tp_profile_obj, self.retrieval_data.format_temperature)
temperature, _ = function_tp_profile()
# ATMOSPHERIC COMPOSITION CALCULATION
# Calculate mass fractions, mean molecular weight, and VMR profiles
mass_fraction_names_hr, MMW, vmr, mean_VMR_and_MF_string, mean_VMR_and_MF_dict, negative_mass_fractions = self.calculate_mass_fraction(
temperature, metallicity, c_o_ratio_final, si_o_ratio_final,
vmr_peak_arr, pressure_peak_arr, width_peak_arr,
)
if negative_mass_fractions:
return -np.inf, 1, None
# INITIALIZE OUTPUT VARIABLES
lhood = 0 # Total log-likelihood
wl_full_resolution_HR = 0
depth_full_resolution_HR = 0
wl_full_resolution_LR = 0
depth_full_resolution_LR = 0
wl_binned_LR = 0
spectrum_binned_LR = 0
wlen_mu_contribution_HR = 0
contribution_HR = 0
wlen_mu_contribution_LR = 0
contribution_LR = 0
opacity_contribution_HR = None
opacity_contribution_LR = None
# CREATE MODEL CALCULATION DICTIONARY
# Package all parameters needed for atmospheric model calculations
dict_calc_model = {
"offsetLR_arr": offsetLR_arr,
"Pc": Pc,
"rp": rp,
"k0": k0,
"gamma": gamma,
"gravity": gravity,
"omegad": omegad,
"rv": rv,
"sf": sf,
"sf_arr": sf_arr,
"f_rot_arr": f_rot_arr,
"T0": T0,
"T_low": T_low,
"T3_node": T3_node,
"ecc": ecc,
"opi": opi,
"kp": kp,
"jitter_arr": jitter_arr,
"dVsys_arr": dVsys_arr,
"P_ref": P_ref,
"haze_factor": haze_factor,
"cloud_fraction": cloud_fraction,
"k_cond": k_cond,
"k_opac": k_opac,
"lambda0_micron": lambda0_micron,
"xi": xi,
"omega_scale_micron": omega_scale_micron,
"std_radius_distribution": std_radius_distribution,
"cloud_fsed": cloud_fsed,
"eddy_diff_coeff": eddy_diff_coeff_e,
}
lh_high_resolution = None
lh_low_resolution = None
# HIGH-RESOLUTION LIKELIHOOD CALCULATION
if self.atmosphere.resolution_obj.high_resolution():
if "Contribution" in self.model_type:
# Calculate opacity contributions for analysis
os.system(f"mkdir -p {self.retrieval_data.path_results}/contribution/")
wl_full_resolution_HR, contribution_HR = self.atmosphere.calc_model_contribution(
temperature, mass_fraction_names_hr, MMW, dict_calc_model
)
wlen_mu_contribution_HR = wl_full_resolution_HR
else:
# Calculate high-resolution likelihood
hr_likelihood = self.high_resolution_lhood(
temperature, mass_fraction_names_hr, vmr, MMW, dict_calc_model
)
all_ok = hr_likelihood[0]
lhood += hr_likelihood[1]
wl_full_resolution_HR = hr_likelihood[2]
depth_full_resolution_HR = hr_likelihood[3]
opacity_contribution_HR = hr_likelihood[4]
lh_high_resolution = hr_likelihood[5]
# Exit if high-resolution calculation failed
if not all_ok:
return lhood, 1, None
mass_fraction_names_lr = None
# LOW-RESOLUTION LIKELIHOOD CALCULATION
if self.atmosphere.resolution_obj.low_resolution():
# Prepare mass fraction names for low-resolution (different naming convention)
mass_fraction_names_lr = mass_fraction_names_hr.copy()
for counter_species, elem in enumerate(self.atmosphere.species_obj.line_species_complete_name_hr):
mass_fraction_names_lr[self.atmosphere.species_obj.line_species_complete_name_lr[counter_species]] = mass_fraction_names_lr.pop(elem)
if "Contribution" in self.model_type:
# Calculate opacity contributions for low-resolution analysis
os.system(f"mkdir -p {self.retrieval_data.path_results}/contribution/")
wl_full_resolution_LR, contribution_LR = self.atmosphere.calc_model_contribution(
temperature, mass_fraction_names_lr, MMW, dict_calc_model, False
)
wlen_mu_contribution_LR = wl_full_resolution_LR
# Calculate low-resolution likelihood
lr_likelihood = self.low_resolution_lhood(
temperature, mass_fraction_names_lr, vmr, MMW, dict_calc_model, rp
)
lhood += lr_likelihood[0]
wl_full_resolution_LR = lr_likelihood[1]
depth_full_resolution_LR = lr_likelihood[2]
wl_binned_LR = lr_likelihood[3]
spectrum_binned_LR = lr_likelihood[4]
opacity_contribution_LR = lr_likelihood[5]
lh_low_resolution = lr_likelihood[6]
# CALCULATE BAYESIAN PRIOR DETERMINANT
det = self.calculate_det(params)
# DETERMINE SPECIES LIST BASED ON CHEMISTRY MODEL
if self.atmosphere.chemistry == ConstantVariables.LIST_CHEMISTRY_TABLE[1]:
species = mass_fraction_names_hr.keys()
else:
species = list(self.atmosphere.species_compatible_with_prt)
# CREATE OUTPUT OBJECT FOR NON-RETRIEVAL MODES
not_retrieval_obj = None
if "Manual" in self.model_type or "Model" in self.model_type:
# print("Construction of Not Retrieval OBJ")
not_retrieval_obj = NotRetrieval(
mass_fraction_names_hr, mass_fraction_names_lr, MMW, vmr, mean_VMR_and_MF_string, mean_VMR_and_MF_dict, wl_full_resolution_HR, depth_full_resolution_HR,
wlen_mu_contribution_HR, contribution_HR, wl_full_resolution_LR,
depth_full_resolution_LR, wl_binned_LR, spectrum_binned_LR,
offsetLR_arr, wlen_mu_contribution_LR, contribution_LR,
self.atmosphere.chemistry, params[self.start_molec:self.start_elements],
species, opacity_contribution_HR, opacity_contribution_LR, self.atmosphere.resolution_obj.instruments_LR, rp_jup, rad_mode=self.atmosphere.rad_mode,
stellar_radius=self.atmosphere.target.stellar_radius, stellar_spectrum=self.atmosphere.stellar_spectrum,
lh_HR=lh_high_resolution, lh_LR=lh_low_resolution, HR_res_present=self.atmosphere.resolution_obj.high_resolution(),
LR_res_present=self.atmosphere.resolution_obj.low_resolution(), use_hr_linelists_for_lr=self.atmosphere.use_hr_linelists_for_lr
)
return lhood, det, not_retrieval_obj
[docs]
def parallel_chain(
self,
index_core,
j,
return_dict,
oldpars,
oldchi2,
olddet,
pars_r1,
pars_r2
):
"""
Execute parallel MCMC chain step for differential evolution.
This method implements one step of the differential evolution MCMC algorithm
in parallel. It calculates new parameter values using the DE formula and
evaluates the likelihood to decide whether to accept or reject the step.
Args:
index_core: Index of the core.
j: Chain index
return_dict: Shared dictionary for returning results
oldpars: Current parameter values for this chain
oldchi2: Current chi-square value for this chain
olddet: Current determinant value for this chain
pars_r1: Parameter values from random chain 1
pars_r2: Parameter values from random chain 2
"""
arr_chains = np.empty_like(j, dtype=StructReturnExofast)
try:
for i in range(len(j)):
# Calculate new parameter values using differential evolution formula
newpars_chain = (
oldpars[:, i]
+ self.bestpars_data.gamma_coeff * np.reshape(pars_r1[:, i] - pars_r2[:, i], (self.bestpars_data.nfit,))
+ self.random_obj.rng.standard_normal(self.bestpars_data.nfit) * self.retrieval_data.scale_vector_params / 100
)
# Create full parameter object and evaluate likelihood
param_full = self.create_param_full(newpars_chain)
newchi2, newdet, _ = self.lh_function_gib(param_full)
# Calculate acceptance probability
C = (newdet / olddet[i]) * np.exp(newchi2 - oldchi2[i])
# Accept or reject the step based on Metropolis criterion
if self.random_obj.rng.uniform() < C:
temp = StructReturnExofast(1, newpars_chain, newchi2, newdet)
else:
temp = StructReturnExofast(0, oldpars[:, i], oldchi2[i], olddet[i])
arr_chains[i] = temp
except Exception as e:
# If likelihood calculation fails, reject all steps in this batch
print(f"\n!!! Worker process {index_core} crashed in parallel_chain !!!")
print(f"Process ID: {self.retrieval_data.id_process}, Core: {index_core}")
print(f"Exception: {e}")
print(traceback.format_exc())
# print("Rejecting all steps in this batch and continuing...\n")
# # Fill remaining chains with rejected steps (keep old parameters)
# for i in range(len(j)):
# arr_chains[i] = StructReturnExofast(0, oldpars[:, i], oldchi2[i], olddet[i])
# Store result in shared dictionary
try:
return_dict[index_core] = arr_chains
except Exception as e:
print(
f"Error on process {self.retrieval_data.id_process}, "
f"Exception: {str(e)}"
)
print(traceback.format_exc())
exit()
# def parallel_chain_pool_version(self, j, oldpars, oldchi2, olddet, pars_r1, pars_r2):
# """
# Pool version of parallel_chain - returns result instead of writing to shared dict.
#
# Args:
# j: Chain index
# oldpars: Current parameter values for this chain
# oldchi2: Current chi-square value for this chain
# olddet: Current determinant value for this chain
# pars_r1: Parameter values from random chain 1
# pars_r2: Parameter values from random chain 2
#
# Returns:
# tuple: (j, StructReturnExofast_result)
# """
# # Calculate new parameter values using differential evolution formula
# newpars_chain = (
# oldpars
# + self.bestpars_data.gamma_coeff
# * np.reshape(pars_r1 - pars_r2, (self.bestpars_data.nfit,))
# + self.random_obj.rng.standard_normal(self.bestpars_data.nfit)
# * self.retrieval_data.scale_vector_params
# / 100
# )
#
# # Create full parameter object and evaluate likelihood
# param_full = self.create_param_full(newpars_chain)
# newchi2, newdet, _ = self.lh_function_gib(param_full)
#
# # Calculate acceptance probability
# C = (newdet / olddet) * np.exp(newchi2 - oldchi2)
#
# # Accept or reject the step based on Metropolis criterion
# if self.random_obj.rng.uniform() < C:
# temp = StructReturnExofast(1, newpars_chain, newchi2, newdet)
# else:
# temp = StructReturnExofast(0, oldpars, oldchi2, olddet)
#
# # Return result instead of writing to shared dict
# return j, temp
# def run_multiple_processes_pool_version(self, old_pars, old_chi, old_det):
# """
# Pool version that works EXACTLY like the original Manager version.
# One process per chain, one core per chain.
#
# Args:
# old_pars: Current parameter values for all chains
# old_chi: Current chi-square values for all chains
# old_det: Current determinant values for all chains
#
# Returns:
# return_dict: Dictionary containing results from all processes
# """
# nchains = self.bestpars_data.nchains
# ncores = self.bestpars_data.ncores
#
# # Prepare arguments for each chain
# chain_args = []
# for j in range(nchains):
# # Select two random chains different from current chain for DE
# while True:
# r1 = self.random_obj.rng.integers(0, nchains, 1)[0]
# if r1 != j:
# break
#
# while True:
# r2 = self.random_obj.rng.integers(0, nchains, 1)[0]
# if r2 != j and r2 != r1:
# break
#
# # Add arguments for this chain
# chain_args.append((
# j,
# old_pars[:, j],
# old_chi[j],
# old_det[j],
# old_pars[:, r1],
# old_pars[:, r2]
# ))
#
# # Execute in parallel - ONE PROCESS PER CHAIN, exactly like Manager
# try:
# with multiprocessing.Pool(processes=ncores) as pool:
# results = pool.starmap(self.parallel_chain_pool_version, chain_args)
#
# # Convert results back to dictionary format
# return_dict = {}
# for j, result in results:
# return_dict[j] = result
#
# return return_dict
#
# except Exception as e:
# print(f"Pool execution failed: {e}")
# # Fallback to original Manager version
# return self.run_multiple_processes_manager_version(old_pars, old_chi, old_det)
[docs]
def run_multiple_processes_manager_version(self, old_pars, old_chi, old_det):
"""
Run MCMC chain steps in parallel using multiprocessing Manager.
This method distributes chain steps across multiple processes using
differential evolution MCMC, with a shared Manager dictionary for
collecting results.
Args:
old_pars: Current parameter values for all chains, shape (nfit, nchains)
old_chi: Current chi-square values for all chains, shape (nchains,)
old_det: Current determinant values for all chains, shape (nchains,)
Returns:
return_dict: Manager dictionary mapping core index to array of
StructReturnExofast results
"""
proc = []
manager = Manager()
return_dict = manager.dict()
chain_per_core = int(self.bestpars_data.nchains / self.bestpars_data.ncores)
for i in range(self.bestpars_data.ncores):
r1 = np.zeros(chain_per_core, dtype=int)
r2 = np.zeros(chain_per_core, dtype=int)
j = np.zeros(chain_per_core, dtype=int)
for k in range(chain_per_core):
# Select two random chains different from current chain for DE
j[k] = int(i * chain_per_core + k)
while True:
r1[k] = self.random_obj.rng.integers(0, self.bestpars_data.nchains, 1)
if r1[k] != j[k]:
break
while True:
r2[k] = self.random_obj.rng.integers(0, self.bestpars_data.nchains, 1)
if r2[k] != j[k]:
break
# Create and start process for this chain
proc.append(
multiprocessing.Process(
target=self.parallel_chain,
args=(
i,
j,
return_dict,
old_pars[:, j],
old_chi[j],
old_det[j],
old_pars[:, r1],
old_pars[:, r2],
),
)
)
proc[i].start()
# Wait for all processes to complete and clean up
for i in range(self.bestpars_data.ncores):
proc[i].join()
proc[i].terminate()
return return_dict
[docs]
def run_multiple_processes(self, old_pars, old_chi, old_det, use_pool=False):
"""
Run MCMC chain steps in parallel using the Manager-based approach.
Args:
old_pars: Current parameter values for all chains, shape (nfit, nchains)
old_chi: Current chi-square values for all chains, shape (nchains,)
old_det: Current determinant values for all chains, shape (nchains,)
use_pool: Currently unused; Manager approach is always used
Returns:
return_dict: Dictionary containing results from all processes
"""
# if use_pool:
# return self.run_multiple_processes_pool_version(old_pars, old_chi, old_det)
# else:
return self.run_multiple_processes_manager_version(old_pars, old_chi, old_det)
# noinspection PyUnresolvedReferences
[docs]
def create_param_full(self, newpars_chain):
"""
Create full parameter array from chain parameter values.
This method reconstructs the complete parameter array from the reduced
set of parameters used in the MCMC chain. It handles both single-value
parameters and multi-value parameters (like jitter, f_rot arrays).
The logic preserves the exact sequence of operations:
1. Single parameters get assigned directly
2. Multi-value parameters (jitter, f_rot, etc.) get collected into arrays
Args:
newpars_chain: Array of parameter values from MCMC chain
Returns:
param_full: Complete parameter array with all values assigned
"""
param_full = self.initial_param_array
counter_newpars = 0
for i in range(len(param_full)):
if param_full[i] is not None and param_full[i].status:
# Handle single-value parameters (NOT f_rot or jitter arrays)
if param_full[i].starting_value_variable is not None:
param_full[i].value_in_retrieval = newpars_chain[counter_newpars]
counter_newpars += 1
else:
# Handle multi-value parameters (jitter, f_rot, etc.)
#
# self.list_multiple_param contains names of repeated parameters.
# This section processes parameters that have multiple instances
# (e.g., jitter1, jitter2, jitter3 -> jitter array).
#
# The logic:
# - Check if current parameter name is in multiple_param list
# - Collect all sequential values with same base name
# - Create array: [1, [2,3,4], 5, [6,7], 8] for ["kp", "jitter", "omega", "f_rot", "H2O"]
arr_same_param = []
for multiple_param in self.list_multiple_param:
if (
multiple_param
in self.bestpars_data.list_bestpars[counter_newpars]
):
# Collect all consecutive parameters with same base name
while (
multiple_param
in self.bestpars_data.list_bestpars[counter_newpars]
):
arr_same_param.append(newpars_chain[counter_newpars])
counter_newpars += 1
if counter_newpars >= self.bestpars_data.nfit:
break
# Assign collected array to parameter
param_full[i].value_arr_in_retrieval = arr_same_param
break
return param_full