Source code for GUIBRUSHR.General_Constants.Classes.Night

# cython: language_level=3
import os
import pickle
import logging
from pathlib import Path
from typing import Optional, Union
import numpy as np
from scipy.stats import stats

from GUIBRUSHR.General_Constants.FunctionsAndConstants.general_functions import read_fits

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


[docs] class NightError(Exception): """Base exception class for Night-related errors.""" pass
[docs] class FileNotFoundErrorPers(NightError): """Raised when a required file is not found.""" pass
[docs] class DataValidationError(NightError): """Raised when data validation fails.""" pass
[docs] def sigma_calc(spectra, simulation=False): """Calculate sigma values for spectral data.""" std_spectral_channel = np.std(spectra, axis=2, ddof=1) std_spectr = np.std(spectra, axis=0, ddof=1).T std_spect_norm_factor = ( std_spectr / np.mean(std_spectr, axis=0) ).T # Normalize to the mean value so that spectra with standard deviation greater than the mean will have increased error sigma_spectra = ( std_spectral_channel[:, :, np.newaxis] * std_spect_norm_factor[np.newaxis, :, :] ) err_substitute = 0 if simulation else 1 sigma_spectra[sigma_spectra <= 1e-5] = err_substitute return sigma_spectra
[docs] class Night: """ A class to manage and process astronomical night observation data. This class handles the processing of spectroscopic data, including phase calculations, spectral order management, and various data transformations. Attributes: intransit (NDArray): Indices of spectra within transit maskinvm (NDArray): Mask inversion data wlen_list_overplot (NDArray): Wavelength list for overplotting sigma_spectra (NDArray): Spectral sigma values lambdas (NDArray): Wavelength data spectra (NDArray): Spectral data n_good_orders (int): Number of spectral orders n_pixels (int): Number of pixels n_phases_considered (int): Number of in-transit phases considered good_order (NDArray): Good order indices n_tot_phases (int): Total number of phases barycentric_velocity (NDArray): Barycentric velocity corrections phases (NDArray): Phase values """
[docs] def __init__( self, chosen_limit_phase: float, path_instrument: Union[str, Path], date: str, rad_mode: str, path_good_order: Union[str, Path], path_order_tell: Union[str, Path], path_order_test: Union[str, Path], table_output_file: Optional[Union[str, Path]] = None, CC: bool = False, simulation: bool = False, synthetic: bool = False, retrieval_standardize: str = "From pkl", ) -> None: """ Initialize the Night object with observation parameters. Args: chosen_limit_phase: Phase limit for transit path_instrument: Path to instrument data date: Observation date rad_mode: Radiation mode path_good_order: Path to good order data path_order_tell: Path to telluric order data path_order_test: Path to test order data table_output_file: Optional path for output table CC: Cross-correlation flag simulation: If True, simulate data synthetic: If True, synthetic data retrieval_standardize: Standardize option for retrieval data Raises: NightError: If initialization of any data processing step fails """ # Convert paths to Path objects self.mean_aligned = None self.average_spectrum_to_be_added = None self.apply_standardize = None self.nfc = None self.array_reconstructed_from_PCA_components = None self.real_error_matrix = None self.snr_total = None self.mask_error_pixels = None self.aligned = None self.no_intransit = None self.intransit_real = None self.count_in_transit_real = None self.subtraction_of_the_average_spectrum = None self.path_instrument = Path(path_instrument) self.path_good_order = Path(path_good_order) self.path_order_tell = Path(path_order_tell) self.path_order_test = Path(path_order_test) self.table_output_file = Path(table_output_file) if table_output_file else None # Initialize instance variables self.date = date self.rad_mode = rad_mode self.retrieval_standardize = retrieval_standardize self.intransit = None self.maskinvm = None # self.good_order = None self.n_good_orders = None self.n_orders_tot = None self.n_pixels = None # self.spectra_full = None self.complete_wavelength = None self.lambdas = None self.spectra = None self.wlen_list_overplot = None self.sigma_spectra = None self.sigma_spectra_full = None # self.n_tot_phases = None self.n_phases_considered = None self.phases = None self.phases_considered = None self.phase_new_range = None self.phase_new_range_considered = None self.barycentric_velocity = None self.barycentric_velocity_considered = None self.hjd_ck = None self.hjd_ck_considered = None # self.smooth_on = None self.smooth_size = None # Initialize data processing try: self.smooth_management(synthetic) self.fase_operation(chosen_limit_phase, CC) self.good_order_operation(synthetic) self.management_spAbi(synthetic) self.read_maskinv(synthetic) self.read_wlen_sp(synthetic) self.load_aligned_and_spAb_pca(synthetic) self.sigma_spectra_calc(simulation, synthetic) except Exception as e: raise NightError(f"Failed to initialize Night object: {str(e)}") from e
[docs] def load_aligned_and_spAb_pca(self, synthetic) -> None: """Load additional data files required for processing.""" try: self.aligned = np.transpose(read_fits(str(self.path_instrument / self.date / "aligned.fits"), transpose=True), axes=(0, 2, 1)) path_snr = str(self.path_instrument / self.date / "snr_total.fits") if os.path.exists(path_snr): self.snr_total = np.transpose(read_fits(path_snr, transpose=True), axes=(0, 2, 1)) # Create mask for snr_total # True = masked (to exclude), False = valid # Simple sigma clipping clipped_data, lower_bound, upper_bound = stats.sigmaclip(self.snr_total, low=3, high=3) # Get the mask self.mask_error_pixels = (self.snr_total < 1) | (self.snr_total > upper_bound) # Apply mask to snr_total self.snr_total = np.ma.masked_array(self.snr_total, mask=self.mask_error_pixels) # this value is used to create a more or less realistic value for the signal. # In principle I could have used S = 1 in R = S / SNR, the important thing is to use S the same everywhere self.mean_aligned = np.mean(self.aligned) # Perform division only on unmasked pixels self.real_error_matrix = (self.mean_aligned / self.snr_total).filled(0) else: self.snr_total = None self.real_error_matrix = None self.n_pixels, self.n_orders_tot, _ = self.aligned.shape if not synthetic: # Get matrix reconstructed containing hypothetical telluric lines, stellar lines, # instruments systematic, but no the planetary signal path_pca = self.path_order_test / "array_reconstructed_from_PCA_components.fits" # old compatibility TODO remove in future if not path_pca.exists(): path_pca_old = self.path_order_test / "spectrum_AB_PCA.fits" if path_pca_old.exists(): os.system(f"mv {path_pca_old} {path_pca}") else: # Very old compatibility TODO remove in future path_pca_very_old = self.path_order_test / "spAb_pca.fits" os.system(f"mv {path_pca_very_old} {path_pca}") # self.array_reconstructed_from_PCA_components = np.transpose( read_fits(str(path_pca), transpose=True), (0, 2, 1) ) self.nfc = read_fits(str(self.path_order_test / "nfc.fits"), transpose=True) except Exception as e: raise FileNotFoundErrorPers(f"Failed to load additional data: {str(e)}") from e
[docs] def fase_operation(self, chosen_limit_phase: float, CC: bool) -> None: """ Process phase data and determine transit status. Args: chosen_limit_phase: Phase limit for transit determination CC: Cross-correlation flag Raises: FileNotFoundErrorPers: If phase file is not found """ path_phase = self.path_instrument / self.date / "phase.pkl" try: if path_phase.exists(): with open(path_phase, "rb") as file_obj: phase_data = pickle.load(file_obj) corr = phase_data["barycentric_velocity"] fase_fase = phase_data["transit_phase"] try: hjd_ck = phase_data["hjd"] except KeyError: logger.warning("hjd not found in phase data, using default ones array") hjd_ck = np.ones_like(fase_fase) try: # For backward compatibility, also check for BJD_TDB if needed _ = phase_data["BJD_TDB"] except KeyError: pass # BJD_TDB not used in this function else: path_fase = self.path_instrument / self.date / "fase.pkl" with open(path_fase, "rb") as file_obj: corr = pickle.load(file_obj) fase_fase = pickle.load(file_obj) try: hjd_ck = pickle.load(file_obj) except EOFError: logger.warning("hjd_ck not found, using default ones array") hjd_ck = np.ones_like(fase_fase) try: bjd = pickle.load(file_obj) except EOFError: logger.warning("BJD not found, using default ones array") bjd = np.ones_like(fase_fase) phase_dictionary = { "barycentric_velocity": corr, "transit_phase": fase_fase, "hjd": hjd_ck, "BJD_TDB": bjd } with open(path_phase, "wb") as fof: pickle.dump(phase_dictionary, fof) # with open(path_phase, "rb") as file_obj: # phase_data = pickle.load(file_obj) # corr = phase_data.get("barycentric_velocity", np.array([])) # fase_fase = phase_data.get("transit_phase", np.array([])) # hjd_ck = phase_data.get("hjd", np.ones_like(fase_fase)) # _ = phase_data.get("BJD_TDB", np.ones_like(fase_fase)) except FileNotFoundErrorPers: raise FileNotFoundErrorPers(f"Phase file not found: {path_phase}") self.n_tot_phases = fase_fase.size # Handle negative phases if np.any(fase_fase < 0): fase_fase[fase_fase < 0] += 1 logger.warning("Negative phase values detected and corrected") if self.table_output_file: self.table_output_file.write_text("WARNING: Negative phase values were corrected") self.phases = np.array(fase_fase) if self.rad_mode == "Transmission": self.phase_new_range = np.where(self.phases > 0.5, self.phases - 1, self.phases) else: self.phase_new_range = self.phases self.barycentric_velocity = np.array(corr) self.hjd_ck = np.array(hjd_ck) self.intransit_real = np.where( (self.phases < chosen_limit_phase) | (self.phases > (1.0 - chosen_limit_phase)) )[0] self.count_in_transit_real = len(self.intransit_real) # Determine transit status if self.rad_mode == "Transmission" and not CC: self.intransit = np.array(self.intransit_real, dtype=int) self.no_intransit = [i for i in range(len(self.phases)) if i not in self.intransit] else: self.intransit = np.arange(len(self.phases), dtype=int) self.no_intransit = np.array([], dtype=int) # Update considered values self._update_considered_values()
def _update_considered_values(self) -> None: """Update all 'considered' attributes based on intransit indices.""" self.phases_considered = self.phases[self.intransit] self.phase_new_range_considered = self.phase_new_range[self.intransit] self.barycentric_velocity_considered = self.barycentric_velocity[self.intransit] self.hjd_ck_considered = self.hjd_ck[self.intransit] self.n_phases_considered = len(self.intransit)
[docs] def good_order_operation(self, synthetic) -> None: """ Load good order data from pickle file. Args: synthetic: If True, skip loading good order data. Raises: FileNotFoundErrorPers: If good order file is not found """ if synthetic: return try: with open(self.path_good_order, "rb") as file_obj: self.good_order = pickle.load(file_obj) self.n_good_orders = len(self.good_order) except FileNotFoundErrorPers: raise FileNotFoundErrorPers(f"Good order file not found: {self.path_good_order}")
[docs] def management_spAbi(self, synthetic) -> None: """ Process spectral data and set dimensional attributes. Args: synthetic: If True, skip loading spectral data. Raises: FileNotFoundErrorPers: If spectral data file is not found """ if synthetic: return try: path_signal_residuals_after_PCA_subtraction = str(self.path_order_tell / "signal_residuals_after_PCA_subtraction.fits") # Old compatibility TODO remove in future if not os.path.exists(path_signal_residuals_after_PCA_subtraction): old_path = str(self.path_order_tell / "spAbest_pca.fits") os.system(f"mv {old_path} {path_signal_residuals_after_PCA_subtraction}") # spAbi = read_fits( path_signal_residuals_after_PCA_subtraction, transpose=True, intransit=self.intransit, ) spAbi_full = read_fits( path_signal_residuals_after_PCA_subtraction, transpose=True, ) self.spectra = np.transpose(spAbi, (0, 2, 1)) self.spectra_full = np.transpose(spAbi_full, (0, 2, 1)) except Exception as e: raise FileNotFoundErrorPers(f"Failed to load spectral data: {str(e)}") from e
[docs] def read_maskinv(self, synthetic) -> None: """ Load and process mask inversion data. Args: synthetic: If True, skip loading mask inversion data. Raises: FileNotFoundErrorPers: If mask inversion file is not found """ if synthetic: return path_complete = self.path_order_tell / "mask_inv_complete.pkl" if not path_complete.exists(): os.system( f"mv {self.path_order_tell / 'maskinvm.pkl'} {path_complete}" ) os.system(f"rm -f {self.path_order_tell / 'maskinvm.pkl'}") try: with path_complete.open("rb") as f: maskinvm = np.array(pickle.load(f), dtype=bool) except FileNotFoundErrorPers: raise FileNotFoundErrorPers(f"Mask inversion file not found: {path_complete}") # Process mask data maskinvmf = np.tile(maskinvm[:, np.newaxis, :], (1, self.n_tot_phases, 1)) if self.intransit is not None: maskinvmf = maskinvmf[:, self.intransit, :] self.maskinvm = np.transpose(maskinvmf, (0, 2, 1))
[docs] def read_wlen_sp(self, synthetic) -> None: """ Read and process wavelength data. Args: synthetic: If True, skip some loading wavelength data. Raises: FileNotFoundErrorPers: If wavelength file is not found """ try: wlenspf = read_fits( str(self.path_instrument / self.date / "wlen_refined.fits"), transpose=True, ) if np.ndim(wlenspf) == 2: if not synthetic: self.wlen_list_overplot = wlenspf[self.good_order, :] wlenspfi = np.tile(wlenspf[:, np.newaxis, :], (1, self.n_phases_considered, 1)) self.complete_wavelength = np.array( np.tile(wlenspf[:, :, np.newaxis], (1, 1, self.n_tot_phases)) ) else: wlenspfi = np.array(wlenspf) if not synthetic: self.wlen_list_overplot = wlenspfi[self.good_order, 0, :] self.complete_wavelength = np.transpose(wlenspf, (0, 2, 1)) if not synthetic: wlenspfi = np.array(wlenspfi[:, :, self.good_order]) self.lambdas = np.transpose(wlenspfi, (0, 2, 1)) except Exception as e: raise FileNotFoundErrorPers(f"Failed to load wavelength data: {str(e)}") from e
[docs] def sigma_spectra_calc(self, simulation=False, synthetic=False): """ Calculate sigma (uncertainty) arrays for the spectral data. Args: simulation: If True, use simulation-specific error handling. synthetic: If True, skip calculation entirely. """ if synthetic: return self.sigma_spectra = sigma_calc(self.spectra, simulation) self.sigma_spectra_full = sigma_calc(self.spectra_full, simulation)
[docs] def smooth_management(self, synthetic) -> None: """ Load smoothing parameters from either smooth_file.pkl or temp_params_tell.pkl. Args: synthetic: If True, skip loading smoothing parameters. Raises: FileNotFoundErrorPers: If smoothing parameter file is not found """ if synthetic: self.pca_mode = "spatial" # default value for synthetic data return smooth_file = self.path_order_tell / "smooth_file.pkl" try: if smooth_file.exists(): with open(smooth_file, "rb") as fo: smooth_data = pickle.load(fo) # self.smooth_on = smooth_data.get("smooth_on", False) self.smooth_size = smooth_data.get("smooth_size", 0) if "subtraction_of_the_average_spectrum_checkbox" in smooth_data.keys(): self.subtraction_of_the_average_spectrum = smooth_data.get("subtraction_of_the_average_spectrum_checkbox", False) else: # old compatibility # TODO remove in future self.subtraction_of_the_average_spectrum = smooth_data.get("meancol_checkbox", False) apply_standardize_pkl = smooth_data.get("apply_standardize", False) self.average_spectrum_to_be_added = smooth_data.get("average_spectrum_to_be_added", None) # PCA mode: "spatial" (loop over orders) or "temporal" (loop over spectral channels) self.pca_mode = smooth_data.get("pca_mode", "spatial") # if self.average_spectrum_to_be_added is None: # old compatibility # TODO remove in future self.subtraction_of_the_average_spectrum = False else: raise FileNotFoundErrorPers(f"No smoothing parameter {smooth_file} files found") if (apply_standardize_pkl and self.retrieval_standardize == "From pkl") or (self.retrieval_standardize == "True"): self.apply_standardize = True else: self.apply_standardize = False except Exception as e: raise FileNotFoundErrorPers(f"Failed to load {smooth_file} smoothing parameters: {str(e)}") from e