# cython: language_level=3
import os
import pickle
import logging
from pathlib import Path
from typing import Optional, Union
import numpy as np
from scipy.stats import stats
from GUIBRUSHR.General_Constants.FunctionsAndConstants.general_functions import read_fits
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
[docs]
class NightError(Exception):
"""Base exception class for Night-related errors."""
pass
[docs]
class FileNotFoundErrorPers(NightError):
"""Raised when a required file is not found."""
pass
[docs]
class DataValidationError(NightError):
"""Raised when data validation fails."""
pass
[docs]
def sigma_calc(spectra, simulation=False):
"""Calculate sigma values for spectral data."""
std_spectral_channel = np.std(spectra, axis=2, ddof=1)
std_spectr = np.std(spectra, axis=0, ddof=1).T
std_spect_norm_factor = (
std_spectr / np.mean(std_spectr, axis=0)
).T # Normalize to the mean value so that spectra with standard deviation greater than the mean will have increased error
sigma_spectra = (
std_spectral_channel[:, :, np.newaxis] * std_spect_norm_factor[np.newaxis, :, :]
)
err_substitute = 0 if simulation else 1
sigma_spectra[sigma_spectra <= 1e-5] = err_substitute
return sigma_spectra
[docs]
class Night:
"""
A class to manage and process astronomical night observation data.
This class handles the processing of spectroscopic data, including phase calculations,
spectral order management, and various data transformations.
Attributes:
intransit (NDArray): Indices of spectra within transit
maskinvm (NDArray): Mask inversion data
wlen_list_overplot (NDArray): Wavelength list for overplotting
sigma_spectra (NDArray): Spectral sigma values
lambdas (NDArray): Wavelength data
spectra (NDArray): Spectral data
n_good_orders (int): Number of spectral orders
n_pixels (int): Number of pixels
n_phases_considered (int): Number of in-transit phases considered
good_order (NDArray): Good order indices
n_tot_phases (int): Total number of phases
barycentric_velocity (NDArray): Barycentric velocity corrections
phases (NDArray): Phase values
"""
[docs]
def __init__(
self,
chosen_limit_phase: float,
path_instrument: Union[str, Path],
date: str,
rad_mode: str,
path_good_order: Union[str, Path],
path_order_tell: Union[str, Path],
path_order_test: Union[str, Path],
table_output_file: Optional[Union[str, Path]] = None,
CC: bool = False,
simulation: bool = False,
synthetic: bool = False,
retrieval_standardize: str = "From pkl",
) -> None:
"""
Initialize the Night object with observation parameters.
Args:
chosen_limit_phase: Phase limit for transit
path_instrument: Path to instrument data
date: Observation date
rad_mode: Radiation mode
path_good_order: Path to good order data
path_order_tell: Path to telluric order data
path_order_test: Path to test order data
table_output_file: Optional path for output table
CC: Cross-correlation flag
simulation: If True, simulate data
synthetic: If True, synthetic data
retrieval_standardize: Standardize option for retrieval data
Raises:
NightError: If initialization of any data processing step fails
"""
# Convert paths to Path objects
self.mean_aligned = None
self.average_spectrum_to_be_added = None
self.apply_standardize = None
self.nfc = None
self.array_reconstructed_from_PCA_components = None
self.real_error_matrix = None
self.snr_total = None
self.mask_error_pixels = None
self.aligned = None
self.no_intransit = None
self.intransit_real = None
self.count_in_transit_real = None
self.subtraction_of_the_average_spectrum = None
self.path_instrument = Path(path_instrument)
self.path_good_order = Path(path_good_order)
self.path_order_tell = Path(path_order_tell)
self.path_order_test = Path(path_order_test)
self.table_output_file = Path(table_output_file) if table_output_file else None
# Initialize instance variables
self.date = date
self.rad_mode = rad_mode
self.retrieval_standardize = retrieval_standardize
self.intransit = None
self.maskinvm = None
#
self.good_order = None
self.n_good_orders = None
self.n_orders_tot = None
self.n_pixels = None
#
self.spectra_full = None
self.complete_wavelength = None
self.lambdas = None
self.spectra = None
self.wlen_list_overplot = None
self.sigma_spectra = None
self.sigma_spectra_full = None
#
self.n_tot_phases = None
self.n_phases_considered = None
self.phases = None
self.phases_considered = None
self.phase_new_range = None
self.phase_new_range_considered = None
self.barycentric_velocity = None
self.barycentric_velocity_considered = None
self.hjd_ck = None
self.hjd_ck_considered = None
#
self.smooth_on = None
self.smooth_size = None
# Initialize data processing
try:
self.smooth_management(synthetic)
self.fase_operation(chosen_limit_phase, CC)
self.good_order_operation(synthetic)
self.management_spAbi(synthetic)
self.read_maskinv(synthetic)
self.read_wlen_sp(synthetic)
self.load_aligned_and_spAb_pca(synthetic)
self.sigma_spectra_calc(simulation, synthetic)
except Exception as e:
raise NightError(f"Failed to initialize Night object: {str(e)}") from e
[docs]
def load_aligned_and_spAb_pca(self, synthetic) -> None:
"""Load additional data files required for processing."""
try:
self.aligned = np.transpose(read_fits(str(self.path_instrument / self.date / "aligned.fits"), transpose=True), axes=(0, 2, 1))
path_snr = str(self.path_instrument / self.date / "snr_total.fits")
if os.path.exists(path_snr):
self.snr_total = np.transpose(read_fits(path_snr, transpose=True), axes=(0, 2, 1))
# Create mask for snr_total
# True = masked (to exclude), False = valid
# Simple sigma clipping
clipped_data, lower_bound, upper_bound = stats.sigmaclip(self.snr_total, low=3, high=3)
# Get the mask
self.mask_error_pixels = (self.snr_total < 1) | (self.snr_total > upper_bound)
# Apply mask to snr_total
self.snr_total = np.ma.masked_array(self.snr_total, mask=self.mask_error_pixels)
# this value is used to create a more or less realistic value for the signal.
# In principle I could have used S = 1 in R = S / SNR, the important thing is to use S the same everywhere
self.mean_aligned = np.mean(self.aligned)
# Perform division only on unmasked pixels
self.real_error_matrix = (self.mean_aligned / self.snr_total).filled(0)
else:
self.snr_total = None
self.real_error_matrix = None
self.n_pixels, self.n_orders_tot, _ = self.aligned.shape
if not synthetic:
# Get matrix reconstructed containing hypothetical telluric lines, stellar lines,
# instruments systematic, but no the planetary signal
path_pca = self.path_order_test / "array_reconstructed_from_PCA_components.fits"
# old compatibility TODO remove in future
if not path_pca.exists():
path_pca_old = self.path_order_test / "spectrum_AB_PCA.fits"
if path_pca_old.exists():
os.system(f"mv {path_pca_old} {path_pca}")
else: # Very old compatibility TODO remove in future
path_pca_very_old = self.path_order_test / "spAb_pca.fits"
os.system(f"mv {path_pca_very_old} {path_pca}")
#
self.array_reconstructed_from_PCA_components = np.transpose(
read_fits(str(path_pca), transpose=True),
(0, 2, 1)
)
self.nfc = read_fits(str(self.path_order_test / "nfc.fits"), transpose=True)
except Exception as e:
raise FileNotFoundErrorPers(f"Failed to load additional data: {str(e)}") from e
[docs]
def fase_operation(self, chosen_limit_phase: float, CC: bool) -> None:
"""
Process phase data and determine transit status.
Args:
chosen_limit_phase: Phase limit for transit determination
CC: Cross-correlation flag
Raises:
FileNotFoundErrorPers: If phase file is not found
"""
path_phase = self.path_instrument / self.date / "phase.pkl"
try:
if path_phase.exists():
with open(path_phase, "rb") as file_obj:
phase_data = pickle.load(file_obj)
corr = phase_data["barycentric_velocity"]
fase_fase = phase_data["transit_phase"]
try:
hjd_ck = phase_data["hjd"]
except KeyError:
logger.warning("hjd not found in phase data, using default ones array")
hjd_ck = np.ones_like(fase_fase)
try:
# For backward compatibility, also check for BJD_TDB if needed
_ = phase_data["BJD_TDB"]
except KeyError:
pass # BJD_TDB not used in this function
else:
path_fase = self.path_instrument / self.date / "fase.pkl"
with open(path_fase, "rb") as file_obj:
corr = pickle.load(file_obj)
fase_fase = pickle.load(file_obj)
try:
hjd_ck = pickle.load(file_obj)
except EOFError:
logger.warning("hjd_ck not found, using default ones array")
hjd_ck = np.ones_like(fase_fase)
try:
bjd = pickle.load(file_obj)
except EOFError:
logger.warning("BJD not found, using default ones array")
bjd = np.ones_like(fase_fase)
phase_dictionary = {
"barycentric_velocity": corr,
"transit_phase": fase_fase,
"hjd": hjd_ck,
"BJD_TDB": bjd
}
with open(path_phase, "wb") as fof:
pickle.dump(phase_dictionary, fof)
# with open(path_phase, "rb") as file_obj:
# phase_data = pickle.load(file_obj)
# corr = phase_data.get("barycentric_velocity", np.array([]))
# fase_fase = phase_data.get("transit_phase", np.array([]))
# hjd_ck = phase_data.get("hjd", np.ones_like(fase_fase))
# _ = phase_data.get("BJD_TDB", np.ones_like(fase_fase))
except FileNotFoundErrorPers:
raise FileNotFoundErrorPers(f"Phase file not found: {path_phase}")
self.n_tot_phases = fase_fase.size
# Handle negative phases
if np.any(fase_fase < 0):
fase_fase[fase_fase < 0] += 1
logger.warning("Negative phase values detected and corrected")
if self.table_output_file:
self.table_output_file.write_text("WARNING: Negative phase values were corrected")
self.phases = np.array(fase_fase)
if self.rad_mode == "Transmission":
self.phase_new_range = np.where(self.phases > 0.5, self.phases - 1, self.phases)
else:
self.phase_new_range = self.phases
self.barycentric_velocity = np.array(corr)
self.hjd_ck = np.array(hjd_ck)
self.intransit_real = np.where(
(self.phases < chosen_limit_phase) | (self.phases > (1.0 - chosen_limit_phase))
)[0]
self.count_in_transit_real = len(self.intransit_real)
# Determine transit status
if self.rad_mode == "Transmission" and not CC:
self.intransit = np.array(self.intransit_real, dtype=int)
self.no_intransit = [i for i in range(len(self.phases)) if i not in self.intransit]
else:
self.intransit = np.arange(len(self.phases), dtype=int)
self.no_intransit = np.array([], dtype=int)
# Update considered values
self._update_considered_values()
def _update_considered_values(self) -> None:
"""Update all 'considered' attributes based on intransit indices."""
self.phases_considered = self.phases[self.intransit]
self.phase_new_range_considered = self.phase_new_range[self.intransit]
self.barycentric_velocity_considered = self.barycentric_velocity[self.intransit]
self.hjd_ck_considered = self.hjd_ck[self.intransit]
self.n_phases_considered = len(self.intransit)
[docs]
def good_order_operation(self, synthetic) -> None:
"""
Load good order data from pickle file.
Args:
synthetic: If True, skip loading good order data.
Raises:
FileNotFoundErrorPers: If good order file is not found
"""
if synthetic:
return
try:
with open(self.path_good_order, "rb") as file_obj:
self.good_order = pickle.load(file_obj)
self.n_good_orders = len(self.good_order)
except FileNotFoundErrorPers:
raise FileNotFoundErrorPers(f"Good order file not found: {self.path_good_order}")
[docs]
def management_spAbi(self, synthetic) -> None:
"""
Process spectral data and set dimensional attributes.
Args:
synthetic: If True, skip loading spectral data.
Raises:
FileNotFoundErrorPers: If spectral data file is not found
"""
if synthetic:
return
try:
path_signal_residuals_after_PCA_subtraction = str(self.path_order_tell / "signal_residuals_after_PCA_subtraction.fits")
# Old compatibility TODO remove in future
if not os.path.exists(path_signal_residuals_after_PCA_subtraction):
old_path = str(self.path_order_tell / "spAbest_pca.fits")
os.system(f"mv {old_path} {path_signal_residuals_after_PCA_subtraction}")
#
spAbi = read_fits(
path_signal_residuals_after_PCA_subtraction,
transpose=True,
intransit=self.intransit,
)
spAbi_full = read_fits(
path_signal_residuals_after_PCA_subtraction,
transpose=True,
)
self.spectra = np.transpose(spAbi, (0, 2, 1))
self.spectra_full = np.transpose(spAbi_full, (0, 2, 1))
except Exception as e:
raise FileNotFoundErrorPers(f"Failed to load spectral data: {str(e)}") from e
[docs]
def read_maskinv(self, synthetic) -> None:
"""
Load and process mask inversion data.
Args:
synthetic: If True, skip loading mask inversion data.
Raises:
FileNotFoundErrorPers: If mask inversion file is not found
"""
if synthetic:
return
path_complete = self.path_order_tell / "mask_inv_complete.pkl"
if not path_complete.exists():
os.system(
f"mv {self.path_order_tell / 'maskinvm.pkl'} {path_complete}"
)
os.system(f"rm -f {self.path_order_tell / 'maskinvm.pkl'}")
try:
with path_complete.open("rb") as f:
maskinvm = np.array(pickle.load(f), dtype=bool)
except FileNotFoundErrorPers:
raise FileNotFoundErrorPers(f"Mask inversion file not found: {path_complete}")
# Process mask data
maskinvmf = np.tile(maskinvm[:, np.newaxis, :], (1, self.n_tot_phases, 1))
if self.intransit is not None:
maskinvmf = maskinvmf[:, self.intransit, :]
self.maskinvm = np.transpose(maskinvmf, (0, 2, 1))
[docs]
def read_wlen_sp(self, synthetic) -> None:
"""
Read and process wavelength data.
Args:
synthetic: If True, skip some loading wavelength data.
Raises:
FileNotFoundErrorPers: If wavelength file is not found
"""
try:
wlenspf = read_fits(
str(self.path_instrument / self.date / "wlen_refined.fits"),
transpose=True,
)
if np.ndim(wlenspf) == 2:
if not synthetic:
self.wlen_list_overplot = wlenspf[self.good_order, :]
wlenspfi = np.tile(wlenspf[:, np.newaxis, :], (1, self.n_phases_considered, 1))
self.complete_wavelength = np.array(
np.tile(wlenspf[:, :, np.newaxis], (1, 1, self.n_tot_phases))
)
else:
wlenspfi = np.array(wlenspf)
if not synthetic:
self.wlen_list_overplot = wlenspfi[self.good_order, 0, :]
self.complete_wavelength = np.transpose(wlenspf, (0, 2, 1))
if not synthetic:
wlenspfi = np.array(wlenspfi[:, :, self.good_order])
self.lambdas = np.transpose(wlenspfi, (0, 2, 1))
except Exception as e:
raise FileNotFoundErrorPers(f"Failed to load wavelength data: {str(e)}") from e
[docs]
def sigma_spectra_calc(self, simulation=False, synthetic=False):
"""
Calculate sigma (uncertainty) arrays for the spectral data.
Args:
simulation: If True, use simulation-specific error handling.
synthetic: If True, skip calculation entirely.
"""
if synthetic:
return
self.sigma_spectra = sigma_calc(self.spectra, simulation)
self.sigma_spectra_full = sigma_calc(self.spectra_full, simulation)
[docs]
def smooth_management(self, synthetic) -> None:
"""
Load smoothing parameters from either smooth_file.pkl or temp_params_tell.pkl.
Args:
synthetic: If True, skip loading smoothing parameters.
Raises:
FileNotFoundErrorPers: If smoothing parameter file is not found
"""
if synthetic:
self.pca_mode = "spatial" # default value for synthetic data
return
smooth_file = self.path_order_tell / "smooth_file.pkl"
try:
if smooth_file.exists():
with open(smooth_file, "rb") as fo:
smooth_data = pickle.load(fo)
#
self.smooth_on = smooth_data.get("smooth_on", False)
self.smooth_size = smooth_data.get("smooth_size", 0)
if "subtraction_of_the_average_spectrum_checkbox" in smooth_data.keys():
self.subtraction_of_the_average_spectrum = smooth_data.get("subtraction_of_the_average_spectrum_checkbox", False)
else: # old compatibility # TODO remove in future
self.subtraction_of_the_average_spectrum = smooth_data.get("meancol_checkbox", False)
apply_standardize_pkl = smooth_data.get("apply_standardize", False)
self.average_spectrum_to_be_added = smooth_data.get("average_spectrum_to_be_added", None)
# PCA mode: "spatial" (loop over orders) or "temporal" (loop over spectral channels)
self.pca_mode = smooth_data.get("pca_mode", "spatial")
#
if self.average_spectrum_to_be_added is None: # old compatibility # TODO remove in future
self.subtraction_of_the_average_spectrum = False
else:
raise FileNotFoundErrorPers(f"No smoothing parameter {smooth_file} files found")
if (apply_standardize_pkl and self.retrieval_standardize == "From pkl") or (self.retrieval_standardize == "True"):
self.apply_standardize = True
else:
self.apply_standardize = False
except Exception as e:
raise FileNotFoundErrorPers(f"Failed to load {smooth_file} smoothing parameters: {str(e)}") from e