Source code for GUIBRUSHR.GUI.Input_Output_Panels.Output_Panels.PanelButtonRetrieval
"""
Panel for managing process control buttons in the GUI.
This panel provides functionality to kill processes, update process information,
and close the GUI application.
"""
import os
import signal
from tkinter import messagebox
from GUIBRUSHR.GUI.LAYOUT.MyPanel import MyPanel
from GUIBRUSHR.GUI.WIDGET.MyButton import MyButton
from GUIBRUSHR.GUI.WIDGET.MyLabel import MyLabel
from GUIBRUSHR.General_Constants.Classes.HelpButton import HelpButton
[docs]
class PanelButtonRetrieval(MyPanel):
"""
A panel containing buttons for process management operations.
This panel provides three main functionalities:
1. Kill selected process
2. Update process information
3. Close the GUI application
Attributes:
parent: The parent widget
button_kill: Button to terminate selected process
button_update: Button to refresh process information
close_all: Button to close the GUI
label: Label for displaying messages
global_exc: Global exception counter
"""
[docs]
def __init__(self, parent, color, row, column, **kwargs):
"""
Initialize the PanelButtonRetrieval.
Args:
parent: Parent widget
color: Background color for the panel
row: Row position in the grid
column: Column position in the grid
**kwargs: Additional keyword arguments for the panel
"""
super().__init__(parent, color, row, column, **kwargs)
self.parent = parent
# Help text dictionary
help_text = {
"Process Table": "The table displays all active atmospheric retrieval processes running in the background. Each row represents one retrieval run with the following columns: PID (Process ID), Target (exoplanet name), Instruments (spectroscopic instruments used), ID (unique run identifier), and Output (current status and log information from the retrieval process).",
"KILL SELECTED PROCESS": "Terminates the currently selected retrieval process from the table. When clicked, it prompts for confirmation, then kills both the main process and all its child processes using SIGKILL. This action is irreversible and will stop the MCMC sampling immediately. The process is then removed from the tracking lists and the table. Use this if a retrieval is stuck or needs to be canceled.",
"UPDATE PROCESSES": "Refreshes the process information displayed in the table. This button reads the latest output from each running retrieval's log file and updates the 'Output' column with the current status, including the latest log messages showing MCMC progress, convergence diagnostics, and any errors. Click this periodically to monitor retrieval progress without restarting the GUI.",
"CLOSE GUI": "Closes the entire GUIBRUSHR application window. After confirmation, all running retrieval processes will be gracefully terminated (SIGTERM), and force-killed (SIGKILL) if they don't stop within 0.5 seconds. This ensures no background processes are left running after the GUI exits. The cleanup process iterates through all tracked PIDs and terminates both parent and child processes."
}
# Create help button on the right side
self.help_button = HelpButton(
self, 0, 1,
"Process Management Help",
help_text,
columnspan=1
)
self.help_button.button.grid(rowspan=4) # Span across all button rows
# Initialize buttons and label
self.button_kill = MyButton(
self, 0, 0,
"KILL SELECTED\nPROCESS",
"#FF0000",
self.kill,
color_panel=color
)
self.button_update = MyButton(
self, 1, 0,
"UPDATE\nPROCESSES",
"#0000FF",
self.update,
fg="white",
color_panel=color
)
self.close_all = MyButton(
self, 2, 0,
"CLOSE GUI",
"#000000",
self.parent.parent.on_closing,
fg="white",
color_panel=color
)
self.label = MyLabel(self, 3, 0, color, " ")
self.global_exc = 1
[docs]
def kill(self):
"""
Kill the selected process and update the process lists.
This method:
1. Prompts for confirmation
2. Kills the selected process and its children
3. Updates the process tracking lists
4. Removes the process from the table
"""
if messagebox.askokcancel(
"Delete process",
"Do you want to delete the selected process?"
):
curItem = self.parent.panel_table.table.focus()
item = self.parent.panel_table.table.item(curItem)
arr_values = item.get("values")
pid = arr_values[0]
id_p = arr_values[3]
try:
# Kill child processes first, then the main process
os.system("pkill -P " + str(pid))
os.kill(pid, signal.SIGKILL)
print(f"Process {id_p} killed by os")
except Exception as _:
print(f"Process {id_p} already killed")
try:
# Update process tracking lists
self._update_process_lists(id_p)
# Remove process from table
self.parent.panel_table.table.delete(id_p)
except Exception as _:
print("Error updating process information")
def _update_process_lists(self, id_p):
"""
Update the process tracking lists by removing the specified process ID.
Args:
id_p: The process ID to remove from tracking
"""
self.parent.parent.frame_input_master.frame_parameter.panel_start.list_pid = {
i: self.parent.parent.frame_input_master.frame_parameter.panel_start.list_pid[i]
for i in self.parent.parent.frame_input_master.frame_parameter.panel_start.list_pid
if i != id_p
}
self.parent.parent.frame_input_master.frame_parameter.panel_start.list_file = {
i: self.parent.parent.frame_input_master.frame_parameter.panel_start.list_file[i]
for i in self.parent.parent.frame_input_master.frame_parameter.panel_start.list_file
if i != id_p
}
[docs]
def update(self):
"""
Update the process information in the table.
This method:
1. Iterates through all tracked processes
2. Reads the process output file if it exists
3. Updates the table with current process information
"""
for elem in self.parent.parent.frame_input_master.frame_parameter.panel_start.list_pid:
item = self.parent.panel_table.table.item(elem)
arr_values = item.get("values")
# Read process output file
output = self._read_process_output(elem)
# Update table with current information
self.parent.panel_table.table.item(
elem,
text="",
values=(
arr_values[0],
arr_values[1],
arr_values[2],
arr_values[3],
output,
),
)
def _read_process_output(self, elem):
"""
Read the output file for a specific process.
Args:
elem: Process identifier
Returns:
str: Process output or "File not created" if file doesn't exist
"""
file_path = self.parent.parent.frame_input_master.frame_parameter.panel_start.list_file[elem]
if os.path.exists(file_path):
with open(file_path, "r") as f:
return f.read()
return "File not created"