Source code for GUIBRUSHR.Retrieval.ModelCalculation.ModelData

"""
ModelData Module

This module contains the core ModelData class for atmospheric retrieval calculations.
It handles parameter management, model calculations, and likelihood evaluations for
both high-resolution and low-resolution observations.

This is the CORE of the code - all logic and sequence of operations are preserved.
"""

import os
import pickle
import traceback
import multiprocessing
from pathlib import Path
from multiprocessing import Manager
from re import sub

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from petitRADTRANS import physical_constants as phys_const
import pyratbay.atmosphere as pa
from scipy.integrate import trapezoid
from scipy.interpolate import splrep, splev
import pyratbay.spectrum as ps

from GUIBRUSHR.General_Constants.Classes.UserTemperatureProfile import UserTemperatureProfile
from GUIBRUSHR.General_Constants.Classes.ValueErrorTP import ValueErrorTP
from GUIBRUSHR.General_Constants.FunctionsAndConstants.Constant_Variables import ConstantVariables
from GUIBRUSHR.General_Constants.Classes.Instrument import Instrument
from GUIBRUSHR.General_Constants.Classes.Night import Night
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.Atmosphere import Atmosphere
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.Retrieval import Retrieval
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.Bestpars import Bestpars
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.Random import Random
from GUIBRUSHR.General_Constants.Classes.Target import Target
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.ParamForModel import ParamForModel
from GUIBRUSHR.Retrieval.ModelCalculation.Classes.NotRetrieval import NotRetrieval
from GUIBRUSHR.Retrieval.ExofastMCMC.StructReturnExofast import StructReturnExofast
from GUIBRUSHR.General_Constants.FunctionsAndConstants.general_functions import (
    get_csv_value, convolve_resolution, trpca, compute_adjusted_abundance,
    convolve_solid_body_rotation, kernel_solid_body_rotation, rv_planet_and_star, estimate_continuum
)

[docs] def get_param_array_initial_value(array, index, single_value=True): """ Get initial parameter value from array at specified index. Args: array: Parameter array index: Index to retrieve single_value: If True, return single value, else return array Returns: Initial parameter value or None if not found """ if array[index] is None: return None if single_value: return array[index].get_starting_value() else: return array[index].get_starting_value_arr()
[docs] class ModelData: """ Core class for atmospheric retrieval model data and calculations. This class manages all aspects of atmospheric modeling including: - Parameter initialization and management - Model calculations for both high and low resolution - Likelihood evaluations - MCMC chain operations Attributes: path_default: Default path for operations params_list: List of all possible parameters list_multiple_param: Parameters that can have multiple values clight: Speed of light constant model_type: Type of model (Retrieval, Model, etc.) atmosphere: Atmosphere object for calculations retrieval_data: Retrieval configuration data bestpars_data: Best parameters data random_obj: Random number generator object """
[docs] def __init__( self, path_params=None, path_df=None, id_process=None, table_output_file=None, minwlen_lr=None, maxwlen_lr=None, minwlen_hr=None, maxwlen_hr=None, model_type="Retrieval", lbl_sampling_hr=None, lbl_sampling_lr=None, range_min=None, range_max=None, nlayers=None, manual_model_obj=None, load_new_opacities=True, path_default=None, ): """ Initialize ModelData object. Args: path_params: Path to parameters file path_df: Path to dataframe file id_process: Process ID for parallel operations table_output_file: Output file for table data minwlen_lr: Minimum wavelength for low resolution maxwlen_lr: Maximum wavelength for low resolution minwlen_hr: Minimum wavelength for high resolution maxwlen_hr: Maximum wavelength for high resolution model_type: Type of model calculation lbl_sampling_hr: Line-by-line sampling parameter hr lbl_sampling_lr: Line-by-line sampling parameter lr range_min: Minimum pressure range range_max: Maximum pressure range nlayers: Number of atmospheric layers manual_model_obj: Manual model object for direct initialization load_new_opacities: Whether to load new opacity data path_default: Default working directory path """ # Initialize core attributes self.lbl_sampling_hr = lbl_sampling_hr self.lbl_sampling_lr = lbl_sampling_lr os.chdir(path_default) self.path_default = path_default self.params_list = ConstantVariables.params_list self.list_multiple_param = ConstantVariables.LIST_MULTIPLE_PARAM self.clight = ConstantVariables.CLIGHT # Initialize data objects self.smooth_data = None self.bestpars_data = None self.retrieval_data = None self.atmosphere = None self.random_obj = None self.wlen_list_overplot = None self.jitter_list = None self.table_output_file = None # Initialize parameter array and indices self.initial_param_array = [None for _ in self.params_list] self.start_general_1 = self.get_index("kp") self.start_elements = self.get_index(ConstantVariables.LIST_ELEMENT_FOR_HYBRID[0]) self.start_molec = self.get_index("H2") self.start_condensed = ConstantVariables.start_condensed_molecs # Store configuration parameters self.model_type = model_type # Initialize based on input method if path_params is not None: # Initialize from parameter files df_parameters = self.read_df_parameters(path_params) self.read_df_information( path_df, df_parameters, id_process, table_output_file, minwlen_lr, maxwlen_lr, minwlen_hr, maxwlen_hr, range_min, range_max, nlayers ) else: # Initialize from manual model object self.populate_from_manual_model( manual_model_obj, load_new_opacities, minwlen_lr, maxwlen_lr, minwlen_hr, maxwlen_hr, range_min, range_max, nlayers )
[docs] def populate_from_manual_model( self, manual_model_obj, load_new_opacities, minwlen_lr, maxwlen_lr, minwlen_hr, maxwlen_hr, range_min_press=None, range_max_press=None, nlayers=None, ): """ Populate model data from a manual model object. This method initializes the model using parameters from a manual model object instead of reading from parameter files. It processes both parameter arrays and molecule lists to set up the atmospheric model. Args: manual_model_obj: Manual model object containing parameters and molecules load_new_opacities: Whether to load new opacity data minwlen_lr: Minimum wavelength for low resolution maxwlen_lr: Maximum wavelength for low resolution minwlen_hr: Minimum wavelength for high resolution maxwlen_hr: Maximum wavelength for high resolution range_min_press: Minimum pressure range range_max_press: Maximum pressure range nlayers: Number of atmospheric layers """ # Initialize parameter lists scale = [] rayleigh_species = [] line_species = [] line_species_isotope = [] line_species_complete_name_hr = [] line_species_complete_name_lr = [] list_bestpars = [] list_bestpars_initial_value = [] list_fixed = [] list_condensed_molecules = [] isotope = None # Process parameters from manual model object for elem in manual_model_obj.param_array: if elem is not None and elem.is_considered(): # Extract parameter information name, name_for_list_molec, molec_formula = elem.get_name() mass_elem = elem.mass_molec value_during_retrieval = elem.get_value() constant_VMR = elem.VMR rayleigh = elem.get_rayleigh_value() # Add parameter to model parameters = self.add_param_manual_model( name, value_during_retrieval, constant_VMR, mass_elem, molec_formula, name_for_list_molec, scale, rayleigh, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, isotope, list_fixed, list_condensed_molecules ) # Update parameter lists with returned values ( scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules ) = parameters # Process molecules from manual model object molec_list = manual_model_obj.molecs for molec in molec_list.split("\n"): string_comp = molec.split(",") # Parse molecule information name = molec_formula = string_comp[0] isotope = string_comp[1] name_for_list_molec = string_comp[2] mass_elem = float(string_comp[4]) value_during_retrieval = float(string_comp[5]) constant_VMR = int(string_comp[6]) rayleigh = int(string_comp[8]) # Add molecule parameter to model parameters = self.add_param_manual_model( name, value_during_retrieval, constant_VMR, mass_elem, molec_formula, name_for_list_molec, scale, rayleigh, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, isotope, list_fixed, list_condensed_molecules ) # Update parameter lists with returned values ( scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules ) = parameters # Process molecules from manual model object condensed_list = manual_model_obj.condensed if condensed_list != "": for condensed in condensed_list.split("\n"): string_comp = condensed.split(",") # Parse molecule information name = name_for_list_molec = string_comp[0] molec_formula = string_comp[-1] isotope = None mass_elem = float(string_comp[2]) value_during_retrieval = float(string_comp[3]) constant_VMR = True rayleigh = False # Add molecule parameter to model parameters = self.add_param_manual_model( name, value_during_retrieval, constant_VMR, mass_elem, molec_formula, name_for_list_molec, scale, rayleigh, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, isotope, list_fixed, list_condensed_molecules ) # Update parameter lists with returned values ( scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules ) = parameters # Process molecules from manual model object hybrid_list = manual_model_obj.hybrid if hybrid_list != "": for hybrid in hybrid_list.split("\n"): string_hybrid = hybrid.split(",") value_during_retrieval = float(string_hybrid[2]) name = name_for_list_molec = string_hybrid[0] molec_formula = None isotope = None mass_elem = None constant_VMR = True rayleigh = False # Add molecule parameter to model parameters = self.add_param_manual_model( name, value_during_retrieval, constant_VMR, mass_elem, molec_formula, name_for_list_molec, scale, rayleigh, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, isotope, list_fixed, list_condensed_molecules ) # Update parameter lists with returned values ( scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules ) = parameters # Initialize core objects self.random_obj = Random(1990) # not needed in forward model self.bestpars_data = Bestpars(list_bestpars, list_bestpars_initial_value) # Initialize atmosphere if needed if load_new_opacities: self.atmosphere = Atmosphere() # Update atmosphere parameters self.atmosphere.update_params( line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_condensed_molecules, rayleigh_species, range_min_press, range_max_press, nlayers, lbl_high_res=self.lbl_sampling_hr, lbl_low_res=1, manual_model_obj=manual_model_obj, chemistry=manual_model_obj.chemistry, start_molecs=self.start_molec ) # Initialize retrieval data self.retrieval_data = Retrieval(scale, manual_model_obj=manual_model_obj) # Set up opacities and wavelength ranges if needed if load_new_opacities: self.atmosphere.set_wl_range( minwlen_lr=minwlen_lr, maxwlen_lr=maxwlen_lr, minwlen_hr=minwlen_hr, maxwlen_hr=maxwlen_hr, ) self.atmosphere.read_opacities( table_output_file=manual_model_obj.table_output_file, path_target=manual_model_obj.path_targets, stellar_spectrum_type_hr=manual_model_obj.path_stellar_spectrum )
[docs] def add_param_manual_model( self, name, value_during_retrieval, constant_VMR, mass_elem, molec_formula, name_for_list_molec, scale, rayleigh, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, isotope, list_fixed, list_condensed_molecules ): """ Add a parameter from manual model to the parameter management system. This method processes a single parameter from a manual model object and adds it to the appropriate parameter lists based on its properties. Args: name: Parameter name value_during_retrieval: Parameter value for retrieval constant_VMR: Whether VMR is constant mass_elem: Molecular mass molec_formula: Molecular formula name_for_list_molec: Name for molecule list scale: Scale factor list rayleigh: Whether Rayleigh scattering applies rayleigh_species: List of Rayleigh species line_species: List of line species line_species_isotope: List of isotopes line_species_complete_name_hr: List of complete species names HR line_species_complete_name_lr: List of complete species names LR list_bestpars: List of best parameters list_bestpars_initial_value: List of initial values isotope: Isotope information list_fixed: List of fixed parameters list_condensed_molecules: List of condensed molecules Returns: Tuple of updated parameter lists """ # Set default parameter properties for manual model in_best_pars = 0 scale_param = 0 sigma_prior_param = None is_molec = mass_elem is not None range_param = np.inf opacity_name_lr = isotope # Use parameter management system to process the parameter return self.parameter_management( name, in_best_pars, scale_param, constant_VMR, -range_param, range_param, is_molec, molec_formula, mass_elem, name_for_list_molec, rayleigh, value_during_retrieval, sigma_prior_param, isotope, opacity_name_lr, scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules )
[docs] def read_df_parameters(self, path_df_parameters): """ Read df_parameters.csv file containing the fitting parameters. This method reads a CSV file containing parameter definitions and processes each parameter to build the complete parameter structure for the model. Args: path_df_parameters: Path to the parameters CSV file Returns: Tuple containing all processed parameter information: - mass_vector: List of molecular masses - scale: Array of scale factors - rayleigh_species: List of Rayleigh scattering species - line_species: List of line species - line_species_isotope: List of isotope information - line_species_complete_name_hr: List of complete HR species names - line_species_complete_name_lr: List of complete LR species names - list_bestpars: List of best fit parameters - list_bestpars_initial_value: List of initial parameter values - list_fixed: List of fixed parameters - list_condensed_molecules: List of condensed phase molecules """ # Initialize parameter lists rayleigh_species = [] line_species = [] line_species_isotope = [] line_species_complete_name_hr = [] line_species_complete_name_lr = [] list_condensed_molecules = [] list_bestpars = [] list_bestpars_initial_value = [] list_fixed = [] scale = [] mass_vector = [] # Read and process CSV file df_parameters = pd.read_csv(path_df_parameters) df_parameters = df_parameters.where(pd.notnull(df_parameters), None) for _, row in df_parameters.iterrows(): name_for_retrieval = row.get('name') name_for_list_molec = row.get('name') molec_param = int(row.get('molec')) value_during_retrieval = float(row.get('value')) scale_param = float(row.get('scale')) range_min_param = float(row.get('range_min')) range_max_param = float(row.get('range_max')) rayleigh_species_param = int(row.get('rayleigh_species')) in_bestpars_param = int(row.get('in_bestpars')) mass_param = row.get('mass') sigma_prior_param = row.get('sigma_prior') molec_formula_param = row.get('molec_formula') constant_vmr_param = row.get('constant_vmr') isotope = row.get('isotope') opacity_name_lr = row.get('opacity_name_lr') or isotope # Process mass parameter if mass_param is None or np.isnan(mass_param): mass_param = None else: mass_param = float(mass_param) mass_vector.append(mass_param) # Process sigma prior parameter if sigma_prior_param is None or np.isnan(sigma_prior_param): sigma_prior_param = None else: sigma_prior_param = float(sigma_prior_param) # Process molecule formula and name if molec_param == 0: molec_formula_param = None elif molec_param == 1: name_for_retrieval = molec_formula_param # Use parameter management to process this parameter ( scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules ) = self.parameter_management( name_for_retrieval, in_bestpars_param, scale_param, constant_vmr_param, range_min_param, range_max_param, molec_param, molec_formula_param, mass_param, name_for_list_molec, rayleigh_species_param, value_during_retrieval, sigma_prior_param, isotope, opacity_name_lr, scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules ) # Finalize processing scale = np.array(scale) if len(list_condensed_molecules) == 0: list_condensed_molecules = None return ( mass_vector, scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules )
# noinspection PyUnresolvedReferences
[docs] def parameter_management( self, name_for_retrieval, in_bestpars_param, scale_param, constant_vmr_param, range_min_param, range_max_param, molec_param, molec_formula_param, mass_param, name_for_list_molec, rayleigh_species_param, value_during_retrieval, sigma_prior_param, isotope, opacity_name_lr, scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules ): """ Manage parameter processing and categorization for the retrieval system. This method handles the complex logic of processing individual parameters and categorizing them into appropriate lists based on their properties. It manages parameter creation, updates, and classification into different species types (Rayleigh, line species, condensed molecules). Args: name_for_retrieval: Parameter name used in retrieval in_bestpars_param: Whether parameter is in best parameters list scale_param: Scale factor for parameter constant_vmr_param: Whether VMR is constant range_min_param: Minimum parameter range range_max_param: Maximum parameter range molec_param: Whether parameter is molecular molec_formula_param: Molecular formula mass_param: Molecular mass name_for_list_molec: Name for molecule list rayleigh_species_param: Whether parameter is Rayleigh species value_during_retrieval: Parameter value during retrieval sigma_prior_param: Sigma prior for parameter isotope: Isotope information opacity_name_lr: Opacity name for low resolution scale: Scale factors list rayleigh_species: Rayleigh species list line_species: Line species list line_species_isotope: Isotope list line_species_complete_name_hr: Complete species names list HR line_species_complete_name_lr: Complete species names list LR list_bestpars: Best parameters list list_bestpars_initial_value: Initial values list list_fixed: Fixed parameters list list_condensed_molecules: Condensed molecules list Returns: Tuple of updated parameter lists """ # Resolve parameter name (handle numbered parameters like jitter1, jitter2) if name_for_retrieval not in self.params_list: name = sub(r"\d+", "", name_for_retrieval) else: name = name_for_retrieval # Create parameter object if it doesn't exist if self.initial_param_array[self.get_index(name)] is None: self.initial_param_array[self.get_index(name)] = ParamForModel( name, in_bestpars_param, scale_param, constant_vmr_param, range_min_param, range_max_param, molec_param, molec_formula_param, mass_param, name_for_list_molec, isotope, ) # Update parameter values based on parameter type if name in self.list_multiple_param: # Handle multi-value parameters (jitter, f_rot, etc.) self.initial_param_array[self.get_index(name)].append_elem( value_during_retrieval, sigma_prior_param ) else: # Handle single-value parameters self.initial_param_array[self.get_index(name)].update_starting_value( value_during_retrieval, sigma_prior_param ) # Categorize parameter for fitting vs. fixed if in_bestpars_param: list_bestpars.append(name_for_retrieval) list_bestpars_initial_value.append(value_during_retrieval) scale.append(scale_param) else: list_fixed.append(name_for_retrieval) # Categorize parameter by species type if int(rayleigh_species_param) == 2: # Both Rayleigh scattering and line opacity species rayleigh_species.append(name_for_retrieval) if molec_param and name.replace("_", "") in ConstantVariables.ALL_MOLEC: line_species.append(name_for_retrieval) line_species_isotope.append(isotope) line_species_complete_name_hr.append(name_for_list_molec) line_species_complete_name_lr.append(opacity_name_lr) elif int(rayleigh_species_param) == 1: # Rayleigh scattering species only rayleigh_species.append(name_for_retrieval) elif molec_param and name.replace("_", "") in ConstantVariables.ALL_MOLEC: # Line opacity species line_species.append(name_for_retrieval) line_species_isotope.append(isotope) line_species_complete_name_hr.append(name_for_list_molec) line_species_complete_name_lr.append(opacity_name_lr) elif name_for_retrieval in ConstantVariables.ALL_CONDENSED_MOLEC: # Condensed phase molecules list_condensed_molecules.append(name_for_retrieval) return ( scale, rayleigh_species, line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_bestpars, list_bestpars_initial_value, list_fixed, list_condensed_molecules )
[docs] def read_df_information( self, path_df_information, df_parameters, id_process, table_output_file, minwlen_lr, maxwlen_lr, minwlen_hr, maxwlen_hr, range_min, range_max, nlayers ): """ Read and process general information from configuration CSV file. This method reads the main configuration file containing instrument setup, atmospheric parameters, target properties, and retrieval settings. It initializes all core objects needed for the atmospheric retrieval including instruments, atmosphere, target, and retrieval configuration. Args: path_df_information: Path to the general information CSV file df_parameters: Processed parameter data from read_df_parameters id_process: Process ID for parallel operations table_output_file: Output file for logging minwlen_lr: Minimum wavelength for low resolution maxwlen_lr: Maximum wavelength for low resolution minwlen_hr: Minimum wavelength for high resolution maxwlen_hr: Maximum wavelength for high resolution range_min: Minimum pressure range override (overrides CSV value if provided) range_max: Maximum pressure range override (overrides CSV value if provided) nlayers: Number of atmospheric layers for retrieval """ # Extract processed parameter data from previous step mass_vector = df_parameters[0] scale = df_parameters[1] rayleigh_species = df_parameters[2] line_species = df_parameters[3] line_species_isotope = df_parameters[4] line_species_complete_name_hr = df_parameters[5] line_species_complete_name_lr = df_parameters[6] list_bestpars = df_parameters[7] list_bestpars_initial_value = df_parameters[8] list_fixed = df_parameters[9] list_condensed_molecules = df_parameters[10] # Read general configuration information df_general_info = pd.read_csv(path_df_information) # Set up petitRADTRANS data path environment input_data_folder = get_csv_value(df_general_info, "path_petitRadTrans") if input_data_folder[-1] != os.sep: input_data_folder += os.sep os.environ["pRT_input_data_path"] = input_data_folder # Initialize high-resolution instruments instrument_HR = [] instrument_hr_str = get_csv_value(df_general_info, "instruments_hr_list") total_nights = 0 total_hr_instruments = 0 min_hwhm_hr = 999 # use_hr_linelist_for_lr = get_csv_value(df_general_info, "use_hr_linelist_for_lr") use_hr_linelist_for_lr = use_hr_linelist_for_lr == "1" if instrument_hr_str != "Nothing": # Parse instrument string format: name&path&pixel_dv&min_wl&max_wl&nights list_instruments = instrument_hr_str.split("|") for index_inst, curr_instrument in enumerate(list_instruments): curr_instrument = curr_instrument.split("&") pixel_dv = float(curr_instrument[2]) min_hwhm_hr = min(min_hwhm_hr, pixel_dv) instrument_HR.append( Instrument( curr_instrument[0], curr_instrument[1], "HR", pixel_dv, float(curr_instrument[3]), float(curr_instrument[4]), nights=curr_instrument[5] ) ) total_hr_instruments += 1 total_nights += instrument_HR[index_inst].ndate # Initialize low-resolution instruments instrument_LR = [] instrument_lr_str = get_csv_value(df_general_info, "instruments_lr_list") if instrument_lr_str != "Nothing": list_instruments = instrument_lr_str.split("|") for curr_instrument in list_instruments: curr_instrument = curr_instrument.split("&") # Handle optional pixel_dv parameter if curr_instrument[2] != "": pixel_dv = float(curr_instrument[2]) # min_hwhm_lr = min(min_hwhm_lr, pixel_dv) else: pixel_dv = None # Handle optional wavelength ranges min_wl = float(curr_instrument[3]) if curr_instrument[3] != "" else None max_wl = float(curr_instrument[4]) if curr_instrument[4] != "" else None instrument_LR.append( Instrument( curr_instrument[0], curr_instrument[1], "LR", pixel_dv, min_wl, max_wl ) ) # chemistry = get_csv_value(df_general_info, "chemistry") # chemcat_bool = get_csv_value(df_general_info, "chemcat_bool") # chemcat_bool = chemcat_bool == "True" # format_temperature = get_csv_value(df_general_info, "t_p_profile") # continum_opacity_arr = get_csv_value( df_general_info, "continum_opacity" ) continum_opacity = continum_opacity_arr.split(",") # maxsteps = int(get_csv_value(df_general_info, "maxsteps")) # seme = int(get_csv_value(df_general_info, "seed")) # multiplier_chains = get_csv_value(df_general_info, "multiplier_chains") multiplier_chains = 2 if multiplier_chains is None else int(multiplier_chains) # multiplier_cores = get_csv_value(df_general_info, "multiplier_cores") multiplier_cores = 1 if multiplier_cores is None else int(multiplier_cores) # use_pool = get_csv_value(df_general_info, "use_pool") use_pool = False if use_pool is None else int(use_pool) # use_parallel_init = get_csv_value(df_general_info, "use_parallel_init") use_parallel_init = False if use_parallel_init is None else int(use_parallel_init) # order_sel = get_csv_value(df_general_info, "order_sel") # # Transmission or emission rad_mode = get_csv_value(df_general_info, "rad_mode") # target = Target(path_df_information, rad_mode=rad_mode) path_results = get_csv_value(df_general_info, "path_results") path_target = Path(path_results).parent.parent # ecc_opi = get_csv_value(df_general_info, "ecc_opi") ecc_opi = ecc_opi == "1" # tell_rm_method = get_csv_value(df_general_info, "tell_rm_method") # resolution = get_csv_value(df_general_info, "resolution") # stellar_spectrum_type_hr = get_csv_value(df_general_info, "stellar_spectrum_type_hr") stellar_spectrum_type_lr = get_csv_value(df_general_info, "stellar_spectrum_type_lr") stellar_spectrum_type_lr_mode = get_csv_value(df_general_info, "stellar_spectrum_type_lr_mode") if stellar_spectrum_type_hr == "None": stellar_spectrum_type_lr, = stellar_spectrum_type_hr = get_csv_value(df_general_info, "stellar_spectrum") print("stellar_spectrum_type_lr:", stellar_spectrum_type_lr, " Mode:", stellar_spectrum_type_lr_mode) print("stellar_spectrum_type_hr:", stellar_spectrum_type_hr) if rad_mode == "Transmission": stellar_spectrum_type_hr = None stellar_spectrum_type_lr = None # range_min_pressures = ( float(get_csv_value(df_general_info, "range_min_pressures")) if range_min is None else range_min ) # range_max_pressures = ( float(get_csv_value(df_general_info, "range_max_pressures")) if range_max is None else range_max ) # layers = ( int(get_csv_value(df_general_info, "layers")) if nlayers is None else nlayers ) # lbl_hr = get_csv_value(df_general_info, "lbl_hr") if lbl_hr is None: lbl_hr = int(get_csv_value(df_general_info, "lbl")) # RETROCOMPATIBILITY else: lbl_hr = int(lbl_hr) lbl_hr = ( lbl_hr if self.lbl_sampling_hr is None else self.lbl_sampling_hr ) # lbl_lr = get_csv_value(df_general_info, "lbl_lr") lbl_lr = ( lbl_lr if self.lbl_sampling_lr is None else self.lbl_sampling_lr ) if lbl_lr is None: lbl_lr = 4 else: lbl_lr = int(lbl_lr) # model_reprocessing = get_csv_value(df_general_info, "model_reprocessing") # standardize_PCA = get_csv_value(df_general_info, "standardize_PCA") if standardize_PCA is None: standardize_PCA = "False" # temp_min = get_csv_value(df_general_info, "temp_min") temp_max = get_csv_value(df_general_info, "temp_max") temp_step = get_csv_value(df_general_info, "temp_step") if temp_step is not None: temp_min = float(temp_min) temp_max = float(temp_max) temp_step = float(temp_step) # rv_sampling = get_csv_value(df_general_info, "rv_sampling") rv_sampling = float(rv_sampling) if rv_sampling is not None else 0.1 # self.random_obj = Random(seme) # retrieval_model = get_csv_value(df_general_info, "retrieval_model") # # Mask phase settings (with backup mode for older configurations) mask_phase_enabled = get_csv_value(df_general_info, "mask_phase_enabled") if mask_phase_enabled is None: mask_phase_enabled = False min_phase = None max_phase = None mask_night_index = None mask_inside_range = None else: mask_phase_enabled = mask_phase_enabled == "1" mask_night = get_csv_value(df_general_info, "mask_night") mask_night_index = [int(value) for value in mask_night.split(",")] min_phase = float(get_csv_value(df_general_info, "min_phase")) max_phase = float(get_csv_value(df_general_info, "max_phase")) mask_inside_range = get_csv_value(df_general_info, "which_mask") if mask_inside_range is None or mask_inside_range == "Inside range": mask_inside_range = True else: mask_inside_range = False transit_limit_mode = get_csv_value(df_general_info, "transit_limit_mode") if transit_limit_mode is None: transit_limit_mode = "T14" # # fmt: off self.bestpars_data = Bestpars( list_bestpars, list_bestpars_initial_value, multiplier_chains, multiplier_cores, use_pool, use_parallel_init ) # additional_opac = False for name in ["k_opac", "k_cond", "lambda0_micron", "omega_scale_micron", "xi"]: if name in list_bestpars or name in list_fixed: additional_opac = True # self.atmosphere = Atmosphere() self.atmosphere.update_params( line_species, line_species_isotope, line_species_complete_name_hr, line_species_complete_name_lr, list_condensed_molecules, rayleigh_species, range_min_pressures, range_max_pressures, layers, lbl_high_res=lbl_hr, lbl_low_res=lbl_lr, resolution=resolution, instruments_HR=instrument_HR, instruments_LR=instrument_LR, continum_opacity=continum_opacity, mass_vector=mass_vector, target=target, temp_min=temp_min, temp_max=temp_max, temp_step=temp_step, initial_params=self.initial_param_array, path_default=self.path_default, rad_mode=rad_mode, retrieval_model=retrieval_model, chemistry=chemistry, rv_sampling=rv_sampling, start_molecs=self.start_molec, additional_opac=additional_opac ) self.retrieval_data = Retrieval( scale, ecc_opi=ecc_opi, tell_rm_method=tell_rm_method, model_reprocessing=model_reprocessing, standardize_PCA=standardize_PCA, format_temperature=format_temperature, table_output_file=table_output_file, maxsteps=maxsteps, order_sel=order_sel, id_process=id_process, path_results=path_results, retrieval_model=retrieval_model, total_nights=total_nights, total_hr_instruments=total_hr_instruments, mask_phase_enabled=mask_phase_enabled, mask_night_index=mask_night_index, min_phase=min_phase, max_phase=max_phase, mask_inside_range=mask_inside_range, transit_limit_mode=transit_limit_mode ) self.night_data_extraction() self.atmosphere.set_wl_range( minwlen_lr=minwlen_lr, maxwlen_lr=maxwlen_lr, minwlen_hr=minwlen_hr, maxwlen_hr=maxwlen_hr, ) if "Retrieval" in self.model_type: with open(table_output_file, "w") as f: f.write(f"Initializing {retrieval_model} module") if retrieval_model == 'petitRADTRANS': self.atmosphere.read_opacities( table_output_file=table_output_file , path_target=path_target, stellar_spectrum_type_lr=stellar_spectrum_type_lr, stellar_spectrum_type_hr=stellar_spectrum_type_hr, min_hwhm_hr=min_hwhm_hr, instrument_LR=instrument_LR, stellar_spectrum_type_lr_mode=stellar_spectrum_type_lr_mode, use_hr_linelists_for_lr=use_hr_linelist_for_lr ) elif retrieval_model == 'PyratBay': output_folder = str(Path(table_output_file).parent) self.atmosphere.init_pyratbay(output_folder, self.initial_param_array)
[docs] def night_data_extraction(self): """ Extract and process night observation data for each instrument. This method handles the extraction and organization of night observation data, including path setup, file archiving (for retrieval mode), and creation of Night objects for each observation date. The method handles different processing modes (Model, Manual, Retrieval) with appropriate file handling. """ tar_folder = None # Set up temporary folder for model plotting if "Model" in self.model_type: username = os.environ.get("USER") tar_folder = str( Path(self.path_default, "GUIBRUSHR", "Files", "Temp_Files_GUI", username, "tar_unpack") ) # Initialize wavelength list for overplotting self.wlen_list_overplot = [] # Exit if no high-resolution instruments available if self.atmosphere.resolution_obj.instruments_HR is None: return # Process each high-resolution instrument for instrument in self.atmosphere.resolution_obj.instruments_HR: for i in range(instrument.ndate): # Handle Model and Manual modes (use extracted tar files) if "Model" in self.model_type or "Manual" in self.model_type: path_night_reference = str( Path(tar_folder, instrument.name) ) path_order_tell = str( Path(path_night_reference, instrument.dates[i]) ) if "Model" in self.model_type: # Clean and extract tar archive for model plotting os.system("rm -rf " + path_order_tell) os.system("mkdir -p " + path_order_tell) os.system( "tar -xzf " + self.retrieval_data.path_results + instrument.dates[i] + "_" + instrument.name + ".tar.gz --transform='s/.*\///' -C " + path_order_tell ) path_order_test = path_order_tell path_good_order = str( Path(path_order_test, "good_order_" + self.retrieval_data.order_sel + ".pkl") ) else: # Handle Retrieval mode (create and archive files) # Set up paths for order selection and processing path_night_reference = instrument.path_instrument path_order = str(Path( instrument.path_instrument, instrument.dates[i], "orders_selection", self.retrieval_data.order_sel )) # Define paths for different processing stages path_good_order = str( Path(path_order, "good_order_" + self.retrieval_data.order_sel + ".pkl") ) path_order_tell = str( Path(path_order, self.retrieval_data.tell_rm_method) ) path_order_test = str(Path(path_order_tell, "test")) # Define paths for observation files path_aligned = str( Path(instrument.path_instrument, instrument.dates[i], "aligned.fits") ) path_wlen = str( Path(instrument.path_instrument, instrument.dates[i], "wlen_refined.fits") ) path_fase = str( Path(instrument.path_instrument, instrument.dates[i], "phase.pkl") ) if not os.path.exists(path_fase): path_fase = str( Path(instrument.path_instrument, instrument.dates[i], "fase.pkl") ) # Create archive with all necessary files final_folder = str( Path(self.retrieval_data.path_results, instrument.dates[i] + "_" + instrument.name + ".tar.gz") ) # Archive telluric removal results and observation files os.system( f"tar -zcvf - {path_order_tell} {path_good_order} " f"{path_aligned} {path_wlen} {path_fase} > {final_folder}" ) # Copy resolution string file os.system( "cp " + str(Path(path_order_tell, "string_res.txt")) + " " + self.retrieval_data.path_results ) # Create Night object for this observation target_limit_phase = ( self.atmosphere.target.limit_phase_t14 if self.retrieval_data.transit_limit_mode == "T14" else self.atmosphere.target.limit_phase_t23 ) night = Night( self.atmosphere.target.limit_phase_t14, path_night_reference, instrument.dates[i], self.atmosphere.rad_mode, path_good_order, path_order_tell, path_order_test, table_output_file=self.table_output_file, CC=False, simulation=False, synthetic=False, retrieval_standardize=self.retrieval_data.standardize_PCA ) # Store wavelength list and add night to instrument self.wlen_list_overplot.append(night.wlen_list_overplot) instrument.append_night_obj(night)
[docs] def get_value(self, params, name, single_value=True): """ Extract value of named parameter from params array. Args: params: Array of parameter objects name: Parameter name to extract single_value: If True, return single value, else return array Returns: Parameter value(s) or None if parameter not found """ index = self.get_index(name) if params[index] is None: return None if single_value: return params[index].get_retrieval_value() return params[index].get_retrieval_value_arr()
[docs] def get_index(self, param_name): """ Find index of parameter with given name in the parameters list. This method handles both exact parameter names and parameter names with numeric suffixes (e.g., 'jitter1', 'jitter2' -> 'jitter'). Args: param_name: Name of parameter to find Returns: Index of parameter in params_list """ # Try exact match first if param_name in self.params_list: return self.params_list.index(param_name) # If not found, try removing numbers from name name_without_numbers = sub(r"\d+", "", param_name) return int(self.params_list.index(name_without_numbers))
# noinspection PyUnresolvedReferences
[docs] def calculate_mass_fraction( self, temperature, metallicity, c_o_ratio, si_o_ratio_final, vmr_peak_arr, pressure_peak_arr, width_peak_arr, ): """ Compute mass mixing ratios for atmospheric species. This method calculates mass mixing ratios based on the selected chemistry model (equilibrium, free chemistry, or hybrid). It handles volume mixing ratio calculations, mean molecular weight computation, and species abundance adjustments for dissociation effects. Args: temperature: 1D temperature profile array metallicity: log(metallicity) enhancement factor for equilibrium chemistry c_o_ratio: Carbon-to-oxygen ratio for equilibrium chemistry si_o_ratio_final: Si-to-oxygen ratio for equilibrium chemistry vmr_peak_arr: VMR peak values for variable abundance profiles pressure_peak_arr: Pressure peak positions for variable profiles width_peak_arr: Width parameters for variable abundance profiles Returns: Tuple containing: - mass_fraction: Dictionary of mass mixing ratio profiles {species name: 1D profile} for each species - MMW: 1D mean molecular weight profile array - vmr: 2D volume mixing ratio array of shape [nlayers, nmol] - mean_VMR_and_MF_string: String containing mean VMR and mass fractions per species - mean_VMR_and_MF_dict: Dictionary mapping species to their mean VMR, MF, and linelist name """ # Get parameters and molecular masses params = self.initial_param_array masses = self.atmosphere.species_obj.mass_vector # Calculate VMR based on chemistry model array_condensed_molecules = np.array(params[self.start_condensed:]) array_molecules_full = np.array(params[self.start_molec:self.start_elements]) array_molecules_excluded = np.array( params[self.start_molec + self.atmosphere.species_obj.skip_molec_param_array:self.start_elements] ) e_ratio = {} if c_o_ratio is not None: e_ratio["C_O"] = c_o_ratio if si_o_ratio_final is not None: e_ratio["Si_O"] = si_o_ratio_final # EQUILIBRIUM CHEMISTRY: Use thermochemical equilibrium if self.atmosphere.chemistry == ConstantVariables.LIST_CHEMISTRY_TABLE[0]: # Equilibrium species = list(self.atmosphere.species_compatible_with_prt) vmr = self.atmosphere.chemcat_obj.thermochemical_equilibrium( temperature=temperature, metallicity=metallicity, e_ratio=e_ratio, ) elif self.atmosphere.chemistry == ConstantVariables.LIST_CHEMISTRY_TABLE[1]: pressure = self.atmosphere.pressure_data.pressures nlayers = len(pressure) species = list(self.atmosphere.species_obj.composition) nmol = len(species) vmr = np.zeros((nlayers, nmol)) # To correct indices of vmr_peak, pressure_peak, width_peak, a4 i_peak = 0 index_metals = [] # He and H2 are NOT included (+2 reason) for mol in array_molecules_excluded: if mol is None: continue imol = species.index(mol.name) if mol.molec_formula not in ConstantVariables.NOT_VMR_METALS: index_metals.append(imol) vmr_value = 10 ** self.get_value(params, mol.molec_formula) if mol.is_VMR_costant: vmr[:, imol] = vmr_value else: vmr_peak_par = vmr_peak_arr[i_peak] pressure_peak_par = pressure_peak_arr[i_peak] width_peak_par = width_peak_arr[i_peak] # a4_par = a4_arr[i_peak] vmr_pf_log = np.log10(vmr_value) - vmr_peak_par * np.exp( -np.power( (np.log10(pressure) - pressure_peak_par) / width_peak_par, 2, ) ) vmr_pf_log[np.where(vmr_pf_log > -1)] = -1 vmr_pf = 10 ** vmr_pf_log i_peak += 1 vmr[:, imol] = vmr_pf # condensed for condensed_mol in array_condensed_molecules: if condensed_mol is None: continue imol = species.index(condensed_mol.name) index_metals.append(imol) vmr_value = 10 ** self.get_value(params, condensed_mol.name) vmr[:, imol] = vmr_value vmr_metals = np.sum(vmr[:, index_metals], axis=1) # TBD: Reject sample if np.any(vmr_metals>1) if self.atmosphere.species_obj.include_h_m: # Compute the H2/He ratio from the provided parameters H2_He_ratio = self.get_value(params, "H2") / self.get_value(params, "He") # Get indices for the relevant species in the vmr array i_H2 = species.index('H2') i_He = species.index('He') i_H = species.index('H') i_em = species.index('e-') i_hm = species.index('H-') # Update the helium (He) volume mixing ratio (vmr) based on the available non-metal abundance. # The total abundance allocated to H2, and He is (1 - vmr_metals). vmr[:, i_He] = (1.0 - vmr_metals) / (1.0 + H2_He_ratio) # --- H2 Abundance --- # Define coefficients for H2 dissociation adjustment coeff_h2 = [1, 2.41e04, 6.5] # [1, 0.0001 * 2.41, 6.5] # The reference H2 abundance is based on the H2/He ratio times the He abundance in the last layer. base_vmr_H2 = H2_He_ratio * vmr[-1, i_He] # Compute the adjusted H2 abundance across all layers A_H2 = compute_adjusted_abundance(pressure, temperature, coeff_h2, base_vmr_H2) vmr[:, i_H2] = A_H2 # For atomic hydrogen (H), the abundance profile is taken as the reverse of the H2 profile. vmr[:, i_H] = A_H2[::-1] # --- Free Electrons (e-) --- # Define coefficients for electron dissociation adjustment coeff_em = [-0.4, 0, 0] base_vmr_em = vmr[-1, i_em] A_em = compute_adjusted_abundance(pressure, temperature, coeff_em, base_vmr_em) vmr[:, i_em] = A_em # --- Negative Hydrogen Ions (H-) --- # Define coefficients for H- dissociation adjustment coeff_hm = [0.6, -0.14e4, 7.7] # [0.6, 0.0001 * -0.14, 7.7] base_vmr_hm = vmr[-1, i_hm] A_hm = compute_adjusted_abundance(pressure, temperature, coeff_hm, base_vmr_hm) vmr[:, i_hm] = A_hm # --- Water Vapor (H2O) --- if 'H2O' in species: i_h2o = species.index('H2O') # Define coefficients for H2O dissociation adjustment coeff_h2o = [2, 4.83e4, 15.9] # [2, 0.0001 * 4.83, 15.9] base_vmr_h2o = vmr[-1, i_h2o] A_h2o = compute_adjusted_abundance(pressure, temperature, coeff_h2o, base_vmr_h2o) vmr[:, i_h2o] = A_h2o else: H2_He_ratio = self.get_value(params, "H2") / self.get_value(params, "He") i_H2 = species.index('H2') i_He = species.index('He') # Distribute the remaining VMR (after metals) between H2 and He # enforcing the fixed ratio R = vmr_H2 / vmr_He. # From: vmr_H2 + vmr_He = (1 - vmr_metals) and vmr_H2 = R * vmr_He # → vmr_He = (1 - vmr_metals) / (1 + R), vmr_H2 = R * vmr_He vmr[:, i_He] = (1.0 - vmr_metals) / (1.0 + H2_He_ratio) vmr[:, i_H2] = H2_He_ratio * vmr[:, i_He] else: species = list(self.atmosphere.species_compatible_with_prt) e_abundances = dict() for element in params[self.start_elements:]: if element is None: continue e_abundances[element.molec_formula] = self.get_value(params, element.name) vmr = self.atmosphere.chemcat_obj.thermochemical_equilibrium( temperature=temperature, metallicity=metallicity, e_abundances=e_abundances, e_ratio={"C_O": c_o_ratio}, ) # Mean molecular weight MMW = pa.mean_weight(vmr, species, mass=masses) # Mass-mixing ratio abundances: mass_fractions_PTR = {} mean_VMR_and_MF_dict = {} mean_VMR_and_MF_string = "" err = False for mol in array_molecules_full: if mol is not None: name = mol.name_for_list_molec i = species.index(mol.molec_formula) mass_fractions_PTR[name] = vmr[:, i] * masses[i] / MMW mean_VMR_and_MF_dict[mol.molec_formula] = { "VMR": np.mean(vmr[:, i]), "MF": np.mean(mass_fractions_PTR[name]), "LineList": name } mean_VMR_and_MF_string += f"{mol.molec_formula}: Mean VMR = {np.mean(vmr[:, i])}:.7e; Mean MF {np.mean(mass_fractions_PTR[name]):.7e}\n" if np.any(mass_fractions_PTR[name] < 0): print(f"Warning: negative mass fraction values found for {name}") err = True for condensed_mol in array_condensed_molecules: if condensed_mol is not None: name = condensed_mol.name_for_list_molec i = species.index(name) mass_fractions_PTR[name] = vmr[:, i] * masses[i] / MMW mean_VMR_and_MF_dict[condensed_mol.molec_formula] = { "VMR": np.mean(vmr[:, i]), "MF": np.mean(mass_fractions_PTR[name]), "LineList": name } mean_VMR_and_MF_string += f"{condensed_mol.molec_formula}: Mean VMR = {np.mean(vmr[:, i])}:.7e; Mean MF {np.mean(mass_fractions_PTR[name]):.7e}\n" if np.any(mass_fractions_PTR[name] < 0): print(f"Warning: negative mass fraction values found for {name}") err = True return mass_fractions_PTR, MMW, vmr, mean_VMR_and_MF_string, mean_VMR_and_MF_dict, err
def _save_trpca_error_debug_info( self, lcrm_mask, lcrm_mask_nomask, good_pixel, night, h, dict_calc_model, temperature, mass_fraction, vmr, MMW, wl_full_resolution_HR, model_full_resolution_HR, depth_full_resolution_HR, ): """ Save detailed debug information when TRPCA (Telluric Removal PCA) fails. This method creates a comprehensive dictionary containing all relevant data for troubleshooting PCA failures during telluric removal and saves it to a pickle file for later analysis. Args: lcrm_mask: Masked telluric-removed model spectrum lcrm_mask_nomask: Unmasked telluric-removed model spectrum good_pixel: Boolean array of valid pixels night: Night object containing observation data h: Current spectral order index dict_calc_model: Dictionary with calculated model parameters temperature: Temperature profile array mass_fraction: Mass fraction dictionary vmr: Volume mixing ratio array MMW: Mean molecular weight array wl_full_resolution_HR: High-resolution wavelength array model_full_resolution_HR: High-resolution model spectrum depth_full_resolution_HR: High-resolution transit depth spectrum from model Note: Saves debug info to pickle file and prints a message. """ error_trpca_dictionary = { "lcrm_mask": lcrm_mask, "lcrm_mask_nomask": lcrm_mask_nomask, "good_pixel": good_pixel, "nfc": night.nfc[h], "tell_rm_method": self.retrieval_data.tell_rm_method, "smooth_on": night.smooth_on, "smooth_size": night.smooth_size, "model_reprocessing": self.retrieval_data.model_reprocessing, "subtraction_of_the_average_spectrum": night.subtraction_of_the_average_spectrum, "dict_calc_model": dict_calc_model, "temperature": temperature, "mass_fraction": mass_fraction, "vmr": vmr, "MMW": MMW, "LIST_PT_PROFILE_TABLE": ConstantVariables.LIST_PT_PROFILE_TABLE, "instruments_HR": self.atmosphere.resolution_obj.instruments_HR, "retrieval_data": self.retrieval_data, "wl_full_resolution_HR": wl_full_resolution_HR, "model_full_resolution_HR": model_full_resolution_HR, "depth_full_resolution_HR": depth_full_resolution_HR, "stellar_spectrum": self.atmosphere.stellar_spectrum, "stellar_spline_model_HR": self.atmosphere.stellar_spline_model_hr, "atmospherehr": { "pressures": self.atmosphere.pressure_data.pressures, "line_species_complete_name_hr": self.atmosphere.species_obj.line_species_complete_name_hr, "rayleigh_species": self.atmosphere.species_obj.rayleigh_species, "gas_continuum_contributors": self.atmosphere.species_obj.continum_opacity, "wavelength_boundaries": np.array([ self.atmosphere.wavelength.min_wl_hr, self.atmosphere.wavelength.max_wl_hr ]), "line_opacity_mode": "lbl", "line_by_line_opacity_sampling": self.atmosphere.resolution_obj.lbl_high_res }, "SOLAR_TO_JUPITER_MASSES": ConstantVariables.SOLAR_TO_JUPITER_MASSES, "clight": ConstantVariables.CLIGHT, "stellar_spectrum_type_hr": self.atmosphere.stellar_spectrum_type_hr, "jitter_list": self.jitter_list } # Save debug information to pickle file error_file_path = f"{self.retrieval_data.path_results}/error_trpca.pkl" with open(error_file_path, "ab") as fo: pickle.dump(error_trpca_dictionary, fo) print( f"TRPCA FAILED, check file: {self.retrieval_data.path_results}" "/error_trpca.pkl\n" )
[docs] def calculate_det(self, params): """ Calculate the determinant for prior probability evaluation. This method computes the determinant term used in Bayesian parameter estimation, incorporating Gaussian priors for parameters that have sigma_prior values defined. Args: params: Array of parameter objects Returns: det: Determinant value for prior probability calculation """ det = 1.0 for iel, param in enumerate(params): # Skip parameters without priors if param is None or param.sigma_prior is None: continue # Determine if parameter has single or multiple values single_value = param.name not in self.list_multiple_param value = self.get_value(params, param.name, single_value) # Get initial parameter value for comparison initial_value = get_param_array_initial_value( self.initial_param_array, iel, single_value ) sigma = params[iel].get_sigma_prior() # Calculate Gaussian prior contribution if single_value: det *= np.exp(-((value - initial_value) ** 2) / (2 * sigma ** 2)) else: # Handle array parameters (e.g., jitter, f_rot) for index_arr in range(len(value)): det *= np.exp( -((value[index_arr] - initial_value[index_arr]) ** 2) / (2 * sigma[index_arr] ** 2) ) return det
# fmt: off
[docs] def high_resolution_lhood( self, temperature, mass_fraction, vmr, MMW, dict_calc_model ): """ Compute high-resolution likelihood with advanced spectral processing. This method performs the most complex spectral processing in the atmospheric retrieval system. It handles: 1. Full-resolution atmospheric model calculation 2. Instrumental resolution convolution 3. Stellar rotation broadening (solid body rotation) 4. Orbital dynamics and Doppler shifts 5. Telluric removal via Principal Component Analysis (PCA) 6. Chi-square likelihood evaluation against observations The method preserves exact mathematical operations critical for high-precision atmospheric spectroscopy and exoplanet detection. Args: temperature: Atmospheric temperature profile mass_fraction: Mass fraction profiles for all species vmr: Volume mixing ratios for all species MMW: Mean molecular weight profile dict_calc_model: Dictionary containing all model parameters Returns: Tuple containing: - status: True if successful, False if errors occurred - lhood: Log-likelihood value - wl_full_resolution_HR: Full resolution wavelength array - depth_full_resolution_HR: Full resolution spectrum - opacity_contribution_HR: Opacity contributions (for plotting) - lh_high_resolution: Dictionary of per-instrument per-night likelihood details """ # EXTRACT PARAMETERS FROM MODEL CALCULATION DICTIONARY rp = dict_calc_model["rp"] # Planet radius gravity = dict_calc_model["gravity"] # Surface gravity omegad = dict_calc_model["omegad"] # Rotation velocity (log scale) rv = dict_calc_model["rv"] # Systemic radial velocity sf = dict_calc_model["sf"] # Single scaling factor sf_arr = dict_calc_model["sf_arr"] # Multiple scaling factors f_rot_arr = dict_calc_model["f_rot_arr"] # Rotation factors per night T0 = dict_calc_model["T0"] # Reference temperature T_low = dict_calc_model["T_low"] # Low pressure temperature T3_node = dict_calc_model["T3_node"] # Temperature node 3 ecc = dict_calc_model["ecc"] # Orbital eccentricity opi = dict_calc_model["opi"] # Argument of periastron kp = dict_calc_model["kp"] # Planet velocity semi-amplitude jitter_arr = dict_calc_model["jitter_arr"] # Noise jitter per night dVsys_arr = dict_calc_model["dVsys_arr"] # Systemic velocity variations # INITIALIZE VARIABLES lhood = 0 # Total log-likelihood csscaled = None # Spline for convolved spectrum opacity_contribution_HR = None lh_high_resolution = {} # CALCULATE FULL-RESOLUTION ATMOSPHERIC MODEL # This is the core atmospheric model calculation producing the theoretical spectrum wl_full_resolution_HR, model_full_resolution_HR, depth_full_resolution_HR = self.atmosphere.calc_model( temperature, mass_fraction, vmr, MMW, dict_calc_model, True, ) # SETUP PLOTTING FOR MODEL ANALYSIS (if needed) pdf = None if "Model" in self.model_type: os.system(f"mkdir -p {self.retrieval_data.path_results}/model_high_low_res/") pdf = PdfPages( f"{self.retrieval_data.path_results}/model_high_low_res/model_reprocessing.pdf" ) if "Opacity" in self.model_type: # Calculate opacity contributions for analysis opacity_contribution_HR = self.atmosphere.opacity_contribution( temperature, mass_fraction, MMW, dict_calc_model, True ) # PROCESS EACH HIGH-RESOLUTION INSTRUMENT counter_nights_total = -1 # Global night counter across all instruments counter_derivative_params = -1 for index_instrument, instrument in enumerate(self.atmosphere.resolution_obj.instruments_HR): lh_night = {} # INSTRUMENTAL RESOLUTION CONVOLUTION (if no stellar rotation) if omegad is None: # Convolve theoretical spectrum with instrument resolution function wl_convolved_resolution, model_convolved_resolution = convolve_resolution( wl_full_resolution_HR, model_full_resolution_HR, instrument.hwhm_km_s, rv, self.atmosphere.rv_sampling ) # Create spline interpolation for fast evaluation at arbitrary wavelengths csscaled = splrep(wl_convolved_resolution, model_convolved_resolution) # PROCESS EACH OBSERVATION NIGHT for index_night_in_current_instrument, night in enumerate(instrument.night_arr): counter_nights_total += 1 if index_night_in_current_instrument != 0: counter_derivative_params += 1 # DETERMINE SCALING FACTOR FOR THIS NIGHT scale_night = 0 if sf is not None: # Use single scaling factor for all nights scale_night = sf elif sf_arr is not None: # Use night-specific scaling factor scale_night = sf_arr[counter_nights_total] scale_HR = 10 ** scale_night # Convert from log to linear scale # STELLAR ROTATION BROADENING (if included) if omegad is not None: omegade = 10 ** omegad # Convert rotation velocity from log scale # Determine reference temperature for rotation kernel calculation # Different temperature profile formats use different reference temperatures if self.retrieval_data.format_temperature == ConstantVariables.LIST_PT_PROFILE_TABLE[3]: T0 = T_low elif self.retrieval_data.format_temperature == ConstantVariables.LIST_PT_PROFILE_TABLE[4]: T0 = T3_node # Calculate solid body rotation kernel ker_rot = kernel_solid_body_rotation( omegade, f_rot_arr, T0, np.mean(MMW), gravity, rp, self.atmosphere.rad_mode, counter_nights_total, self.atmosphere.rv_sampling ) # Apply rotation broadening to the spectrum wl_convolved_rot, model_convolved_rot = convolve_solid_body_rotation( wl_full_resolution_HR, model_full_resolution_HR, ker_rot, self.atmosphere.rv_sampling ) # Apply instrumental resolution convolution after rotation wl_convolved_resolution_after_rot, model_convolved_resolution_after_rot = convolve_resolution( wl_convolved_rot, model_convolved_rot, instrument.hwhm_km_s, rv, self.atmosphere.rv_sampling ) # Create spline for the rotation+resolution convolved spectrum csscaled = splrep(wl_convolved_resolution_after_rot, model_convolved_resolution_after_rot) # ORBITAL DYNAMICS AND RADIAL VELOCITY CALCULATION vtot, vtot_star, _ = rv_planet_and_star( self.retrieval_data.eccentricity, self.atmosphere.target, ecc, opi, kp, night, "Retrieval", index_night_in_current_instrument, dVsys_arr, counter_derivative_params ) lh_current_night = 0 # PROCESS EACH SPECTRAL ORDER for h in range(night.n_good_orders): # Extract observational data for this order wavelengths_night = night.lambdas[:, h, :] # Wavelength array spectrum_only_planet_night = night.spectra[:, h, :] # Observed spectra error_spectrum_only_planet_night = night.sigma_spectra[:, h, :] # Error bars good_pixels = night.maskinvm[:, h, :] == 0 # Valid pixel mask # APPLY DOPPLER SHIFTS TO WAVELENGTH GRID # Reshape wavelength array to match good pixels structure wl_data_masked = wavelengths_night[np.where(good_pixels)].reshape( -1, np.shape(good_pixels)[1] ) # Calculate Doppler shifts for planet and star dl_planet = wl_data_masked * vtot / self.clight # Planet velocity shift wlen_shifted_planet = dl_planet + wl_data_masked # Planet rest frame wavelengths dl_star = wl_data_masked * vtot_star / self.clight # Stellar velocity shift wlen_shifted_star = dl_star + wl_data_masked # Stellar rest frame wavelengths Fscaled = None mol_modf = None stellar_spectrum_HR = None # CALCULATE OBSERVED SPECTRUM BASED ON OBSERVATION MODE if self.atmosphere.rad_mode == 'Transmission': # TRANSMISSION SPECTROSCOPY # Evaluate atmospheric model at Doppler-shifted wavelengths model_eval = splev(wlen_shifted_planet, csscaled, der=0) # Convert from altitude to transit depth mol_mod = model_eval / phys_const.r_jup_mean mol_modf = (mol_mod / (self.atmosphere.target.stellar_radius * ConstantVariables.RATIO_RSUN_RJUP_MEAN)) ** 2 spectrum = 1 - mol_modf # Transit depth spectrum else: # EMISSION SPECTROSCOPY # Calculate stellar spectrum contribution stellar_spectrum_HR = splev(wlen_shifted_star*1e-7, self.atmosphere.stellar_spline_model_hr) # Evaluate planetary emission model model_eval = splev(wlen_shifted_planet, csscaled, der=0) # Calculate planet-to-star flux ratio # Stellar spc is already multiplied by np.pi to go from erg/cm^2/cm/s/sr to erg/cm^2/cm/s Fscaled = (model_eval / stellar_spectrum_HR) * ( rp / (self.atmosphere.target.stellar_radius * phys_const.r_sun)) ** 2 spectrum = 1 + Fscaled # Combined stellar + planetary flux if np.sum(spectrum < 0) > 0: if "Model" in self.model_type: os.system(f"mkdir -p {self.retrieval_data.path_results}/error_spectrum/") with open(f"{self.retrieval_data.path_results}/error_spectrum/spectrum.pkl", "wb") as pkl_spectrum: error_spc_pkl = { "spectrum": spectrum, "Fscaled": Fscaled, "mol_modf": mol_modf, "stellar_spectrum_HR": stellar_spectrum_HR } pickle.dump(error_spc_pkl, pkl_spectrum) print(f"Negative values in spectrum for order {h} of night {index_night_in_current_instrument}.") return False, -np.inf, wl_full_resolution_HR, depth_full_resolution_HR, opacity_contribution_HR, lh_high_resolution # PREPARE DATA FOR TELLURIC REMOVAL VIA PCA model_in_log = np.log10(spectrum) # Convert to log scale for PCA processing good_pixel = np.where(good_pixels[:, -1]) good_pixel = good_pixel[0] # SETUP PCA PROCESSING BASED ON METHOD # TODO can be moved in night_extraction lcrm_mask_nomask = np.zeros_like(night.array_reconstructed_from_PCA_components[:, h, :]) if self.retrieval_data.model_reprocessing == "hard": # Hard PCA: Use existing telluric removal data lcrm_mask = night.array_reconstructed_from_PCA_components[good_pixel, h, :] if night.subtraction_of_the_average_spectrum: lcrm_mask = lcrm_mask + night.average_spectrum_to_be_added[h] else: # "soft" method # Soft PCA: Simplified approach with minimal telluric removal lcrm_mask = np.zeros_like(night.array_reconstructed_from_PCA_components[good_pixel, h, :]) # INJECT ATMOSPHERIC MODEL INTO IN-TRANSIT DATA # Add theoretical atmospheric signal to in-transit observations lcrm_mask[:, night.intransit] = ( model_in_log + (lcrm_mask[:, night.intransit]) ) # EXECUTE TELLURIC REMOVAL PCA # Apply Time-Resolved Principal Component Analysis (TRPCA) # This removes telluric contamination while preserving the atmospheric signal lcrm_pca, error_trpca = trpca( lcrm_mask, lcrm_mask_nomask, good_pixel, night.nfc[h], # Number of components night.smooth_on, # Smoothing flag night.smooth_size, # Smoothing size night.apply_standardize, self.retrieval_data.model_reprocessing, # Processing method night.subtraction_of_the_average_spectrum, night.pca_mode # PCA mode: spatial or temporal ) if error_trpca: if "Model" in self.model_type: print("Error in TRPCA processing.") # Handle PCA failure with detailed error logging self._save_trpca_error_debug_info( lcrm_mask, lcrm_mask_nomask, good_pixel, night, h, dict_calc_model, temperature, mass_fraction, vmr, MMW, wl_full_resolution_HR, model_full_resolution_HR, depth_full_resolution_HR, ) return False, -np.inf, wl_full_resolution_HR, depth_full_resolution_HR, opacity_contribution_HR, lh_high_resolution # CALCULATE CHI-SQUARE LIKELIHOOD # Extract only in-transit data for likelihood calculation lcrm_pca = lcrm_pca[:, night.intransit] # Add jitter noise to error bars if specified if self.jitter_list is not None and self.jitter_list.size > 0: errplusjit = np.sqrt( error_spectrum_only_planet_night[good_pixel, :] ** 2 + jitter_arr[counter_nights_total] ** 2 ) else: errplusjit = error_spectrum_only_planet_night[good_pixel, :] if ( self.retrieval_data.mask_phase_enabled and self.retrieval_data.mask_night_index[counter_nights_total] ): if self.retrieval_data.mask_inside_range: mask_phases_indices_current_night = np.where(np.logical_and( night.phase_new_range_considered >= self.retrieval_data.min_phase, night.phase_new_range_considered <= self.retrieval_data.max_phase ))[0] else: mask_phases_indices_current_night = np.where(np.logical_or( night.phase_new_range_considered <= self.retrieval_data.min_phase, night.phase_new_range_considered >= self.retrieval_data.max_phase ))[0] spectrum_only_planet_night[:, mask_phases_indices_current_night] = 0 lcrm_pca[:, mask_phases_indices_current_night] = 0 errplusjit[:, mask_phases_indices_current_night] = 1 # Calculate chi-square for this spectral order chiquadro_HR = np.sum( ( ( spectrum_only_planet_night[good_pixel, :] # Observed data - (lcrm_pca * scale_HR) # Scaled model ) / errplusjit # Error bars ) ** 2 ) n_good_pixels = len(good_pixel) n_spectra = spectrum_only_planet_night.shape[1] # or the temporal dimension # Total number of data points dof = n_good_pixels * n_spectra chi2_reduced = chiquadro_HR / dof # Add to total log-likelihood (including error normalization) lh_current_order = (-np.sum(np.log(errplusjit)) - 0.5 * chiquadro_HR) lh_current_night += lh_current_order lh_night[night.date] = f"{chiquadro_HR};{lh_current_night};{chi2_reduced};{dof};{self.bestpars_data.nfit}" lhood += lh_current_order # GENERATE DIAGNOSTIC PLOTS (for model analysis mode) if "Model" in self.model_type: # Create two-panel plot showing model injection and PCA result fig, axs = plt.subplots(2, 1, figsize=(12, 5)) axs[0].set_title( "Night: " + instrument.nights[index_night_in_current_instrument] + " Order: " + str(h) ) # Prepare phase array for plotting (center around transit) phforplot = night.phases_considered phforplot[phforplot > 0.5] -= 1 # TOP PANEL: Show injected atmospheric model model_in_logforplot = np.zeros( np.shape(night.array_reconstructed_from_PCA_components[:, h, night.intransit]) ) # Fill good pixels with model signal, bad pixels with mean model_in_logforplot[np.where(good_pixels[:, 0] == 1)] = model_in_log model_in_logforplot[np.where(good_pixels[:, 0] == 0)] = np.mean(model_in_log) # Plot model as 2D image (wavelength vs. orbital phase) result_im = axs[0].imshow( model_in_logforplot.T[:, 500:701], aspect="auto", origin="lower", interpolation="None", extent=[ wavelengths_night[500, 0], wavelengths_night[701, 0], phforplot[0], phforplot[-1], ], ) _ = fig.colorbar(result_im, ax=[axs[0]]) # BOTTOM PANEL: Show PCA-processed result lcrm_pcaforplot = np.zeros( np.shape(night.array_reconstructed_from_PCA_components[:, h, night.intransit]) ) # Fill good pixels with PCA result, bad pixels with mean lcrm_pcaforplot[np.where(good_pixels[:, 0] == 1)] = lcrm_pca lcrm_pcaforplot[np.where(good_pixels[:, 0] == 0)] = np.mean(lcrm_pca) # Plot PCA result as 2D image result_im2 = axs[1].imshow( lcrm_pcaforplot.T[:, 500:701], aspect="auto", origin="lower", interpolation="None", extent=[ wavelengths_night[500, 0], wavelengths_night[701, 0], phforplot[0], phforplot[-1], ], ) _ = fig.colorbar(result_im2, ax=[axs[1]]) # Save plot and clean up pdf.savefig(fig) fig.clear() plt.close(fig) lh_high_resolution[instrument.name] = lh_night # RETURN RESULTS return True, lhood, wl_full_resolution_HR, depth_full_resolution_HR, opacity_contribution_HR, lh_high_resolution
def _bin_emission_lr(self, instrument, ind_planet, model_LR, wl_full_resolution_LR, i, rp): # Get stellar flux for this instrument and bin # Note: stellar_spectrum_LR[instrument.name] can be either: # - A single integrated flux value (if stellar_spectrum_type_lr == "Integral") # - An array of point values (if stellar_spectrum_type_lr == "Planck" or "Phoenix") flux_star_instrument_in_bin = self.atmosphere.stellar_spectrum[instrument.name] # Extract planet flux values within this wavelength bin flux_planet_in_bin = model_LR[ind_planet] # INTEGRAL METHOD: Integrate planet flux over bin and normalize by stellar flux if self.atmosphere.stellar_spectrum_type_lr_mode == "Integral": if len(ind_planet) == 0: flux_planet_full_in_bin = 0 else: if len(ind_planet) == 1: # Create simple 2-point integration using bin boundaries x = np.array([instrument.lr_data.lbottom[i], instrument.lr_data.ltop[i]]) y_temp = model_LR[ind_planet] # Duplicate flux values at boundaries for integration y = np.append(y_temp, y_temp) else: y = flux_planet_in_bin x = wl_full_resolution_LR[ind_planet] # Perform trapezoidal integration of planet flux over wavelength bin flux_planet_full_in_bin = trapezoid(y, x) # Calculate eclipse depth: (integrated planet flux) / (stellar flux) eclipse_ratio_bin = flux_planet_full_in_bin / flux_star_instrument_in_bin[i] # POINT-SAMPLED METHOD: Use flux ratios at individual wavelength points else: # Planck or Phoenix stellar spectrum # Direct ratio of planet to stellar flux # stellar_spectrum_LR contains point-by-point stellar flux values eclipse_ratio_bin = np.mean(flux_planet_in_bin / flux_star_instrument_in_bin[i]) # Scale eclipse depth by (Rp/R*)^2 to get observed contrast # This converts from flux ratio to observable transit/eclipse depth Fscaled = ( eclipse_ratio_bin * (rp / (self.atmosphere.target.stellar_radius * phys_const.r_sun)) ** 2 ) # Store as relative spectrum: 1 + eclipse_depth # (1.0 = stellar continuum, values > 1 indicate planetary emission) return 1 + Fscaled @staticmethod def _bin_transmission_lr(instrument, model_after_preprocessing, depth_full_resolution_LR, ind_planet): if instrument.lr_data.flux_norm: value = np.mean(model_after_preprocessing[ind_planet]) else: # Average all spectrum values within the wavelength bin # This represents the mean transit depth in the bin value = np.mean(depth_full_resolution_LR[ind_planet]) # already considered the 1 - transit_depth return value def _preprocess_model_lr_transmission(self, instrument, wl_full_resolution_LR, depth_full_resolution_LR, num_windows=40, top_window=10): if instrument.lr_data.flux_norm: if instrument.hwhm_km_s is not None and self.atmosphere.resolution_obj.HR_resolution > instrument.lr_data.resolution_dataset: wl_new, spectrum_new = convolve_resolution( wl_full_resolution_LR, depth_full_resolution_LR, instrument.hwhm_km_s, 0, self.atmosphere.rv_sampling ) else: wl_new = instrument.lr_data.ldata_LR spline_model = splrep(wl_full_resolution_LR, depth_full_resolution_LR) spectrum_new = splev(wl_new, spline_model) _, model_after_processing = estimate_continuum(spectrum_new, num_windows=num_windows, top_window=top_window, check_gaussian=False) wl_after_processing = wl_new else: # Emission or simple transit not in form of normalixed flux wl_after_processing = wl_full_resolution_LR model_after_processing = depth_full_resolution_LR return wl_after_processing, model_after_processing def _bin_spectrum_to_instrument_resolution( self, instrument, wl_full_resolution_LR, model_LR, depth_full_resolution_LR, rp, ): """ Bin a full-resolution spectrum to the instrument's spectral resolution. This method performs spectral binning by integrating the high-resolution atmospheric model spectrum over the wavelength bins defined by the instrument's resolution. It handles both emission and transmission modes, with special treatment for emission spectroscopy including stellar flux normalization and eclipse depth calculations. The binning process: 1. For each wavelength bin defined by the instrument 2. Find all full-resolution wavelength points within the bin 3. For emission: calculate eclipse depth using planet/star flux ratio 4. For transmission: average the spectrum values in the bin 5. Handle edge cases (e.g., zero flux) with appropriate fallbacks Args: instrument: Instrument object containing wavelength bin definitions (lr_data.lbottom, lr_data.ltop, lr_data.npix) wl_full_resolution_LR: Full-resolution wavelength array [microns] model_LR: Full-resolution model flux array (planet flux for emission, transit depth for transmission) depth_full_resolution_LR: Full-resolution spectrum array rp: Planet radius in cm Returns: spectrum_binned_LR: 1D array of binned spectrum values with length equal to instrument.lr_data.npix. Values are set to unity (1.0) where NaN occurs, representing a neutral (featureless) spectrum. Notes: - For emission mode with integral stellar spectrum type, uses trapezoidal integration over each bin - For emission mode with point-sampled stellar spectrum (Planck/Phoenix), uses point-by-point ratios - NaN values in the output are replaced with 1.0 (neutral spectrum) - The returned spectrum is in relative units (1.0 = continuum level) """ # Pre process data if self.atmosphere.rad_mode == "Emission": wl_after_processing = wl_full_resolution_LR model_after_processing = model_LR else: wl_after_processing, model_after_processing = self._preprocess_model_lr_transmission( instrument, wl_full_resolution_LR, depth_full_resolution_LR ) if self.atmosphere.use_hr_linelists_for_lr or self.atmosphere.temporary_force_new_binning: spectrum_binned_LR = ps.bin_spectrum( instrument.lr_data.ldata_LR_micron, wl_after_processing / 1e3, model_after_processing, half_widths=instrument.lr_data.step_LR_micron, gaps=None ) if self.atmosphere.rad_mode == "Emission": eclipse_ratio_bin = spectrum_binned_LR / self.atmosphere.stellar_spectrum[instrument.name] spectrum_binned_LR_scaled = ( eclipse_ratio_bin * (rp / (self.atmosphere.target.stellar_radius * phys_const.r_sun)) ** 2 ) spectrum_binned_LR = 1 + spectrum_binned_LR_scaled else: # Initialize binned spectrum array with unity (neutral spectrum baseline) spectrum_binned_LR = np.ones(instrument.lr_data.npix) # Iterate over each wavelength bin defined by the instrument for i, (bottom, top) in enumerate(zip(instrument.lr_data.lbottom, instrument.lr_data.ltop)): # Find indices of full-resolution wavelengths that fall within this bin # Using open-closed interval: (bottom, top] ind_planet = np.where( (wl_after_processing > bottom) & (wl_after_processing <= top) )[0] # EMISSION SPECTROSCOPY: Calculate planet-to-star flux ratio if self.atmosphere.rad_mode == "Emission": spectrum_binned_LR[i] = self._bin_emission_lr(instrument, ind_planet, model_after_processing, wl_after_processing, i, rp) # TRANSMISSION SPECTROSCOPY: Simple averaging over bin else: spectrum_binned_LR[i] = self._bin_transmission_lr(instrument, model_after_processing, depth_full_resolution_LR, ind_planet) # CLEANUP: Replace any NaN values with unity (neutral spectrum) # NaN can occur from division by zero or empty bins spectrum_binned_LR[np.isnan(spectrum_binned_LR)] = 1 return spectrum_binned_LR
[docs] def low_resolution_lhood( self, temperature, mass_fraction, vmr, MMW, dict_calc_model, rp, ): """ Compute likelihood for low-resolution observations. This method calculates the likelihood by comparing the binned theoretical spectrum with low-resolution observational data. It handles spectral binning, offset corrections, and chi-square calculation for each instrument. Args: temperature: Temperature profile mass_fraction: Mass fraction profiles for species vmr: Volume mixing ratios MMW: Mean molecular weight profile dict_calc_model: Dictionary containing model calculation parameters rp: Planet radius in cm Returns: Tuple containing: - lhood: Log-likelihood value - wl_full_resolution_LR: Full resolution wavelength array - depth_full_resolution_LR: Full resolution spectrum - wl_binned_LR_complete: Binned wavelength arrays for all instruments - spectrum_binned_LR_complete: Binned spectra for all instruments - opacity_contribution_LR: Opacity contributions (if model plotting) - lh_low_resolution: Dictionary of per-instrument likelihood details """ lhood = 0 opacity_contribution_LR = None wl_binned_LR_complete = [] spectrum_binned_LR_complete = [] # Calculate full resolution model spectrum wl_full_resolution_LR, model_LR, depth_full_resolution_LR = self.atmosphere.calc_model( temperature, mass_fraction, vmr, MMW, dict_calc_model, False, ) # Calculate opacity contributions for model plotting if "Opacity" in self.model_type: opacity_contribution_LR = self.atmosphere.opacity_contribution( temperature, mass_fraction, MMW, dict_calc_model, False ) lh_low_resolution = {} # Process each low-resolution instrument for index_LR, instrument in enumerate(self.atmosphere.resolution_obj.instruments_LR): # Bin the full-resolution spectrum to instrument resolution spectrum_binned_LR = self._bin_spectrum_to_instrument_resolution( instrument=instrument, wl_full_resolution_LR=wl_full_resolution_LR, model_LR=model_LR, depth_full_resolution_LR=depth_full_resolution_LR, rp=rp ) # Apply instrument-specific offset correction offsetLR_arr = dict_calc_model["offsetLR_arr"] offsetLR = 0 if (index_LR == 0 or offsetLR_arr is None) else offsetLR_arr[index_LR - 1] spectrum_binned_LR += offsetLR # Store results for output wl_binned_LR_complete.append(instrument.lr_data.ldata_LR) spectrum_binned_LR_complete.append(spectrum_binned_LR) # Calculate chi-square and likelihood contribution # Compute residuals residuals = instrument.lr_data.rdata_LR - spectrum_binned_LR # Select error array element-wise based on the sign of each residual: # - positive residual → upper error (sigma_plus) # - negative residual → lower error (sigma_minus) sigma_used = np.where( residuals >= 0, instrument.lr_data.errdata_LR_plus, instrument.lr_data.errdata_LR_minus ) chiquadro_LR = np.sum((residuals / sigma_used) ** 2) dof = instrument.lr_data.npix chi2_reduced = chiquadro_LR / dof lh_single_instrument = (-np.sum(np.log(sigma_used)) - 0.5 * chiquadro_LR) lhood += lh_single_instrument lh_low_resolution[instrument.name] = f"{chiquadro_LR};{lh_single_instrument};{chi2_reduced};{instrument.lr_data.npix};{self.bestpars_data.nfit}" return ( lhood, wl_full_resolution_LR, depth_full_resolution_LR, wl_binned_LR_complete, spectrum_binned_LR_complete, opacity_contribution_LR, lh_low_resolution )
[docs] def lh_function_gib(self, params): """ Evaluate the core likelihood function for atmospheric retrieval. This is the HEART of the atmospheric retrieval system. It takes input parameters, performs boundary checks, extracts all parameter values, calculates temperature profiles, computes mass fractions, and evaluates both high-resolution and low-resolution likelihoods. The function preserves the exact sequence of operations critical for atmospheric modeling and MCMC retrieval. Args: params: List of ParamForModel objects containing all retrieval parameters Returns: Tuple containing: - lhood: Log-likelihood value (float) - det: Determinant value for Bayesian priors (float) - not_retrieval_obj: NotRetrieval object with model results (or None) """ # BOUNDARY CHECKING: Reject parameters outside valid ranges if "Retrieval" in self.model_type: in_bounds = np.all([ param.boundaries_check() for param in params if param is not None ]) if not in_bounds: return -np.inf, 1, None # EXTRACT ORBITAL AND OBSERVATIONAL PARAMETERS # Radial velocity parameters kp = self.get_value(params, 'kp') # Planet orbital velocity semi-amplitude rv = self.get_value(params, "rv") # Systemic radial velocity # Scaling factors for spectral fitting sf = self.get_value(params, "sf") # Single scaling factor sf_arr = self.get_value(params, "sf_multi", single_value=False) # Multiple scaling factors # Orbital eccentricity parameters (Thiele-Innes elements) h_ecc = self.get_value(params, "h_ecc") # h = e*sin(ω) k_ecc = self.get_value(params, "k_ecc") # k = e*cos(ω) if self.retrieval_data.eccentricity: # Convert Thiele-Innes elements to eccentricity and argument of periastron ecc = np.power(h_ecc, 2) + np.power(k_ecc, 2) # e² = h² + k² opi = np.arctan2(h_ecc, k_ecc) # ω = arctan2(h, k) # Physical constraint: eccentricity must be between 0 and 1 if not 0 <= ecc <= 1: return -np.inf, 1, None else: ecc = None opi = None # EXTRACT ATMOSPHERIC AND CLOUD PARAMETERS # Cloud deck parameters Pc = self.get_value(params, "Pc") # Cloud deck pressure Pc = None if Pc is None else 10**Pc # Convert from log to linear # Haze and cloud properties haze_factor = self.get_value(params, "haze_factor") # Haze enhancement factor haze_factor = 1 if haze_factor is None else 10**haze_factor cloud_fraction = self.get_value(params, "cloud_fraction") # Cloud coverage fraction cloud_fraction = 1 if cloud_fraction is None else cloud_fraction # Opacity parameters k0 = self.get_value(params, "k0") # Reference opacity k0 = None if k0 is None else 10**k0 gamma = self.get_value(params, "gamma") # Power law index for opacity k_cond = self.get_value(params, "k_cond") # Condensate opacity k_cond = None if k_cond is None else 10**k_cond k_opac = self.get_value(params, "k_opac") # Additional opacity parameter k_opac = None if k_opac is None else 10**k_opac # Wavelength-dependent parameters lambda0_micron = self.get_value(params, "lambda0_micron") # Reference wavelength xi = self.get_value(params, "xi") # Wavelength dependency parameter omega_scale_micron = self.get_value(params, "omega_scale_micron") # Scale factor omega_scale_micron = None if omega_scale_micron is None else 10**omega_scale_micron # EXTRACT ARRAY PARAMETERS (multiple values per night/instrument) jitter_arr = self.get_value(params, "jitter", single_value=False) # Noise jitter per night vmr_peak_arr = self.get_value(params, "vmr_peak", single_value=False) # VMR peak values pressure_peak_arr = self.get_value(params, "pressure_peak", single_value=False) # Pressure peaks width_peak_arr = self.get_value(params, "width_peak", single_value=False) # Peak widths offsetLR_arr = self.get_value(params, "offsetLR", single_value=False) # LR instrument offsets # EXTRACT PLANETARY PARAMETERS # Reference pressure level P_ref = self.get_value(params, "Pref") P_ref = 0.1 if P_ref is None else 10**P_ref # Default 0.1 bar or converted from log # Planet radius and gravity calculation rp = self.get_value(params, "rp") # Planet radius in Jupiter radii rp_jup = rp rp *= phys_const.r_jup_mean # Convert to meters gravity = ( # Surface gravity calculation phys_const.G * (self.atmosphere.target.mass * phys_const.m_jup) / np.power(rp, 2) ) # EXTRACT TEMPERATURE PROFILE PARAMETERS # Basic temperature parameters T0 = self.get_value(params, "T0") # Reference temperature kappa_IR = self.get_value(params, "kappa_IR") # IR opacity kappa_IR_e = None if kappa_IR is None else 10 ** kappa_IR gamma_g = self.get_value(params, "gamma_g") # Adiabatic index gamma_g_e = None if gamma_g is None else 10 ** gamma_g T_int = self.get_value(params, "T_int") # Internal temperature # Madhu temperature profile parameters p1 = self.get_value(params, "p1") # Pressure point 1 p2 = self.get_value(params, "p2") # Pressure point 2 p3 = self.get_value(params, "p3") # Pressure point 3 alpha1 = self.get_value(params, "alpha1") # Temperature gradient 1 alpha2 = self.get_value(params, "alpha2") # Temperature gradient 2 # Physical constraint for Madhu profile: p1 must be < p3 if self.retrieval_data.format_temperature == "madhu" and p1 > p3: return -np.inf, 1, None # EXTRACT CHEMISTRY AND NODE PARAMETERS # Atmospheric chemistry parameters metallicity = self.get_value(params, "met") # Metallicity enhancement c_o_ratio = self.get_value(params, "co_ratio") # Carbon-to-oxygen ratio c_o_ratio_linear = self.get_value(params, "co_ratio_linear") # Carbon-to-oxygen ratio si_o_ratio_linear = self.get_value(params, "sio_ratio_linear") # Si-to-oxygen ratio c_o_ratio_final = 10 ** c_o_ratio if c_o_ratio is not None else c_o_ratio_linear si_o_ratio_final = si_o_ratio_linear # Temperature-pressure profile nodes T_low = self.get_value(params, "T_low") # Low pressure temperature T_high = self.get_value(params, "T_high") # High pressure temperature P_low = self.get_value(params, "P_low") # Low pressure boundary P_high = self.get_value(params, "P_high") # High pressure boundary # Additional temperature nodes T0_node = self.get_value(params, "T0_node") T1_node = self.get_value(params, "T1_node") T2_node = self.get_value(params, "T2_node") T3_node = self.get_value(params, "T3_node") P1_node = self.get_value(params, "P1_node") P2_node = self.get_value(params, "P2_node") # Physical constraint: high pressure must be greater than low pressure if P_high is not None and P_high >= P_low: return -np.inf, 1, None # EXTRACT ROTATION AND ADDITIONAL PARAMETERS # Stellar rotation and line broadening omegad = self.get_value(params, "omega") # Rotation velocity f_rot_arr = self.get_value(params, "f_rot", single_value=False) # Rotation factors per night # Systemic velocity variations per night dVsys_arr = self.get_value(params, "dVsys", single_value=False) # Cloud microphysics parameters std_radius_distribution = self.get_value(params, "std_radius_distribution") # Particle size distribution cloud_fsed = self.get_value(params, "cloud_fsed") # Sedimentation efficiency # Atmospheric mixing eddy_diff_coeff = self.get_value(params, "eddy_diff_coeff") # Eddy diffusion coefficient eddy_diff_coeff_e = None if eddy_diff_coeff is None else np.ones_like( self.atmosphere.pressure_data.pressures) * (10 ** eddy_diff_coeff) # TEMPERATURE PROFILE SETUP # Create parameter dictionary for temperature profile calculation parameters_tp_profile = { "T0": ValueErrorTP(T0), "kappa_IR": ValueErrorTP(kappa_IR_e), "gamma_g": ValueErrorTP(gamma_g_e), "T_int": ValueErrorTP(T_int), "T_low": ValueErrorTP(T_low), "T_high": ValueErrorTP(T_high), "P_low": ValueErrorTP(P_low), "P_high": ValueErrorTP(P_high), "p1": ValueErrorTP(p1), "p2": ValueErrorTP(p2), "p3": ValueErrorTP(p3), "alpha1": ValueErrorTP(alpha1), "alpha2": ValueErrorTP(alpha2), "T0_node": ValueErrorTP(T0_node), "T1_node": ValueErrorTP(T1_node), "T2_node": ValueErrorTP(T2_node), "T3_node": ValueErrorTP(T3_node), "P1_node": ValueErrorTP(P1_node), "P2_node": ValueErrorTP(P2_node), } # TEMPERATURE PROFILE CALCULATION # Create temperature profile object with all parameters tp_profile_obj = UserTemperatureProfile( self.atmosphere.pressure_data.pressures, parameters_tp_profile, gravity, False, None ) # Get the appropriate temperature profile function and calculate temperature function_tp_profile = getattr(tp_profile_obj, self.retrieval_data.format_temperature) temperature, _ = function_tp_profile() # ATMOSPHERIC COMPOSITION CALCULATION # Calculate mass fractions, mean molecular weight, and VMR profiles mass_fraction_names_hr, MMW, vmr, mean_VMR_and_MF_string, mean_VMR_and_MF_dict, negative_mass_fractions = self.calculate_mass_fraction( temperature, metallicity, c_o_ratio_final, si_o_ratio_final, vmr_peak_arr, pressure_peak_arr, width_peak_arr, ) if negative_mass_fractions: return -np.inf, 1, None # INITIALIZE OUTPUT VARIABLES lhood = 0 # Total log-likelihood wl_full_resolution_HR = 0 depth_full_resolution_HR = 0 wl_full_resolution_LR = 0 depth_full_resolution_LR = 0 wl_binned_LR = 0 spectrum_binned_LR = 0 wlen_mu_contribution_HR = 0 contribution_HR = 0 wlen_mu_contribution_LR = 0 contribution_LR = 0 opacity_contribution_HR = None opacity_contribution_LR = None # CREATE MODEL CALCULATION DICTIONARY # Package all parameters needed for atmospheric model calculations dict_calc_model = { "offsetLR_arr": offsetLR_arr, "Pc": Pc, "rp": rp, "k0": k0, "gamma": gamma, "gravity": gravity, "omegad": omegad, "rv": rv, "sf": sf, "sf_arr": sf_arr, "f_rot_arr": f_rot_arr, "T0": T0, "T_low": T_low, "T3_node": T3_node, "ecc": ecc, "opi": opi, "kp": kp, "jitter_arr": jitter_arr, "dVsys_arr": dVsys_arr, "P_ref": P_ref, "haze_factor": haze_factor, "cloud_fraction": cloud_fraction, "k_cond": k_cond, "k_opac": k_opac, "lambda0_micron": lambda0_micron, "xi": xi, "omega_scale_micron": omega_scale_micron, "std_radius_distribution": std_radius_distribution, "cloud_fsed": cloud_fsed, "eddy_diff_coeff": eddy_diff_coeff_e, } lh_high_resolution = None lh_low_resolution = None # HIGH-RESOLUTION LIKELIHOOD CALCULATION if self.atmosphere.resolution_obj.high_resolution(): if "Contribution" in self.model_type: # Calculate opacity contributions for analysis os.system(f"mkdir -p {self.retrieval_data.path_results}/contribution/") wl_full_resolution_HR, contribution_HR = self.atmosphere.calc_model_contribution( temperature, mass_fraction_names_hr, MMW, dict_calc_model ) wlen_mu_contribution_HR = wl_full_resolution_HR else: # Calculate high-resolution likelihood hr_likelihood = self.high_resolution_lhood( temperature, mass_fraction_names_hr, vmr, MMW, dict_calc_model ) all_ok = hr_likelihood[0] lhood += hr_likelihood[1] wl_full_resolution_HR = hr_likelihood[2] depth_full_resolution_HR = hr_likelihood[3] opacity_contribution_HR = hr_likelihood[4] lh_high_resolution = hr_likelihood[5] # Exit if high-resolution calculation failed if not all_ok: return lhood, 1, None mass_fraction_names_lr = None # LOW-RESOLUTION LIKELIHOOD CALCULATION if self.atmosphere.resolution_obj.low_resolution(): # Prepare mass fraction names for low-resolution (different naming convention) mass_fraction_names_lr = mass_fraction_names_hr.copy() for counter_species, elem in enumerate(self.atmosphere.species_obj.line_species_complete_name_hr): mass_fraction_names_lr[self.atmosphere.species_obj.line_species_complete_name_lr[counter_species]] = mass_fraction_names_lr.pop(elem) if "Contribution" in self.model_type: # Calculate opacity contributions for low-resolution analysis os.system(f"mkdir -p {self.retrieval_data.path_results}/contribution/") wl_full_resolution_LR, contribution_LR = self.atmosphere.calc_model_contribution( temperature, mass_fraction_names_lr, MMW, dict_calc_model, False ) wlen_mu_contribution_LR = wl_full_resolution_LR # Calculate low-resolution likelihood lr_likelihood = self.low_resolution_lhood( temperature, mass_fraction_names_lr, vmr, MMW, dict_calc_model, rp ) lhood += lr_likelihood[0] wl_full_resolution_LR = lr_likelihood[1] depth_full_resolution_LR = lr_likelihood[2] wl_binned_LR = lr_likelihood[3] spectrum_binned_LR = lr_likelihood[4] opacity_contribution_LR = lr_likelihood[5] lh_low_resolution = lr_likelihood[6] # CALCULATE BAYESIAN PRIOR DETERMINANT det = self.calculate_det(params) # DETERMINE SPECIES LIST BASED ON CHEMISTRY MODEL if self.atmosphere.chemistry == ConstantVariables.LIST_CHEMISTRY_TABLE[1]: species = mass_fraction_names_hr.keys() else: species = list(self.atmosphere.species_compatible_with_prt) # CREATE OUTPUT OBJECT FOR NON-RETRIEVAL MODES not_retrieval_obj = None if "Manual" in self.model_type or "Model" in self.model_type: # print("Construction of Not Retrieval OBJ") not_retrieval_obj = NotRetrieval( mass_fraction_names_hr, mass_fraction_names_lr, MMW, vmr, mean_VMR_and_MF_string, mean_VMR_and_MF_dict, wl_full_resolution_HR, depth_full_resolution_HR, wlen_mu_contribution_HR, contribution_HR, wl_full_resolution_LR, depth_full_resolution_LR, wl_binned_LR, spectrum_binned_LR, offsetLR_arr, wlen_mu_contribution_LR, contribution_LR, self.atmosphere.chemistry, params[self.start_molec:self.start_elements], species, opacity_contribution_HR, opacity_contribution_LR, self.atmosphere.resolution_obj.instruments_LR, rp_jup, rad_mode=self.atmosphere.rad_mode, stellar_radius=self.atmosphere.target.stellar_radius, stellar_spectrum=self.atmosphere.stellar_spectrum, lh_HR=lh_high_resolution, lh_LR=lh_low_resolution, HR_res_present=self.atmosphere.resolution_obj.high_resolution(), LR_res_present=self.atmosphere.resolution_obj.low_resolution(), use_hr_linelists_for_lr=self.atmosphere.use_hr_linelists_for_lr ) return lhood, det, not_retrieval_obj
[docs] def parallel_chain( self, index_core, j, return_dict, oldpars, oldchi2, olddet, pars_r1, pars_r2 ): """ Execute parallel MCMC chain step for differential evolution. This method implements one step of the differential evolution MCMC algorithm in parallel. It calculates new parameter values using the DE formula and evaluates the likelihood to decide whether to accept or reject the step. Args: index_core: Index of the core. j: Chain index return_dict: Shared dictionary for returning results oldpars: Current parameter values for this chain oldchi2: Current chi-square value for this chain olddet: Current determinant value for this chain pars_r1: Parameter values from random chain 1 pars_r2: Parameter values from random chain 2 """ arr_chains = np.empty_like(j, dtype=StructReturnExofast) try: for i in range(len(j)): # Calculate new parameter values using differential evolution formula newpars_chain = ( oldpars[:, i] + self.bestpars_data.gamma_coeff * np.reshape(pars_r1[:, i] - pars_r2[:, i], (self.bestpars_data.nfit,)) + self.random_obj.rng.standard_normal(self.bestpars_data.nfit) * self.retrieval_data.scale_vector_params / 100 ) # Create full parameter object and evaluate likelihood param_full = self.create_param_full(newpars_chain) newchi2, newdet, _ = self.lh_function_gib(param_full) # Calculate acceptance probability C = (newdet / olddet[i]) * np.exp(newchi2 - oldchi2[i]) # Accept or reject the step based on Metropolis criterion if self.random_obj.rng.uniform() < C: temp = StructReturnExofast(1, newpars_chain, newchi2, newdet) else: temp = StructReturnExofast(0, oldpars[:, i], oldchi2[i], olddet[i]) arr_chains[i] = temp except Exception as e: # If likelihood calculation fails, reject all steps in this batch print(f"\n!!! Worker process {index_core} crashed in parallel_chain !!!") print(f"Process ID: {self.retrieval_data.id_process}, Core: {index_core}") print(f"Exception: {e}") print(traceback.format_exc()) # print("Rejecting all steps in this batch and continuing...\n") # # Fill remaining chains with rejected steps (keep old parameters) # for i in range(len(j)): # arr_chains[i] = StructReturnExofast(0, oldpars[:, i], oldchi2[i], olddet[i]) # Store result in shared dictionary try: return_dict[index_core] = arr_chains except Exception as e: print( f"Error on process {self.retrieval_data.id_process}, " f"Exception: {str(e)}" ) print(traceback.format_exc()) exit()
# def parallel_chain_pool_version(self, j, oldpars, oldchi2, olddet, pars_r1, pars_r2): # """ # Pool version of parallel_chain - returns result instead of writing to shared dict. # # Args: # j: Chain index # oldpars: Current parameter values for this chain # oldchi2: Current chi-square value for this chain # olddet: Current determinant value for this chain # pars_r1: Parameter values from random chain 1 # pars_r2: Parameter values from random chain 2 # # Returns: # tuple: (j, StructReturnExofast_result) # """ # # Calculate new parameter values using differential evolution formula # newpars_chain = ( # oldpars # + self.bestpars_data.gamma_coeff # * np.reshape(pars_r1 - pars_r2, (self.bestpars_data.nfit,)) # + self.random_obj.rng.standard_normal(self.bestpars_data.nfit) # * self.retrieval_data.scale_vector_params # / 100 # ) # # # Create full parameter object and evaluate likelihood # param_full = self.create_param_full(newpars_chain) # newchi2, newdet, _ = self.lh_function_gib(param_full) # # # Calculate acceptance probability # C = (newdet / olddet) * np.exp(newchi2 - oldchi2) # # # Accept or reject the step based on Metropolis criterion # if self.random_obj.rng.uniform() < C: # temp = StructReturnExofast(1, newpars_chain, newchi2, newdet) # else: # temp = StructReturnExofast(0, oldpars, oldchi2, olddet) # # # Return result instead of writing to shared dict # return j, temp # def run_multiple_processes_pool_version(self, old_pars, old_chi, old_det): # """ # Pool version that works EXACTLY like the original Manager version. # One process per chain, one core per chain. # # Args: # old_pars: Current parameter values for all chains # old_chi: Current chi-square values for all chains # old_det: Current determinant values for all chains # # Returns: # return_dict: Dictionary containing results from all processes # """ # nchains = self.bestpars_data.nchains # ncores = self.bestpars_data.ncores # # # Prepare arguments for each chain # chain_args = [] # for j in range(nchains): # # Select two random chains different from current chain for DE # while True: # r1 = self.random_obj.rng.integers(0, nchains, 1)[0] # if r1 != j: # break # # while True: # r2 = self.random_obj.rng.integers(0, nchains, 1)[0] # if r2 != j and r2 != r1: # break # # # Add arguments for this chain # chain_args.append(( # j, # old_pars[:, j], # old_chi[j], # old_det[j], # old_pars[:, r1], # old_pars[:, r2] # )) # # # Execute in parallel - ONE PROCESS PER CHAIN, exactly like Manager # try: # with multiprocessing.Pool(processes=ncores) as pool: # results = pool.starmap(self.parallel_chain_pool_version, chain_args) # # # Convert results back to dictionary format # return_dict = {} # for j, result in results: # return_dict[j] = result # # return return_dict # # except Exception as e: # print(f"Pool execution failed: {e}") # # Fallback to original Manager version # return self.run_multiple_processes_manager_version(old_pars, old_chi, old_det)
[docs] def run_multiple_processes_manager_version(self, old_pars, old_chi, old_det): """ Run MCMC chain steps in parallel using multiprocessing Manager. This method distributes chain steps across multiple processes using differential evolution MCMC, with a shared Manager dictionary for collecting results. Args: old_pars: Current parameter values for all chains, shape (nfit, nchains) old_chi: Current chi-square values for all chains, shape (nchains,) old_det: Current determinant values for all chains, shape (nchains,) Returns: return_dict: Manager dictionary mapping core index to array of StructReturnExofast results """ proc = [] manager = Manager() return_dict = manager.dict() chain_per_core = int(self.bestpars_data.nchains / self.bestpars_data.ncores) for i in range(self.bestpars_data.ncores): r1 = np.zeros(chain_per_core, dtype=int) r2 = np.zeros(chain_per_core, dtype=int) j = np.zeros(chain_per_core, dtype=int) for k in range(chain_per_core): # Select two random chains different from current chain for DE j[k] = int(i * chain_per_core + k) while True: r1[k] = self.random_obj.rng.integers(0, self.bestpars_data.nchains, 1) if r1[k] != j[k]: break while True: r2[k] = self.random_obj.rng.integers(0, self.bestpars_data.nchains, 1) if r2[k] != j[k]: break # Create and start process for this chain proc.append( multiprocessing.Process( target=self.parallel_chain, args=( i, j, return_dict, old_pars[:, j], old_chi[j], old_det[j], old_pars[:, r1], old_pars[:, r2], ), ) ) proc[i].start() # Wait for all processes to complete and clean up for i in range(self.bestpars_data.ncores): proc[i].join() proc[i].terminate() return return_dict
[docs] def run_multiple_processes(self, old_pars, old_chi, old_det, use_pool=False): """ Run MCMC chain steps in parallel using the Manager-based approach. Args: old_pars: Current parameter values for all chains, shape (nfit, nchains) old_chi: Current chi-square values for all chains, shape (nchains,) old_det: Current determinant values for all chains, shape (nchains,) use_pool: Currently unused; Manager approach is always used Returns: return_dict: Dictionary containing results from all processes """ # if use_pool: # return self.run_multiple_processes_pool_version(old_pars, old_chi, old_det) # else: return self.run_multiple_processes_manager_version(old_pars, old_chi, old_det)
# noinspection PyUnresolvedReferences
[docs] def create_param_full(self, newpars_chain): """ Create full parameter array from chain parameter values. This method reconstructs the complete parameter array from the reduced set of parameters used in the MCMC chain. It handles both single-value parameters and multi-value parameters (like jitter, f_rot arrays). The logic preserves the exact sequence of operations: 1. Single parameters get assigned directly 2. Multi-value parameters (jitter, f_rot, etc.) get collected into arrays Args: newpars_chain: Array of parameter values from MCMC chain Returns: param_full: Complete parameter array with all values assigned """ param_full = self.initial_param_array counter_newpars = 0 for i in range(len(param_full)): if param_full[i] is not None and param_full[i].status: # Handle single-value parameters (NOT f_rot or jitter arrays) if param_full[i].starting_value_variable is not None: param_full[i].value_in_retrieval = newpars_chain[counter_newpars] counter_newpars += 1 else: # Handle multi-value parameters (jitter, f_rot, etc.) # # self.list_multiple_param contains names of repeated parameters. # This section processes parameters that have multiple instances # (e.g., jitter1, jitter2, jitter3 -> jitter array). # # The logic: # - Check if current parameter name is in multiple_param list # - Collect all sequential values with same base name # - Create array: [1, [2,3,4], 5, [6,7], 8] for ["kp", "jitter", "omega", "f_rot", "H2O"] arr_same_param = [] for multiple_param in self.list_multiple_param: if ( multiple_param in self.bestpars_data.list_bestpars[counter_newpars] ): # Collect all consecutive parameters with same base name while ( multiple_param in self.bestpars_data.list_bestpars[counter_newpars] ): arr_same_param.append(newpars_chain[counter_newpars]) counter_newpars += 1 if counter_newpars >= self.bestpars_data.nfit: break # Assign collected array to parameter param_full[i].value_arr_in_retrieval = arr_same_param break return param_full