Skip to content

API Reference

Complete API documentation for GOLIAT, organized by module category.

Core Modules

Core functionality for configuration, logging, and utilities.

Antenna

goliat.antenna.Antenna

Antenna(config: Config, frequency_mhz: int)

Manages antenna-specific properties and configurations.

Parameters:

Name Type Description Default
config Config

The configuration object containing antenna settings.

required
frequency_mhz int

The operating frequency in MHz.

required
Source code in goliat/antenna.py
def __init__(self, config: "Config", frequency_mhz: int):
    """Initializes the Antenna object.

    Args:
        config: The configuration object containing antenna settings.
        frequency_mhz: The operating frequency in MHz.
    """
    self.config = config
    self.frequency_mhz = frequency_mhz
    self.antenna_config = self.config["antenna_config"] or {}

Functions

get_config_for_frequency

get_config_for_frequency() -> dict

Gets the antenna configuration for the current frequency.

Raises:

Type Description
ValueError

If no configuration is defined for the frequency.

Returns:

Type Description
dict

The antenna configuration dictionary.

Source code in goliat/antenna.py
def get_config_for_frequency(self) -> dict:
    """Gets the antenna configuration for the current frequency.

    Raises:
        ValueError: If no configuration is defined for the frequency.

    Returns:
        The antenna configuration dictionary.
    """
    freq_str = str(self.frequency_mhz)
    if freq_str not in self.antenna_config:
        raise ValueError(f"Antenna configuration not defined for frequency: {self.frequency_mhz} MHz")
    return self.antenna_config[freq_str]

get_model_type

get_model_type() -> str

Returns the antenna model type string.

Source code in goliat/antenna.py
def get_model_type(self) -> str:
    """Returns the antenna model type string."""
    return str(self.get_config_for_frequency().get("model_type"))

get_source_entity_name

get_source_entity_name() -> str

Returns the source entity name from the antenna config.

Source code in goliat/antenna.py
def get_source_entity_name(self) -> str:
    """Returns the source entity name from the antenna config."""
    return str(self.get_config_for_frequency().get("source_name"))

get_centered_antenna_path

get_centered_antenna_path(centered_antennas_dir: str) -> str

Constructs the path to the centered .sab antenna file.

If the exact frequency file doesn't exist, finds the nearest available frequency and shows a warning.

Parameters:

Name Type Description Default
centered_antennas_dir str

The directory for centered antenna files.

required

Returns:

Type Description
str

The absolute path to the centered antenna model file.

Raises:

Type Description
FileNotFoundError

If no antenna files are found in the directory.

Source code in goliat/antenna.py
def get_centered_antenna_path(self, centered_antennas_dir: str) -> str:
    """Constructs the path to the centered .sab antenna file.

    If the exact frequency file doesn't exist, finds the nearest available frequency
    and shows a warning.

    Args:
        centered_antennas_dir: The directory for centered antenna files.

    Returns:
        The absolute path to the centered antenna model file.

    Raises:
        FileNotFoundError: If no antenna files are found in the directory.
    """
    antenna_filename = f"{self.frequency_mhz}MHz_centered.sab"
    antenna_path = os.path.join(centered_antennas_dir, antenna_filename)

    # Check if exact file exists
    if os.path.exists(antenna_path) and os.path.isfile(antenna_path):
        return antenna_path

    # File doesn't exist, try to find nearest frequency
    logger.warning(f"Antenna file for {self.frequency_mhz} MHz not found: {antenna_path}. Searching for nearest available frequency...")

    # Scan directory for available antenna files
    if not os.path.exists(centered_antennas_dir):
        raise FileNotFoundError(f"Antenna directory does not exist: {centered_antennas_dir}")

    available_files = []
    pattern = re.compile(r"(\d+)MHz_centered\.sab$")

    try:
        for filename in os.listdir(centered_antennas_dir):
            match = pattern.match(filename)
            if match:
                freq = int(match.group(1))
                filepath = os.path.join(centered_antennas_dir, filename)
                if os.path.isfile(filepath):
                    available_files.append((freq, filepath))
    except (OSError, PermissionError) as e:
        raise FileNotFoundError(f"Could not read antenna directory {centered_antennas_dir}: {e}") from e

    if not available_files:
        raise FileNotFoundError(f"No antenna files found in directory: {centered_antennas_dir}")

    # Find nearest frequency
    available_files.sort(key=lambda x: x[0])  # Sort by frequency
    nearest_freq, nearest_path = min(available_files, key=lambda x: abs(x[0] - self.frequency_mhz))

    logger.warning(f"Using antenna file for {nearest_freq} MHz instead of {self.frequency_mhz} MHz. File: {nearest_path}")

    return nearest_path

Colors

goliat.colors

Functions

init_colorama

init_colorama()

Initialize colorama with appropriate settings for the current environment.

Preserves ANSI codes when stdout is piped (e.g., in Jupyter notebooks) by checking for JUPYTER_NOTEBOOK or COLORAMA_STRIP environment variables.

Source code in goliat/colors.py
def init_colorama():
    """Initialize colorama with appropriate settings for the current environment.

    Preserves ANSI codes when stdout is piped (e.g., in Jupyter notebooks)
    by checking for JUPYTER_NOTEBOOK or COLORAMA_STRIP environment variables.
    """
    # Preserve ANSI codes when stdout is piped (e.g., in Jupyter notebooks)
    strip_codes = os.environ.get("COLORAMA_STRIP", "").lower() == "0" or os.environ.get("JUPYTER_NOTEBOOK", "").lower() == "1"
    init(autoreset=True, strip=not strip_codes, convert=False if strip_codes else True)

get_color

get_color(log_type: str) -> str

Returns the colorama color code for a log type, or white if not found.

Parameters:

Name Type Description Default
log_type str

Log type key (e.g., 'info', 'warning', 'error').

required

Returns:

Type Description
str

Colorama color code string.

Source code in goliat/colors.py
def get_color(log_type: str) -> str:
    """Returns the colorama color code for a log type, or white if not found.

    Args:
        log_type: Log type key (e.g., 'info', 'warning', 'error').

    Returns:
        Colorama color code string.
    """
    return COLOR_MAP.get(log_type, Fore.WHITE)

Data Management

goliat.data_extractor

Functions

get_parameter_from_json

get_parameter_from_json(file_path: str, json_path: str) -> Any

Extracts a nested value from a JSON file using dot notation.

Parameters:

Name Type Description Default
file_path str

Path to the JSON file.

required
json_path str

Dot-separated path like 'section.subsection.key'.

required

Returns:

Type Description
Any

The value at the path, or None if not found.

Source code in goliat/data_extractor.py
def get_parameter_from_json(file_path: str, json_path: str) -> Any:
    """Extracts a nested value from a JSON file using dot notation.

    Args:
        file_path: Path to the JSON file.
        json_path: Dot-separated path like 'section.subsection.key'.

    Returns:
        The value at the path, or None if not found.
    """
    if not os.path.exists(file_path):
        return None

    try:
        with open(file_path, "r") as f:
            data = json.load(f)
    except (json.JSONDecodeError, ValueError):
        return None

    keys = json_path.split(".")
    value = data
    for key in keys:
        if isinstance(value, dict) and key in value:
            value = value[key]
        else:
            return None
    return value

get_parameter

get_parameter(source_config: Dict[str, Any], context: Dict[str, Any]) -> Any

Retrieves a parameter from a data source using a config-driven approach.

Supports JSON sources currently. The source_config defines where to look, and context provides values for formatting paths (e.g., project_root).

Parameters:

Name Type Description Default
source_config Dict[str, Any]

Dict with 'source_type', 'file_path_template', 'json_path'.

required
context Dict[str, Any]

Values for formatting file paths.

required

Returns:

Type Description
Any

The retrieved value, or None on error.

Source code in goliat/data_extractor.py
def get_parameter(source_config: Dict[str, Any], context: Dict[str, Any]) -> Any:
    """Retrieves a parameter from a data source using a config-driven approach.

    Supports JSON sources currently. The source_config defines where to look,
    and context provides values for formatting paths (e.g., project_root).

    Args:
        source_config: Dict with 'source_type', 'file_path_template', 'json_path'.
        context: Values for formatting file paths.

    Returns:
        The retrieved value, or None on error.
    """
    source_type = source_config.get("source_type")

    if source_type == "json":
        file_path_template = source_config.get("file_path_template")
        if not file_path_template:
            return None

        try:
            file_path = file_path_template.format(**context)
        except KeyError as e:
            import logging

            logging.getLogger("verbose").error(f"Error: Missing context for placeholder in file_path_template: {e}")
            return None

        json_path = source_config.get("json_path")
        if not json_path:
            return None

        project_root = context.get("project_root", "")
        full_path = os.path.join(project_root, file_path)

        return get_parameter_from_json(full_path, json_path)

    # Future extension for other data source types
    # elif source_type == 'simulation':
    #     # ... implementation for extracting from simulation results ...
    #     pass

    else:
        import logging

        logging.getLogger("verbose").error(f"Error: Unsupported source type '{source_type}'")
        return None

Logging

goliat.logging_manager

Classes

ColorFormatter

Bases: Formatter

Custom formatter that colorizes log messages based on log_type.

Applies colorama color codes to messages and caller info based on the log_type attribute (info, warning, error, success, etc.).

Functions
format
format(record: LogRecord) -> str

Adds color codes to log messages based on log_type.

Source code in goliat/logging_manager.py
def format(self, record: logging.LogRecord) -> str:
    """Adds color codes to log messages based on log_type."""
    log_type = getattr(record, "log_type", "default")
    message_color = get_color(log_type)
    caller_color = get_color("caller")
    message = record.getMessage()
    caller_info = getattr(record, "caller_info", "")
    return f"{message_color}{message}{Style.RESET_ALL} {caller_color}{caller_info}{Style.RESET_ALL}"

CustomFormatter

Bases: Formatter

Formatter that safely handles optional caller_info attribute.

Functions
format
format(record: LogRecord) -> str

Formats the record, safely handling the 'caller_info' attribute.

Source code in goliat/logging_manager.py
def format(self, record: logging.LogRecord) -> str:
    """Formats the record, safely handling the 'caller_info' attribute."""
    base_message = super().format(record)
    caller_info = getattr(record, "caller_info", "")
    if caller_info:
        return f"{base_message} {caller_info}"
    return base_message

LoggingMixin

A mixin class that provides a standardized logging interface.

Provides a _log method that directs messages to the appropriate logger ('progress' or 'verbose') and, if available, to the GUI.

Functions

setup_loggers

setup_loggers(process_id: Optional[str] = None) -> tuple[logging.Logger, logging.Logger, str]

Sets up dual logging system with rotation.

Creates 'progress' and 'verbose' loggers with file and console handlers. Rotates old logs when more than 30 exist. Uses lock file for thread-safe rotation.

Parameters:

Name Type Description Default
process_id Optional[str]

Optional ID to make log filenames unique for parallel runs.

None

Returns:

Type Description
tuple[Logger, Logger, str]

Tuple of (progress_logger, verbose_logger, session_timestamp).

Source code in goliat/logging_manager.py
def setup_loggers(process_id: Optional[str] = None) -> tuple[logging.Logger, logging.Logger, str]:
    """Sets up dual logging system with rotation.

    Creates 'progress' and 'verbose' loggers with file and console handlers.
    Rotates old logs when more than 30 exist. Uses lock file for thread-safe
    rotation.

    Args:
        process_id: Optional ID to make log filenames unique for parallel runs.

    Returns:
        Tuple of (progress_logger, verbose_logger, session_timestamp).
    """
    # Initialize colorama with appropriate settings for current environment
    init_colorama()
    log_dir = "logs"
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    session_timestamp = datetime.now().strftime("%d-%m_%H-%M-%S")
    if process_id:
        session_timestamp = f"{session_timestamp}_{process_id}"

    lock_file_path = os.path.join(log_dir, "log_rotation.lock")

    while True:
        try:
            with open(lock_file_path, "x"):
                break
        except FileExistsError:
            time.sleep(0.1)

    try:
        log_files = [os.path.join(log_dir, f) for f in os.listdir(log_dir) if f.endswith(".log")]
        log_files.sort(key=os.path.getctime)
        while len(log_files) >= 30:
            try:
                old_log = log_files.pop(0)
                base, _ = os.path.splitext(old_log)
                progress_log = base + ".progress.log"

                if os.path.exists(old_log):
                    os.remove(old_log)
                if os.path.exists(progress_log):
                    os.remove(progress_log)
            except OSError:
                pass
    finally:
        if os.path.exists(lock_file_path):
            os.remove(lock_file_path)

    progress_log_filename = os.path.join(log_dir, f"{session_timestamp}.progress.log")
    main_log_filename = os.path.join(log_dir, f"{session_timestamp}.log")

    file_formatter = CustomFormatter("%(asctime)s - %(levelname)s - %(message)s")
    console_formatter = ColorFormatter()

    main_file_handler = logging.FileHandler(main_log_filename, mode="a")
    main_file_handler.setFormatter(file_formatter)

    progress_logger = logging.getLogger("progress")
    progress_logger.setLevel(logging.INFO)
    for handler in progress_logger.handlers[:]:
        progress_logger.removeHandler(handler)

    progress_file_handler = logging.FileHandler(progress_log_filename, mode="a")
    progress_file_handler.setFormatter(file_formatter)
    progress_logger.addHandler(progress_file_handler)

    progress_logger.addHandler(main_file_handler)

    progress_stream_handler = logging.StreamHandler()
    progress_stream_handler.setFormatter(console_formatter)
    progress_logger.addHandler(progress_stream_handler)
    progress_logger.propagate = False

    verbose_logger = logging.getLogger("verbose")
    verbose_logger.setLevel(logging.INFO)
    for handler in verbose_logger.handlers[:]:
        verbose_logger.removeHandler(handler)

    verbose_logger.addHandler(main_file_handler)

    verbose_stream_handler = logging.StreamHandler()
    verbose_stream_handler.setFormatter(console_formatter)
    verbose_logger.addHandler(verbose_stream_handler)
    verbose_logger.propagate = False

    progress_logger.info(f"--- Progress logging started for file: {os.path.abspath(progress_log_filename)} ---")
    verbose_logger.info(f"--- Main logging started for file: {os.path.abspath(main_log_filename)} ---")

    return progress_logger, verbose_logger, session_timestamp

shutdown_loggers

shutdown_loggers()

Safely shuts down all logging handlers to release file locks.

Source code in goliat/logging_manager.py
def shutdown_loggers():
    """Safely shuts down all logging handlers to release file locks."""
    for name in ["progress", "verbose"]:
        logger = logging.getLogger(name)
        logger.info("--- Logging shutdown ---")
        for handler in logger.handlers[:]:
            handler.close()
            logger.removeHandler(handler)

add_simulation_log_handlers

add_simulation_log_handlers(simulation_dir: str) -> list[logging.Handler]

Adds file handlers for progress and verbose logs to a simulation-specific directory.

Creates log files in the simulation directory while keeping the main logs/ directory handlers intact. Both progress and verbose logs are written to the simulation directory.

Parameters:

Name Type Description Default
simulation_dir str

Directory path where simulation-specific logs should be written.

required

Returns:

Type Description
list[Handler]

List of handlers that were added (for later removal via remove_simulation_log_handlers).

Source code in goliat/logging_manager.py
def add_simulation_log_handlers(simulation_dir: str) -> list[logging.Handler]:
    """Adds file handlers for progress and verbose logs to a simulation-specific directory.

    Creates log files in the simulation directory while keeping the main logs/ directory
    handlers intact. Both progress and verbose logs are written to the simulation directory.

    Args:
        simulation_dir: Directory path where simulation-specific logs should be written.

    Returns:
        List of handlers that were added (for later removal via remove_simulation_log_handlers).
    """
    if not os.path.exists(simulation_dir):
        os.makedirs(simulation_dir)

    file_formatter = CustomFormatter("%(asctime)s - %(levelname)s - %(message)s")

    # Create log file paths in the simulation directory
    progress_log_path = os.path.join(simulation_dir, "progress.log")
    verbose_log_path = os.path.join(simulation_dir, "verbose.log")

    # Create handlers
    progress_file_handler = logging.FileHandler(progress_log_path, mode="a")
    progress_file_handler.setFormatter(file_formatter)
    setattr(progress_file_handler, "_is_simulation_handler", True)  # Mark for later removal

    verbose_file_handler = logging.FileHandler(verbose_log_path, mode="a")
    verbose_file_handler.setFormatter(file_formatter)
    setattr(verbose_file_handler, "_is_simulation_handler", True)  # Mark for later removal

    # Add handlers to loggers
    progress_logger = logging.getLogger("progress")
    verbose_logger = logging.getLogger("verbose")

    progress_logger.addHandler(progress_file_handler)
    verbose_logger.addHandler(verbose_file_handler)

    # Log that simulation-specific logging has started
    progress_logger.info(f"--- Simulation-specific progress logging started: {progress_log_path} ---")
    verbose_logger.info(f"--- Simulation-specific verbose logging started: {verbose_log_path} ---")

    return [progress_file_handler, verbose_file_handler]

remove_simulation_log_handlers

remove_simulation_log_handlers(handlers: list[Handler])

Removes simulation-specific log handlers and closes their files.

Parameters:

Name Type Description Default
handlers list[Handler]

List of handlers to remove (typically returned from add_simulation_log_handlers).

required
Source code in goliat/logging_manager.py
def remove_simulation_log_handlers(handlers: list[logging.Handler]):
    """Removes simulation-specific log handlers and closes their files.

    Args:
        handlers: List of handlers to remove (typically returned from add_simulation_log_handlers).
    """
    for handler in handlers:
        if hasattr(handler, "_is_simulation_handler") and getattr(handler, "_is_simulation_handler", False):
            # Find and remove from appropriate logger
            progress_logger = logging.getLogger("progress")
            verbose_logger = logging.getLogger("verbose")

            if handler in progress_logger.handlers:
                progress_logger.removeHandler(handler)
            if handler in verbose_logger.handlers:
                verbose_logger.removeHandler(handler)

            handler.close()

Profiling

goliat.profiler.Profiler

Profiler(execution_control: dict, profiling_config: dict, study_type: str, config_path: str)

Manages execution time tracking, ETA estimation, and study phase management.

This class divides a study into phases (setup, run, extract), calculates weighted progress, and estimates the time remaining. It also saves updated time estimates to a configuration file after each run, making it self-improving.

Parameters:

Name Type Description Default
execution_control dict

Dict indicating which phases are enabled.

required
profiling_config dict

Historical timing data for estimates.

required
study_type str

Study type ('near_field' or 'far_field').

required
config_path str

Path where profiling config is saved.

required
Source code in goliat/profiler.py
def __init__(
    self,
    execution_control: dict,
    profiling_config: dict,
    study_type: str,
    config_path: str,
):
    """Sets up the profiler with phase tracking and ETA estimation.

    Args:
        execution_control: Dict indicating which phases are enabled.
        profiling_config: Historical timing data for estimates.
        study_type: Study type ('near_field' or 'far_field').
        config_path: Path where profiling config is saved.
    """
    self.execution_control = execution_control
    self.profiling_config = profiling_config
    self.study_type = study_type
    self.config_path = config_path

    self.phase_weights = self._calculate_phase_weights()
    self.subtask_times = defaultdict(list)
    self.subtask_stack = []

    self.total_simulations = 0
    self.completed_simulations = 0
    self.total_projects = 0
    self.current_project = 0
    self.completed_phases = set()

    self.start_time = time.monotonic()
    self.current_phase = None
    self.phase_start_time = None
    self.phase_skipped = False
    self.run_phase_total_duration = 0

Functions

set_total_simulations

set_total_simulations(total: int)

Sets total simulation count for progress tracking.

Source code in goliat/profiler.py
def set_total_simulations(self, total: int):
    """Sets total simulation count for progress tracking."""
    self.total_simulations = total

set_project_scope

set_project_scope(total_projects: int)

Sets total project count for progress tracking.

Source code in goliat/profiler.py
def set_project_scope(self, total_projects: int):
    """Sets total project count for progress tracking."""
    self.total_projects = total_projects

set_current_project

set_current_project(project_index: int)

Sets the current project index.

Source code in goliat/profiler.py
def set_current_project(self, project_index: int):
    """Sets the current project index."""
    self.current_project = project_index

simulation_completed

simulation_completed()

Marks one simulation as completed.

Source code in goliat/profiler.py
def simulation_completed(self):
    """Marks one simulation as completed."""
    self.completed_simulations += 1

start_stage

start_stage(phase_name: str, total_stages: int = 1)

Starts tracking a new phase (setup/run/extract).

Parameters:

Name Type Description Default
phase_name str

Phase name like 'setup', 'run', or 'extract'.

required
total_stages int

Number of stages within this phase.

1
Source code in goliat/profiler.py
def start_stage(self, phase_name: str, total_stages: int = 1):
    """Starts tracking a new phase (setup/run/extract).

    Args:
        phase_name: Phase name like 'setup', 'run', or 'extract'.
        total_stages: Number of stages within this phase.
    """
    self.current_phase = phase_name
    self.phase_start_time = time.monotonic()
    self.phase_skipped = False
    self.completed_stages_in_phase = 0
    self.total_stages_in_phase = total_stages

end_stage

end_stage()

Ends current phase and records its duration for future estimates.

Source code in goliat/profiler.py
def end_stage(self):
    """Ends current phase and records its duration for future estimates."""
    if self.phase_start_time:
        elapsed = time.monotonic() - self.phase_start_time

        # For setup phase: if it was cached/skipped, don't add to statistics
        # (cached phases pollute real execution time statistics)
        if self.current_phase == "setup" and self.phase_skipped:
            # Cached setup: don't pollute statistics
            # avg_{phase}_time remains unchanged (uses previous real measurements)
            pass
        else:
            # Real phase: add to statistics and compute simple average for display
            self.subtask_times[self.current_phase].append(elapsed)
            times = self.subtask_times[self.current_phase]
            # Store simple average for pie charts, timings table, etc.
            self.profiling_config[f"avg_{self.current_phase}_time"] = sum(times) / len(times)

    self.current_phase = None
    self.phase_skipped = False  # Reset for next phase

complete_run_phase

complete_run_phase()

Stores the total duration of the 'run' phase from its subtasks.

Source code in goliat/profiler.py
def complete_run_phase(self):
    """Stores the total duration of the 'run' phase from its subtasks."""
    self.run_phase_total_duration = sum(self.subtask_times.get("run_simulation_total", [0]))

get_weighted_progress

get_weighted_progress(phase_name: str, phase_progress_ratio: float) -> float

Calculates overall study progress using phase weights and simulation count.

This method handles the complexity that different phases (setup, run, extract) take different amounts of time. For example, if setup takes 10 minutes, run takes 2 hours, and extract takes 5 minutes, then the run phase should account for roughly 85% of the progress bar, not 33%.

The calculation works in two parts: 1. Progress within current simulation: Sums weights of completed phases, plus partial weight for the current phase based on its progress ratio. 2. Overall progress: Divides (completed_simulations + current_sim_progress) by total_simulations to get the overall percentage.

Parameters:

Name Type Description Default
phase_name str

The name of the current phase ('setup', 'run', or 'extract').

required
phase_progress_ratio float

Progress within current phase (0.0 = not started, 1.0 = fully complete).

required

Returns:

Type Description
float

Overall progress percentage (0.0 to 100.0).

Source code in goliat/profiler.py
def get_weighted_progress(self, phase_name: str, phase_progress_ratio: float) -> float:
    """Calculates overall study progress using phase weights and simulation count.

    This method handles the complexity that different phases (setup, run, extract)
    take different amounts of time. For example, if setup takes 10 minutes, run takes
    2 hours, and extract takes 5 minutes, then the run phase should account for
    roughly 85% of the progress bar, not 33%.

    The calculation works in two parts:
    1. Progress within current simulation: Sums weights of completed phases,
       plus partial weight for the current phase based on its progress ratio.
    2. Overall progress: Divides (completed_simulations + current_sim_progress)
       by total_simulations to get the overall percentage.

    Args:
        phase_name: The name of the current phase ('setup', 'run', or 'extract').
        phase_progress_ratio: Progress within current phase (0.0 = not started,
                             1.0 = fully complete).

    Returns:
        Overall progress percentage (0.0 to 100.0).
    """
    if self.total_simulations == 0:
        return 0.0

    # Progress within the current simulation
    progress_current_sim = 0
    for p, w in self.phase_weights.items():
        if p == phase_name:
            progress_current_sim += w * phase_progress_ratio
            break
        progress_current_sim += w

    # Overall progress
    overall_progress = (self.completed_simulations + progress_current_sim) / self.total_simulations
    # print(f"DEBUG: get_weighted_progress: phase={phase_name}, ratio={phase_progress_ratio:.2f}, completed={self.completed_simulations}, total={self.total_simulations}, progress={overall_progress * 100:.1f}%")
    return overall_progress * 100

get_subtask_estimate

get_subtask_estimate(task_name: str) -> float

Retrieves the estimated time for a specific subtask. Args: task_name: The name of the subtask. Returns: The estimated duration in seconds.

Source code in goliat/profiler.py
def get_subtask_estimate(self, task_name: str) -> float:
    """Retrieves the estimated time for a specific subtask.
    Args:
        task_name: The name of the subtask.
    Returns:
        The estimated duration in seconds.
    """
    return self.profiling_config.get(f"avg_{task_name}", 1.0)

get_phase_subtasks

get_phase_subtasks(phase_name: str) -> list

Gets a list of subtasks for a given phase. Args: phase_name: The name of the phase. Returns: A list of subtask names.

Source code in goliat/profiler.py
def get_phase_subtasks(self, phase_name: str) -> list:
    """Gets a list of subtasks for a given phase.
    Args:
        phase_name: The name of the phase.
    Returns:
        A list of subtask names.
    """
    subtasks = []
    for key in self.profiling_config.keys():
        if key.startswith(f"avg_{phase_name}_"):
            subtasks.append(key.replace("avg_", ""))
    return subtasks

get_time_remaining

get_time_remaining(current_stage_progress: float = 0.0) -> float

Estimates total time remaining for the entire study.

Uses historical timing data to predict how long each phase will take, then calculates remaining time by subtracting elapsed time from total estimated time. This gives a realistic ETA that accounts for the fact that different phases take different amounts of time.

The calculation considers: - Time already spent on fully completed simulations - Time spent on phases within the current simulation that are done - Estimated time remaining in the current phase (based on progress ratio) - Estimated time for all future simulations

Parameters:

Name Type Description Default
current_stage_progress float

Progress within current stage (0.0 to 1.0).

0.0

Returns:

Type Description
float

Estimated time remaining in seconds.

Source code in goliat/profiler.py
def get_time_remaining(self, current_stage_progress: float = 0.0) -> float:
    """Estimates total time remaining for the entire study.

    Uses historical timing data to predict how long each phase will take,
    then calculates remaining time by subtracting elapsed time from total
    estimated time. This gives a realistic ETA that accounts for the fact
    that different phases take different amounts of time.

    The calculation considers:
    - Time already spent on fully completed simulations
    - Time spent on phases within the current simulation that are done
    - Estimated time remaining in the current phase (based on progress ratio)
    - Estimated time for all future simulations

    Args:
        current_stage_progress: Progress within current stage (0.0 to 1.0).

    Returns:
        Estimated time remaining in seconds.
    """
    if not self.current_phase or self.total_simulations == 0:
        return 0.0

    # Calculate the total estimated time for one simulation using smart estimates
    total_time_per_sim = 0
    for phase in ["setup", "run", "extract"]:
        if self.execution_control.get(f"do_{phase}", False):
            total_time_per_sim += self._get_smart_phase_estimate(phase)

    # Calculate estimated time remaining in the current simulation
    ordered_phases = [p for p in ["setup", "run", "extract"] if self.execution_control.get(f"do_{p}", False)]
    try:
        current_phase_index = ordered_phases.index(self.current_phase)
    except ValueError:
        current_phase_index = 0

    # Estimated time remaining in current phase (based on progress)
    # Clamp progress to [0.0, 1.0] to handle edge cases
    progress = max(0.0, min(1.0, current_stage_progress))
    current_phase_time = self._get_smart_phase_estimate(self.current_phase)
    time_remaining_in_current_sim = current_phase_time * (1.0 - progress)

    # Add time for phases not yet started in current simulation
    for i in range(current_phase_index + 1, len(ordered_phases)):
        phase = ordered_phases[i]
        time_remaining_in_current_sim += self._get_smart_phase_estimate(phase)

    # Calculate remaining simulations (excluding current one)
    remaining_simulations = self.total_simulations - self.completed_simulations - 1

    # Estimated time remaining = time left in current sim + time for all remaining sims
    eta = time_remaining_in_current_sim + (remaining_simulations * total_time_per_sim)
    return max(0, eta)

subtask

subtask(task_name: str)

A context manager to time a subtask.

Source code in goliat/profiler.py
@contextlib.contextmanager
def subtask(self, task_name: str):
    """A context manager to time a subtask."""
    self.subtask_stack.append({"name": task_name, "start_time": time.monotonic()})
    try:
        yield
    finally:
        subtask = self.subtask_stack.pop()
        elapsed = time.monotonic() - subtask["start_time"]
        self.subtask_times[subtask["name"]].append(elapsed)
        self.update_and_save_estimates()

update_and_save_estimates

update_and_save_estimates()

Updates the profiling configuration with the latest average times and saves it.

This makes the profiler's estimates self-improving over time.

Source code in goliat/profiler.py
def update_and_save_estimates(self):
    """Updates the profiling configuration with the latest average times and saves it.

    This makes the profiler's estimates self-improving over time.
    """
    try:
        with open(self.config_path, "r") as f:
            full_config = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        full_config = {}

    if self.study_type not in full_config:
        full_config[self.study_type] = {}

    for key, value in self.profiling_config.items():
        if key.startswith("avg_"):
            full_config[self.study_type][key] = round(value, 2)

    for task_name, times in self.subtask_times.items():
        if times:
            avg_task_time = sum(times) / len(times)
            avg_key = f"avg_{task_name}"
            full_config[self.study_type][avg_key] = round(avg_task_time, 2)
            # Also update the in-memory profiling_config so it's available when sent to GUI
            self.profiling_config[avg_key] = round(avg_task_time, 2)

    with open(self.config_path, "w") as f:
        json.dump(full_config, f, indent=4)

save_estimates

save_estimates()

Saves the final profiling estimates at the end of the study.

Source code in goliat/profiler.py
def save_estimates(self):
    """Saves the final profiling estimates at the end of the study."""
    self.update_and_save_estimates()

Project Management

goliat.project_manager

Attributes

Classes

ProjectCorruptionError

Bases: Exception

Raised when a project file is corrupted, locked, or inaccessible.

ProjectManager

ProjectManager(config: Config, verbose_logger: Logger, progress_logger: Logger, gui: Optional[QueueGUI] = None, no_cache: bool = False)

Bases: LoggingMixin

Manages the lifecycle of Sim4Life (.smash) project files.

Handles creation, opening, saving, and validation of project files, ensuring robustness against file corruption and locks.

Parameters:

Name Type Description Default
config Config

The main configuration object.

required
verbose_logger Logger

Logger for detailed output.

required
progress_logger Logger

Logger for high-level progress updates.

required
gui Optional[QueueGUI]

The GUI proxy for inter-process communication.

None
no_cache bool

If True, bypasses metadata verification.

False
Source code in goliat/project_manager.py
def __init__(
    self,
    config: "Config",
    verbose_logger: "Logger",
    progress_logger: "Logger",
    gui: Optional["QueueGUI"] = None,
    no_cache: bool = False,
):
    """Initializes the ProjectManager.

    Args:
        config: The main configuration object.
        verbose_logger: Logger for detailed output.
        progress_logger: Logger for high-level progress updates.
        gui: The GUI proxy for inter-process communication.
        no_cache: If True, bypasses metadata verification.
    """
    self.config = config
    self.verbose_logger = verbose_logger
    self.progress_logger = progress_logger
    self.gui = gui
    self.no_cache = no_cache
    import s4l_v1.document

    self.document = s4l_v1.document
    self.project_path: Optional[str] = None
    self.execution_control = self.config["execution_control"] or {"do_setup": True, "do_run": True, "do_extract": True}
Functions
write_simulation_metadata
write_simulation_metadata(meta_path: str, surgical_config: dict, update_setup_timestamp: bool = False)

Writes config metadata and hash to disk for verification/resume.

Creates a metadata file that tracks the config hash and completion status of each phase (setup/run/extract). Used by the verify-and-resume feature.

Parameters:

Name Type Description Default
meta_path str

Full path where metadata should be saved.

required
surgical_config dict

The minimal config snapshot for this simulation.

required
update_setup_timestamp bool

If True, updates setup_timestamp to now (use when setup was done). If False, preserves existing timestamp if metadata exists.

False
Source code in goliat/project_manager.py
def write_simulation_metadata(self, meta_path: str, surgical_config: dict, update_setup_timestamp: bool = False):
    """Writes config metadata and hash to disk for verification/resume.

    Creates a metadata file that tracks the config hash and completion status
    of each phase (setup/run/extract). Used by the verify-and-resume feature.

    Args:
        meta_path: Full path where metadata should be saved.
        surgical_config: The minimal config snapshot for this simulation.
        update_setup_timestamp: If True, updates setup_timestamp to now (use when setup was done).
                                If False, preserves existing timestamp if metadata exists.
    """
    config_hash = self._generate_config_hash(surgical_config)

    # Preserve existing setup_timestamp unless we're updating it (setup was done)
    setup_timestamp = datetime.now().isoformat()
    if not update_setup_timestamp and os.path.exists(meta_path):
        try:
            with open(meta_path, "r") as f:
                existing_metadata = json.load(f)
                if "setup_timestamp" in existing_metadata:
                    setup_timestamp = existing_metadata["setup_timestamp"]
        except (json.JSONDecodeError, KeyError, ValueError):
            pass  # Use new timestamp if we can't read existing

    metadata = {
        "config_hash": config_hash,
        "config_snapshot": surgical_config,
        "setup_timestamp": setup_timestamp,
        "run_done": False,
        "extract_done": False,
    }

    os.makedirs(os.path.dirname(meta_path), exist_ok=True)
    with open(meta_path, "w") as f:
        json.dump(metadata, f, indent=4)
    self._log(
        f"  - Saved configuration metadata to {os.path.basename(meta_path)}",
        log_type="info",
    )
update_simulation_metadata
update_simulation_metadata(meta_path: str, run_done: Optional[bool] = None, extract_done: Optional[bool] = None)

Updates phase completion flags in the metadata file.

Parameters:

Name Type Description Default
meta_path str

Path to the metadata file.

required
run_done Optional[bool]

New run phase status.

None
extract_done Optional[bool]

New extract phase status.

None
Source code in goliat/project_manager.py
def update_simulation_metadata(self, meta_path: str, run_done: Optional[bool] = None, extract_done: Optional[bool] = None):
    """Updates phase completion flags in the metadata file.

    Args:
        meta_path: Path to the metadata file.
        run_done: New run phase status.
        extract_done: New extract phase status.
    """
    if not os.path.exists(meta_path):
        self._log(f"Cannot update metadata, file not found: {meta_path}", log_type="warning")
        return

    with open(meta_path, "r+") as f:
        metadata = json.load(f)
        if run_done is not None:
            metadata["run_done"] = run_done
        if extract_done is not None:
            metadata["extract_done"] = extract_done
        f.seek(0)
        json.dump(metadata, f, indent=4)
        f.truncate()
    self._log(f"Updated metadata in {os.path.basename(meta_path)}", log_type="info")
verify_simulation_metadata
verify_simulation_metadata(meta_path: str, surgical_config: dict, smash_path: Optional[str] = None) -> dict

Verifies if an existing simulation can be reused to skip completed phases.

This method implements a three-step verification process to determine if a previously run simulation can be reused:

  1. Config hash check: Compares the stored config hash with the current config. If they don't match, the simulation is outdated and must be rerun.

  2. Project file validation: Checks if the .smash file exists, is not locked, and has valid HDF5 structure. If invalid, setup must be rerun.

  3. Deliverable freshness check: Verifies that output files (H5 results) and extracted files (JSON/PKL/HTML) exist and are newer than the setup timestamp. This ensures we don't skip phases if files are missing or outdated.

The method returns a dict indicating which phases are complete, allowing the study to skip setup/run/extract as appropriate. Note that extract completion always requires run completion - if extract is done but run isn't, both are marked incomplete to prevent inconsistent states.

Parameters:

Name Type Description Default
meta_path str

Path to the metadata file containing config hash and timestamps.

required
surgical_config dict

Current config snapshot to compare against stored hash.

required
smash_path Optional[str]

Optional override for project file path (used for verification).

None

Returns:

Type Description
dict

Dict with boolean flags: 'setup_done', 'run_done', 'extract_done'.

dict

All False if verification fails at any step.

Source code in goliat/project_manager.py
def verify_simulation_metadata(self, meta_path: str, surgical_config: dict, smash_path: Optional[str] = None) -> dict:
    """Verifies if an existing simulation can be reused to skip completed phases.

    This method implements a three-step verification process to determine if a
    previously run simulation can be reused:

    1. Config hash check: Compares the stored config hash with the current config.
       If they don't match, the simulation is outdated and must be rerun.

    2. Project file validation: Checks if the .smash file exists, is not locked,
       and has valid HDF5 structure. If invalid, setup must be rerun.

    3. Deliverable freshness check: Verifies that output files (H5 results) and
       extracted files (JSON/PKL/HTML) exist and are newer than the setup timestamp.
       This ensures we don't skip phases if files are missing or outdated.

    The method returns a dict indicating which phases are complete, allowing the
    study to skip setup/run/extract as appropriate. Note that extract completion
    always requires run completion - if extract is done but run isn't, both are
    marked incomplete to prevent inconsistent states.

    Args:
        meta_path: Path to the metadata file containing config hash and timestamps.
        surgical_config: Current config snapshot to compare against stored hash.
        smash_path: Optional override for project file path (used for verification).

    Returns:
        Dict with boolean flags: 'setup_done', 'run_done', 'extract_done'.
        All False if verification fails at any step.
    """
    status = {"setup_done": False, "run_done": False, "extract_done": False}

    if not os.path.exists(meta_path):
        self._log(f"No metadata file found at {os.path.basename(meta_path)}.", log_type="info")
        return status

    try:
        with open(meta_path, "r") as f:
            metadata = json.load(f)

        if not self._verify_config_hash(metadata, surgical_config, meta_path):
            return status

        is_valid, path_to_check = self._verify_project_file(smash_path)
        if not is_valid or path_to_check is None:
            return status

        status["setup_done"] = True

        setup_timestamp = self._parse_setup_timestamp(metadata)
        if setup_timestamp is None:
            return status

        deliverables_status = self._verify_deliverables(path_to_check, setup_timestamp)
        status["run_done"] = deliverables_status["run_done"]
        status["extract_done"] = deliverables_status["extract_done"]

        status = self._normalize_status(status)
        self._log_status_summary(status)
        return status

    except (json.JSONDecodeError, KeyError):
        self._log(f"Metadata file {os.path.basename(meta_path)} is corrupted.", log_type="error")
        return status
get_setup_timestamp_from_metadata
get_setup_timestamp_from_metadata(meta_path: str) -> Optional[float]

Retrieves the setup timestamp from the metadata file.

Parameters:

Name Type Description Default
meta_path str

Path to the metadata file.

required

Returns:

Type Description
Optional[float]

Setup timestamp as a float (seconds since epoch), or None if not found or file doesn't exist.

Source code in goliat/project_manager.py
def get_setup_timestamp_from_metadata(self, meta_path: str) -> Optional[float]:
    """Retrieves the setup timestamp from the metadata file.

    Args:
        meta_path: Path to the metadata file.

    Returns:
        Setup timestamp as a float (seconds since epoch), or None if not found or file doesn't exist.
    """
    if not os.path.exists(meta_path):
        return None

    try:
        with open(meta_path, "r") as f:
            metadata = json.load(f)

        setup_timestamp_str = metadata.get("setup_timestamp")
        if not setup_timestamp_str:
            return None

        # Convert ISO 8601 string to timestamp
        if isinstance(setup_timestamp_str, str):
            return datetime.fromisoformat(setup_timestamp_str).timestamp()
        else:
            return float(setup_timestamp_str)  # Backward compatibility
    except (json.JSONDecodeError, KeyError, ValueError):
        return None
create_or_open_project
create_or_open_project(phantom_name: str, frequency_mhz: int, scenario_name: Optional[str] = None, position_name: Optional[str] = None, orientation_name: Optional[str] = None, **kwargs) -> dict

Creates a new project or opens an existing one based on the 'do_setup' flag.

Parameters:

Name Type Description Default
phantom_name str

The name of the phantom model.

required
frequency_mhz int

The simulation frequency in MHz.

required
scenario_name Optional[str]

The base name of the placement scenario.

None
position_name Optional[str]

The name of the position within the scenario.

None
orientation_name Optional[str]

The name of the orientation within the scenario.

None

Raises:

Type Description
ValueError

If required parameters are missing or study_type is unknown.

FileNotFoundError

If do_setup is false and the project file does not exist.

ProjectCorruptionError

If the project file is corrupted.

Source code in goliat/project_manager.py
def create_or_open_project(
    self,
    phantom_name: str,
    frequency_mhz: int,
    scenario_name: Optional[str] = None,
    position_name: Optional[str] = None,
    orientation_name: Optional[str] = None,
    **kwargs,
) -> dict:
    """Creates a new project or opens an existing one based on the 'do_setup' flag.

    Args:
        phantom_name: The name of the phantom model.
        frequency_mhz: The simulation frequency in MHz.
        scenario_name: The base name of the placement scenario.
        position_name: The name of the position within the scenario.
        orientation_name: The name of the orientation within the scenario.

    Raises:
        ValueError: If required parameters are missing or `study_type` is unknown.
        FileNotFoundError: If `do_setup` is false and the project file does not exist.
        ProjectCorruptionError: If the project file is corrupted.
    """
    study_type = self.config["study_type"]
    if not study_type or not isinstance(study_type, str):
        raise ValueError("'study_type' not found in the configuration file.")

    # Validate placement parameters
    self._validate_placement_params(study_type, phantom_name, frequency_mhz, scenario_name, position_name, orientation_name)

    # Build placement name and project paths
    placement_name = f"{scenario_name}_{position_name}_{orientation_name}"
    project_dir, project_filename = self._build_project_path(study_type, phantom_name, frequency_mhz, placement_name)

    os.makedirs(project_dir, exist_ok=True)
    self.project_path = os.path.join(project_dir, project_filename).replace("\\", "/")
    self._log(f"Project path set to: {self.project_path}", log_type="info")

    if self.execution_control is None:
        self.execution_control = {}
    do_setup = self.execution_control.get("do_setup", True)

    # For far-field, direction and polarization are part of the unique signature,
    # but they are not used in the file path, so we retrieve them from the setup logic if needed.
    # This is a bit of a workaround but keeps the project manager's interface clean.
    direction_name = None
    polarization_name = None
    if study_type == "far_field":
        # For far-field, the orientation and position names map to direction and polarization
        direction_name = orientation_name
        polarization_name = position_name

    surgical_config = self.config.build_simulation_config(
        phantom_name=phantom_name,
        frequency_mhz=frequency_mhz,
        scenario_name=scenario_name,
        position_name=position_name,
        orientation_name=orientation_name,
        direction_name=direction_name,
        polarization_name=polarization_name,
    )

    if not do_setup:
        self._log(
            "Execution control: 'do_setup' is false. Opening existing project without verification.",
            log_type="info",
        )
        if not self.project_path or not os.path.exists(self.project_path):
            error_msg = f"ERROR: 'do_setup' is false, but project file not found at {self.project_path}. Cannot proceed."
            self._log(error_msg, log_type="fatal")
            raise FileNotFoundError(error_msg)
        self.open()
        # Return a status dict indicating setup is done, but we don't know about the other phases.
        return {"setup_done": True, "run_done": False, "extract_done": False}

    # If do_setup is true, we verify the project unless --no-cache is used.
    if self.no_cache:
        self._log("`--no-cache` flag is active. Forcing a new setup by skipping verification.", log_type="warning")
        return {"setup_done": False, "run_done": False, "extract_done": False}

    verification_status = self.verify_simulation_metadata(os.path.join(project_dir, "config.json"), surgical_config)

    if verification_status["setup_done"]:
        # Only open the file if we need to run or extract phases
        # If everything is done, skip opening to avoid unnecessary file access
        if not verification_status["run_done"] or not verification_status["extract_done"]:
            self._log("Verified existing project. Opening.", log_type="info")
            self.open()
        else:
            self._log("Project completely done, skipping file open.", log_type="info")
        return verification_status

    if os.path.exists(project_dir):
        self._log("Existing project is invalid or out of date. A new setup is required.", log_type="info")
    return {"setup_done": False, "run_done": False, "extract_done": False}
create_new
create_new()

Creates a new empty project in memory.

Closes any open document, deletes existing project file and cache files, then creates a fresh unsaved project. Also initializes the model by creating/deleting a dummy block to ensure Sim4Life is ready.

Source code in goliat/project_manager.py
def create_new(self):
    """Creates a new empty project in memory.

    Closes any open document, deletes existing project file and cache files,
    then creates a fresh unsaved project. Also initializes the model by
    creating/deleting a dummy block to ensure Sim4Life is ready.
    """
    if self.document and hasattr(self.document, "IsOpen") and self.document.IsOpen():  # type: ignore
        self._log(
            "Closing existing document before creating a new one to release file lock.",
            log_type="info",
        )
        self.document.Close()

    if self.project_path and os.path.exists(self.project_path):
        self._log(
            f"Deleting existing project file at {self.project_path}",
            log_type="warning",
        )
        os.remove(self.project_path)

        project_dir = os.path.dirname(self.project_path)
        project_filename_base = os.path.basename(self.project_path)
        for item in os.listdir(project_dir):
            is_cache_file = item.startswith(f".{project_filename_base}") or (
                item.startswith(project_filename_base) and item != project_filename_base
            )

            if is_cache_file:
                item_path = os.path.join(project_dir, item)
                if os.path.isfile(item_path):
                    self._log(f"Deleting cache file: {item_path}", log_type="info")
                    try:
                        os.remove(item_path)
                    except OSError as e:
                        self._log(
                            f"Error deleting cache file {item_path}: {e}",
                            log_type="error",
                        )

    self._log("Creating a new empty project in memory.", log_type="info")
    self.document.New()

    self._log(
        "Initializing model by creating and deleting a dummy block...",
        log_type="verbose",
    )
    import s4l_v1.model as s4l_model

    dummy_block = s4l_model.CreateSolidBlock(s4l_model.Vec3(0, 0, 0), s4l_model.Vec3(1, 1, 1))
    dummy_block.Delete()
    self._log("Model initialized, ready for population.", log_type="verbose")
open
open()

Opens an existing project after validation checks.

Raises:

Type Description
ProjectCorruptionError

If project file is invalid, locked, or Sim4Life can't open it.

Source code in goliat/project_manager.py
def open(self):
    """Opens an existing project after validation checks.

    Raises:
        ProjectCorruptionError: If project file is invalid, locked, or
                                Sim4Life can't open it.
    """
    self._log(f"Validating project file: {self.project_path}", log_type="info")
    if not self._is_valid_smash_file():
        self._log(
            f"ERROR: Project file {self.project_path} is corrupted or locked.",
            log_type="fatal",
        )
        raise ProjectCorruptionError(f"File is not a valid or accessible HDF5 file: {self.project_path}")

    self._log(f"Opening project with Sim4Life: {self.project_path}", log_type="info")
    try:
        if self.project_path:
            open_project(self.project_path)
    except Exception as e:
        self._log(
            f"ERROR: Sim4Life failed to open project file, it is likely corrupted: {e}",
            log_type="fatal",
        )
        if self.document and hasattr(self.document, "IsOpen") and self.document.IsOpen():  # type: ignore
            self.document.Close()
        raise ProjectCorruptionError(f"Sim4Life could not open corrupted file: {self.project_path}")
save
save()

Saves the current project to its file path.

Retries the save operation up to N times (configurable via save_retry_count) if Sim4Life randomly errors out. Logs warnings for each retry attempt.

Raises:

Type Description
ValueError

If project_path hasn't been set.

Exception

If all retry attempts fail, the last exception is raised.

Source code in goliat/project_manager.py
def save(self):
    """Saves the current project to its file path.

    Retries the save operation up to N times (configurable via save_retry_count)
    if Sim4Life randomly errors out. Logs warnings for each retry attempt.

    Raises:
        ValueError: If project_path hasn't been set.
        Exception: If all retry attempts fail, the last exception is raised.
    """
    if not self.project_path:
        raise ValueError("Project path is not set. Cannot save.")

    retry_count = self.config["save_retry_count"]
    if retry_count is None:
        retry_count = 4
    if not isinstance(retry_count, int):
        retry_count = 4
    last_exception = None

    self._log(f"Saving project to {self.project_path}...", log_type="info")

    for attempt in range(1, retry_count + 1):
        try:
            # Use Save() if document is already saved to the same path
            # This avoids the ARES error about connection to running jobs
            current_path = self.document.FilePath
            if current_path and os.path.normpath(current_path) == os.path.normpath(self.project_path):
                self.document.Save()
            else:
                self.document.SaveAs(self.project_path)
            if attempt > 1:
                self._log(f"Project saved successfully on attempt {attempt}.", log_type="success")
            else:
                self._log("Project saved.", log_type="success")
            return
        except Exception as e:
            last_exception = e
            if attempt < retry_count:
                self._log(
                    f"WARNING: Save attempt {attempt} failed: {e}. Retrying ({attempt + 1}/{retry_count})...",
                    log_type="warning",
                )
            else:
                self._log(
                    f"ERROR: All {retry_count} save attempts failed. Last error: {e}",
                    log_type="error",
                )

    # If we get here, all attempts failed
    if last_exception is not None:
        raise last_exception
    raise RuntimeError("Save failed but no exception was captured")
close
close()

Closes the active Sim4Life document.

Source code in goliat/project_manager.py
def close(self):
    """Closes the active Sim4Life document."""
    self._log("Closing project document...", log_type="info")
    self.document.Close()
cleanup
cleanup()

Closes any open project.

Source code in goliat/project_manager.py
def cleanup(self):
    """Closes any open project."""
    if self.document and hasattr(self.document, "IsOpen") and self.document.IsOpen():  # type: ignore
        self.close()
reload_project
reload_project()

Saves, closes, and reopens the project to load simulation results.

Needed because Sim4Life sometimes requires a reload to see new results files. This ensures results are available for extraction.

Source code in goliat/project_manager.py
def reload_project(self):
    """Saves, closes, and reopens the project to load simulation results.

    Needed because Sim4Life sometimes requires a reload to see new results
    files. This ensures results are available for extraction.
    """
    if self.document and hasattr(self.document, "IsOpen") and self.document.IsOpen():  # type: ignore
        self._log("Saving and reloading project to load results...", log_type="info")
        self.save()
        self.close()

    self.open()
    self._log("Project reloaded.", log_type="success")

Functions


Study Orchestration

Study classes that orchestrate simulation workflows.

Base Study

goliat.studies.base_study

Classes

BaseStudy

BaseStudy(study_type: str, config_filename: Optional[str] = None, gui: Optional[QueueGUI] = None, profiler=None, no_cache: bool = False)

Bases: LoggingMixin

Base class for simulation studies.

Handles common setup like config loading, profiling, project management, and GUI coordination. Subclasses implement _run_study() for specific logic.

Source code in goliat/studies/base_study.py
def __init__(
    self,
    study_type: str,
    config_filename: Optional[str] = None,
    gui: Optional["QueueGUI"] = None,
    profiler=None,
    no_cache: bool = False,
):
    self.study_type = study_type
    self.gui = gui
    self.verbose_logger = logging.getLogger("verbose")
    self.progress_logger = logging.getLogger("progress")
    self.no_cache = no_cache

    # Determine base_dir: prefer cwd if it has configs/, otherwise fallback to package location
    cwd = os.getcwd()
    if os.path.isdir(os.path.join(cwd, "configs")):
        self.base_dir = cwd
    else:
        # Fallback: calculate from package location (for backwards compatibility)
        self.base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

    self.config = Config(
        self.base_dir,
        config_filename if config_filename else f"{self.study_type}_config.json",
    )

    # Get study-specific profiling config
    profiling_config = self.config.get_profiling_config(self.study_type)
    execution_control = self.config["execution_control"] or {"do_setup": True, "do_run": True, "do_extract": True}

    self.profiler = Profiler(
        execution_control,  # type: ignore
        profiling_config,
        self.study_type,
        self.config.profiling_config_path,
    )
    self.line_profiler = None

    self.project_manager = ProjectManager(
        self.config,
        self.verbose_logger,
        self.progress_logger,
        self.gui,
        no_cache=self.no_cache,
    )
Functions
subtask
subtask(task_name: str, instance_to_profile=None)

A context manager for a 'subtask' within a phase.

Source code in goliat/studies/base_study.py
@contextlib.contextmanager
def subtask(self, task_name: str, instance_to_profile=None):
    """A context manager for a 'subtask' within a phase."""
    is_top_level_subtask = not self.profiler.subtask_stack
    sub_stage_display_name = task_name.replace("_", " ").capitalize()

    if is_top_level_subtask:
        self._log(f"  - {sub_stage_display_name}...", level="progress", log_type="progress")
        if self.gui and self.profiler.current_phase:
            self.gui.update_stage_progress(self.profiler.current_phase.capitalize(), 0, 1, sub_stage=sub_stage_display_name)
            self.start_stage_animation(task_name, 100)

    lp, wrapper = self._setup_line_profiler_if_needed(task_name, instance_to_profile)

    try:
        with self.profiler.subtask(task_name):
            if lp and wrapper:
                yield wrapper
            else:
                yield
    finally:
        elapsed = self.profiler.subtask_times[task_name][-1]
        self._log(f"    - Subtask '{task_name}' done in {elapsed:.2f}s", log_type="verbose")

        if is_top_level_subtask:
            self._log(f"    - Done in {elapsed:.2f}s", level="progress", log_type="success")
            if self.gui:
                self.end_stage_animation()
                if self.profiler.current_phase:
                    self.gui.update_stage_progress(self.profiler.current_phase.capitalize(), 1, 1)

        if lp:
            self._log_line_profiler_stats(task_name, lp)
start_stage_animation
start_stage_animation(task_name: str, end_value: int)

Starts progress bar animation for the current stage.

Source code in goliat/studies/base_study.py
def start_stage_animation(self, task_name: str, end_value: int):
    """Starts progress bar animation for the current stage."""
    if self.gui:
        self.gui.start_stage_animation(task_name, end_value)
end_stage_animation
end_stage_animation()

Stops the current stage animation.

Source code in goliat/studies/base_study.py
def end_stage_animation(self):
    """Stops the current stage animation."""
    if self.gui:
        self.gui.end_stage_animation()
run
run()

Main entry point to run the study.

Ensures Sim4Life is running, calls _run_study(), and handles cleanup and error reporting. Catches StudyCancelledError for graceful shutdown.

Source code in goliat/studies/base_study.py
def run(self):
    """Main entry point to run the study.

    Ensures Sim4Life is running, calls _run_study(), and handles cleanup
    and error reporting. Catches StudyCancelledError for graceful shutdown.
    """
    ensure_s4l_running()
    try:
        self._run_study()
    except StudyCancelledError:
        self._log(
            "--- Study execution cancelled by user. ---",
            level="progress",
            log_type="warning",
        )
    except Exception as e:
        self._log(f"--- FATAL ERROR in study: {e} ---", level="progress", log_type="fatal")
        self.verbose_logger.error(traceback.format_exc())
    finally:
        self._log(
            f"\n--- {self.__class__.__name__} Finished ---",
            level="progress",
            log_type="success",
        )
        self.profiler.save_estimates()
        self.project_manager.cleanup()
        if self.gui:
            self.gui.update_profiler()  # Send final profiler state

Functions

Far-Field Study

goliat.studies.far_field_study

Classes

FarFieldStudy

FarFieldStudy(study_type: str, config_filename: Optional[str] = None, gui: Optional[QueueGUI] = None, profiler=None, no_cache: bool = False)

Bases: BaseStudy

Manages far-field simulation campaigns.

Runs plane wave simulations across phantoms, frequencies, directions, and polarizations. Handles setup, run, and extraction phases with progress tracking.

Source code in goliat/studies/base_study.py
def __init__(
    self,
    study_type: str,
    config_filename: Optional[str] = None,
    gui: Optional["QueueGUI"] = None,
    profiler=None,
    no_cache: bool = False,
):
    self.study_type = study_type
    self.gui = gui
    self.verbose_logger = logging.getLogger("verbose")
    self.progress_logger = logging.getLogger("progress")
    self.no_cache = no_cache

    # Determine base_dir: prefer cwd if it has configs/, otherwise fallback to package location
    cwd = os.getcwd()
    if os.path.isdir(os.path.join(cwd, "configs")):
        self.base_dir = cwd
    else:
        # Fallback: calculate from package location (for backwards compatibility)
        self.base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

    self.config = Config(
        self.base_dir,
        config_filename if config_filename else f"{self.study_type}_config.json",
    )

    # Get study-specific profiling config
    profiling_config = self.config.get_profiling_config(self.study_type)
    execution_control = self.config["execution_control"] or {"do_setup": True, "do_run": True, "do_extract": True}

    self.profiler = Profiler(
        execution_control,  # type: ignore
        profiling_config,
        self.study_type,
        self.config.profiling_config_path,
    )
    self.line_profiler = None

    self.project_manager = ProjectManager(
        self.config,
        self.verbose_logger,
        self.progress_logger,
        self.gui,
        no_cache=self.no_cache,
    )

Functions

Near-Field Study

goliat.studies.near_field_study

Classes

NearFieldStudy

NearFieldStudy(study_type: str, config_filename: Optional[str] = None, gui: Optional[QueueGUI] = None, profiler=None, no_cache: bool = False)

Bases: BaseStudy

Manages near-field simulation campaigns.

Runs simulations across phantoms, frequencies, placements, positions, and orientations. Handles setup, run, and extraction phases with progress tracking and metadata verification.

Source code in goliat/studies/base_study.py
def __init__(
    self,
    study_type: str,
    config_filename: Optional[str] = None,
    gui: Optional["QueueGUI"] = None,
    profiler=None,
    no_cache: bool = False,
):
    self.study_type = study_type
    self.gui = gui
    self.verbose_logger = logging.getLogger("verbose")
    self.progress_logger = logging.getLogger("progress")
    self.no_cache = no_cache

    # Determine base_dir: prefer cwd if it has configs/, otherwise fallback to package location
    cwd = os.getcwd()
    if os.path.isdir(os.path.join(cwd, "configs")):
        self.base_dir = cwd
    else:
        # Fallback: calculate from package location (for backwards compatibility)
        self.base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

    self.config = Config(
        self.base_dir,
        config_filename if config_filename else f"{self.study_type}_config.json",
    )

    # Get study-specific profiling config
    profiling_config = self.config.get_profiling_config(self.study_type)
    execution_control = self.config["execution_control"] or {"do_setup": True, "do_run": True, "do_extract": True}

    self.profiler = Profiler(
        execution_control,  # type: ignore
        profiling_config,
        self.study_type,
        self.config.profiling_config_path,
    )
    self.line_profiler = None

    self.project_manager = ProjectManager(
        self.config,
        self.verbose_logger,
        self.progress_logger,
        self.gui,
        no_cache=self.no_cache,
    )

Functions


Setup Modules

Classes responsible for building the Sim4Life simulation scene.

Base Setup

goliat.setups.base_setup

Classes

BaseSetup

BaseSetup(config: Config, verbose_logger: Logger, progress_logger: Logger, gui: Optional[QueueGUI] = None)

Bases: LoggingMixin

Base class for simulation setup modules.

Provides common functionality like solver configuration, time/termination setup, point sensor creation, and final voxelization. Subclasses implement run_full_setup() for specific setup logic.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
verbose_logger Logger

Logger for detailed output.

required
progress_logger Logger

Logger for progress updates.

required
gui Optional[QueueGUI]

Optional GUI instance for forwarding progress messages.

None
Source code in goliat/setups/base_setup.py
def __init__(self, config: "Config", verbose_logger: "Logger", progress_logger: "Logger", gui: Optional["QueueGUI"] = None):
    """Sets up the base setup.

    Args:
        config: Configuration object.
        verbose_logger: Logger for detailed output.
        progress_logger: Logger for progress updates.
        gui: Optional GUI instance for forwarding progress messages.
    """
    self.config = config
    self.verbose_logger = verbose_logger
    self.progress_logger = progress_logger
    self.gui = gui
    import s4l_v1

    self.s4l_v1 = s4l_v1
    self.emfdtd = self.s4l_v1.simulation.emfdtd
    self.model = self.s4l_v1.model
Functions
run_full_setup
run_full_setup(project_manager: ProjectManager)

Prepares the simulation scene. Must be implemented by subclasses.

Returns:

Type Description

The configured simulation object.

Raises:

Type Description
NotImplementedError

If not overridden by subclass.

Source code in goliat/setups/base_setup.py
def run_full_setup(self, project_manager: "ProjectManager"):
    """Prepares the simulation scene. Must be implemented by subclasses.

    Returns:
        The configured simulation object.

    Raises:
        NotImplementedError: If not overridden by subclass.
    """
    raise NotImplementedError("The 'run_full_setup' method must be implemented by a subclass.")

Boundary Setup

goliat.setups.boundary_setup

Classes

BoundarySetup

BoundarySetup(config: Config, simulation: Simulation, verbose_logger: Logger, progress_logger: Logger)

Bases: BaseSetup

Configures the boundary conditions for the simulation.

Source code in goliat/setups/boundary_setup.py
def __init__(
    self,
    config: "Config",
    simulation: "emfdtd.Simulation",
    verbose_logger: "Logger",
    progress_logger: "Logger",
):
    super().__init__(config, verbose_logger, progress_logger)
    self.simulation = simulation
Functions
setup_boundary_conditions
setup_boundary_conditions()

Configures PML boundary conditions from the solver settings.

Sets the global boundary type (e.g., UPML/CPML) and PML strength (Low/Medium/High) based on the config.

Source code in goliat/setups/boundary_setup.py
def setup_boundary_conditions(self):
    """Configures PML boundary conditions from the solver settings.

    Sets the global boundary type (e.g., UPML/CPML) and PML strength
    (Low/Medium/High) based on the config.
    """
    self._log("Setting up boundary conditions...", log_type="progress")
    solver_settings = self.config["solver_settings"] or {}
    boundary_config = solver_settings.get("boundary_conditions", {})

    # Set Boundary Type (e.g., UpmlCpml)
    bc_type = boundary_config.get("type", "UpmlCpml")
    self._log(f"  - Setting global boundary conditions to: {bc_type}", log_type="info")

    global_boundaries = self.simulation.GlobalBoundarySettings
    if global_boundaries:
        bc_enum = global_boundaries.GlobalBoundaryType.enum
        if hasattr(bc_enum, bc_type):
            global_boundaries.GlobalBoundaryType = getattr(bc_enum, bc_type)
            self._log(
                f"    - Successfully set GlobalBoundaryType to {bc_type}",
                log_type="verbose",
            )
        else:
            self._log(
                f"    - Warning: Invalid boundary condition type '{bc_type}'. Using default.",
                log_type="warning",
            )
    else:
        self._log(
            "    - Warning: 'GlobalBoundarySettings' not found on simulation object.",
            log_type="warning",
        )

    # Set PML Strength
    strength = boundary_config.get("strength", "Medium").capitalize()
    self._log(f"  - Setting PML strength to: {strength}", log_type="info")

    boundary_settings_list = [x for x in self.simulation.AllSettings if isinstance(x, self.emfdtd.BoundarySettings)]
    if not boundary_settings_list:
        self._log(
            "  - No BoundarySettings found in simulation. Cannot set PML strength.",
            log_type="warning",
        )
        return

    boundary_settings = boundary_settings_list[0]

    strength_enum = boundary_settings.PmlStrength.enum
    if hasattr(strength_enum, strength):
        boundary_settings.PmlStrength = getattr(strength_enum, strength)
        self._log(f"    - Successfully set PmlStrength to {strength}", log_type="verbose")
    else:
        self._log(
            f"    - Warning: Invalid PML strength '{strength}'. Using default (Medium).",
            log_type="warning",
        )
        boundary_settings.PmlStrength = strength_enum.Medium

Far-Field Setup

goliat.setups.far_field_setup

Classes

FarFieldSetup

FarFieldSetup(config: Config, phantom_name: str, frequency_mhz: int, direction_name: str, polarization_name: str, project_manager: ProjectManager, verbose_logger: Logger, progress_logger: Logger, profiler: Profiler, gui=None)

Bases: BaseSetup

Configures a far-field simulation for a specific direction and polarization.

Source code in goliat/setups/far_field_setup.py
def __init__(
    self,
    config: "Config",
    phantom_name: str,
    frequency_mhz: int,
    direction_name: str,
    polarization_name: str,
    project_manager: "ProjectManager",
    verbose_logger: "Logger",
    progress_logger: "Logger",
    profiler: "Profiler",
    gui=None,
):
    super().__init__(config, verbose_logger, progress_logger, gui)
    self.phantom_name = phantom_name
    self.frequency_mhz = frequency_mhz
    self.direction_name = direction_name
    self.polarization_name = polarization_name
    self.project_manager = project_manager
    self.profiler = profiler
    self.gui = gui
    self.simulation_type = self.config["far_field_setup.type"]
    if self.simulation_type is None:
        self.simulation_type = "environmental"
    self.document = self.s4l_v1.document
Functions
run_full_setup
run_full_setup(project_manager: ProjectManager) -> emfdtd.Simulation

Executes the full setup sequence for a single far-field simulation with granular timing.

Source code in goliat/setups/far_field_setup.py
def run_full_setup(self, project_manager: "ProjectManager") -> "emfdtd.Simulation":
    """Executes the full setup sequence for a single far-field simulation with granular timing."""
    self._log("--- Setting up single Far-Field sim ---", log_type="header")

    # Subtask 1: Load phantom
    self._log("    - Load phantom...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_load_phantom"):
        phantom_setup = PhantomSetup(
            self.config,
            self.phantom_name,
            self.verbose_logger,
            self.progress_logger,
        )
        phantom_setup.ensure_phantom_is_loaded()
    elapsed = self.profiler.subtask_times["setup_load_phantom"][-1]
    self._log(f"      - Subtask 'setup_load_phantom' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 2: Configure scene
    self._log("    - Configure scene (bbox, plane wave)...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_configure_scene"):
        bbox_entity = self._create_or_get_simulation_bbox()
        simulation = self._create_simulation_entity(bbox_entity)
    elapsed = self.profiler.subtask_times["setup_configure_scene"][-1]
    self._log(f"      - Subtask 'setup_configure_scene' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 3: Assign materials
    self._log("    - Assign materials...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_materials"):
        material_setup = MaterialSetup(
            self.config,
            simulation,
            None,  # type: ignore
            self.phantom_name,
            self.verbose_logger,
            self.progress_logger,
            free_space=False,
        )
        material_setup.assign_materials(phantom_only=True)
    elapsed = self.profiler.subtask_times["setup_materials"][-1]
    self._log(f"      - Subtask 'setup_materials' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 4: Configure solver
    self._log("    - Configure solver (gridding, boundaries, sensors)...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_solver"):
        gridding_setup = GriddingSetup(
            self.config,
            simulation,
            None,  # type: ignore
            None,  # type: ignore
            self.verbose_logger,
            self.progress_logger,
            frequency_mhz=self.frequency_mhz,
        )
        gridding_setup.setup_gridding()

        boundary_setup = BoundarySetup(self.config, simulation, self.verbose_logger, self.progress_logger)
        boundary_setup.setup_boundary_conditions()

        self._add_point_sensors(simulation, "far_field_simulation_bbox")
        self._setup_solver_settings(simulation)
    elapsed = self.profiler.subtask_times["setup_solver"][-1]
    self._log(f"      - Subtask 'setup_solver' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 5: Voxelize
    self._log("    - Voxelize simulation...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_voxelize"):
        bbox_entity = next(
            (e for e in self.model.AllEntities() if hasattr(e, "Name") and e.Name == "far_field_simulation_bbox"),
            None,
        )
        if not bbox_entity:
            raise RuntimeError("Could not find 'far_field_simulation_bbox' for voxelization.")

        import XCoreModeling

        phantom_entities = [e for e in self.model.AllEntities() if isinstance(e, XCoreModeling.TriangleMesh)]
        point_sensor_entities = [e for e in self.model.AllEntities() if "Point Sensor Entity" in e.Name]

        all_simulation_parts = phantom_entities + [bbox_entity] + point_sensor_entities

        super()._finalize_setup(self.project_manager, simulation, all_simulation_parts, self.frequency_mhz)
    elapsed = self.profiler.subtask_times["setup_voxelize"][-1]
    self._log(f"      - Subtask 'setup_voxelize' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 6: Save project
    self._log("    - Save project...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_save_project"):
        project_manager.save()
    elapsed = self.profiler.subtask_times["setup_save_project"][-1]
    self._log(f"      - Subtask 'setup_save_project' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    self._log("Common settings applied.", log_type="success")
    return simulation

Gridding Setup

goliat.setups.gridding_setup

Classes

GriddingSetup

GriddingSetup(config: Config, simulation: Simulation, placement_name: str, antenna: Antenna, verbose_logger: Logger, progress_logger: Logger, frequency_mhz: int | None = None)

Bases: BaseSetup

Configures simulation grid resolution and subgridding.

Sets up main grid (automatic or manual) with padding, and optional antenna-specific subgrids for fine details.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
simulation Simulation

The simulation object to configure gridding for.

required
placement_name str

Name of the placement scenario.

required
antenna Antenna

Antenna object.

required
verbose_logger Logger

Logger for detailed output.

required
progress_logger Logger

Logger for progress updates.

required
frequency_mhz int | None

Simulation frequency in MHz (optional).

None
Source code in goliat/setups/gridding_setup.py
def __init__(
    self,
    config: "Config",
    simulation: "emfdtd.Simulation",
    placement_name: str,
    antenna: "Antenna",
    verbose_logger: "Logger",
    progress_logger: "Logger",
    frequency_mhz: int | None = None,
):
    """Initializes the GriddingSetup.

    Args:
        config: Configuration object.
        simulation: The simulation object to configure gridding for.
        placement_name: Name of the placement scenario.
        antenna: Antenna object.
        verbose_logger: Logger for detailed output.
        progress_logger: Logger for progress updates.
        frequency_mhz: Simulation frequency in MHz (optional).
    """
    super().__init__(config, verbose_logger, progress_logger)
    self.simulation = simulation
    self.placement_name = placement_name
    self.antenna = antenna
    self.frequency_mhz = frequency_mhz

    import s4l_v1.units

    self.units = s4l_v1.units
Functions
setup_gridding
setup_gridding(antenna_components: dict | None = None)

Sets up main grid and optional antenna subgrids.

Parameters:

Name Type Description Default
antenna_components dict | None

Dict mapping component names to entities.

None
Source code in goliat/setups/gridding_setup.py
def setup_gridding(self, antenna_components: dict | None = None):
    """Sets up main grid and optional antenna subgrids.

    Args:
        antenna_components: Dict mapping component names to entities.
    """
    self._log("Setting up gridding...", log_type="progress")
    self._setup_main_grid()
    if antenna_components:
        self._setup_subgrids(antenna_components)
    else:
        self._log(
            "  - No antenna components provided, skipping subgridding.",
            log_type="info",
        )

Material Setup

goliat.setups.material_setup

Classes

MaterialSetup

MaterialSetup(config: Config, simulation: Simulation, antenna: Antenna, phantom_name: str, verbose_logger: Logger, progress_logger: Logger, free_space: bool = False)

Bases: BaseSetup

Assigns materials to phantom tissues and antenna components.

Maps tissue names to IT'IS database materials and assigns antenna materials from config. Uses file locking for thread-safe database access.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
simulation Simulation

The simulation object to assign materials to.

required
antenna Antenna

Antenna object.

required
phantom_name str

Name of the phantom model.

required
verbose_logger Logger

Logger for detailed output.

required
progress_logger Logger

Logger for progress updates.

required
free_space bool

Whether this is a free-space simulation.

False
Source code in goliat/setups/material_setup.py
def __init__(
    self,
    config: "Config",
    simulation: "emfdtd.Simulation",
    antenna: "Antenna",
    phantom_name: str,
    verbose_logger: "Logger",
    progress_logger: "Logger",
    free_space: bool = False,
):
    """Initializes the MaterialSetup.

    Args:
        config: Configuration object.
        simulation: The simulation object to assign materials to.
        antenna: Antenna object.
        phantom_name: Name of the phantom model.
        verbose_logger: Logger for detailed output.
        progress_logger: Logger for progress updates.
        free_space: Whether this is a free-space simulation.
    """
    super().__init__(config, verbose_logger, progress_logger)
    self.simulation = simulation
    self.antenna = antenna
    self.phantom_name = phantom_name
    self.free_space = free_space

    # Import required modules
    import s4l_v1.materials.database
    import XCoreModeling

    # Access material database - use s4l_v1.materials.database
    self.database = s4l_v1.materials.database
    self.XCoreModeling = XCoreModeling
Functions
assign_materials
assign_materials(antenna_components: dict | None = None, phantom_only: bool = False)

Assigns materials to simulation entities.

Sets background to Air, then assigns phantom materials if not free-space, and antenna materials if not phantom_only mode.

Parameters:

Name Type Description Default
antenna_components dict | None

Dict mapping component names to entities.

None
phantom_only bool

If True, skips antenna material assignment.

False
Source code in goliat/setups/material_setup.py
def assign_materials(self, antenna_components: dict | None = None, phantom_only: bool = False):
    """Assigns materials to simulation entities.

    Sets background to Air, then assigns phantom materials if not free-space,
    and antenna materials if not phantom_only mode.

    Args:
        antenna_components: Dict mapping component names to entities.
        phantom_only: If True, skips antenna material assignment.
    """
    self._log("Assigning materials...", log_type="progress")

    # Background material
    background_settings = self.simulation.raw.BackgroundMaterialSettings()  # type: ignore
    air_material = self.database["Generic 1.1"]["Air"]  # type: ignore
    self.simulation.raw.AssignMaterial(background_settings, air_material)  # type: ignore

    # Phantom materials
    if not self.free_space:
        self._assign_phantom_materials()

    # Antenna materials
    if not phantom_only:
        if not antenna_components:
            raise ValueError("antenna_components must be provided when not in phantom_only mode.")
        self._assign_antenna_materials(antenna_components)

Near-Field Setup

goliat.setups.near_field_setup

Classes

NearFieldSetup

NearFieldSetup(config: Config, phantom_name: str, frequency_mhz: int, scenario_name: str, position_name: str, orientation_name: str, antenna: Antenna, verbose_logger: Logger, progress_logger: Logger, profiler: Profiler, gui=None, free_space: bool = False)

Bases: BaseSetup

Configures the simulation environment by coordinating setup modules.

Source code in goliat/setups/near_field_setup.py
def __init__(
    self,
    config: "Config",
    phantom_name: str,
    frequency_mhz: int,
    scenario_name: str,
    position_name: str,
    orientation_name: str,
    antenna: "Antenna",
    verbose_logger: "Logger",
    progress_logger: "Logger",
    profiler: "Profiler",
    gui=None,
    free_space: bool = False,
):
    super().__init__(config, verbose_logger, progress_logger, gui)
    self.phantom_name = phantom_name
    self.frequency_mhz = frequency_mhz
    self.base_placement_name = scenario_name
    self.position_name = position_name
    self.orientation_name = orientation_name
    self.placement_name = f"{scenario_name}_{position_name}_{orientation_name}"
    self.antenna = antenna
    self.profiler = profiler
    self.gui = gui
    self.free_space = free_space
    # Will be set in _setup_simulation_entity if detuning is enabled
    self.final_frequency_mhz = frequency_mhz

    # S4L modules
    import XCoreModeling

    self.document = self.s4l_v1.document
    self.XCoreModeling = XCoreModeling
Functions
run_full_setup
run_full_setup(project_manager: ProjectManager, lock=None) -> emfdtd.Simulation

Executes complete setup sequence with detailed timing.

Orchestrates the entire simulation setup process in 6 major subtasks:

  1. Load phantom: Imports phantom model from disk or downloads if missing. Creates head/trunk bounding boxes if needed.

  2. Configure scene: Imports antenna, places it relative to phantom, creates simulation bounding box, sets up simulation entity, adds point sensors. Handles special cases like phantom rotation and phone alignment.

  3. Assign materials: Maps tissue names to IT'IS database materials, assigns antenna component materials from config. Uses file locking for thread safety.

  4. Configure solver: Sets up gridding (automatic or manual with subgrids), configures boundary conditions (PML), and sets up excitation sources.

  5. Voxelize: Runs automatic voxelization on all simulation entities, updates materials and grid, optionally exports material properties.

  6. Save project: Saves the .smash file to disk.

Each subtask is profiled individually for accurate timing estimates. The method returns a fully configured simulation object ready to run.

Parameters:

Name Type Description Default
project_manager ProjectManager

Project manager for saving operations.

required
lock

Optional lock (currently unused, reserved for future use).

None

Returns:

Type Description
Simulation

Fully configured simulation object ready for execution.

Source code in goliat/setups/near_field_setup.py
def run_full_setup(self, project_manager: "ProjectManager", lock=None) -> "emfdtd.Simulation":
    """Executes complete setup sequence with detailed timing.

    Orchestrates the entire simulation setup process in 6 major subtasks:

    1. Load phantom: Imports phantom model from disk or downloads if missing.
       Creates head/trunk bounding boxes if needed.

    2. Configure scene: Imports antenna, places it relative to phantom, creates
       simulation bounding box, sets up simulation entity, adds point sensors.
       Handles special cases like phantom rotation and phone alignment.

    3. Assign materials: Maps tissue names to IT'IS database materials, assigns
       antenna component materials from config. Uses file locking for thread safety.

    4. Configure solver: Sets up gridding (automatic or manual with subgrids),
       configures boundary conditions (PML), and sets up excitation sources.

    5. Voxelize: Runs automatic voxelization on all simulation entities, updates
       materials and grid, optionally exports material properties.

    6. Save project: Saves the .smash file to disk.

    Each subtask is profiled individually for accurate timing estimates. The method
    returns a fully configured simulation object ready to run.

    Args:
        project_manager: Project manager for saving operations.
        lock: Optional lock (currently unused, reserved for future use).

    Returns:
        Fully configured simulation object ready for execution.
    """
    self._log("Running full simulation setup...", log_type="progress")

    # Subtask 1: Load phantom
    if not self.free_space:
        self._log("    - Load phantom...", level="progress", log_type="progress")
        with self.profiler.subtask("setup_load_phantom"):
            phantom_setup = PhantomSetup(
                self.config,
                self.phantom_name,
                self.verbose_logger,
                self.progress_logger,
            )
            phantom_setup.ensure_phantom_is_loaded()
        elapsed = self.profiler.subtask_times["setup_load_phantom"][-1]
        self._log(f"      - Subtask 'setup_load_phantom' done in {elapsed:.2f}s", log_type="verbose")
        self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 2: Configure scene
    self._log("    - Configure scene (bboxes, placement, simulation, sensors)...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_configure_scene"):
        if not self.free_space:
            self._setup_bounding_boxes()

        placement_setup = PlacementSetup(
            self.config,
            self.phantom_name,
            self.frequency_mhz,
            self.base_placement_name,
            self.position_name,
            self.orientation_name,
            self.antenna,
            self.verbose_logger,
            self.progress_logger,
            self.free_space,
        )
        placement_setup.place_antenna()

        antenna_components = self._get_antenna_components()
        self._create_simulation_bbox()
        simulation = self._setup_simulation_entity()

        sim_bbox_name = f"{self.placement_name.lower()}_simulation_bbox"
        self._add_point_sensors(simulation, sim_bbox_name)

        self._handle_phantom_rotation(placement_setup)
        self._align_simulation_with_phone()

    elapsed = self.profiler.subtask_times["setup_configure_scene"][-1]
    self._log(f"      - Subtask 'setup_configure_scene' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 3: Assign materials
    self._log("    - Assign materials...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_materials"):
        material_setup = MaterialSetup(
            self.config,
            simulation,
            self.antenna,
            self.phantom_name,
            self.verbose_logger,
            self.progress_logger,
            self.free_space,
        )
        material_setup.assign_materials(antenna_components)

    elapsed = self.profiler.subtask_times["setup_materials"][-1]
    self._log(f"      - Subtask 'setup_materials' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 4: Configure solver
    self._log("    - Configure solver (gridding, boundaries, sources)...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_solver"):
        gridding_setup = GriddingSetup(
            self.config,
            simulation,
            self.placement_name,
            self.antenna,
            self.verbose_logger,
            self.progress_logger,
            frequency_mhz=self.frequency_mhz,
        )
        gridding_setup.setup_gridding(antenna_components)

        boundary_setup = BoundarySetup(self.config, simulation, self.verbose_logger, self.progress_logger)
        boundary_setup.setup_boundary_conditions()

        # Use final frequency (with detuning applied) for source setup
        source_frequency = getattr(self, "final_frequency_mhz", self.frequency_mhz)
        source_setup = SourceSetup(
            self.config,
            simulation,
            source_frequency,
            self.antenna,
            self.verbose_logger,
            self.progress_logger,
            self.free_space,
            self.phantom_name,
            self.placement_name,
        )
        source_setup.setup_source_and_sensors(antenna_components)

    elapsed = self.profiler.subtask_times["setup_solver"][-1]
    self._log(f"      - Subtask 'setup_solver' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 5: Voxelize
    self._log("    - Voxelize simulation...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_voxelize"):
        all_antenna_parts = list(antenna_components.values())
        point_sensor_entities = [e for e in self.model.AllEntities() if "Point Sensor Entity" in e.Name]

        if self.free_space:
            sim_bbox_name = "freespace_simulation_bbox"
        else:
            sim_bbox_name = f"{self.placement_name.lower()}_simulation_bbox"

        sim_bbox_entity = next(
            (e for e in self.model.AllEntities() if hasattr(e, "Name") and e.Name == sim_bbox_name),
            None,
        )
        if not sim_bbox_entity:
            raise RuntimeError(f"Could not find simulation bounding box: '{sim_bbox_name}' for voxelization.")

        phantom_entities = [e for e in self.model.AllEntities() if isinstance(e, self.XCoreModeling.TriangleMesh)]
        all_simulation_parts = phantom_entities + all_antenna_parts + point_sensor_entities + [sim_bbox_entity]

        super()._finalize_setup(project_manager, simulation, all_simulation_parts, self.frequency_mhz)

    elapsed = self.profiler.subtask_times["setup_voxelize"][-1]
    self._log(f"      - Subtask 'setup_voxelize' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    # Subtask 6: Save project
    self._log("    - Save project...", level="progress", log_type="progress")
    with self.profiler.subtask("setup_save_project"):
        project_manager.save()
    elapsed = self.profiler.subtask_times["setup_save_project"][-1]
    self._log(f"      - Subtask 'setup_save_project' done in {elapsed:.2f}s", log_type="verbose")
    self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    self._log("Full simulation setup complete.", log_type="success")
    return simulation

Phantom Setup

goliat.setups.phantom_setup

Classes

PhantomSetup

PhantomSetup(config: Config, phantom_name: str, verbose_logger: Logger, progress_logger: Logger)

Bases: BaseSetup

Handles loading and importing phantom models into Sim4Life.

Source code in goliat/setups/phantom_setup.py
def __init__(
    self,
    config: "Config",
    phantom_name: str,
    verbose_logger: "Logger",
    progress_logger: "Logger",
):
    super().__init__(config, verbose_logger, progress_logger)
    self.phantom_name = phantom_name

    import s4l_v1.data
    import s4l_v1.model
    import XCoreModeling

    self.model = s4l_v1.model
    self.data = s4l_v1.data
    self.XCoreModeling = XCoreModeling
Functions
ensure_phantom_is_loaded
ensure_phantom_is_loaded() -> bool

Checks if phantom is loaded, imports from disk if available, or downloads if missing.

Returns:

Type Description
bool

True if phantom is now loaded, False if download was initiated (requires re-run).

Source code in goliat/setups/phantom_setup.py
def ensure_phantom_is_loaded(self) -> bool:
    """Checks if phantom is loaded, imports from disk if available, or downloads if missing.

    Returns:
        True if phantom is now loaded, False if download was initiated (requires re-run).
    """
    self._log("--- Running Phantom Check ---", log_type="header")
    all_entities = self.model.AllEntities()
    self._log(f"Found {len(all_entities)} total entities in the project.", log_type="info")

    is_loaded = False
    for i, entity in enumerate(all_entities):
        if hasattr(entity, "Name"):
            entity_name_lower = entity.Name.lower()
            phantom_name_lower = self.phantom_name.lower()
            if phantom_name_lower in entity_name_lower:
                is_loaded = True
                break
        else:
            self._log(f"  - Entity {i}: (No 'Name' attribute)", log_type="verbose")

    if is_loaded:
        self._log(
            "--- Phantom Check Result: Phantom model is already present. ---",
            log_type="success",
        )
        return True
    else:
        self._log(
            "--- Phantom Check Result: Phantom not found in project. ---",
            log_type="warning",
        )

    study_type = self.config["study_type"]
    if study_type == "near_field" or study_type == "far_field":
        sab_path = os.path.join(self.config.base_dir, "data", "phantoms", f"{self.phantom_name}.sab")
        if os.path.exists(sab_path):
            self._log(
                f"Phantom not found in document. Importing from '{sab_path}'...",
                log_type="info",
            )
            self.XCoreModeling.Import(sab_path)
            self._log("Phantom imported successfully.", log_type="success")
            return True

        self._log(
            f"Local .sab file not found. Attempting to download '{self.phantom_name}'...",
            log_type="info",
        )
        available_downloads = self.data.GetAvailableDownloads()
        phantom_to_download = next(
            (item for item in available_downloads if self.phantom_name.lower() in item.Name.lower()),
            None,
        )

        if not phantom_to_download:
            raise FileNotFoundError(f"Phantom '{self.phantom_name}' not found for download or in local files.")

        self._log(f"Found '{phantom_to_download.Name}'. Downloading...", log_type="info")
        download_email = self.config["download_email"]
        if download_email is None:
            download_email = "example@example.com"
        self.data.DownloadModel(
            phantom_to_download,
            email=download_email,  # type: ignore
            directory=os.path.join(self.config.base_dir, "data", "phantoms"),
        )
        self._log(
            "Phantom downloaded successfully. Please re-run the script to import the new .sab file.",
            log_type="success",
        )
        return False
    else:
        raise ValueError(f"Unknown study type: {study_type}")

Placement Setup

goliat.setups.placement_setup

Classes

PlacementSetup

PlacementSetup(config: Config, phantom_name: str, frequency_mhz: int, base_placement_name: str, position_name: str, orientation_name: str, antenna: Antenna, verbose_logger: Logger, progress_logger: Logger, free_space: bool = False)

Bases: BaseSetup

Handles antenna placement and orientation relative to phantom.

Imports antenna model, calculates target position based on placement scenario, and applies composed transformation (stand-up rotation, translation, orientation twists) to position antenna correctly.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
phantom_name str

Name of the phantom model.

required
frequency_mhz int

Simulation frequency in MHz.

required
base_placement_name str

Base name of the placement scenario.

required
position_name str

Name of the position within the scenario.

required
orientation_name str

Name of the orientation within the scenario.

required
antenna Antenna

Antenna object to place.

required
verbose_logger Logger

Logger for detailed output.

required
progress_logger Logger

Logger for progress updates.

required
free_space bool

Whether this is a free-space simulation.

False
Source code in goliat/setups/placement_setup.py
def __init__(
    self,
    config: "Config",
    phantom_name: str,
    frequency_mhz: int,
    base_placement_name: str,
    position_name: str,
    orientation_name: str,
    antenna: "Antenna",
    verbose_logger: "Logger",
    progress_logger: "Logger",
    free_space: bool = False,
):
    """Initializes the PlacementSetup.

    Args:
        config: Configuration object.
        phantom_name: Name of the phantom model.
        frequency_mhz: Simulation frequency in MHz.
        base_placement_name: Base name of the placement scenario.
        position_name: Name of the position within the scenario.
        orientation_name: Name of the orientation within the scenario.
        antenna: Antenna object to place.
        verbose_logger: Logger for detailed output.
        progress_logger: Logger for progress updates.
        free_space: Whether this is a free-space simulation.
    """
    super().__init__(config, verbose_logger, progress_logger)
    self.phantom_name = phantom_name
    self.frequency_mhz = frequency_mhz
    self.base_placement_name = base_placement_name
    self.position_name = position_name
    self.orientation_name = orientation_name
    self.placement_name = f"{base_placement_name}_{position_name}_{orientation_name}"
    self.antenna = antenna
    self.free_space = free_space

    # Import XCoreMath for transformations
    import XCoreMath

    self.XCoreMath = XCoreMath
Functions
place_antenna
place_antenna()

Places and orients antenna using a single composed transformation.

This method implements a key optimization: instead of applying multiple transforms sequentially (which causes precision loss and can accumulate errors), it composes all transformations into a single matrix and applies it once.

The transformation sequence is: 1. Stand-up rotation: Rotates antenna 90° around X-axis to make it upright 2. Base translation: Moves antenna to a reference point (speaker location) 3. Special rotation: For 'by_cheek', applies -90° Z-rotation to align with YZ plane 4. Orientation twists: Applies any rotations specified in orientation config 5. Final translation: Moves antenna to its target position relative to phantom

The order matters because matrix multiplication is not commutative. Each step builds on the previous transform, so the antenna ends up correctly positioned and oriented relative to the phantom regardless of how many rotations are needed.

Parameters:

Name Type Description Default
None (uses instance attributes

antenna, placement_name, etc.)

required

Raises:

Type Description
RuntimeError

If antenna import fails or required entities aren't found.

Source code in goliat/setups/placement_setup.py
def place_antenna(self):
    """Places and orients antenna using a single composed transformation.

    This method implements a key optimization: instead of applying multiple
    transforms sequentially (which causes precision loss and can accumulate errors),
    it composes all transformations into a single matrix and applies it once.

    The transformation sequence is:
    1. Stand-up rotation: Rotates antenna 90° around X-axis to make it upright
    2. Base translation: Moves antenna to a reference point (speaker location)
    3. Special rotation: For 'by_cheek', applies -90° Z-rotation to align with YZ plane
    4. Orientation twists: Applies any rotations specified in orientation config
    5. Final translation: Moves antenna to its target position relative to phantom

    The order matters because matrix multiplication is not commutative. Each step
    builds on the previous transform, so the antenna ends up correctly positioned
    and oriented relative to the phantom regardless of how many rotations are needed.

    Args:
        None (uses instance attributes: antenna, placement_name, etc.)

    Raises:
        RuntimeError: If antenna import fails or required entities aren't found.
    """
    self._log(
        f"--- Starting Placement: {self.base_placement_name} - {self.position_name} - {self.orientation_name} ---",
        log_type="header",
    )

    phantom_definition = (self.config["phantom_definitions"] or {}).get(self.phantom_name.lower(), {})
    if not self.free_space and not phantom_definition.get("placements", {}).get(f"do_{self.base_placement_name}"):
        self._log(
            f"Placement '{self.base_placement_name}' is disabled in the configuration.",
            log_type="info",
        )
        return

    base_target_point, position_offset = self._get_placement_details()

    # Import antenna model
    antenna_path = self.antenna.get_centered_antenna_path(os.path.join(self.config.base_dir, "data", "antennas", "centered"))
    imported_entities = list(self.model.Import(antenna_path))

    antenna_group = next(
        (e for e in imported_entities if "Antenna" in e.Name and "bounding box" not in e.Name),
        None,
    )
    bbox_entity = next((e for e in imported_entities if "bounding box" in e.Name), None)

    if not antenna_group:
        raise RuntimeError("Could not find imported antenna group.")

    # Find the "Ground" entity/entities ("PCB" of the phone excl. IFA antenna)
    ground_entities = [e for e in antenna_group.Entities if "Ground" in e.Name or "Substrate" in e.Name]  # type: ignore

    # Normalize antenna group name to use requested frequency (in case fallback file was used)
    # Replace any frequency in the name with the requested frequency
    normalized_name = re.sub(r"\d+\s*MHz", f"{self.antenna.frequency_mhz} MHz", antenna_group.Name)

    # Rename the entities to include the placement name for uniqueness
    antenna_group.Name = f"{normalized_name} ({self.placement_name})"
    if bbox_entity:
        # Also normalize bounding box name if it contains frequency
        normalized_bbox_name = re.sub(r"\d+\s*MHz", f"{self.antenna.frequency_mhz} MHz", bbox_entity.Name)
        bbox_entity.Name = f"{normalized_bbox_name} ({self.placement_name})"

    entities_to_transform = [antenna_group, bbox_entity] if bbox_entity else [antenna_group]

    # --- Transformation Composition ---
    self._log("Composing final transformation...", log_type="progress")

    # Start with an identity transform
    final_transform = self.XCoreMath.Transform()

    # 1. Stand-up Rotation
    rot_stand_up = self.XCoreMath.Rotation(self.XCoreMath.Vec3(1, 0, 0), np.deg2rad(90))
    final_transform = rot_stand_up * final_transform

    # 2. Base translation to antenna reference point (speaker output of the mock-up phone)
    reference_target_point = self._get_speaker_reference(ground_entities, upright_transform=final_transform)
    base_translation = self.XCoreMath.Translation(reference_target_point)
    final_transform = base_translation * final_transform

    # Special rotation for 'by_cheek' to align with YZ plane
    if self.base_placement_name.startswith("by_cheek"):
        self._log("Applying 'by_cheek' specific Z-rotation.", log_type="info")
        rot_z_cheek = self.XCoreMath.Rotation(self.XCoreMath.Vec3(0, 0, 1), np.deg2rad(-90))
        final_transform = rot_z_cheek * final_transform

    # 3. Orientation Twist
    if self.orientation_rotations:
        for rot in self.orientation_rotations:
            axis_map = {
                "X": self.XCoreMath.Vec3(1, 0, 0),
                "Y": self.XCoreMath.Vec3(0, 1, 0),
                "Z": self.XCoreMath.Vec3(0, 0, 1),
            }
            rot_twist = self.XCoreMath.Rotation(axis_map[rot["axis"].upper()], np.deg2rad(rot["angle_deg"]))
            final_transform = rot_twist * final_transform

    # 4. Final Translation
    final_target_point = self.model.Vec3(
        base_target_point[0] + position_offset[0],
        base_target_point[1] + position_offset[1],
        base_target_point[2] + position_offset[2],
    )
    translation_transform = self.XCoreMath.Translation(final_target_point)
    final_transform = translation_transform * final_transform

    # --- Apply the single, composed transform ---
    self._log("Applying final composed transform.", log_type="progress")
    for entity in entities_to_transform:
        entity.ApplyTransform(final_transform)

    self._log("--- Transformation Sequence Complete ---", log_type="success")

Source Setup

goliat.setups.source_setup

Classes

SourceSetup

SourceSetup(config: Config, simulation: Simulation, frequency_mhz: int, antenna: Antenna, verbose_logger: Logger, progress_logger: Logger, free_space: bool = False, phantom_name: Optional[str] = None, placement_name: Optional[str] = None)

Bases: BaseSetup

Configures excitation sources and sensors for the simulation.

Source code in goliat/setups/source_setup.py
def __init__(
    self,
    config: "Config",
    simulation: "emfdtd.Simulation",
    frequency_mhz: int,
    antenna: "Antenna",
    verbose_logger: "Logger",
    progress_logger: "Logger",
    free_space: bool = False,
    phantom_name: Optional[str] = None,
    placement_name: Optional[str] = None,
):
    super().__init__(config, verbose_logger, progress_logger)
    self.simulation = simulation
    self.frequency_mhz = frequency_mhz
    self.antenna = antenna
    self.free_space = free_space
    self.phantom_name = phantom_name
    self.placement_name = placement_name

    import s4l_v1.units

    self.units = s4l_v1.units
Functions
setup_source_and_sensors
setup_source_and_sensors(antenna_components: dict)

Sets up the edge source and sensors based on excitation type.

Uses excitation_type from config to determine Harmonic or Gaussian excitation. For free-space simulations, also adds far-field sensors for Gaussian sources.

Parameters:

Name Type Description Default
antenna_components dict

Dict mapping component names to entities.

required
Source code in goliat/setups/source_setup.py
def setup_source_and_sensors(self, antenna_components: dict):
    """Sets up the edge source and sensors based on excitation type.

    Uses excitation_type from config to determine Harmonic or Gaussian excitation.
    For free-space simulations, also adds far-field sensors for Gaussian sources.

    Args:
        antenna_components: Dict mapping component names to entities.
    """
    self._log("Setting up source and sensors...", log_type="progress")

    source_name = self.antenna.get_source_entity_name()
    if source_name not in antenna_components:
        raise RuntimeError(f"Could not find source entity '{source_name}' in antenna group.")
    source_entity = antenna_components[source_name]

    # Source setup
    edge_source_settings = self.emfdtd.EdgeSourceSettings()

    # Get the enum for ExcitationType
    excitation_enum = edge_source_settings.ExcitationType.enum

    # Read excitation type from config (default to Harmonic for backward compatibility)
    excitation_type = self.config["simulation_parameters.excitation_type"] or "Harmonic"
    excitation_type_lower = excitation_type.lower() if isinstance(excitation_type, str) else "harmonic"

    if excitation_type_lower == "gaussian":
        bandwidth_mhz_val = self.config["simulation_parameters.bandwidth_mhz"] or 50.0
        bandwidth_mhz = float(bandwidth_mhz_val) if not isinstance(bandwidth_mhz_val, dict) else 50.0
        k_val = self.config["simulation_parameters.gaussian_pulse_k"] or 3
        k = int(k_val) if not isinstance(k_val, dict) else 3

        if k == 5:
            # Use Sim4Life built-in Gaussian (forced k=5)
            self._log(f"  - Using Sim4Life built-in Gaussian source (BW: {bandwidth_mhz} MHz, k=5).", log_type="info")
            edge_source_settings.ExcitationType = excitation_enum.Gaussian
            edge_source_settings.CenterFrequency = self.frequency_mhz, self.units.MHz
            edge_source_settings.Bandwidth = bandwidth_mhz, self.units.MHz
        else:
            # Use custom Gaussian waveform with user-defined k (faster pulse)
            self._log(f"  - Using custom Gaussian source (BW: {bandwidth_mhz} MHz, k={k}).", log_type="info")
            edge_source_settings.ExcitationType = excitation_enum.UserDefined

            # Set up user-defined signal from equation
            user_signal_enum = edge_source_settings.UserSignalType.enum
            edge_source_settings.UserSignalType = user_signal_enum.FromEquation

            # Calculate parameters for Gaussian pulse
            # σ = 0.94/(π·BW), t₀ = k·σ (to start near zero)
            bandwidth_hz = bandwidth_mhz * 1e6
            center_freq_hz = self.frequency_mhz * 1e6
            sigma = 0.94 / (np.pi * bandwidth_hz)
            t0 = float(k) * sigma

            # Create Gaussian-modulated pulse expression: A * exp(-(_t-t₀)²/(2σ²)) * cos(2π·f₀·_t)
            # Using Sim4Life expression syntax with '_t' as time variable
            # Note: Using ** for exponentiation (Python-style) instead of ^
            amplitude = 1.0
            expression = f"{amplitude} * exp(-(_t - {t0})**2 / (2 * {sigma}**2)) * cos(2 * pi * {center_freq_hz} * _t)"
            edge_source_settings.UserExpression = expression

            # Set center frequency for reference (used by post-processing)
            edge_source_settings.CenterFrequency = self.frequency_mhz, self.units.MHz
    else:
        self._log("  - Using Harmonic source.", log_type="info")
        # Frequency already has detuning applied in NearFieldSetup if enabled
        edge_source_settings.ExcitationType = excitation_enum.Harmonic
        edge_source_settings.Frequency = self.frequency_mhz, self.units.MHz
        edge_source_settings.CenterFrequency = self.frequency_mhz, self.units.MHz

    self.simulation.Add(edge_source_settings, [source_entity])

    # Sensor setup
    edge_sensor_settings = self.emfdtd.EdgeSensorSettings()
    self.simulation.Add(edge_sensor_settings, [source_entity])

    # Far-field sensors only for free-space simulations (for radiation patterns)
    if self.free_space:
        far_field_sensor_settings = self.simulation.AddFarFieldSensorSettings()

        # Configure extracted frequencies for Gaussian source
        if excitation_type_lower == "gaussian":
            center_freq_hz = self.frequency_mhz * 1e6
            bandwidth_mhz_val = self.config["simulation_parameters.bandwidth_mhz"] or 50.0
            bandwidth_mhz_ff = float(bandwidth_mhz_val) if not isinstance(bandwidth_mhz_val, dict) else 50.0
            bandwidth_hz = bandwidth_mhz_ff * 1e6
            start_freq_hz = center_freq_hz - (bandwidth_hz / 2)
            end_freq_hz = center_freq_hz + (bandwidth_hz / 2)

            # Create a list of 21 frequencies, including the center frequency
            num_samples = 21
            extracted_frequencies_hz = [start_freq_hz + i * (bandwidth_hz / (num_samples - 1)) for i in range(num_samples)]

            far_field_sensor_settings.ExtractedFrequencies = (
                extracted_frequencies_hz,
                self.units.Hz,
            )
            self._log(
                f"  - Set extracted frequencies from {start_freq_hz / 1e6} MHz to {end_freq_hz / 1e6} MHz.",
                log_type="info",
            )

Simulation Execution

goliat.simulation_runner.SimulationRunner

SimulationRunner(config: Config, project_path: str, simulation: Simulation, profiler: Profiler, verbose_logger: Logger, progress_logger: Logger, project_manager: ProjectManager, gui: Optional[QueueGUI] = None)

Bases: LoggingMixin

Manages simulation execution via the Sim4Life API or iSolve.exe.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
project_path str

Path to the Sim4Life project file.

required
simulation Simulation

The simulation object to run.

required
profiler Profiler

Profiler for timing subtasks.

required
verbose_logger Logger

Logger for detailed output.

required
progress_logger Logger

Logger for high-level updates.

required
gui Optional[QueueGUI]

Optional GUI proxy for updates.

None
project_manager ProjectManager

ProjectManager instance. Uses its save() method.

required
Source code in goliat/simulation_runner.py
def __init__(
    self,
    config: "Config",
    project_path: str,
    simulation: "s4l_v1.simulation.emfdtd.Simulation",
    profiler: "Profiler",
    verbose_logger: "Logger",
    progress_logger: "Logger",
    project_manager: "ProjectManager",
    gui: "Optional[QueueGUI]" = None,
):
    """Sets up the simulation runner.

    Args:
        config: Configuration object.
        project_path: Path to the Sim4Life project file.
        simulation: The simulation object to run.
        profiler: Profiler for timing subtasks.
        verbose_logger: Logger for detailed output.
        progress_logger: Logger for high-level updates.
        gui: Optional GUI proxy for updates.
        project_manager: ProjectManager instance. Uses its save() method.
    """
    self.config = config
    self.project_path = project_path
    self.simulation = simulation
    self.profiler = profiler
    self.verbose_logger = verbose_logger
    self.progress_logger = progress_logger
    self.gui = gui
    self.project_manager = project_manager
    import s4l_v1.document

    self.document = s4l_v1.document
    self.current_strategy: Optional["ExecutionStrategy"] = None  # Track current execution strategy
    _active_runners.add(self)  # Register this instance for global cleanup

Functions

run

run()

Runs the simulation using the configured execution method.

Writes input file first, then runs via Sim4Life API, manual iSolve, or oSPARC depending on config. Handles errors and provides helpful messages for common issues.

Source code in goliat/simulation_runner.py
def run(self):
    """Runs the simulation using the configured execution method.

    Writes input file first, then runs via Sim4Life API, manual iSolve,
    or oSPARC depending on config. Handles errors and provides helpful
    messages for common issues.
    """
    if not self.simulation:
        self._log(
            "ERROR: Simulation object not found. Cannot run simulation.",
            level="progress",
            log_type="error",
        )
        return
    self._log(f"Running simulation: {self.simulation.Name}", log_type="verbose")

    server_name = (self.config["solver_settings"] or {}).get("server")

    try:
        if hasattr(self.simulation, "WriteInputFile"):
            self._log(
                "    - Write input file...",
                level="progress",
                log_type="progress",
            )
            with self.profiler.subtask("run_write_input_file"):
                self.simulation.WriteInputFile()
                # Force a save to flush files
                self.project_manager.save()
            elapsed = self.profiler.subtask_times["run_write_input_file"][-1]
            self._log(f"      - Subtask 'run_write_input_file' done in {elapsed:.2f}s", log_type="verbose")
            self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

        # Stop here if we only want to write the input file
        if self.config.get_only_write_input_file():
            self._log(
                "'only_write_input_file' is true, skipping simulation run.",
                level="progress",
                log_type="info",
            )
            return

        # Select and execute appropriate strategy
        strategy = self._create_execution_strategy(server_name)
        self.current_strategy = strategy
        try:
            strategy.run()
        finally:
            self.current_strategy = None

    except Exception as e:
        self._log(
            f"An error occurred during simulation run: {e}",
            level="progress",
            log_type="error",
        )
        # Check if a cloud server was intended for the run
        server_name = (self.config["solver_settings"] or {}).get("server")
        if server_name and server_name != "localhost":
            self._log(
                "If you are running on the cloud, please ensure you are logged into Sim4Life "
                "via the GUI and your API credentials are correct.",
                level="progress",
                log_type="warning",
            )
        self.verbose_logger.error(traceback.format_exc())

    return self.simulation

Execution Strategies

Strategy pattern implementations for different simulation execution methods.

Base Strategy

goliat.runners.execution_strategy

Abstract base class for simulation execution strategies.

Classes

ExecutionStrategy

ExecutionStrategy(config: Config, project_path: str, simulation: Simulation, profiler: Profiler, verbose_logger: LoggingMixin, progress_logger: LoggingMixin, project_manager: ProjectManager, gui: QueueGUI | None = None)

Bases: ABC

Abstract base class for simulation execution strategies.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
project_path str

Path to the Sim4Life project file.

required
simulation Simulation

The simulation object to run.

required
profiler Profiler

Profiler for timing subtasks.

required
verbose_logger LoggingMixin

Logger for detailed output.

required
progress_logger LoggingMixin

Logger for high-level updates.

required
project_manager ProjectManager

ProjectManager instance.

required
gui QueueGUI | None

Optional GUI proxy for updates.

None
Source code in goliat/runners/execution_strategy.py
def __init__(
    self,
    config: "Config",
    project_path: str,
    simulation: "s4l_v1.simulation.emfdtd.Simulation",
    profiler: "Profiler",
    verbose_logger: "LoggingMixin",
    progress_logger: "LoggingMixin",
    project_manager: "ProjectManager",
    gui: "QueueGUI | None" = None,
):
    """Initialize execution strategy.

    Args:
        config: Configuration object.
        project_path: Path to the Sim4Life project file.
        simulation: The simulation object to run.
        profiler: Profiler for timing subtasks.
        verbose_logger: Logger for detailed output.
        progress_logger: Logger for high-level updates.
        project_manager: ProjectManager instance.
        gui: Optional GUI proxy for updates.
    """
    self.config = config
    self.project_path = project_path
    self.simulation = simulation
    self.profiler = profiler
    self.verbose_logger = verbose_logger
    self.progress_logger = progress_logger
    self.project_manager = project_manager
    self.gui = gui
Functions
run abstractmethod
run() -> None

Execute the simulation using this strategy.

Raises:

Type Description
StudyCancelledError

If execution is cancelled by user.

RuntimeError

If execution fails.

FileNotFoundError

If required files are missing.

Source code in goliat/runners/execution_strategy.py
@abstractmethod
def run(self) -> None:
    """Execute the simulation using this strategy.

    Raises:
        StudyCancelledError: If execution is cancelled by user.
        RuntimeError: If execution fails.
        FileNotFoundError: If required files are missing.
    """
    pass

iSolve Manual Strategy

goliat.runners.isolve_manual_strategy

Execution strategy for manual iSolve subprocess execution.

Classes

ISolveManualStrategy

ISolveManualStrategy(*args, **kwargs)

Bases: ExecutionStrategy, LoggingMixin

Execution strategy for running iSolve.exe directly via subprocess.

Source code in goliat/runners/isolve_manual_strategy.py
def __init__(self, *args, **kwargs):
    """Initialize iSolve manual strategy."""
    super().__init__(*args, **kwargs)
    self.current_isolve_process = None
    self.current_process_manager = None
Functions
run
run() -> None

Runs iSolve.exe directly with real-time output logging.

This method bypasses Sim4Life's API and runs the solver executable directly. This is useful when you need more control over the execution environment or when the API has issues. The key challenge is capturing output in real-time without blocking the main thread.

The solution uses a background thread with a queue: - A daemon thread reads stdout line-by-line and puts lines into a queue - The main thread polls the queue non-blockingly and logs output - After process completion, remaining output is drained from the queue

This approach allows the GUI to remain responsive and users to see progress updates as they happen. Without threading, reading stdout would block until the process finishes, making it impossible to show real-time progress.

Steps: 1. Locate iSolve.exe relative to Python executable 2. Spawn subprocess with stdout/stderr pipes 3. Start background thread to read stdout into queue 4. Poll process and queue, logging output without blocking 5. After completion, reload project to load results into Sim4Life

Raises:

Type Description
FileNotFoundError

If iSolve.exe or input file not found.

RuntimeError

If iSolve exits with non-zero code or simulation can't be found after reload.

Source code in goliat/runners/isolve_manual_strategy.py
def run(self) -> None:
    """Runs iSolve.exe directly with real-time output logging.

    This method bypasses Sim4Life's API and runs the solver executable directly.
    This is useful when you need more control over the execution environment or when
    the API has issues. The key challenge is capturing output in real-time without
    blocking the main thread.

    The solution uses a background thread with a queue:
    - A daemon thread reads stdout line-by-line and puts lines into a queue
    - The main thread polls the queue non-blockingly and logs output
    - After process completion, remaining output is drained from the queue

    This approach allows the GUI to remain responsive and users to see progress
    updates as they happen. Without threading, reading stdout would block until
    the process finishes, making it impossible to show real-time progress.

    Steps:
    1. Locate iSolve.exe relative to Python executable
    2. Spawn subprocess with stdout/stderr pipes
    3. Start background thread to read stdout into queue
    4. Poll process and queue, logging output without blocking
    5. After completion, reload project to load results into Sim4Life

    Raises:
        FileNotFoundError: If iSolve.exe or input file not found.
        RuntimeError: If iSolve exits with non-zero code or simulation
                      can't be found after reload.
    """
    command = self._prepare_isolve_command()

    try:
        self._log("    - Execute iSolve...", level="progress", log_type="progress")
        with self.profiler.subtask("run_isolve_execution"):
            output_parser = ISolveOutputParser(self.verbose_logger, self.progress_logger, self.gui)
            keep_awake_handler = KeepAwakeHandler(self.config)
            retry_handler = RetryHandler(self.progress_logger, self.gui)

            # Call keep_awake before first attempt
            keep_awake_handler.trigger_before_retry()

            while True:
                # Check for stop signal before starting new subprocess
                self._check_for_stop_signal()

                # Call keep_awake before each retry attempt
                if retry_handler.get_attempt_number() > 0:
                    keep_awake_handler.trigger_before_retry()

                # Track iSolve errors detected in stdout (iSolve writes errors to stdout, not stderr)
                # Initialize outside try block so it's available in exception handler
                detected_errors = []
                process_manager: ISolveProcessManager | None = None

                try:
                    process_manager = ISolveProcessManager(command, self.gui, self.verbose_logger, self.progress_logger)
                    process_manager.start()
                    self.current_isolve_process = process_manager.process
                    self.current_process_manager = process_manager

                    # Monitor running process and process output in real-time
                    self._monitor_running_process(process_manager, output_parser, keep_awake_handler, detected_errors)

                    # Process has finished, get the return code
                    return_code = process_manager.get_return_code()

                    # Process all remaining output after process finishes
                    self._process_remaining_output(process_manager, output_parser, detected_errors)

                    # Read stderr output from iSolve (as fallback - most errors are in stdout)
                    stderr_output = process_manager.read_stderr()

                    # Check for memory/alloc errors and exit if found
                    self._check_for_memory_error_and_exit(detected_errors, stderr_output)

                    # Clear process tracking since it's finished
                    self.current_isolve_process = None
                    self.current_process_manager = None

                    if return_code == 0:
                        # Success, break out of retry loop
                        break

                    # Handle process failure and check if should retry
                    # Only handle failure if return_code is not None (process finished)
                    if return_code is not None:
                        should_retry = self._handle_process_failure(
                            process_manager,
                            return_code,
                            detected_errors,
                            stderr_output,
                            retry_handler,
                            output_parser,
                            keep_awake_handler,
                        )
                    else:
                        # Process didn't finish properly, don't retry
                        should_retry = False
                    if not should_retry:
                        break

                except StudyCancelledError:
                    # Re-raise cancellation errors immediately
                    if process_manager is not None:
                        process_manager.cleanup()
                    raise
                except Exception as e:
                    # Handle execution exception and check if should retry
                    should_retry = self._handle_execution_exception(
                        e, process_manager, detected_errors, retry_handler, output_parser, keep_awake_handler
                    )
                    if not should_retry:
                        break

        elapsed = self.profiler.subtask_times["run_isolve_execution"][-1]
        self._log(f"      - Subtask 'run_isolve_execution' done in {elapsed:.2f}s", log_type="verbose")
        self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

        # Post-simulation steps
        post_handler = PostSimulationHandler(self.project_path, self.profiler, self.verbose_logger, self.progress_logger)
        post_handler.wait_and_reload()

    except StudyCancelledError:
        # Clean up subprocess on cancellation
        self._cleanup()
        raise
    except Exception as e:
        # Clean up subprocess on any exception
        self._cleanup()
        self._log(
            f"An unexpected error occurred while running iSolve.exe: {e}",
            level="progress",
            log_type="error",
        )
        self.verbose_logger.error(traceback.format_exc())
        raise
    finally:
        # Always ensure cleanup, even on successful completion
        self._cleanup()

Functions

oSPARC Direct Strategy

goliat.runners.osparc_direct_strategy

Execution strategy for oSPARC cloud platform execution.

Classes

OSPARCDirectStrategy

OSPARCDirectStrategy(server_name: str, *args, **kwargs)

Bases: ExecutionStrategy, LoggingMixin

Execution strategy for submitting simulations to oSPARC cloud platform.

Parameters:

Name Type Description Default
server_name str

oSPARC resource name to use (e.g., 'local', 'osparc-1').

required
*args

Passed to parent class.

()
**kwargs

Passed to parent class.

{}
Source code in goliat/runners/osparc_direct_strategy.py
def __init__(self, server_name: str, *args, **kwargs):
    """Initialize oSPARC direct strategy.

    Args:
        server_name: oSPARC resource name to use (e.g., 'local', 'osparc-1').
        *args: Passed to parent class.
        **kwargs: Passed to parent class.
    """
    super().__init__(*args, **kwargs)
    self.server_name = server_name
Functions
run
run() -> None

Submits simulation directly to oSPARC cloud platform.

This method handles cloud-based simulation execution through the oSPARC platform. Instead of running locally, it uploads the solver input file to oSPARC, submits a job, and polls for completion.

The process: 1. Initializes oSPARC API client using credentials from config 2. Creates a job submission with input file path and resource name 3. Submits job and waits for completion (polls status periodically) 4. Downloads results when job completes successfully 5. Reloads project to load results into Sim4Life

This requires Sim4Life 8.2.0 for the XOsparcApiClient module. The method handles authentication, job lifecycle, and error reporting.

Raises:

Type Description
RuntimeError

If API client unavailable, job creation fails, job completes with non-success status, or simulation can't be found after reload.

FileNotFoundError

If solver input file not found.

ValueError

If oSPARC credentials are missing from config.

Source code in goliat/runners/osparc_direct_strategy.py
def run(self) -> None:
    """Submits simulation directly to oSPARC cloud platform.

    This method handles cloud-based simulation execution through the oSPARC
    platform. Instead of running locally, it uploads the solver input file
    to oSPARC, submits a job, and polls for completion.

    The process:
    1. Initializes oSPARC API client using credentials from config
    2. Creates a job submission with input file path and resource name
    3. Submits job and waits for completion (polls status periodically)
    4. Downloads results when job completes successfully
    5. Reloads project to load results into Sim4Life

    This requires Sim4Life 8.2.0 for the XOsparcApiClient module. The method
    handles authentication, job lifecycle, and error reporting.

    Raises:
        RuntimeError: If API client unavailable, job creation fails, job
                      completes with non-success status, or simulation can't
                      be found after reload.
        FileNotFoundError: If solver input file not found.
        ValueError: If oSPARC credentials are missing from config.
    """
    try:
        import XOsparcApiClient  # type: ignore # Only available in Sim4Life v8.2.0 and later.
    except ImportError as e:
        self._log(
            "Failed to import XOsparcApiClient. This module is required for direct oSPARC integration.",
            level="progress",
            log_type="error",
        )
        self._log(
            "Please ensure you are using Sim4Life version 8.2.0.",
            level="progress",
            log_type="info",
        )
        self._log(f"Original error: {e}", log_type="verbose")
        self.verbose_logger.error(traceback.format_exc())
        raise RuntimeError("Could not import XOsparcApiClient module, which is necessary for oSPARC runs.")

    self._log(
        f"--- Running simulation on oSPARC server: {self.server_name} ---",
        level="progress",
        log_type="header",
    )

    # 1. Get Credentials and Initialize Client
    creds = self.config.get_osparc_credentials()
    if not all(k in creds for k in ["api_key", "api_secret", "api_server"]):
        raise ValueError("Missing oSPARC credentials in configuration.")

    client = XOsparcApiClient.OsparcApiClient(
        api_key=creds["api_key"],
        api_secret=creds["api_secret"],
        api_server=creds["api_server"],
        api_version=creds.get("api_version", "v0"),
    )
    self._log("oSPARC client initialized.", log_type="verbose")

    # 2. Prepare Job Submission Data
    input_file_path = os.path.join(os.path.dirname(self.project_path), self.simulation.GetInputFileName())
    if not os.path.exists(input_file_path):
        raise FileNotFoundError(f"Solver input file not found at: {input_file_path}")

    job_data = XOsparcApiClient.JobSubmissionData()
    job_data.InputFilePath = input_file_path
    job_data.ResourceName = self.server_name
    job_data.SolverKey = "sim4life-isolve"
    job_data.SolverVersion = ""  # Let the API choose the default version

    # 3. Create and Start the Job
    self._log(
        f"Creating job for input file: {os.path.basename(input_file_path)}",
        level="progress",
        log_type="info",
    )
    create_response = client.CreateJob(job_data)
    if not create_response.Success:
        raise RuntimeError(f"Failed to create oSPARC job: {create_response.Content}")

    job_id = create_response.Content.get("id")
    if not job_id:
        raise RuntimeError("oSPARC API did not return a job ID after creation.")

    self._log(
        f"Job created with ID: {job_id}. Starting job...",
        level="progress",
        log_type="info",
    )
    start_response = client.StartJob(job_data, job_id)
    if not start_response.Success:
        raise RuntimeError(f"Failed to start oSPARC job {job_id}: {start_response.Content}")

    # 4. Poll for Job Completion
    self._log("Job started. Polling for status...", level="progress", log_type="progress")
    while True:
        self._check_for_stop_signal()

        status_response = client.GetJobStatus(job_data.SolverKey, job_data.SolverVersion, job_id)
        if not status_response.Success:
            self._log(
                f"Warning: Could not get job status for {job_id}.",
                level="progress",
                log_type="warning",
            )
            time.sleep(10)
            continue

        status = status_response.Content.get("state")
        self._log(f"  - Job '{job_id}' status: {status}", log_type="verbose")

        if status in ["SUCCESS", "FAILED", "ABORTED"]:
            log_type = "success" if status == "SUCCESS" else "error"
            self._log(
                f"Job {job_id} finished with status: {status}",
                level="progress",
                log_type=log_type,
            )
            if status != "SUCCESS":
                raise RuntimeError(f"oSPARC job {job_id} failed with status: {status}")
            break

        time.sleep(30)

    # 5. Post-simulation steps
    post_handler = PostSimulationHandler(self.project_path, self.profiler, self.verbose_logger, self.progress_logger)
    post_handler.wait_and_reload()

Sim4Life API Strategy

goliat.runners.sim4life_api_strategy

Execution strategy for Sim4Life API execution.

Classes

Sim4LifeAPIStrategy

Sim4LifeAPIStrategy(server_id: str | None, *args, **kwargs)

Bases: ExecutionStrategy, LoggingMixin

Execution strategy for running simulations via Sim4Life API.

Parameters:

Name Type Description Default
server_id str | None

Server ID to use (None for localhost).

required
*args

Passed to parent class.

()
**kwargs

Passed to parent class.

{}
Source code in goliat/runners/sim4life_api_strategy.py
def __init__(self, server_id: str | None, *args, **kwargs):
    """Initialize Sim4Life API strategy.

    Args:
        server_id: Server ID to use (None for localhost).
        *args: Passed to parent class.
        **kwargs: Passed to parent class.
    """
    super().__init__(*args, **kwargs)
    self.server_id = server_id
Functions
run
run() -> None

Run simulation using Sim4Life API.

Raises:

Type Description
RuntimeError

If simulation execution fails.

Source code in goliat/runners/sim4life_api_strategy.py
def run(self) -> None:
    """Run simulation using Sim4Life API.

    Raises:
        RuntimeError: If simulation execution fails.
    """
    self.simulation.RunSimulation(wait=True, server_id=self.server_id)
    server_name = (self.config["solver_settings"] or {}).get("server", "localhost")
    log_msg = f"Simulation finished on '{server_name}'."
    self._log(log_msg, level="progress", log_type="success")

Results Extraction

Classes for extracting and processing simulation results.

Cleanup

goliat.extraction.cleaner.Cleaner

Cleaner(parent: ResultsExtractor)

Manages deletion of simulation files to free disk space.

Deletes output files, input files, and/or project files based on config. Useful for long-running studies where disk space is limited.

Parameters:

Name Type Description Default
parent ResultsExtractor

Parent ResultsExtractor instance.

required
Source code in goliat/extraction/cleaner.py
def __init__(self, parent: "ResultsExtractor"):
    """Sets up the cleaner.

    Args:
        parent: Parent ResultsExtractor instance.
    """
    self.parent = parent

Functions

cleanup_simulation_files

cleanup_simulation_files()

Deletes simulation files based on auto_cleanup config.

Removes files matching specified patterns (output/input H5 files, project files). Only runs if cleanup is enabled in config.

Source code in goliat/extraction/cleaner.py
def cleanup_simulation_files(self):
    """Deletes simulation files based on auto_cleanup config.

    Removes files matching specified patterns (output/input H5 files,
    project files). Only runs if cleanup is enabled in config.
    """
    cleanup_types = self.parent.config.get_auto_cleanup_previous_results()
    if not cleanup_types:
        return

    if self.parent.study is None:
        self.parent._log("  - WARNING: Study object is not available. Skipping cleanup.", log_type="warning")
        return

    project_path = self.parent.study.project_manager.project_path
    if not project_path:
        self.parent._log("  - WARNING: Project path is not set. Skipping cleanup.", log_type="warning")
        return
    project_dir = os.path.dirname(project_path)
    project_filename = os.path.basename(project_path)
    results_dir = os.path.join(project_dir, project_filename + "_Results")

    # Map cleanup types to file patterns and directories
    file_patterns = {
        "output": (results_dir, "*_Output.h5", "output"),
        "input": (results_dir, "*_Input.h5", "input"),
        "smash": (project_dir, "*.smash", "project"),
    }

    total_deleted = self._delete_files(cleanup_types, file_patterns)

    if total_deleted > 0:
        self.parent._log(
            f"  - Cleaned up {total_deleted} file(s) to save disk space.",
            level="progress",
            log_type="info",
        )

JSON Encoding

goliat.extraction.json_encoder

Classes

NumpyArrayEncoder

Bases: JSONEncoder

JSON encoder that handles NumPy types.

Converts numpy arrays and numeric types to Python built-ins so they can be serialized to JSON.

Functions
default
default(o: Any) -> Any

Converts NumPy types to JSON-serializable Python types.

Source code in goliat/extraction/json_encoder.py
def default(self, o: Any) -> Any:
    """Converts NumPy types to JSON-serializable Python types."""
    if isinstance(o, np.ndarray):
        return o.tolist()
    if isinstance(o, np.integer):
        return int(o)
    if isinstance(o, np.floating):
        return float(o)
    return super().default(o)

Power Extraction

goliat.extraction.power_extractor

Classes

PowerExtractor

PowerExtractor(parent: ResultsExtractor, results_data: dict)

Bases: LoggingMixin

Extracts input power and power balance from simulation results.

For near-field, reads power from port sensors. For far-field, calculates theoretical power from plane wave parameters. Also extracts power balance to verify energy conservation.

Parameters:

Name Type Description Default
parent ResultsExtractor

Parent ResultsExtractor instance.

required
results_data dict

Dict to store extracted power data.

required
Source code in goliat/extraction/power_extractor.py
def __init__(self, parent: "ResultsExtractor", results_data: dict):
    """Sets up the power extractor.

    Args:
        parent: Parent ResultsExtractor instance.
        results_data: Dict to store extracted power data.
    """
    self.parent = parent
    self.config = parent.config
    self.simulation = parent.simulation
    self.study_type = parent.study_type
    self.placement_name = parent.placement_name
    self.frequency_mhz = parent.frequency_mhz
    self.verbose_logger = parent.verbose_logger
    self.progress_logger = parent.progress_logger
    self.gui = parent.gui
    self.results_data = results_data

    import s4l_v1.document

    self.document = s4l_v1.document
Functions
extract_input_power
extract_input_power(simulation_extractor: Extractor)

Extracts input power, delegating to study-type specific methods.

Parameters:

Name Type Description Default
simulation_extractor Extractor

Results extractor from the simulation.

required
Source code in goliat/extraction/power_extractor.py
def extract_input_power(self, simulation_extractor: "analysis.Extractor"):  # type: ignore
    """Extracts input power, delegating to study-type specific methods.

    Args:
        simulation_extractor: Results extractor from the simulation.
    """
    self._log("    - Extract input power...", level="progress", log_type="progress")
    try:
        elapsed = 0.0
        if self.parent.study:
            with self.parent.study.profiler.subtask("extract_input_power"):  # type: ignore
                if self.study_type == "far_field":
                    self._extract_far_field_power(simulation_extractor)
                else:
                    self._extract_near_field_power(simulation_extractor)

            elapsed = self.parent.study.profiler.subtask_times["extract_input_power"][-1]
        self._log(f"      - Subtask 'extract_input_power' done in {elapsed:.2f}s", log_type="verbose")
        self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")
    except Exception as e:
        self._log(
            f"  - ERROR: An exception occurred during input power extraction: {e}",
            level="progress",
            log_type="error",
        )
        self.verbose_logger.error(traceback.format_exc())
extract_power_balance
extract_power_balance(simulation_extractor: Extractor)

Extracts power balance to verify energy conservation.

Power balance is a sanity check: the power going into the simulation should equal the power coming out (as losses and radiation). This helps catch numerical errors or convergence issues.

The balance is calculated as: balance = (P_out / P_in) × 100%

Where P_out includes: - Dielectric losses (power absorbed by materials) - Radiated power (power escaping the simulation volume)

For far-field studies, uses the theoretical input power (from plane wave calculation) rather than extracted power, since plane waves don't have a traditional "input port" sensor.

A balance close to 100% indicates good energy conservation. Values significantly different suggest convergence issues or numerical errors.

Parameters:

Name Type Description Default
simulation_extractor Extractor

Results extractor from the simulation.

required
Source code in goliat/extraction/power_extractor.py
def extract_power_balance(self, simulation_extractor: "analysis.Extractor"):  # type: ignore
    """Extracts power balance to verify energy conservation.

    Power balance is a sanity check: the power going into the simulation should
    equal the power coming out (as losses and radiation). This helps catch
    numerical errors or convergence issues.

    The balance is calculated as: balance = (P_out / P_in) × 100%

    Where P_out includes:
    - Dielectric losses (power absorbed by materials)
    - Radiated power (power escaping the simulation volume)

    For far-field studies, uses the theoretical input power (from plane wave
    calculation) rather than extracted power, since plane waves don't have a
    traditional "input port" sensor.

    A balance close to 100% indicates good energy conservation. Values significantly
    different suggest convergence issues or numerical errors.

    Args:
        simulation_extractor: Results extractor from the simulation.
    """
    self._log("    - Extract power balance...", level="progress", log_type="progress")
    try:
        elapsed = 0.0
        if self.parent.study:
            with self.parent.study.profiler.subtask("extract_power_balance"):  # type: ignore
                em_sensor_extractor = simulation_extractor["Overall Field"]
                power_balance_extractor = em_sensor_extractor.Outputs["Power Balance"]
                power_balance_extractor.Update()

                power_balance_data = {
                    key: power_balance_extractor.Data.DataSimpleDataCollection.FieldValue(key, 0)
                    for key in power_balance_extractor.Data.DataSimpleDataCollection.Keys()
                    if key != "Balance"
                }

                if self.parent.study_type == "far_field" and "input_power_W" in self.results_data:
                    power_balance_data["Pin"] = self.results_data["input_power_W"]
                    self._log(
                        f"    - Overwriting Pin with theoretical value: {float(power_balance_data['Pin']):.4e} W",
                        log_type="info",
                    )

                pin = power_balance_data.get("Pin", 0.0)
                p_out = power_balance_data.get("DielLoss", 0.0) + power_balance_data.get("RadPower", 0.0)
                balance = 100 * (p_out / pin) if pin > 1e-9 else float("nan")

                power_balance_data["Balance"] = balance
                self._log(f"    - Final Balance: {balance:.2f}%", log_type="highlight")
                self.results_data["power_balance"] = power_balance_data

            elapsed = self.parent.study.profiler.subtask_times["extract_power_balance"][-1]
        self._log(f"      - Subtask 'extract_power_balance' done in {elapsed:.2f}s", log_type="verbose")
        self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    except Exception as e:
        self._log(f"  - WARNING: Could not extract power balance: {e}", log_type="warning")
        self.verbose_logger.error(traceback.format_exc())

Reporting

goliat.extraction.reporter.Reporter

Reporter(parent: ResultsExtractor)

Generates and saves detailed reports from extraction results.

Creates Pickle files for programmatic access and HTML files for human readability. Includes SAR statistics, tissue groups, and peak SAR details.

Parameters:

Name Type Description Default
parent ResultsExtractor

Parent ResultsExtractor instance.

required
Source code in goliat/extraction/reporter.py
def __init__(self, parent: "ResultsExtractor"):
    """Sets up the reporter.

    Args:
        parent: Parent ResultsExtractor instance.
    """
    self.parent = parent

Functions

save_reports

save_reports(df: DataFrame, tissue_groups: dict, group_sar_stats: dict, results_data: dict)

Saves Pickle and HTML reports to the results directory.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with detailed SAR statistics per tissue.

required
tissue_groups dict

Dict mapping group names to tissue lists.

required
group_sar_stats dict

Dict with aggregated SAR stats per group.

required
results_data dict

Dict with summary results and metadata.

required
Source code in goliat/extraction/reporter.py
def save_reports(
    self,
    df: pd.DataFrame,
    tissue_groups: dict,
    group_sar_stats: dict,
    results_data: dict,
):
    """Saves Pickle and HTML reports to the results directory.

    Args:
        df: DataFrame with detailed SAR statistics per tissue.
        tissue_groups: Dict mapping group names to tissue lists.
        group_sar_stats: Dict with aggregated SAR stats per group.
        results_data: Dict with summary results and metadata.
    """
    results_dir = self._get_results_dir()
    os.makedirs(results_dir, exist_ok=True)

    self._save_pickle_report(results_dir, df, tissue_groups, group_sar_stats, results_data)
    self._save_html_report(results_dir, df, tissue_groups, group_sar_stats, results_data)

SAR Extraction

goliat.extraction.sar_extractor

Classes

SarExtractor

SarExtractor(parent: ResultsExtractor, results_data: dict)

Bases: LoggingMixin

Extracts SAR statistics from simulation results.

Uses Sim4Life's SarStatisticsEvaluator to compute mass-averaged SAR, peak spatial-average SAR (10g), and tissue-specific metrics. Groups tissues into logical groups (eyes, skin, brain) for analysis.

Parameters:

Name Type Description Default
parent ResultsExtractor

Parent ResultsExtractor instance.

required
results_data dict

Dict to store extracted SAR data.

required
Source code in goliat/extraction/sar_extractor.py
def __init__(self, parent: "ResultsExtractor", results_data: dict):
    """Sets up the SAR extractor.

    Args:
        parent: Parent ResultsExtractor instance.
        results_data: Dict to store extracted SAR data.
    """
    self.parent = parent
    self.config = parent.config
    self.simulation = parent.simulation
    self.phantom_name = parent.phantom_name
    self.placement_name = parent.placement_name
    self.verbose_logger = parent.verbose_logger
    self.progress_logger = parent.progress_logger
    self.gui = parent.gui
    self.results_data = results_data

    import s4l_v1.analysis
    import s4l_v1.document
    import s4l_v1.units as units

    self.analysis = s4l_v1.analysis
    self.document = s4l_v1.document
    self.units = units

    self.tissue_grouper = TissueGrouper(self.config, self.phantom_name, self)
Functions
extract_sar_statistics
extract_sar_statistics(simulation_extractor: Extractor)

Extracts comprehensive SAR statistics for all tissues.

This is the main SAR extraction method that orchestrates the entire process. It uses Sim4Life's SarStatisticsEvaluator to compute standardized SAR metrics according to IEEE/IEC standards.

The process: 1. Extracts the 'Overall Field' E-field data from simulation results 2. Creates a SarStatisticsEvaluator configured for 10g peak spatial-average SAR 3. Processes the evaluator output into a pandas DataFrame 4. Groups tissues into logical categories (eyes, skin, brain) 5. Calculates weighted-average SAR for each group (mass-weighted) 6. Extracts peak SAR details (location, coordinates, etc.) 7. Stores both per-tissue and group-level results

The results include mass-averaged SAR, peak spatial-average SAR (10g), and tissue-specific metrics. For near-field studies, also extracts head/trunk SAR based on placement scenario. For far-field, extracts whole-body SAR.

Parameters:

Name Type Description Default
simulation_extractor Extractor

Results extractor from the simulation object.

required
Source code in goliat/extraction/sar_extractor.py
def extract_sar_statistics(self, simulation_extractor: "analysis.Extractor"):  # type: ignore
    """Extracts comprehensive SAR statistics for all tissues.

    This is the main SAR extraction method that orchestrates the entire process.
    It uses Sim4Life's SarStatisticsEvaluator to compute standardized SAR metrics
    according to IEEE/IEC standards.

    The process:
    1. Extracts the 'Overall Field' E-field data from simulation results
    2. Creates a SarStatisticsEvaluator configured for 10g peak spatial-average SAR
    3. Processes the evaluator output into a pandas DataFrame
    4. Groups tissues into logical categories (eyes, skin, brain)
    5. Calculates weighted-average SAR for each group (mass-weighted)
    6. Extracts peak SAR details (location, coordinates, etc.)
    7. Stores both per-tissue and group-level results

    The results include mass-averaged SAR, peak spatial-average SAR (10g), and
    tissue-specific metrics. For near-field studies, also extracts head/trunk SAR
    based on placement scenario. For far-field, extracts whole-body SAR.

    Args:
        simulation_extractor: Results extractor from the simulation object.
    """
    self._log("    - Extract SAR statistics...", level="progress", log_type="progress")
    try:
        elapsed = 0.0
        if self.parent.study:
            with self.parent.study.profiler.subtask("extract_sar_statistics"):  # type: ignore
                em_sensor_extractor = self._setup_em_sensor_extractor(simulation_extractor)
                results = self._evaluate_sar_statistics(em_sensor_extractor)

                if not results:
                    return

                df = self._create_sar_dataframe(results)
                tissue_groups = self.tissue_grouper.group_tissues(df["Tissue"].tolist())
                group_sar_stats = self._calculate_group_sar(df, tissue_groups)

                self._store_group_sar_results(group_sar_stats)
                self._store_all_regions_sar(df)
                self.extract_peak_sar_details(em_sensor_extractor)
                self._store_temporary_data(df, tissue_groups, group_sar_stats)

            elapsed = self.parent.study.profiler.subtask_times["extract_sar_statistics"][-1]
        self._log(f"      - Subtask 'extract_sar_statistics' done in {elapsed:.2f}s", log_type="verbose")
        self._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    except Exception as e:
        self._log(
            f"  - ERROR: An unexpected error during all-tissue SAR statistics extraction: {e}",
            level="progress",
            log_type="error",
        )
        self.verbose_logger.error(traceback.format_exc())
extract_peak_sar_details
extract_peak_sar_details(em_sensor_extractor: Extractor)

Extracts detailed metadata about the peak spatial-average SAR location.

While the main SAR extraction gives per-tissue statistics, this method provides detailed information about where the absolute peak SAR occurs. This includes 3D coordinates, the tissue/organ containing the peak, mass of the 10g averaging volume, and other metadata.

This information is useful for: - Understanding which anatomical region has the highest exposure - Verifying that peak SAR is in an expected location - Debugging unexpected SAR hotspots - Reporting peak exposure location in compliance documentation

Uses Sim4Life's AverageSarFieldEvaluator configured for 10g spatial averaging to find the peak location according to IEEE/IEC 62704-1 standards.

Parameters:

Name Type Description Default
em_sensor_extractor Extractor

The 'Overall Field' results extractor containing the SAR field data.

required
Source code in goliat/extraction/sar_extractor.py
def extract_peak_sar_details(self, em_sensor_extractor: "analysis.Extractor"):  # type: ignore
    """Extracts detailed metadata about the peak spatial-average SAR location.

    While the main SAR extraction gives per-tissue statistics, this method
    provides detailed information about where the absolute peak SAR occurs.
    This includes 3D coordinates, the tissue/organ containing the peak, mass
    of the 10g averaging volume, and other metadata.

    This information is useful for:
    - Understanding which anatomical region has the highest exposure
    - Verifying that peak SAR is in an expected location
    - Debugging unexpected SAR hotspots
    - Reporting peak exposure location in compliance documentation

    Uses Sim4Life's AverageSarFieldEvaluator configured for 10g spatial averaging
    to find the peak location according to IEEE/IEC 62704-1 standards.

    Args:
        em_sensor_extractor: The 'Overall Field' results extractor containing
                           the SAR field data.
    """
    self._log("  - Extracting peak SAR details...", log_type="progress")
    try:
        inputs = [em_sensor_extractor.Outputs["SAR(x,y,z,f0)"]]
        average_sar_field_evaluator = self.analysis.em_evaluators.AverageSarFieldEvaluator(inputs=inputs)
        average_sar_field_evaluator.TargetMass = 10.0, self.units.Unit("g")
        average_sar_field_evaluator.UpdateAttributes()
        self.document.AllAlgorithms.Add(average_sar_field_evaluator)
        average_sar_field_evaluator.Update()

        peak_sar_output = average_sar_field_evaluator.Outputs["Peak Spatial SAR (psSAR) Results"]
        peak_sar_output.Update()  # type: ignore

        data_collection = peak_sar_output.Data.DataSimpleDataCollection  # type: ignore
        if data_collection:
            self.results_data["peak_sar_details"] = {key: data_collection.FieldValue(key, 0) for key in data_collection.Keys()}  # type: ignore
        else:
            self._log(
                "  - WARNING: Could not extract peak SAR details.",
                log_type="warning",
            )

        self.document.AllAlgorithms.Remove(average_sar_field_evaluator)

    except Exception as e:
        self._log(
            f"  - ERROR: An exception occurred during peak SAR detail extraction: {e}",
            log_type="error",
        )
        self.verbose_logger.error(traceback.format_exc())

Sensor Extraction

goliat.extraction.sensor_extractor

Point sensor data extraction.

Classes

SensorExtractor

SensorExtractor(parent: ResultsExtractor, results_data: dict)

Extracts time-domain E-field data from point sensors.

Reads E-field measurements from sensors placed at simulation bbox corners, creates plots showing magnitude over time, and stores raw data in results.

Parameters:

Name Type Description Default
parent ResultsExtractor

Parent ResultsExtractor instance.

required
results_data dict

Dict to store extracted data.

required
Source code in goliat/extraction/sensor_extractor.py
def __init__(self, parent: "ResultsExtractor", results_data: dict):
    """Sets up the sensor extractor.

    Args:
        parent: Parent ResultsExtractor instance.
        results_data: Dict to store extracted data.
    """
    self.parent = parent
    self.results_data = results_data
    self.verbose_logger = parent.verbose_logger
    self.progress_logger = parent.progress_logger
Functions
extract_point_sensor_data
extract_point_sensor_data(simulation_extractor: Extractor)

Extracts E-field data from all point sensors and generates plots.

Iterates through configured sensors, extracts time-domain E-field components (Ex, Ey, Ez), calculates magnitude, and saves both plot and raw data.

Parameters:

Name Type Description Default
simulation_extractor Extractor

Results extractor from the simulation object.

required
Source code in goliat/extraction/sensor_extractor.py
def extract_point_sensor_data(self, simulation_extractor: "analysis.Extractor"):  # type: ignore
    """Extracts E-field data from all point sensors and generates plots.

    Iterates through configured sensors, extracts time-domain E-field components
    (Ex, Ey, Ez), calculates magnitude, and saves both plot and raw data.

    Args:
        simulation_extractor: Results extractor from the simulation object.
    """
    self.parent._log("    - Extract point sensors...", level="progress", log_type="progress")

    try:
        elapsed = 0.0
        if self.parent.study:
            with self.parent.study.profiler.subtask("extract_point_sensor_data"):  # type: ignore
                num_sensors = self.parent.config["simulation_parameters.number_of_point_sensors"] or 0
                if num_sensors == 0:
                    return

                plt.ioff()
                plt.rcParams.update({"text.usetex": False})
                fig, ax = plt.subplots()
                ax.grid(True, which="major", axis="y", linestyle="--")

                point_source_order = self.parent.config["simulation_parameters.point_source_order"] or []
                point_sensor_results = {}

                for i in range(num_sensors):  # type: ignore
                    if i >= len(point_source_order):  # type: ignore
                        self.parent._log(
                            f"    - WARNING: Not enough entries in 'point_source_order' for sensor {i + 1}. Skipping.",
                            log_type="warning",
                        )
                        continue

                    corner_name = point_source_order[i]  # type: ignore
                    full_sensor_name = f"Point Sensor Entity {i + 1} ({corner_name})"

                    try:
                        em_sensor_extractor = simulation_extractor[full_sensor_name]
                        if not em_sensor_extractor:
                            self.parent._log(
                                f"    - WARNING: Could not find sensor extractor for '{full_sensor_name}'",
                                log_type="warning",
                            )
                            continue
                    except Exception as e:
                        self.parent._log(
                            f"    - WARNING: Could not retrieve sensor '{full_sensor_name}'. Error: {e}",
                            log_type="warning",
                        )
                        continue

                    self.parent.document.AllAlgorithms.Add(em_sensor_extractor)

                    if "EM E(t)" not in em_sensor_extractor.Outputs:
                        self.parent._log(
                            f"    - WARNING: 'EM E(t)' output not found for sensor '{full_sensor_name}'",
                            log_type="warning",
                        )
                        self.parent.document.AllAlgorithms.Remove(em_sensor_extractor)
                        continue

                    em_output = em_sensor_extractor.Outputs["EM E(t)"]
                    em_output.Update()

                    time_axis = em_output.Data.Axis
                    ex, ey, ez = (em_output.Data.GetComponent(i) for i in range(3))
                    label = corner_name.replace("_", " ").title()

                    if time_axis is not None and time_axis.size > 0:
                        e_mag = np.sqrt(ex**2 + ey**2 + ez**2)
                        ax.plot(time_axis, e_mag, label=label)
                        point_sensor_results[label] = {
                            "time_s": time_axis.tolist(),
                            "Ex_V_m": ex.tolist(),
                            "Ey_V_m": ey.tolist(),
                            "Ez_V_m": ez.tolist(),
                            "E_mag_V_m": e_mag.tolist(),
                        }
                    else:
                        self.parent._log(
                            f"    - WARNING: No data found for sensor '{full_sensor_name}'",
                            log_type="warning",
                        )

                    self.parent.document.AllAlgorithms.Remove(em_sensor_extractor)

                if point_sensor_results:
                    self.results_data["point_sensor_data"] = point_sensor_results

                self._save_plot(fig, ax)

            elapsed = self.parent.study.profiler.subtask_times["extract_point_sensor_data"][-1]
        self.parent._log(f"      - Subtask 'extract_point_sensor_data' done in {elapsed:.2f}s", log_type="verbose")
        self.parent._log(f"      - Done in {elapsed:.2f}s", level="progress", log_type="success")

    except Exception as e:
        self.parent._log(
            f"  - ERROR: An exception occurred during point sensor data extraction: {e}",
            level="progress",
            log_type="error",
        )
        self.verbose_logger.error(traceback.format_exc())

Analysis

Classes for analyzing and visualizing simulation results.

Analyzer

goliat.analysis.analyzer.Analyzer

Analyzer(config: Config, phantom_name: str, strategy: BaseAnalysisStrategy, plot_format: str = 'pdf')

Analyzes simulation results using a strategy pattern.

Delegates to strategy-specific implementations for loading results and generating plots. Handles unit conversion, caching, and report export.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
phantom_name str

Phantom model name being analyzed.

required
strategy BaseAnalysisStrategy

Strategy implementation for analysis logic.

required
plot_format str

Output format for plots ('pdf' or 'png'), default 'pdf'.

'pdf'
Source code in goliat/analysis/analyzer.py
def __init__(self, config: "Config", phantom_name: str, strategy: "BaseAnalysisStrategy", plot_format: str = "pdf"):
    """Sets up the analyzer with a strategy.

    Args:
        config: Configuration object.
        phantom_name: Phantom model name being analyzed.
        strategy: Strategy implementation for analysis logic.
        plot_format: Output format for plots ('pdf' or 'png'), default 'pdf'.
    """
    self.config = config
    self.base_dir = config.base_dir
    self.phantom_name = phantom_name
    self.strategy = strategy
    self.results_base_dir = self.strategy.get_results_base_dir()
    self.plotter = Plotter(self.strategy.get_plots_dir(), phantom_name=self.phantom_name, plot_format=plot_format)
    self.all_results = []
    self.all_organ_results = []
    # Will be populated from pickle files - contains actual tissue names from extraction
    # This is the authoritative source, computed during extraction using material_name_mapping.json
    self.tissue_group_composition = {}

Functions

run_analysis

run_analysis()

Runs complete analysis pipeline using the selected strategy.

Loads results, converts units, exports reports, and generates plots. Delegates strategy-specific logic to the strategy instance.

Source code in goliat/analysis/analyzer.py
def run_analysis(self):
    """Runs complete analysis pipeline using the selected strategy.

    Loads results, converts units, exports reports, and generates plots.
    Delegates strategy-specific logic to the strategy instance.
    """
    logging.getLogger("progress").info(
        f"--- Starting Results Analysis for Phantom: {self.phantom_name} ---",
        extra={"log_type": "header"},
    )

    # Check if we should load data or use cache instead
    load_data = self.strategy.analysis_config.get("load_data", True)

    if not load_data:
        logging.getLogger("progress").info(
            "--- Skipping data loading phase, loading from cache ---",
            extra={"log_type": "info"},
        )
        results_df, all_organ_results_df = self._load_from_cache()
        if results_df is None:
            logging.getLogger("progress").error(
                "--- Failed to load cached results. Run analysis with load_data=True first. ---",
                extra={"log_type": "error"},
            )
            return
    else:
        self.strategy.load_and_process_results(self)

        if not self.all_results:
            logging.getLogger("progress").info("--- No results found to analyze. ---", extra={"log_type": "warning"})
            return

        results_df = pd.DataFrame(self.all_results)
        all_organ_results_df = pd.DataFrame(self.all_organ_results) if self.all_organ_results else pd.DataFrame()

        results_df = self._convert_units_and_cache(results_df, all_organ_results_df)
        self._export_reports(results_df, all_organ_results_df)

    self.strategy.generate_plots(self, self.plotter, results_df, all_organ_results_df)

    logging.getLogger("progress").info("--- Analysis Finished ---", extra={"log_type": "success"})

Analysis Strategies

goliat.analysis.base_strategy

Classes

BaseAnalysisStrategy

BaseAnalysisStrategy(config: Config, phantom_name: str, analysis_config: dict | None = None)

Bases: ABC

Base class for analysis strategies.

Defines interface for loading results, calculating normalization factors, and generating plots. Subclasses implement study-type specific logic.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
phantom_name str

Phantom model name being analyzed.

required
analysis_config dict | None

Optional dictionary with plot names as keys and boolean values.

None
Source code in goliat/analysis/base_strategy.py
def __init__(self, config: "Config", phantom_name: str, analysis_config: dict | None = None):
    """Sets up the analysis strategy.

    Args:
        config: Configuration object.
        phantom_name: Phantom model name being analyzed.
        analysis_config: Optional dictionary with plot names as keys and boolean values.
    """
    self.config = config
    self.phantom_name = phantom_name
    self.base_dir = config.base_dir
    self.analysis_config = analysis_config or {}
Functions
should_generate_plot
should_generate_plot(plot_name: str) -> bool

Checks if a plot should be generated based on the analysis config.

Parameters:

Name Type Description Default
plot_name str

Name of the plot to check.

required

Returns:

Type Description
bool

True if the plot should be generated (default if not in config), False otherwise.

Source code in goliat/analysis/base_strategy.py
def should_generate_plot(self, plot_name: str) -> bool:
    """Checks if a plot should be generated based on the analysis config.

    Args:
        plot_name: Name of the plot to check.

    Returns:
        True if the plot should be generated (default if not in config), False otherwise.
    """
    if not self.analysis_config:
        return True  # Generate all plots if no config provided
    return self.analysis_config.get(plot_name, True)  # Default to True if not specified
get_results_base_dir
get_results_base_dir() -> str

Returns base directory path for results. Must be implemented by subclasses.

Source code in goliat/analysis/base_strategy.py
def get_results_base_dir(self) -> str:
    """Returns base directory path for results. Must be implemented by subclasses."""
    raise NotImplementedError
get_plots_dir
get_plots_dir() -> str

Returns directory path for saving plots. Must be implemented by subclasses.

Source code in goliat/analysis/base_strategy.py
def get_plots_dir(self) -> str:
    """Returns directory path for saving plots. Must be implemented by subclasses."""
    raise NotImplementedError
load_and_process_results abstractmethod
load_and_process_results(analyzer: Analyzer)

Loads and processes all simulation results.

Iterates through configured scenarios and calls analyzer._process_single_result() for each one.

Parameters:

Name Type Description Default
analyzer Analyzer

Analyzer instance to process results with.

required
Source code in goliat/analysis/base_strategy.py
@abstractmethod
def load_and_process_results(self, analyzer: "Analyzer"):
    """Loads and processes all simulation results.

    Iterates through configured scenarios and calls analyzer._process_single_result()
    for each one.

    Args:
        analyzer: Analyzer instance to process results with.
    """
    pass
get_normalization_factor abstractmethod
get_normalization_factor(frequency_mhz: int, simulated_power_w: float) -> float

Calculates SAR normalization factor from simulated power.

Parameters:

Name Type Description Default
frequency_mhz int

Simulation frequency in MHz.

required
simulated_power_w float

Input power from simulation in Watts.

required

Returns:

Type Description
float

Normalization factor to multiply SAR values by.

Source code in goliat/analysis/base_strategy.py
@abstractmethod
def get_normalization_factor(self, frequency_mhz: int, simulated_power_w: float) -> float:
    """Calculates SAR normalization factor from simulated power.

    Args:
        frequency_mhz: Simulation frequency in MHz.
        simulated_power_w: Input power from simulation in Watts.

    Returns:
        Normalization factor to multiply SAR values by.
    """
    pass
extract_data abstractmethod
extract_data(pickle_data: dict, frequency_mhz: int, placement_name: str, scenario_name: str, sim_power: float, norm_factor: float, sar_results: dict | None = None) -> tuple[dict, list]

Extracts and structures data from a single simulation's result files.

Parameters:

Name Type Description Default
pickle_data dict

Data loaded from the .pkl result file.

required
frequency_mhz int

Simulation frequency.

required
placement_name str

Detailed placement name.

required
scenario_name str

General scenario name.

required
sim_power float

Simulated input power in Watts.

required
norm_factor float

Normalization factor to apply.

required
sar_results dict | None

Optional JSON results dict containing additional data like power balance.

None

Returns:

Type Description
tuple[dict, list]

Tuple of (main result entry dict, list of organ-specific entries).

Source code in goliat/analysis/base_strategy.py
@abstractmethod
def extract_data(
    self,
    pickle_data: dict,
    frequency_mhz: int,
    placement_name: str,
    scenario_name: str,
    sim_power: float,
    norm_factor: float,
    sar_results: dict | None = None,
) -> tuple[dict, list]:
    """Extracts and structures data from a single simulation's result files.

    Args:
        pickle_data: Data loaded from the .pkl result file.
        frequency_mhz: Simulation frequency.
        placement_name: Detailed placement name.
        scenario_name: General scenario name.
        sim_power: Simulated input power in Watts.
        norm_factor: Normalization factor to apply.
        sar_results: Optional JSON results dict containing additional data like power balance.

    Returns:
        Tuple of (main result entry dict, list of organ-specific entries).
    """
    pass
apply_bug_fixes abstractmethod
apply_bug_fixes(result_entry: dict) -> dict

Applies workarounds for known data inconsistencies.

Parameters:

Name Type Description Default
result_entry dict

Data entry for a single simulation result.

required

Returns:

Type Description
dict

Corrected result entry.

Source code in goliat/analysis/base_strategy.py
@abstractmethod
def apply_bug_fixes(self, result_entry: dict) -> dict:
    """Applies workarounds for known data inconsistencies.

    Args:
        result_entry: Data entry for a single simulation result.

    Returns:
        Corrected result entry.
    """
    return result_entry
calculate_summary_stats abstractmethod
calculate_summary_stats(results_df: DataFrame) -> pd.DataFrame

Calculates summary statistics from aggregated results.

Parameters:

Name Type Description Default
results_df DataFrame

DataFrame with all aggregated simulation results.

required

Returns:

Type Description
DataFrame

DataFrame with summary statistics.

Source code in goliat/analysis/base_strategy.py
@abstractmethod
def calculate_summary_stats(self, results_df: pd.DataFrame) -> pd.DataFrame:
    """Calculates summary statistics from aggregated results.

    Args:
        results_df: DataFrame with all aggregated simulation results.

    Returns:
        DataFrame with summary statistics.
    """
    pass
generate_plots abstractmethod
generate_plots(analyzer: Analyzer, plotter: Plotter, results_df: DataFrame, all_organ_results_df: DataFrame)

Generates study-type specific plots.

Parameters:

Name Type Description Default
analyzer Analyzer

Analyzer instance with aggregated data.

required
plotter Plotter

Plotter instance for creating figures.

required
results_df DataFrame

DataFrame with summary results.

required
all_organ_results_df DataFrame

DataFrame with organ-level details.

required
Source code in goliat/analysis/base_strategy.py
@abstractmethod
def generate_plots(
    self,
    analyzer: "Analyzer",
    plotter: "Plotter",
    results_df: pd.DataFrame,
    all_organ_results_df: pd.DataFrame,
):
    """Generates study-type specific plots.

    Args:
        analyzer: Analyzer instance with aggregated data.
        plotter: Plotter instance for creating figures.
        results_df: DataFrame with summary results.
        all_organ_results_df: DataFrame with organ-level details.
    """
    pass

Far-Field Strategy

goliat.analysis.far_field_strategy

Classes

FarFieldAnalysisStrategy

FarFieldAnalysisStrategy(config: Config, phantom_name: str, analysis_config: dict | None = None)

Bases: BaseAnalysisStrategy

Analysis strategy for far-field simulations.

Handles result loading, normalization, and plot generation for far-field studies with incident directions and polarizations.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
phantom_name str

Phantom model name being analyzed.

required
analysis_config dict | None

Optional dictionary with plot names as keys and boolean values.

None
Source code in goliat/analysis/far_field_strategy.py
def __init__(self, config: "Config", phantom_name: str, analysis_config: dict | None = None):
    """Initializes the far-field analysis strategy.

    Args:
        config: Configuration object.
        phantom_name: Phantom model name being analyzed.
        analysis_config: Optional dictionary with plot names as keys and boolean values.
    """
    super().__init__(config, phantom_name, analysis_config)
Functions
get_results_base_dir
get_results_base_dir() -> str

Returns base directory for far-field results.

Source code in goliat/analysis/far_field_strategy.py
def get_results_base_dir(self) -> str:
    """Returns base directory for far-field results."""
    return os.path.join(self.base_dir, "results", "far_field", self.phantom_name)
get_plots_dir
get_plots_dir() -> str

Returns directory for far-field plots.

Source code in goliat/analysis/far_field_strategy.py
def get_plots_dir(self) -> str:
    """Returns directory for far-field plots."""
    return os.path.join(self.base_dir, "plots", "far_field", self.phantom_name)
load_and_process_results
load_and_process_results(analyzer: Analyzer)

Iterates through far-field results and processes each one.

Source code in goliat/analysis/far_field_strategy.py
def load_and_process_results(self, analyzer: "Analyzer"):
    """Iterates through far-field results and processes each one."""
    frequencies = self.config["frequencies_mhz"]
    far_field_params = self.config["far_field_setup.environmental"]
    if not far_field_params:
        return
    incident_directions = far_field_params.get("incident_directions", [])
    polarizations = far_field_params.get("polarizations", [])

    if not frequencies:
        return

    for freq in frequencies:
        if not incident_directions:
            continue
        for direction_name in incident_directions:
            if not polarizations:
                continue
            for polarization_name in polarizations:
                placement_name = f"environmental_{direction_name}_{polarization_name}"
                analyzer._process_single_result(freq, "environmental", placement_name, "")
get_normalization_factor
get_normalization_factor(frequency_mhz: int, simulated_power_w: float) -> float

Returns normalization factor for far-field (always 1.0).

Far-field simulations are normalized to 1 W/m^2 in the simulation itself, so no additional normalization is needed.

Source code in goliat/analysis/far_field_strategy.py
def get_normalization_factor(self, frequency_mhz: int, simulated_power_w: float) -> float:
    """Returns normalization factor for far-field (always 1.0).

    Far-field simulations are normalized to 1 W/m^2 in the simulation itself,
    so no additional normalization is needed.
    """
    return 1.0
apply_bug_fixes
apply_bug_fixes(result_entry: dict) -> dict

No bug fixes needed for far-field data.

Source code in goliat/analysis/far_field_strategy.py
def apply_bug_fixes(self, result_entry: dict) -> dict:
    """No bug fixes needed for far-field data."""
    return result_entry
calculate_summary_stats
calculate_summary_stats(results_df: DataFrame) -> pd.DataFrame

Calculates summary statistics for far-field results.

Source code in goliat/analysis/far_field_strategy.py
def calculate_summary_stats(self, results_df: pd.DataFrame) -> pd.DataFrame:
    """Calculates summary statistics for far-field results."""
    summary = results_df.groupby("frequency_mhz").mean(numeric_only=True)
    return pd.DataFrame(summary)

Near-Field Strategy

goliat.analysis.near_field_strategy

Classes

NearFieldAnalysisStrategy

NearFieldAnalysisStrategy(config: Config, phantom_name: str, analysis_config: dict | None = None)

Bases: BaseAnalysisStrategy

Analysis strategy for near-field simulations.

Handles result loading, normalization, and plot generation for near-field studies with placement scenarios, positions, and orientations.

Parameters:

Name Type Description Default
config Config

Configuration object.

required
phantom_name str

Phantom model name being analyzed.

required
analysis_config dict | None

Optional dictionary with plot names as keys and boolean values.

None
Source code in goliat/analysis/near_field_strategy.py
def __init__(self, config: "Config", phantom_name: str, analysis_config: dict | None = None):
    """Initializes the near-field analysis strategy.

    Args:
        config: Configuration object.
        phantom_name: Phantom model name being analyzed.
        analysis_config: Optional dictionary with plot names as keys and boolean values.
    """
    super().__init__(config, phantom_name, analysis_config)
Functions
get_results_base_dir
get_results_base_dir() -> str

Returns base directory for near-field results.

Source code in goliat/analysis/near_field_strategy.py
def get_results_base_dir(self) -> str:
    """Returns base directory for near-field results."""
    return os.path.join(self.base_dir, "results", "near_field", self.phantom_name)
get_plots_dir
get_plots_dir() -> str

Returns directory for near-field plots.

Source code in goliat/analysis/near_field_strategy.py
def get_plots_dir(self) -> str:
    """Returns directory for near-field plots."""
    return os.path.join(self.base_dir, "plots", "near_field", self.phantom_name)
load_and_process_results
load_and_process_results(analyzer: Analyzer)

Iterates through near-field results and processes each one.

Source code in goliat/analysis/near_field_strategy.py
def load_and_process_results(self, analyzer: "Analyzer"):
    """Iterates through near-field results and processes each one."""
    antenna_config = self.config["antenna_config"] or {}
    if not antenna_config:
        return
    frequencies = antenna_config.keys()
    placement_scenarios = self.config["placement_scenarios"]
    if not placement_scenarios:
        return

    for freq in frequencies:
        frequency_mhz = int(freq)
        for scenario_name, scenario_def in placement_scenarios.items():
            if not scenario_def:
                continue
            positions = scenario_def.get("positions", {})
            orientations = scenario_def.get("orientations", {})
            if not positions or not orientations:
                continue
            for pos_name in positions.keys():
                for orient_name in orientations.keys():
                    analyzer._process_single_result(frequency_mhz, scenario_name, pos_name, orient_name)
get_normalization_factor
get_normalization_factor(frequency_mhz: int, simulated_power_w: float) -> float

Calculates the normalization factor based on the target power.

Parameters:

Name Type Description Default
frequency_mhz int

The simulation frequency in MHz.

required
simulated_power_w float

The input power from the simulation in Watts.

required

Returns:

Type Description
float

The calculated normalization factor, or 1.0 if not possible.

Source code in goliat/analysis/near_field_strategy.py
def get_normalization_factor(self, frequency_mhz: int, simulated_power_w: float) -> float:
    """Calculates the normalization factor based on the target power.

    Args:
        frequency_mhz: The simulation frequency in MHz.
        simulated_power_w: The input power from the simulation in Watts.

    Returns:
        The calculated normalization factor, or 1.0 if not possible.
    """
    antenna_configs = self.config["antenna_config"] or {}
    freq_config = antenna_configs.get(str(frequency_mhz), {})
    target_power_mw = freq_config.get("target_power_mW")
    if target_power_mw is not None and pd.notna(simulated_power_w) and simulated_power_w > 0:
        target_power_w = target_power_mw / 1000.0
        return target_power_w / simulated_power_w
    return 1.0
extract_data
extract_data(pickle_data: dict, frequency_mhz: int, placement_name: str, scenario_name: str, sim_power: float, norm_factor: float, sar_results: dict | None = None) -> tuple[dict, list]

Extracts and normalizes SAR data from a single near-field result.

Parameters:

Name Type Description Default
pickle_data dict

Data loaded from the .pkl result file.

required
frequency_mhz int

The simulation frequency.

required
placement_name str

The detailed name of the placement.

required
scenario_name str

The general scenario name (e.g., 'by_cheek').

required
sim_power float

The simulated input power in Watts.

required
norm_factor float

The normalization factor to apply to SAR values.

required
sar_results dict | None

Optional JSON results dict containing power balance data.

None

Returns:

Type Description
tuple[dict, list]

A tuple containing the main result entry and a list of organ-specific entries.

Source code in goliat/analysis/near_field_strategy.py
def extract_data(
    self,
    pickle_data: dict,
    frequency_mhz: int,
    placement_name: str,
    scenario_name: str,
    sim_power: float,
    norm_factor: float,
    sar_results: dict | None = None,
) -> tuple[dict, list]:
    """Extracts and normalizes SAR data from a single near-field result.

    Args:
        pickle_data: Data loaded from the .pkl result file.
        frequency_mhz: The simulation frequency.
        placement_name: The detailed name of the placement.
        scenario_name: The general scenario name (e.g., 'by_cheek').
        sim_power: The simulated input power in Watts.
        norm_factor: The normalization factor to apply to SAR values.
        sar_results: Optional JSON results dict containing power balance data.

    Returns:
        A tuple containing the main result entry and a list of organ-specific entries.
    """
    summary_results = pickle_data.get("summary_results", {})
    grouped_stats = pickle_data.get("grouped_sar_stats", {})
    detailed_df = pickle_data.get("detailed_sar_stats")

    # Check bounding_box setting from config to determine which SAR fields are valid
    placement_scenarios = self.config["placement_scenarios"] or {}
    scenario_config = placement_scenarios.get(scenario_name, {}) if isinstance(placement_scenarios, dict) else {}
    bounding_box_setting = scenario_config.get("bounding_box", "default")

    # Extract SAR values - prioritize JSON over pickle, and respect bounding_box setting
    sar_head = pd.NA
    sar_trunk = pd.NA
    sar_whole_body = pd.NA

    # If whole_body bounding box, only use whole_body_sar (ignore head/trunk from old pickle files)
    if bounding_box_setting == "whole_body":
        # Prefer JSON, fallback to pickle
        sar_whole_body = sar_results.get("whole_body_sar", pd.NA) if sar_results else pd.NA
        if pd.isna(sar_whole_body):
            sar_whole_body = summary_results.get("whole_body_sar", pd.NA)
    else:
        # For head/trunk/default bounding boxes, extract head/trunk SAR
        # Prefer JSON, fallback to pickle
        if sar_results:
            sar_head = sar_results.get("head_SAR", pd.NA)
            sar_trunk = sar_results.get("trunk_SAR", pd.NA)
        if pd.isna(sar_head):
            sar_head = summary_results.get("head_SAR", pd.NA)
        if pd.isna(sar_trunk):
            sar_trunk = summary_results.get("trunk_SAR", pd.NA)

        # Also check for whole_body_sar (might exist from old data)
        if sar_results:
            sar_whole_body = sar_results.get("whole_body_sar", pd.NA)
        if pd.isna(sar_whole_body):
            sar_whole_body = summary_results.get("whole_body_sar", pd.NA)

    result_entry = {
        "frequency_mhz": frequency_mhz,
        "placement": placement_name,
        "scenario": scenario_name,
        "input_power_w": sim_power,
        "SAR_head": sar_head * norm_factor if pd.notna(sar_head) else pd.NA,
        "SAR_trunk": sar_trunk * norm_factor if pd.notna(sar_trunk) else pd.NA,
        "SAR_whole_body": sar_whole_body * norm_factor if pd.notna(sar_whole_body) else pd.NA,
    }
    for group_name, stats in grouped_stats.items():
        key = f"psSAR10g_{group_name.replace('_group', '')}"
        result_entry[key] = stats.get("peak_sar", pd.NA) * norm_factor

    # Extract power balance data if available
    power_balance = None
    if sar_results and "power_balance" in sar_results:
        power_balance = sar_results["power_balance"]
    elif summary_results and "power_balance" in summary_results:
        power_balance = summary_results["power_balance"]

    if power_balance:
        result_entry["power_balance_pct"] = power_balance.get("Balance", pd.NA)
        result_entry["power_pin_W"] = power_balance.get("Pin", pd.NA)
        result_entry["power_diel_loss_W"] = power_balance.get("DielLoss", pd.NA)
        result_entry["power_rad_W"] = power_balance.get("RadPower", pd.NA)
        result_entry["power_sibc_loss_W"] = power_balance.get("SIBCLoss", pd.NA)

    organ_entries = []
    if detailed_df is not None:
        peak_sar_col = "Peak Spatial-Average SAR[IEEE/IEC62704-1] (10g)"
        for _, row in detailed_df.iterrows():
            organ_entry = {
                "frequency_mhz": frequency_mhz,
                "placement": placement_name,
                "scenario": scenario_name,
                "tissue": _clean_tissue_name(row["Tissue"]),  # Clean tissue name early
                "mass_avg_sar_mw_kg": row["Mass-Averaged SAR"] * norm_factor * 1000,
                "peak_sar_10g_mw_kg": row.get(peak_sar_col, pd.NA) * norm_factor * 1000
                if pd.notna(row.get(peak_sar_col, pd.NA))
                else pd.NA,
                "min_local_sar_mw_kg": row.get("Min. local SAR", pd.NA) * norm_factor * 1000
                if pd.notna(row.get("Min. local SAR", pd.NA))
                else pd.NA,
                "max_local_sar_mw_kg": row.get("Max. local SAR", pd.NA) * norm_factor * 1000
                if pd.notna(row.get("Max. local SAR", pd.NA))
                else pd.NA,
            }
            # Add Total Mass, Total Volume, Total Loss, Max Loss Power Density if available
            if "Total Mass" in row.index:
                organ_entry["Total Mass"] = row["Total Mass"]
            if "Total Volume" in row.index:
                organ_entry["Total Volume"] = row["Total Volume"]
            if "Total Loss" in row.index:
                organ_entry["Total Loss"] = row["Total Loss"]
            if "Max Loss Power Density" in row.index:
                organ_entry["Max Loss Power Density"] = row["Max Loss Power Density"]
            # Add psSAR10g column name for compatibility
            if peak_sar_col in row.index and pd.notna(row[peak_sar_col]):
                organ_entry["psSAR10g"] = row[peak_sar_col] * norm_factor * 1000
            organ_entries.append(organ_entry)
    return result_entry, organ_entries
apply_bug_fixes
apply_bug_fixes(result_entry: dict) -> dict

Applies a workaround for Head SAR being miscategorized as Trunk SAR.

NOTE: This method is deprecated for whole_body bounding box scenarios. For whole_body scenarios, SAR_head and SAR_trunk should remain NA.

Parameters:

Name Type Description Default
result_entry dict

The data entry for a single simulation result.

required

Returns:

Type Description
dict

The corrected result entry.

Source code in goliat/analysis/near_field_strategy.py
def apply_bug_fixes(self, result_entry: dict) -> dict:
    """Applies a workaround for Head SAR being miscategorized as Trunk SAR.

    NOTE: This method is deprecated for whole_body bounding box scenarios.
    For whole_body scenarios, SAR_head and SAR_trunk should remain NA.

    Args:
        result_entry: The data entry for a single simulation result.

    Returns:
        The corrected result entry.
    """
    # Skip bug fix if whole_body_sar is present (indicates whole_body bounding box)
    if pd.notna(result_entry.get("SAR_whole_body")):
        # For whole_body scenarios, ensure head/trunk are NA
        result_entry["SAR_head"] = pd.NA
        result_entry["SAR_trunk"] = pd.NA
        return result_entry

    # Original bug fix logic for head/trunk bounding boxes
    placement = result_entry.get("placement", "").lower()
    if placement.startswith("front_of_eyes") or placement.startswith("by_cheek"):
        sar_head = result_entry.get("SAR_head")
        sar_trunk = result_entry.get("SAR_trunk")
        if bool(pd.isna(sar_head)) and bool(pd.notna(sar_trunk)):
            result_entry["SAR_head"] = sar_trunk
            result_entry["SAR_trunk"] = pd.NA
    return result_entry
calculate_summary_stats
calculate_summary_stats(results_df: DataFrame) -> pd.DataFrame

Calculates summary statistics, including completion progress.

Parameters:

Name Type Description Default
results_df DataFrame

DataFrame with all aggregated simulation results.

required

Returns:

Type Description
DataFrame

A DataFrame with mean SAR values and a 'progress' column.

Source code in goliat/analysis/near_field_strategy.py
def calculate_summary_stats(self, results_df: pd.DataFrame) -> pd.DataFrame:
    """Calculates summary statistics, including completion progress.

    Args:
        results_df: DataFrame with all aggregated simulation results.

    Returns:
        A DataFrame with mean SAR values and a 'progress' column.
    """
    placement_scenarios = self.config["placement_scenarios"]
    placements_per_scenario = {}
    logging.getLogger("progress").info(
        "\n--- Calculating Total Possible Placements per Scenario ---",
        extra={"log_type": "header"},
    )
    if placement_scenarios:
        for name, definition in placement_scenarios.items():
            if not definition:
                continue
            total = len(definition.get("positions", {})) * len(definition.get("orientations", {}))
            placements_per_scenario[name] = total
            logging.getLogger("progress").info(f"- Scenario '{name}': {total} placements", extra={"log_type": "info"})

    summary_stats = results_df.groupby(["scenario", "frequency_mhz"]).mean(numeric_only=True)
    completion_counts = results_df.groupby(["scenario", "frequency_mhz"]).size()

    # Define a mapping function that safely handles potential missing keys
    def get_progress(idx):
        # Index is a tuple (scenario, frequency)
        if isinstance(idx, tuple) and len(idx) == 2:
            scenario_name, _ = idx
        else:
            scenario_name = idx
        completed = completion_counts.get(idx, 0)
        total = placements_per_scenario.get(scenario_name, 0)
        return f"{completed}/{total}"

    if not summary_stats.empty:
        summary_stats["progress"] = summary_stats.index.map(get_progress)  # type: ignore
    return pd.DataFrame(summary_stats)
generate_plots
generate_plots(analyzer: Analyzer, plotter: Plotter, results_df: DataFrame, all_organ_results_df: DataFrame)

Generates all plots for the near-field analysis.

Includes bar charts for average SAR, line plots for psSAR, and boxplots for SAR distribution.

Parameters:

Name Type Description Default
analyzer Analyzer

The main analyzer instance.

required
plotter Plotter

The plotter instance for generating plots.

required
results_df DataFrame

DataFrame with main aggregated results.

required
all_organ_results_df DataFrame

DataFrame with detailed organ-level results.

required
Source code in goliat/analysis/near_field_strategy.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
def generate_plots(
    self,
    analyzer: "Analyzer",
    plotter: "Plotter",
    results_df: pd.DataFrame,
    all_organ_results_df: pd.DataFrame,
):
    """Generates all plots for the near-field analysis.

    Includes bar charts for average SAR, line plots for psSAR, and boxplots
    for SAR distribution.

    Args:
        analyzer: The main analyzer instance.
        plotter: The plotter instance for generating plots.
        results_df: DataFrame with main aggregated results.
        all_organ_results_df: DataFrame with detailed organ-level results.
    """
    # Aggregate organ-level SAR by tissue groups and merge into results_df
    # Uses tissue_group_composition from pickle files if available
    results_df = self._add_tissue_group_sar(results_df, all_organ_results_df, analyzer)

    scenarios_with_results = results_df["scenario"].unique()
    summary_stats = self.calculate_summary_stats(results_df)

    for scenario_name in scenarios_with_results:
        logging.getLogger("progress").info(
            f"\n--- Generating plots for scenario: {scenario_name} ---",
            extra={"log_type": "header"},
        )
        scenario_results_df = results_df[results_df["scenario"] == scenario_name]
        if scenario_name in summary_stats.index:
            scenario_summary_stats = summary_stats.loc[scenario_name]
            avg_results = scenario_summary_stats.drop(columns=["progress"])
            progress_info = scenario_summary_stats["progress"]
            if self.should_generate_plot("plot_average_sar_bar"):
                plotter.plot_average_sar_bar(scenario_name, pd.DataFrame(avg_results), pd.Series(progress_info), scenario_results_df)
            if self.should_generate_plot("plot_average_pssar_bar"):
                plotter.plot_average_pssar_bar(scenario_name, pd.DataFrame(avg_results), pd.Series(progress_info), scenario_results_df)
            if self.should_generate_plot("plot_sar_line"):
                plotter.plot_sar_line(scenario_name, pd.DataFrame(avg_results))
            if self.should_generate_plot("plot_pssar_line"):
                plotter.plot_pssar_line(scenario_name, pd.DataFrame(avg_results))
        if self.should_generate_plot("plot_sar_distribution_boxplots"):
            plotter.plot_sar_distribution_boxplots(scenario_name, pd.DataFrame(scenario_results_df))

        # Individual variation line plots (one line per placement/direction/polarization)
        # Include all SAR and psSAR10g metrics for symmetry
        pssar_columns = [col for col in scenario_results_df.columns if col.startswith("psSAR10g")]
        sar_columns_for_lines = ["SAR_head", "SAR_trunk", "SAR_whole_body", "SAR_brain", "SAR_skin", "SAR_eyes", "SAR_genitals"]
        # Plot SAR metrics using SAR-specific function
        if self.should_generate_plot("plot_sar_line_individual_variations"):
            for metric_col in sar_columns_for_lines:
                if metric_col in scenario_results_df.columns:
                    plotter.plot_sar_line_individual_variations(
                        results_df,
                        scenario_name=scenario_name,
                        metric_column=metric_col,
                    )
        # Plot psSAR10g metrics using psSAR10g-specific function
        if self.should_generate_plot("plot_pssar_line_individual_variations"):
            for metric_col in pssar_columns:
                if metric_col in scenario_results_df.columns:
                    plotter.plot_pssar_line_individual_variations(
                        results_df,
                        scenario_name=scenario_name,
                        metric_column=metric_col,
                    )

    # Generate comprehensive heatmap with all tissues (Min/Avg/Max SAR)
    if not all_organ_results_df.empty and analyzer.tissue_group_composition:
        # Check if required columns exist
        required_cols = ["min_local_sar_mw_kg", "mass_avg_sar_mw_kg", "max_local_sar_mw_kg"]
        missing_cols = [col for col in required_cols if col not in all_organ_results_df.columns]

        if missing_cols:
            logging.getLogger("progress").warning(
                f"  - WARNING: Missing columns for SAR heatmap: {missing_cols}. Skipping heatmap.",
                extra={"log_type": "warning"},
            )
        else:
            # Prepare organ-level data with min/avg/max SAR
            # Aggregate across all placements to get mean values per tissue and frequency
            organ_sar_df = (
                all_organ_results_df.groupby(["tissue", "frequency_mhz"])
                .agg(
                    min_sar=("min_local_sar_mw_kg", "mean"),
                    avg_sar=("mass_avg_sar_mw_kg", "mean"),
                    max_sar=("max_local_sar_mw_kg", "mean"),
                )
                .reset_index()
            )

            # Drop rows where all SAR values are NA (pandas mean() returns NaN if all values are NaN)
            organ_sar_df = organ_sar_df.dropna(subset=["min_sar", "avg_sar", "max_sar"], how="all")

            # Prepare group-level summary data
            tissue_groups = {group_name: list(tissues) for group_name, tissues in analyzer.tissue_group_composition.items()}

            group_summary_data = []
            for group_name, tissues in tissue_groups.items():
                if not tissues:
                    continue

                # Filter organs belonging to this group
                group_df = all_organ_results_df[all_organ_results_df["tissue"].isin(tissues)]

                if not group_df.empty:
                    summary = group_df.groupby("frequency_mhz").agg(avg_sar=("mass_avg_sar_mw_kg", "mean")).reset_index()
                    summary["group"] = group_name.replace("_group", "").capitalize()
                    group_summary_data.append(summary)

            group_summary_df = pd.concat(group_summary_data, ignore_index=True) if group_summary_data else pd.DataFrame()

            if not organ_sar_df.empty and not group_summary_df.empty:
                # Use tissue_group_composition for plotter (convert sets to lists)
                plotter_tissue_groups = {group_name: list(tissues) for group_name, tissues in analyzer.tissue_group_composition.items()}

                if self.should_generate_plot("plot_sar_heatmap"):
                    logging.getLogger("progress").info(
                        "\n--- Generating comprehensive SAR heatmap (all tissues) ---",
                        extra={"log_type": "header"},
                    )
                    plotter.plot_sar_heatmap(
                        pd.DataFrame(organ_sar_df),
                        pd.DataFrame(group_summary_df),
                        plotter_tissue_groups,
                    )

                # Also generate psSAR10g heatmap if data is available
                if "peak_sar_10g_mw_kg" in all_organ_results_df.columns and self.should_generate_plot("plot_peak_sar_heatmap"):
                    organ_pssar_df = (
                        all_organ_results_df.groupby(["tissue", "frequency_mhz"])
                        .agg(peak_sar_10g_mw_kg=("peak_sar_10g_mw_kg", "mean"))
                        .reset_index()
                    )
                    organ_pssar_df = plotter._filter_all_regions(organ_pssar_df, tissue_column="tissue")

                    group_pssar_summary_data = []
                    for group_name, tissues in tissue_groups.items():
                        if not tissues:
                            continue
                        group_df = all_organ_results_df[all_organ_results_df["tissue"].isin(tissues)]
                        if not group_df.empty:
                            summary = (
                                group_df.groupby("frequency_mhz").agg(peak_sar_10g_mw_kg=("peak_sar_10g_mw_kg", "mean")).reset_index()
                            )
                            summary["group"] = group_name.replace("_group", "").capitalize()
                            group_pssar_summary_data.append(summary)

                    group_pssar_summary_df = (
                        pd.concat(group_pssar_summary_data, ignore_index=True) if group_pssar_summary_data else pd.DataFrame()
                    )

                    if not organ_pssar_df.empty and not group_pssar_summary_df.empty:
                        if self.should_generate_plot("plot_peak_sar_heatmap"):
                            logging.getLogger("progress").info(
                                "\n--- Generating comprehensive psSAR10g heatmap (all tissues) ---",
                                extra={"log_type": "header"},
                            )
                            plotter.plot_peak_sar_heatmap(
                                pd.DataFrame(organ_pssar_df),
                                pd.DataFrame(group_pssar_summary_df),
                                plotter_tissue_groups,
                                value_col="peak_sar_10g_mw_kg",
                                title="Peak SAR 10g",
                            )
            else:
                logging.getLogger("progress").warning(
                    "  - WARNING: Insufficient data for SAR heatmap (empty organ or group data).",
                    extra={"log_type": "warning"},
                )

    # Generate power balance plots for all results
    if self.should_generate_plot("plot_power_balance_overview"):
        plotter.plot_power_balance_overview(results_df)

    # ============================================================================
    # Generate All New Comprehensive Plots
    # ============================================================================

    logging.getLogger("progress").info(
        "\n--- Generating comprehensive analysis plots ---",
        extra={"log_type": "header"},
    )

    # Collect peak location data from all results
    peak_location_data = []
    for result in analyzer.all_results:
        if "peak_sar_details" in result:
            peak_details = result["peak_sar_details"]
            if peak_details and isinstance(peak_details, dict):
                peak_location_data.append(
                    {
                        "PeakLocation": peak_details.get("PeakLocation", None),
                        "PeakCubeSideLength": peak_details.get("PeakCubeSideLength", None),
                        "PeakValue": peak_details.get("PeakValue", None),
                        "PeakCell": peak_details.get("PeakCell", None),
                        "placement": result.get("placement", ""),
                        "frequency_mhz": result.get("frequency_mhz", None),
                        "scenario": result.get("scenario", ""),
                    }
                )

    peak_location_df = pd.DataFrame(peak_location_data) if peak_location_data else pd.DataFrame()

    # ============================================================================
    # Spatial Plots
    # ============================================================================
    if not peak_location_df.empty:
        if self.should_generate_plot("plot_peak_location_3d_interactive") or self.should_generate_plot(
            "plot_peak_location_2d_projections"
        ):
            logging.getLogger("progress").info(
                "  - Generating spatial plots (3D and 2D peak locations)...",
                extra={"log_type": "info"},
            )
            # First create aggregated plot with all scenarios
            # Calculate axis limits from all data
            axis_limits = plotter.spatial._calculate_axis_limits(peak_location_df)
            # Create aggregated plot
            if self.should_generate_plot("plot_peak_location_3d_interactive"):
                plotter.plot_peak_location_3d_interactive(peak_location_df, scenario_name=None, axis_limits=axis_limits)

            # Per-scenario spatial plots with inherited axis limits
            for scenario in peak_location_df["scenario"].unique():
                scenario_peak_data = peak_location_df[peak_location_df["scenario"] == scenario].copy()
                if not scenario_peak_data.empty and self.should_generate_plot("plot_peak_location_3d_interactive"):
                    plotter.plot_peak_location_3d_interactive(scenario_peak_data, scenario_name=scenario, axis_limits=axis_limits)
                if self.should_generate_plot("plot_peak_location_2d_projections"):
                    plotter.plot_peak_location_2d_projections(peak_location_df, scenario_name=scenario)

    # ============================================================================
    # Correlation Plots
    # ============================================================================
    if self.should_generate_plot("plot_correlation_head_vs_eye_sar") or self.should_generate_plot(
        "plot_tissue_group_correlation_matrix"
    ):
        logging.getLogger("progress").info(
            "  - Generating correlation plots...",
            extra={"log_type": "info"},
        )
        # Head vs Eye SAR correlation (for front_of_eyes scenario)
        if "front_of_eyes" in scenarios_with_results and self.should_generate_plot("plot_correlation_head_vs_eye_sar"):
            plotter.plot_correlation_head_vs_eye_sar(results_df, scenario_name="front_of_eyes")
        # Tissue group correlation matrix (per scenario only - averaging across scenarios doesn't make sense)
        if self.should_generate_plot("plot_tissue_group_correlation_matrix"):
            for scenario in scenarios_with_results:
                plotter.plot_tissue_group_correlation_matrix(results_df, scenario_name=scenario)

    # ============================================================================
    # Bubble Plots (Mass vs SAR)
    # ============================================================================
    if not all_organ_results_df.empty and (
        self.should_generate_plot("plot_bubble_mass_vs_sar") or self.should_generate_plot("plot_bubble_mass_vs_sar_interactive")
    ):
        logging.getLogger("progress").info(
            "  - Generating bubble plots (mass vs SAR)...",
            extra={"log_type": "info"},
        )
        # Note: Total Mass, Total Volume, etc. should be included in organ_entries from extract_data

        # Get unique frequencies for frequency-specific plots
        frequencies = (
            sorted(all_organ_results_df["frequency_mhz"].dropna().unique()) if "frequency_mhz" in all_organ_results_df.columns else []
        )

        # SAR columns to plot
        sar_columns = ["mass_avg_sar_mw_kg"]
        if "psSAR10g" in all_organ_results_df.columns:
            sar_columns.append("psSAR10g")
        elif "peak_sar_10g_mw_kg" in all_organ_results_df.columns:
            all_organ_results_df["psSAR10g"] = all_organ_results_df["peak_sar_10g_mw_kg"]
            sar_columns.append("psSAR10g")
        if "max_local_sar_mw_kg" in all_organ_results_df.columns:
            sar_columns.append("max_local_sar_mw_kg")

        for sar_col in sar_columns:
            # Per-scenario variants only (averaging across scenarios doesn't make sense)
            for scenario in scenarios_with_results:
                # Per-scenario (all frequencies)
                if self.should_generate_plot("plot_bubble_mass_vs_sar"):
                    plotter.plot_bubble_mass_vs_sar(
                        all_organ_results_df,
                        sar_column=sar_col,
                        scenario_name=scenario,
                    )

                    # Per-scenario AND per-frequency variants
                    for freq in frequencies:
                        plotter.plot_bubble_mass_vs_sar(
                            all_organ_results_df,
                            sar_column=sar_col,
                            scenario_name=scenario,
                            frequency_mhz=freq,
                        )

                # Interactive plot per scenario
                if self.should_generate_plot("plot_bubble_mass_vs_sar_interactive"):
                    plotter.plot_bubble_mass_vs_sar_interactive(
                        all_organ_results_df,
                        sar_column=sar_col,
                        scenario_name=scenario,
                    )

    # ============================================================================
    # Ranking Plots
    # ============================================================================
    if not all_organ_results_df.empty and self.should_generate_plot("plot_top20_tissues_ranking"):
        logging.getLogger("progress").info(
            "  - Generating ranking plots (top 20 tissues)...",
            extra={"log_type": "info"},
        )
        # Per-scenario variants only (averaging across scenarios doesn't make sense)
        for scenario in scenarios_with_results:
            if "max_local_sar_mw_kg" in all_organ_results_df.columns:
                plotter.plot_top20_tissues_ranking(
                    all_organ_results_df,
                    metric="max_local_sar_mw_kg",
                    scenario_name=scenario,
                )
            # Top 20 by Mass-Averaged SAR
            plotter.plot_top20_tissues_ranking(
                all_organ_results_df,
                metric="mass_avg_sar_mw_kg",
                scenario_name=scenario,
            )
            # Top 20 by Total Loss (if available)
            if "Total Loss" in all_organ_results_df.columns:
                plotter.plot_top20_tissues_ranking(
                    all_organ_results_df,
                    metric="Total Loss",
                    scenario_name=scenario,
                )

    # ============================================================================
    # Power Plots
    # ============================================================================
    if self.should_generate_plot("plot_power_efficiency_trends") or self.should_generate_plot("plot_power_absorption_distribution"):
        logging.getLogger("progress").info(
            "  - Generating power analysis plots...",
            extra={"log_type": "info"},
        )
        # Power efficiency trends (per scenario only - averaging across scenarios doesn't make sense)
        if self.should_generate_plot("plot_power_efficiency_trends"):
            for scenario in scenarios_with_results:
                plotter.plot_power_efficiency_trends(results_df, scenario_name=scenario)

        # Power absorption distribution (per scenario only - averaging across scenarios doesn't make sense)
        if (
            self.should_generate_plot("plot_power_absorption_distribution")
            and not all_organ_results_df.empty
            and "Total Loss" in all_organ_results_df.columns
        ):
            for scenario in scenarios_with_results:
                plotter.plot_power_absorption_distribution(all_organ_results_df, scenario_name=scenario)

    # ============================================================================
    # Penetration Depth Plot
    # ============================================================================
    if self.should_generate_plot("plot_penetration_depth_ratio"):
        logging.getLogger("progress").info(
            "  - Generating penetration depth plot...",
            extra={"log_type": "info"},
        )
        # Penetration depth ratio (per scenario only - averaging across scenarios doesn't make sense)
        # Generate both psSAR10g and SAR versions for symmetry
        if "psSAR10g_brain" in results_df.columns and "psSAR10g_skin" in results_df.columns:
            for scenario in scenarios_with_results:
                plotter.plot_penetration_depth_ratio(results_df, scenario_name=scenario, metric_type="psSAR10g")
        if "SAR_brain" in results_df.columns and "SAR_skin" in results_df.columns:
            for scenario in scenarios_with_results:
                plotter.plot_penetration_depth_ratio(results_df, scenario_name=scenario, metric_type="SAR")

    # ============================================================================
    # Tissue Analysis Plots
    # ============================================================================
    if not all_organ_results_df.empty and (
        self.should_generate_plot("plot_max_local_vs_pssar10g_scatter")
        or self.should_generate_plot("plot_tissue_mass_volume_distribution")
        or self.should_generate_plot("plot_tissue_frequency_response")
    ):
        logging.getLogger("progress").info(
            "  - Generating tissue analysis plots...",
            extra={"log_type": "info"},
        )
        # Max Local SAR vs psSAR10g scatter (per scenario only - averaging across scenarios doesn't make sense)
        if self.should_generate_plot("plot_max_local_vs_pssar10g_scatter") and "max_local_sar_mw_kg" in all_organ_results_df.columns:
            for scenario in scenarios_with_results:
                plotter.plot_max_local_vs_pssar10g_scatter(all_organ_results_df, scenario_name=scenario)

        # Tissue mass/volume distribution (per scenario only - averaging across scenarios doesn't make sense)
        if (
            self.should_generate_plot("plot_tissue_mass_volume_distribution")
            and "Total Mass" in all_organ_results_df.columns
            and "Total Volume" in all_organ_results_df.columns
        ):
            for scenario in scenarios_with_results:
                plotter.plot_tissue_mass_volume_distribution(all_organ_results_df, scenario_name=scenario)

        # Tissue frequency response for top tissues (per scenario only - averaging across scenarios doesn't make sense)
        if self.should_generate_plot("plot_tissue_frequency_response") and "mass_avg_sar_mw_kg" in all_organ_results_df.columns:
            # Filter out 'All Regions' before selecting top tissues
            filtered_organs = all_organ_results_df[all_organ_results_df["tissue"] != "All Regions"]
            for scenario in scenarios_with_results:
                # Get top tissues per scenario
                scenario_organs = filtered_organs[filtered_organs["scenario"] == scenario]
                top_tissues = scenario_organs.groupby("tissue")["mass_avg_sar_mw_kg"].mean().nlargest(10).index.tolist()
                for tissue in top_tissues:
                    plotter.plot_tissue_frequency_response(all_organ_results_df, tissue_name=tissue, scenario_name=scenario)

    # ============================================================================
    # CDF Plots
    # ============================================================================
    if self.should_generate_plot("plot_cdf"):
        logging.getLogger("progress").info(
            "  - Generating CDF plots...",
            extra={"log_type": "info"},
        )
        # Get all available metrics for CDF plots
        cdf_metrics = []
        sar_metrics = [col for col in results_df.columns if col.startswith("SAR_")]
        pssar_metrics = [col for col in results_df.columns if col.startswith("psSAR10g_")]
        cdf_metrics.extend(sar_metrics)
        cdf_metrics.extend(pssar_metrics)

        # Generate CDF plots with various grouping options
        for metric in cdf_metrics:
            if metric not in results_df.columns:
                continue

            # CDF grouped by scenario (shows all scenarios together for comparison)
            plotter.plot_cdf(results_df, metric, group_by="scenario")

            # Per-scenario CDFs grouped by frequency
            for scenario in scenarios_with_results:
                plotter.plot_cdf(results_df, metric, group_by="frequency_mhz", scenario_name=scenario)

            # Note: Single CDF (all data) and per-frequency CDFs removed - averaging across scenarios doesn't make sense

    # ============================================================================
    # Outlier Identification
    # ============================================================================
    if self.should_generate_plot("identify_outliers"):
        logging.getLogger("progress").info(
            "  - Identifying outliers...",
            extra={"log_type": "info"},
        )
        # Include all SAR and psSAR10g metrics for symmetry
        outlier_metrics = [
            "psSAR10g_brain",
            "psSAR10g_eyes",
            "psSAR10g_skin",
            "psSAR10g_genitals",
            "psSAR10g_whole_body",
            "SAR_head",
            "SAR_trunk",
            "SAR_whole_body",
            "SAR_brain",
            "SAR_skin",
            "SAR_eyes",
            "SAR_genitals",
        ]
        for metric in outlier_metrics:
            if metric in results_df.columns:
                outliers = plotter.identify_outliers(results_df, metric)
                if outliers is not None and not outliers.empty:
                    # Save outliers to CSV
                    subdir = plotter._get_subdir("outliers")
                    filename = f"outliers_{metric}.csv"
                    outliers.to_csv(os.path.join(subdir, filename), index=False)
                    logging.getLogger("progress").info(
                        f"    - Found {len(outliers)} outliers for {metric}",
                        extra={"log_type": "info"},
                    )

    logging.getLogger("progress").info(
        "\n--- Comprehensive analysis plots generation complete ---",
        extra={"log_type": "success"},
    )

Plotting

goliat.analysis.plotter.Plotter

Plotter(plots_dir: str, phantom_name: str | None = None, plot_format: str = 'pdf')

Bases: BasePlotter

Generates publication-ready plots from simulation results.

Creates bar charts, line plots, boxplots, and heatmaps for SAR analysis. All plots are saved to the configured plots directory.

Uses composition to delegate to specialized plot modules for better organization.

Parameters:

Name Type Description Default
plots_dir str

Directory where all plots will be saved.

required
phantom_name str | None

Optional phantom model name for titles.

None
plot_format str

Output format for plots ('pdf' or 'png'), default 'pdf'.

'pdf'
Source code in goliat/analysis/plotter.py
def __init__(self, plots_dir: str, phantom_name: str | None = None, plot_format: str = "pdf"):
    """Sets up the plotter and creates output directory.

    Args:
        plots_dir: Directory where all plots will be saved.
        phantom_name: Optional phantom model name for titles.
        plot_format: Output format for plots ('pdf' or 'png'), default 'pdf'.
    """
    super().__init__(plots_dir, phantom_name, plot_format)
    os.makedirs(self.plots_dir, exist_ok=True)

    # Initialize specialized plot modules
    self.bar = BarPlotter(plots_dir, phantom_name, plot_format)
    self.line = LinePlotter(plots_dir, phantom_name, plot_format)
    self.boxplot = BoxplotPlotter(plots_dir, phantom_name, plot_format)
    self.heatmap = HeatmapPlotter(plots_dir, phantom_name, plot_format)
    self.spatial = SpatialPlotter(plots_dir, phantom_name, plot_format)
    self.correlation = CorrelationPlotter(plots_dir, phantom_name, plot_format)
    self.bubble = BubblePlotter(plots_dir, phantom_name, plot_format)
    self.ranking = RankingPlotter(plots_dir, phantom_name, plot_format)
    self.power = PowerPlotter(plots_dir, phantom_name, plot_format)
    self.penetration = PenetrationPlotter(plots_dir, phantom_name, plot_format)
    self.tissue_analysis = TissueAnalysisPlotter(plots_dir, phantom_name, plot_format)
    self.cdf = CdfPlotter(plots_dir, phantom_name, plot_format)
    self.outliers = OutliersPlotter(plots_dir, phantom_name, plot_format)

    logging.getLogger("progress").info(
        f"--- Plots will be saved to '{self.plots_dir}' directory. ---",
        extra={"log_type": "info"},
    )

Functions

plot_average_sar_bar

plot_average_sar_bar(*args, **kwargs)

Creates a bar chart of average SAR values by frequency.

Source code in goliat/analysis/plotter.py
def plot_average_sar_bar(self, *args, **kwargs):
    """Creates a bar chart of average SAR values by frequency."""
    return self.bar.plot_average_sar_bar(*args, **kwargs)

plot_average_pssar_bar

plot_average_pssar_bar(*args, **kwargs)

Creates a bar chart of average psSAR10g values by frequency.

Source code in goliat/analysis/plotter.py
def plot_average_pssar_bar(self, *args, **kwargs):
    """Creates a bar chart of average psSAR10g values by frequency."""
    return self.bar.plot_average_pssar_bar(*args, **kwargs)

plot_whole_body_sar_bar

plot_whole_body_sar_bar(*args, **kwargs)

Creates a bar chart of average whole-body SAR by frequency.

Source code in goliat/analysis/plotter.py
def plot_whole_body_sar_bar(self, *args, **kwargs):
    """Creates a bar chart of average whole-body SAR by frequency."""
    return self.bar.plot_whole_body_sar_bar(*args, **kwargs)

plot_peak_sar_line

plot_peak_sar_line(*args, **kwargs)

Plots peak SAR trend across frequencies.

Source code in goliat/analysis/plotter.py
def plot_peak_sar_line(self, *args, **kwargs):
    """Plots peak SAR trend across frequencies."""
    return self.line.plot_peak_sar_line(*args, **kwargs)

plot_pssar_line

plot_pssar_line(*args, **kwargs)

Plots average psSAR10g trends for tissue groups by frequency.

Source code in goliat/analysis/plotter.py
def plot_pssar_line(self, *args, **kwargs):
    """Plots average psSAR10g trends for tissue groups by frequency."""
    return self.line.plot_pssar_line(*args, **kwargs)

plot_sar_line

plot_sar_line(*args, **kwargs)

Plots average SAR trends for tissue groups by frequency.

Source code in goliat/analysis/plotter.py
def plot_sar_line(self, *args, **kwargs):
    """Plots average SAR trends for tissue groups by frequency."""
    return self.line.plot_sar_line(*args, **kwargs)

plot_pssar_line_individual_variations

plot_pssar_line_individual_variations(*args, **kwargs)

Plots individual variation lines for each placement variation.

Source code in goliat/analysis/plotter.py
def plot_pssar_line_individual_variations(self, *args, **kwargs):
    """Plots individual variation lines for each placement variation."""
    return self.line.plot_pssar_line_individual_variations(*args, **kwargs)

plot_sar_line_individual_variations

plot_sar_line_individual_variations(*args, **kwargs)

Plots individual variation lines for SAR metrics.

Source code in goliat/analysis/plotter.py
def plot_sar_line_individual_variations(self, *args, **kwargs):
    """Plots individual variation lines for SAR metrics."""
    return self.line.plot_sar_line_individual_variations(*args, **kwargs)

plot_sar_distribution_boxplots

plot_sar_distribution_boxplots(*args, **kwargs)

Creates boxplots showing SAR value distributions across placements.

Source code in goliat/analysis/plotter.py
def plot_sar_distribution_boxplots(self, *args, **kwargs):
    """Creates boxplots showing SAR value distributions across placements."""
    return self.boxplot.plot_sar_distribution_boxplots(*args, **kwargs)

plot_far_field_distribution_boxplot

plot_far_field_distribution_boxplot(*args, **kwargs)

Creates a boxplot showing distribution of a metric across directions/polarizations.

Source code in goliat/analysis/plotter.py
def plot_far_field_distribution_boxplot(self, *args, **kwargs):
    """Creates a boxplot showing distribution of a metric across directions/polarizations."""
    return self.boxplot.plot_far_field_distribution_boxplot(*args, **kwargs)

plot_sar_heatmap

plot_sar_heatmap(*args, **kwargs)

Creates a combined heatmap showing Min/Avg/Max SAR per tissue and frequency.

Source code in goliat/analysis/plotter.py
def plot_sar_heatmap(self, *args, **kwargs):
    """Creates a combined heatmap showing Min/Avg/Max SAR per tissue and frequency."""
    return self.heatmap.plot_sar_heatmap(*args, **kwargs)

plot_peak_sar_heatmap

plot_peak_sar_heatmap(*args, **kwargs)

Creates a heatmap for peak SAR values across tissues and frequencies.

Source code in goliat/analysis/plotter.py
def plot_peak_sar_heatmap(self, *args, **kwargs):
    """Creates a heatmap for peak SAR values across tissues and frequencies."""
    return self.heatmap.plot_peak_sar_heatmap(*args, **kwargs)

plot_peak_location_3d_interactive

plot_peak_location_3d_interactive(*args, **kwargs)

Creates an interactive 3D plot of peak SAR locations.

Source code in goliat/analysis/plotter.py
def plot_peak_location_3d_interactive(self, *args, **kwargs):
    """Creates an interactive 3D plot of peak SAR locations."""
    return self.spatial.plot_peak_location_3d_interactive(*args, **kwargs)

plot_peak_location_2d_projections

plot_peak_location_2d_projections(*args, **kwargs)

Creates 2D scatter plots showing peak locations projected onto XY, XZ, YZ planes.

Source code in goliat/analysis/plotter.py
def plot_peak_location_2d_projections(self, *args, **kwargs):
    """Creates 2D scatter plots showing peak locations projected onto XY, XZ, YZ planes."""
    return self.spatial.plot_peak_location_2d_projections(*args, **kwargs)

plot_correlation_head_vs_eye_sar

plot_correlation_head_vs_eye_sar(*args, **kwargs)

Creates scatter plot showing correlation between Head SAR and Eye psSAR10g.

Source code in goliat/analysis/plotter.py
def plot_correlation_head_vs_eye_sar(self, *args, **kwargs):
    """Creates scatter plot showing correlation between Head SAR and Eye psSAR10g."""
    return self.correlation.plot_correlation_head_vs_eye_sar(*args, **kwargs)

plot_tissue_group_correlation_matrix

plot_tissue_group_correlation_matrix(*args, **kwargs)

Creates heatmap showing correlation coefficients between tissue group SAR values.

Source code in goliat/analysis/plotter.py
def plot_tissue_group_correlation_matrix(self, *args, **kwargs):
    """Creates heatmap showing correlation coefficients between tissue group SAR values."""
    return self.correlation.plot_tissue_group_correlation_matrix(*args, **kwargs)

plot_bubble_mass_vs_sar

plot_bubble_mass_vs_sar(*args, **kwargs)

Creates bubble plot showing how tissue mass affects SAR values.

Source code in goliat/analysis/plotter.py
def plot_bubble_mass_vs_sar(self, *args, **kwargs):
    """Creates bubble plot showing how tissue mass affects SAR values."""
    return self.bubble.plot_bubble_mass_vs_sar(*args, **kwargs)

plot_bubble_mass_vs_sar_interactive

plot_bubble_mass_vs_sar_interactive(*args, **kwargs)

Creates an interactive bubble plot with common axis limits across frequencies.

Source code in goliat/analysis/plotter.py
def plot_bubble_mass_vs_sar_interactive(self, *args, **kwargs):
    """Creates an interactive bubble plot with common axis limits across frequencies."""
    return self.bubble.plot_bubble_mass_vs_sar_interactive(*args, **kwargs)

plot_top20_tissues_ranking

plot_top20_tissues_ranking(*args, **kwargs)

Creates horizontal bar chart showing top 20 tissues ranked by various metrics.

Source code in goliat/analysis/plotter.py
def plot_top20_tissues_ranking(self, *args, **kwargs):
    """Creates horizontal bar chart showing top 20 tissues ranked by various metrics."""
    return self.ranking.plot_top20_tissues_ranking(*args, **kwargs)
plot_power_efficiency_trends(*args, **kwargs)

Creates line plot showing antenna efficiency and power component percentages.

Source code in goliat/analysis/plotter.py
def plot_power_efficiency_trends(self, *args, **kwargs):
    """Creates line plot showing antenna efficiency and power component percentages."""
    return self.power.plot_power_efficiency_trends(*args, **kwargs)

plot_power_absorption_distribution

plot_power_absorption_distribution(*args, **kwargs)

Creates pie chart or stacked bar chart showing power distribution across tissue groups.

Source code in goliat/analysis/plotter.py
def plot_power_absorption_distribution(self, *args, **kwargs):
    """Creates pie chart or stacked bar chart showing power distribution across tissue groups."""
    return self.power.plot_power_absorption_distribution(*args, **kwargs)

plot_power_balance_overview

plot_power_balance_overview(*args, **kwargs)

Creates comprehensive power balance overview heatmap.

Source code in goliat/analysis/plotter.py
def plot_power_balance_overview(self, *args, **kwargs):
    """Creates comprehensive power balance overview heatmap."""
    return self.power.plot_power_balance_overview(*args, **kwargs)

plot_penetration_depth_ratio

plot_penetration_depth_ratio(*args, **kwargs)

Creates line plot showing SAR penetration depth ratio (Brain/Skin) vs frequency.

Source code in goliat/analysis/plotter.py
def plot_penetration_depth_ratio(self, *args, **kwargs):
    """Creates line plot showing SAR penetration depth ratio (Brain/Skin) vs frequency."""
    return self.penetration.plot_penetration_depth_ratio(*args, **kwargs)

plot_max_local_vs_pssar10g_scatter

plot_max_local_vs_pssar10g_scatter(*args, **kwargs)

Creates scatter plot showing relationship between Max Local SAR and psSAR10g.

Source code in goliat/analysis/plotter.py
def plot_max_local_vs_pssar10g_scatter(self, *args, **kwargs):
    """Creates scatter plot showing relationship between Max Local SAR and psSAR10g."""
    return self.tissue_analysis.plot_max_local_vs_pssar10g_scatter(*args, **kwargs)

plot_tissue_frequency_response

plot_tissue_frequency_response(*args, **kwargs)

Creates line plot showing how a specific tissue responds across frequencies.

Source code in goliat/analysis/plotter.py
def plot_tissue_frequency_response(self, *args, **kwargs):
    """Creates line plot showing how a specific tissue responds across frequencies."""
    return self.tissue_analysis.plot_tissue_frequency_response(*args, **kwargs)

plot_tissue_mass_volume_distribution

plot_tissue_mass_volume_distribution(*args, **kwargs)

Creates histograms and scatter plot showing tissue mass and volume distributions.

Source code in goliat/analysis/plotter.py
def plot_tissue_mass_volume_distribution(self, *args, **kwargs):
    """Creates histograms and scatter plot showing tissue mass and volume distributions."""
    return self.tissue_analysis.plot_tissue_mass_volume_distribution(*args, **kwargs)

plot_cdf

plot_cdf(*args, **kwargs)

Creates CDF plot for a metric with optional aggregation by independent variables.

Source code in goliat/analysis/plotter.py
def plot_cdf(self, *args, **kwargs):
    """Creates CDF plot for a metric with optional aggregation by independent variables."""
    return self.cdf.plot_cdf(*args, **kwargs)

identify_outliers

identify_outliers(*args, **kwargs)

Identifies and visualizes outliers in SAR metrics.

Source code in goliat/analysis/plotter.py
def identify_outliers(self, *args, **kwargs):
    """Identifies and visualizes outliers in SAR metrics."""
    return self.outliers.identify_outliers(*args, **kwargs)

GUI Components

Graphical user interface for monitoring simulation progress.

Main GUI

goliat.gui.progress_gui

ProgressGUI main window component.

Classes

ProgressGUI

ProgressGUI(queue: Queue, stop_event: Event, process: Process, init_window_title: str = '')

Bases: QWidget

Main GUI window for monitoring simulation progress.

Provides real-time progress tracking via progress bars, ETA estimation, and status logs. Runs in the main process and communicates with worker process through a multiprocessing queue.

The GUI architecture: - Main window runs in main process, worker runs in separate process - Communication via multiprocessing.Queue for thread-safe message passing - QueueHandler polls queue every 100ms and updates UI accordingly - Multiple timers handle different update frequencies (queue, clock, graphs)

Features: - Overall progress bar (weighted across all simulations) - Stage progress bar (current phase: setup/run/extract) - Real-time ETA calculation based on profiler estimates - Status log with color-coded messages - Timings table showing execution statistics - Pie charts showing phase/subtask breakdowns - Time series plots for progress and ETA trends - System tray integration for background operation

Initializes data manager, status manager, UI builder, timers, and queue handler. Sets up Qt timers for periodic updates (queue polling, clock updates, graph refreshes).

Parameters:

Name Type Description Default
queue Queue

Queue for receiving messages from worker process.

required
stop_event Event

Event to signal termination to worker process.

required
process Process

Worker process running the study.

required
init_window_title str

Initial window title.

''
Source code in goliat/gui/progress_gui.py
def __init__(
    self,
    queue: Queue,
    stop_event: Event,
    process: Process,
    init_window_title: str = "",
) -> None:
    """Sets up the GUI window and all components.

    Initializes data manager, status manager, UI builder, timers, and
    queue handler. Sets up Qt timers for periodic updates (queue polling,
    clock updates, graph refreshes).

    Args:
        queue: Queue for receiving messages from worker process.
        stop_event: Event to signal termination to worker process.
        process: Worker process running the study.
        init_window_title: Initial window title.
    """
    super().__init__()
    self.queue: Queue = queue
    self.stop_event: Event = stop_event
    self.process: Process = process
    self.start_time: float = time.monotonic()
    self.progress_logger: logging.Logger = logging.getLogger("progress")
    self.verbose_logger: logging.Logger = logging.getLogger("verbose")
    self.init_window_title: str = init_window_title
    self.DEBUG: bool = False
    self.study_is_finished: bool = False
    self.study_had_errors: bool = False

    self.total_simulations: int = 0
    self.current_simulation_count: int = 0

    # Initialize components
    self._initialize_components()

    # Auto-detect machine ID
    self.machine_id = MachineIdDetector.detect(self.verbose_logger)
    self.server_url = "https://monitor.goliat.waves-ugent.be"

    # Build UI
    UIBuilder.build(self, self.status_manager)

    # Initialize managers
    self.web_bridge_manager = WebBridgeManager(self, self.server_url, self.machine_id)
    self.progress_manager = ProgressManager(self)
    self.clock_manager = ClockManager(self)
    self.utilization_manager = UtilizationManager(self)
    self.graph_manager = GraphManager(self)

    # Initialize web GUI bridge after UI is built (so we can set callback)
    self.web_bridge_manager.initialize()

    # Initialize animation and other components
    self._initialize_animation()
    self._initialize_managers()
    self._setup_timers()
    self._initialize_system_monitoring()
Functions
update_overall_progress
update_overall_progress(current_step: float, total_steps: int) -> None

Updates overall progress bar across all simulations.

Source code in goliat/gui/progress_gui.py
def update_overall_progress(self, current_step: float, total_steps: int) -> None:
    """Updates overall progress bar across all simulations."""
    self.progress_manager.update_overall(current_step, total_steps)
update_stage_progress
update_stage_progress(stage_name: str, current_step: int, total_steps: int, sub_stage: str = '') -> None

Updates stage-specific progress bar and label.

Source code in goliat/gui/progress_gui.py
def update_stage_progress(self, stage_name: str, current_step: int, total_steps: int, sub_stage: str = "") -> None:
    """Updates stage-specific progress bar and label."""
    self.progress_manager.update_stage(stage_name, current_step, total_steps, sub_stage)
start_stage_animation
start_stage_animation(estimated_duration: float, end_step: int) -> None

Starts smooth animated progress bar for a stage.

Instead of jumping to discrete progress values, animates smoothly over the estimated duration. This provides visual feedback during long-running tasks where progress updates are infrequent.

The animation uses linear interpolation between current value and target (always 100% = 1000). Updates every 50ms via Qt timer.

Parameters:

Name Type Description Default
estimated_duration float

Estimated task duration in seconds (from profiler).

required
end_step int

Target step value (unused, always animates to 100%).

required
Source code in goliat/gui/progress_gui.py
def start_stage_animation(self, estimated_duration: float, end_step: int) -> None:
    """Starts smooth animated progress bar for a stage.

    Instead of jumping to discrete progress values, animates smoothly over
    the estimated duration. This provides visual feedback during long-running
    tasks where progress updates are infrequent.

    The animation uses linear interpolation between current value and target
    (always 100% = 1000). Updates every 50ms via Qt timer.

    Args:
        estimated_duration: Estimated task duration in seconds (from profiler).
        end_step: Target step value (unused, always animates to 100%).
    """
    if self.DEBUG:
        self.update_status(f"DEBUG: start_stage_animation received: duration={estimated_duration:.2f}s, end_step={end_step}")
    self.progress_animation.start(estimated_duration, end_step)
end_stage_animation
end_stage_animation() -> None

Stops stage progress bar animation.

Source code in goliat/gui/progress_gui.py
def end_stage_animation(self) -> None:
    """Stops stage progress bar animation."""
    self.progress_animation.stop()
update_animation
update_animation() -> None

Updates progress bar animation frame and syncs overall progress.

Called every 50ms by Qt timer when animation is active. Calculates current progress based on elapsed time and estimated duration, then updates stage progress bar. Also syncs overall progress bar using weighted progress calculation from profiler.

Source code in goliat/gui/progress_gui.py
def update_animation(self) -> None:
    """Updates progress bar animation frame and syncs overall progress.

    Called every 50ms by Qt timer when animation is active. Calculates
    current progress based on elapsed time and estimated duration, then
    updates stage progress bar. Also syncs overall progress bar using
    weighted progress calculation from profiler.
    """
    self.progress_animation.update()

    # Sync overall progress based on stage animation
    if self.profiler and self.profiler.current_phase:
        current_value = self.stage_progress_bar.value()
        percent = (current_value / 1000) * 100
        progress = self.profiler.get_weighted_progress(self.profiler.current_phase, percent / 100.0)
        self.progress_manager.update_overall(progress, 100)
update_simulation_details
update_simulation_details(sim_count: int, total_sims: int, details: str) -> None

Updates simulation counter and details labels.

Source code in goliat/gui/progress_gui.py
def update_simulation_details(self, sim_count: int, total_sims: int, details: str) -> None:
    """Updates simulation counter and details labels."""
    self.progress_manager.update_simulation_details(sim_count, total_sims, details)
update_status
update_status(message: str, log_type: str = 'default') -> None

Appends message to status log with color formatting.

Parameters:

Name Type Description Default
message str

Message text.

required
log_type str

Log type for color coding.

'default'
Source code in goliat/gui/progress_gui.py
def update_status(self, message: str, log_type: str = "default") -> None:
    """Appends message to status log with color formatting.

    Args:
        message: Message text.
        log_type: Log type for color coding.
    """
    self.status_manager.record_log(log_type)
    # Update error counter with current web status
    web_connected = False
    if (
        hasattr(self, "web_bridge_manager")
        and self.web_bridge_manager.web_bridge
        and hasattr(self.web_bridge_manager.web_bridge, "is_connected")
    ):
        web_connected = self.web_bridge_manager.web_bridge.is_connected
    self.error_counter_label.setText(self.status_manager.get_error_summary(web_connected=web_connected))
    formatted_message = self.status_manager.format_message(message, log_type)
    self.status_text.append(formatted_message)
update_utilization
update_utilization() -> None

Updates CPU, RAM, and GPU utilization displays.

Source code in goliat/gui/progress_gui.py
def update_utilization(self) -> None:
    """Updates CPU, RAM, and GPU utilization displays."""
    self.utilization_manager.update()
update_utilization_plot
update_utilization_plot() -> None

Updates the system utilization plot with current values.

Source code in goliat/gui/progress_gui.py
def update_utilization_plot(self) -> None:
    """Updates the system utilization plot with current values."""
    self.utilization_manager.update_plot()
update_clock
update_clock() -> None

Updates elapsed time, ETA labels, and window title.

Source code in goliat/gui/progress_gui.py
def update_clock(self) -> None:
    """Updates elapsed time, ETA labels, and window title."""
    self.clock_manager.update()
update_graphs
update_graphs() -> None

Updates time remaining and overall progress graphs.

Source code in goliat/gui/progress_gui.py
def update_graphs(self) -> None:
    """Updates time remaining and overall progress graphs."""
    self.graph_manager.update()
hide_to_tray
hide_to_tray() -> None

Hides main window and shows system tray icon.

Source code in goliat/gui/progress_gui.py
def hide_to_tray(self) -> None:
    """Hides main window and shows system tray icon."""
    self.hide()
    self.tray_manager.show()
show_from_tray
show_from_tray() -> None

Shows main window from system tray.

Source code in goliat/gui/progress_gui.py
def show_from_tray(self) -> None:
    """Shows main window from system tray."""
    self.show()
    self.tray_manager.hide()
stop_study
stop_study() -> None

Sends stop signal to worker process.

Source code in goliat/gui/progress_gui.py
def stop_study(self) -> None:
    """Sends stop signal to worker process."""
    message = "--- Sending stop signal to study process ---"
    self.progress_logger.info(message, extra={"log_type": "warning"})
    self.verbose_logger.info(message, extra={"log_type": "warning"})
    self.update_status(message, log_type="warning")
    self.stop_button.setEnabled(False)
    self.tray_button.setEnabled(False)
    self.stop_event.set()
study_finished
study_finished(error: bool = False) -> None

Handles study completion, stopping timers and updating UI.

Called when worker process signals completion. Stops all timers, updates final progress to 100%, sets stage label, and schedules window auto-close after 3 seconds (if no errors).

Parameters:

Name Type Description Default
error bool

Whether study finished with errors (affects UI styling).

False
Source code in goliat/gui/progress_gui.py
def study_finished(self, error: bool = False) -> None:
    """Handles study completion, stopping timers and updating UI.

    Called when worker process signals completion. Stops all timers,
    updates final progress to 100%, sets stage label, and schedules
    window auto-close after 3 seconds (if no errors).

    Args:
        error: Whether study finished with errors (affects UI styling).
    """
    self.study_is_finished = True
    self.study_had_errors = error
    self.clock_timer.stop()
    self.queue_timer.stop()
    self.graph_timer.stop()
    self.utilization_timer.stop()
    self.utilization_plot_timer.stop()
    self.progress_sync_timer.stop()
    self.progress_animation.stop()
    if not error:
        self.update_status("--- Study Finished ---", log_type="success")
        self.overall_progress_bar.setValue(self.overall_progress_bar.maximum())
        self.stage_label.setText("Finished")
    else:
        self.update_status("--- Study Finished with Errors ---", log_type="fatal")
        self.stage_label.setText("Error")

    self.stop_button.setEnabled(False)
    self.tray_button.setEnabled(False)

    # Send final status update to web before stopping bridge
    self.web_bridge_manager.send_finished(error)

    self.update_clock()  # Final title update

    # Instead of auto-closing, show a message that user can close the window
    if not error:
        self.update_status("\n✓ All done! You may close this window now.", log_type="success")
    else:
        self.update_status("\n✓ Finished with errors. You may close this window now.", log_type="warning")
closeEvent
closeEvent(event: Any) -> None

Handles window close event, ensuring worker process termination.

Parameters:

Name Type Description Default
event Any

Close event.

required
Source code in goliat/gui/progress_gui.py
def closeEvent(self, event: Any) -> None:
    """Handles window close event, ensuring worker process termination.

    Args:
        event: Close event.
    """
    if self.tray_manager.is_visible():
        self.tray_manager.hide()

    if self.process.is_alive():
        self.progress_logger.info("Terminating study process...", extra={"log_type": "warning"})
        self.process.terminate()
        self.process.join(timeout=5)

    # Stop web bridge if enabled
    self.web_bridge_manager.stop()

    shutdown_loggers()
    event.accept()

Functions

GUI Communication

goliat.gui.queue_gui

QueueGUI proxy for worker process communication.

Classes

QueueGUI

QueueGUI(queue: Queue, stop_event: Event, profiler: Profiler, progress_logger: Logger, verbose_logger: Logger)

Bases: LoggingMixin

Proxy for ProgressGUI that operates in a separate process.

Mimics the ProgressGUI interface but routes all calls through a multiprocessing queue, enabling thread-safe communication between worker and GUI processes. All methods serialize their arguments and send them via queue for the GUI process to handle.

Parameters:

Name Type Description Default
queue Queue

Multiprocessing queue for IPC.

required
stop_event Event

Event flagging user cancellation.

required
profiler Profiler

Profiler for ETA calculations.

required
progress_logger Logger

Logger for progress-level messages.

required
verbose_logger Logger

Logger for detailed messages.

required
Source code in goliat/gui/queue_gui.py
def __init__(
    self,
    queue: Queue,
    stop_event: Event,
    profiler: "Profiler",
    progress_logger: Logger,
    verbose_logger: Logger,
) -> None:
    """Sets up the queue GUI proxy.

    Args:
        queue: Multiprocessing queue for IPC.
        stop_event: Event flagging user cancellation.
        profiler: Profiler for ETA calculations.
        progress_logger: Logger for progress-level messages.
        verbose_logger: Logger for detailed messages.
    """
    self.queue: Queue = queue
    self.stop_event: Event = stop_event
    self.profiler: "Profiler" = profiler
    self.progress_logger: Logger = progress_logger
    self.verbose_logger: Logger = verbose_logger
Functions
log
log(message: str, level: str = 'verbose', log_type: str = 'default') -> None

Sends a log message to the GUI via queue.

Only 'progress' level messages are forwarded to reduce queue traffic.

Parameters:

Name Type Description Default
message str

Log message text.

required
level str

Log level (only 'progress' forwarded).

'verbose'
log_type str

Type for color coding in GUI.

'default'
Source code in goliat/gui/queue_gui.py
def log(self, message: str, level: str = "verbose", log_type: str = "default") -> None:
    """Sends a log message to the GUI via queue.

    Only 'progress' level messages are forwarded to reduce queue traffic.

    Args:
        message: Log message text.
        level: Log level (only 'progress' forwarded).
        log_type: Type for color coding in GUI.
    """
    if level == "progress":
        import time

        # Use monotonic time with a small increment to ensure unique timestamps
        # even for messages sent in rapid succession
        if not hasattr(self, "_last_timestamp"):
            self._last_timestamp = 0.0
        current_time = time.time()
        # Ensure timestamp is always increasing, even if system clock jumps backward
        if current_time <= self._last_timestamp:
            self._last_timestamp += 0.000001  # 1 microsecond increment
        else:
            self._last_timestamp = current_time

        self.queue.put(
            {
                "type": "status",
                "message": message,
                "log_type": log_type,
                "timestamp": self._last_timestamp,  # Add timestamp when message is created for proper ordering
            }
        )
update_simulation_details
update_simulation_details(sim_count: int, total_sims: int, details: str) -> None

Sends current simulation case details to GUI.

Parameters:

Name Type Description Default
sim_count int

Current simulation number (1-indexed).

required
total_sims int

Total simulations in study.

required
details str

Human-readable description of current case.

required
Source code in goliat/gui/queue_gui.py
def update_simulation_details(self, sim_count: int, total_sims: int, details: str) -> None:
    """Sends current simulation case details to GUI.

    Args:
        sim_count: Current simulation number (1-indexed).
        total_sims: Total simulations in study.
        details: Human-readable description of current case.
    """
    self.queue.put(
        {
            "type": "sim_details",
            "count": sim_count,
            "total": total_sims,
            "details": details,
        }
    )
update_overall_progress
update_overall_progress(current_step: float, total_steps: int) -> None

Updates overall study progress bar.

Parameters:

Name Type Description Default
current_step float

Current step number or percentage (0-100).

required
total_steps int

Total steps in study.

required
Source code in goliat/gui/queue_gui.py
def update_overall_progress(self, current_step: float, total_steps: int) -> None:
    """Updates overall study progress bar.

    Args:
        current_step: Current step number or percentage (0-100).
        total_steps: Total steps in study.
    """
    self.queue.put({"type": "overall_progress", "current": current_step, "total": total_steps})
update_stage_progress
update_stage_progress(stage_name: str, current_step: int, total_steps: int, sub_stage: str = '') -> None

Updates progress for a specific stage (setup/run/extract).

Parameters:

Name Type Description Default
stage_name str

Stage name like 'Setup' or 'Running Simulation'.

required
current_step int

Current step within stage.

required
total_steps int

Total steps for stage.

required
sub_stage str

Optional sub-stage description.

''
Source code in goliat/gui/queue_gui.py
def update_stage_progress(self, stage_name: str, current_step: int, total_steps: int, sub_stage: str = "") -> None:
    """Updates progress for a specific stage (setup/run/extract).

    Args:
        stage_name: Stage name like 'Setup' or 'Running Simulation'.
        current_step: Current step within stage.
        total_steps: Total steps for stage.
        sub_stage: Optional sub-stage description.
    """
    self.queue.put(
        {
            "type": "stage_progress",
            "name": stage_name,
            "current": current_step,
            "total": total_steps,
            "sub_stage": sub_stage,
        }
    )
start_stage_animation
start_stage_animation(task_name: str, end_value: int) -> None

Starts animated progress bar for a stage.

Looks up time estimate from profiler and starts animation that progresses toward end_value over that duration.

Parameters:

Name Type Description Default
task_name str

Task name ('setup', 'run', 'extract', or subtask name).

required
end_value int

Target progress value (typically 100).

required
Source code in goliat/gui/queue_gui.py
def start_stage_animation(self, task_name: str, end_value: int) -> None:
    """Starts animated progress bar for a stage.

    Looks up time estimate from profiler and starts animation that
    progresses toward end_value over that duration.

    Args:
        task_name: Task name ('setup', 'run', 'extract', or subtask name).
        end_value: Target progress value (typically 100).
    """
    if task_name in ["setup", "run", "extract"]:
        estimate = self.profiler.profiling_config.get(f"avg_{task_name}_time", 60)
    else:
        estimate = self.profiler.get_subtask_estimate(task_name)
    self.queue.put({"type": "start_animation", "estimate": estimate, "end_value": end_value})
end_stage_animation
end_stage_animation() -> None

Stops the current animated progress bar.

Source code in goliat/gui/queue_gui.py
def end_stage_animation(self) -> None:
    """Stops the current animated progress bar."""
    self.queue.put({"type": "end_animation"})
update_profiler
update_profiler() -> None

Sends profiler state to GUI for ETA display.

Source code in goliat/gui/queue_gui.py
def update_profiler(self) -> None:
    """Sends profiler state to GUI for ETA display."""
    self.queue.put({"type": "profiler_update", "profiler": self.profiler})
process_events
process_events() -> None

No-op for interface compatibility with ProgressGUI.

Source code in goliat/gui/queue_gui.py
def process_events(self) -> None:
    """No-op for interface compatibility with ProgressGUI."""
    pass
is_stopped
is_stopped() -> bool

Checks if user requested cancellation via GUI.

Source code in goliat/gui/queue_gui.py
def is_stopped(self) -> bool:
    """Checks if user requested cancellation via GUI."""
    return self.stop_event.is_set()

GUI Components

goliat.gui.components.clock_manager

Clock and ETA management component.

Classes

ClockManager

ClockManager(gui: ProgressGUI)

Manages elapsed time, ETA, and window title updates.

Parameters:

Name Type Description Default
gui ProgressGUI

ProgressGUI instance.

required
Source code in goliat/gui/components/clock_manager.py
def __init__(self, gui: "ProgressGUI") -> None:
    """Initializes clock manager.

    Args:
        gui: ProgressGUI instance.
    """
    self.gui = gui
Functions
update
update() -> None

Updates elapsed time, ETA labels, and window title.

Called every second by Qt timer. Calculates elapsed time from start, gets ETA from profiler (if available), and updates window title with current status and progress percentage.

The window title shows: [progress%] GOLIAT | Sim X/Y | Status where Status is 'Booting...', 'Running...', or 'Finished'.

Source code in goliat/gui/components/clock_manager.py
def update(self) -> None:
    """Updates elapsed time, ETA labels, and window title.

    Called every second by Qt timer. Calculates elapsed time from start,
    gets ETA from profiler (if available), and updates window title with
    current status and progress percentage.

    The window title shows: [progress%] GOLIAT | Sim X/Y | Status
    where Status is 'Booting...', 'Running...', or 'Finished'.
    """
    elapsed_sec = time.monotonic() - self.gui.start_time
    self.gui.elapsed_label.setText(f"Elapsed: {format_time(elapsed_sec)}")

    eta_sec: Optional[float] = None
    if self.gui.profiler and self.gui.profiler.current_phase:
        current_stage_progress_ratio = self.gui.stage_progress_bar.value() / 1000.0
        eta_sec = self.gui.profiler.get_time_remaining(current_stage_progress=current_stage_progress_ratio)

        if eta_sec is not None:
            time_remaining_str = format_time(eta_sec)
            self.gui.eta_label.setText(f"Time Remaining: {time_remaining_str}")
        else:
            self.gui.eta_label.setText("Time Remaining: N/A")
    else:
        self.gui.eta_label.setText("Time Remaining: N/A")

    # Update window title with status
    progress_percent = max(0, self.gui.overall_progress_bar.value() / 100.0)
    title = self.gui.init_window_title
    if title:
        title += " | "
    title += f"[{progress_percent:.2f}%] GOLIAT"
    if self.gui.total_simulations > 0:
        title += f" | Sim {self.gui.current_simulation_count}/{self.gui.total_simulations}"

    # Determine status based on actual activity
    if self.gui.study_is_finished:
        status = "Finished" if not self.gui.study_had_errors else "Finished with Errors"
    elif progress_percent > 0 or self.gui.current_simulation_count > 0:
        status = "Running..."
    else:
        status = "Booting..."

    title += f" | {status}"
    self.gui.setWindowTitle(title)

    # Update web connection status indicator periodically
    if hasattr(self.gui, "web_bridge_manager") and self.gui.web_bridge_manager and self.gui.web_bridge_manager.web_bridge:
        if hasattr(self.gui.web_bridge_manager.web_bridge, "is_connected"):
            if hasattr(self.gui, "error_counter_label") and hasattr(self.gui, "status_manager"):
                self.gui._update_web_status(self.gui.web_bridge_manager.web_bridge.is_connected)

Functions

goliat.gui.components.data_manager

Data management for GUI: CSV file handling and cleanup.

Classes

DataManager

DataManager(data_dir: str, verbose_logger: Logger)

Manages CSV data files for time remaining and overall progress tracking.

Writes timestamped data points to CSV files for plotting and analysis. Automatically cleans up old files (keeps last 50) to prevent disk bloat. Creates unique session files using timestamp and process hash.

Parameters:

Name Type Description Default
data_dir str

Directory where data files will be stored.

required
verbose_logger Logger

Logger for verbose messages.

required
Source code in goliat/gui/components/data_manager.py
def __init__(self, data_dir: str, verbose_logger: Logger) -> None:
    """Sets up data manager with session-specific CSV files.

    Args:
        data_dir: Directory where data files will be stored.
        verbose_logger: Logger for verbose messages.
    """
    self.data_dir: str = data_dir
    self.verbose_logger: Logger = verbose_logger
    self.session_hash: str = hashlib.md5(f"{time.time()}_{os.getpid()}".encode()).hexdigest()[:8]
    session_timestamp = datetime.now().strftime("%d-%m_%H-%M-%S")

    # Cleanup old CSV files before creating new ones
    self._cleanup_old_data_files()

    self.time_remaining_file: str = os.path.join(self.data_dir, f"time_remaining_{session_timestamp}_{self.session_hash}.csv")
    self.overall_progress_file: str = os.path.join(self.data_dir, f"overall_progress_{session_timestamp}_{self.session_hash}.csv")
    self.system_utilization_file: str = os.path.join(self.data_dir, f"system_utilization_{session_timestamp}_{self.session_hash}.csv")

    # Initialize data files
    self._initialize_files()
Functions
write_time_remaining
write_time_remaining(hours_remaining: float) -> None

Appends a time remaining data point to CSV.

Writes timestamp and hours remaining to session-specific CSV file. Used for plotting ETA trends over time.

Parameters:

Name Type Description Default
hours_remaining float

Estimated hours remaining as float.

required
Source code in goliat/gui/components/data_manager.py
def write_time_remaining(self, hours_remaining: float) -> None:
    """Appends a time remaining data point to CSV.

    Writes timestamp and hours remaining to session-specific CSV file.
    Used for plotting ETA trends over time.

    Args:
        hours_remaining: Estimated hours remaining as float.
    """
    self._write_csv_row(self.time_remaining_file, hours_remaining, "time remaining")
write_overall_progress
write_overall_progress(progress_percent: float) -> None

Appends an overall progress data point to CSV.

Writes timestamp and progress percentage to session-specific CSV file. Used for plotting progress trends over time.

Parameters:

Name Type Description Default
progress_percent float

Overall progress percentage (0-100).

required
Source code in goliat/gui/components/data_manager.py
def write_overall_progress(self, progress_percent: float) -> None:
    """Appends an overall progress data point to CSV.

    Writes timestamp and progress percentage to session-specific CSV file.
    Used for plotting progress trends over time.

    Args:
        progress_percent: Overall progress percentage (0-100).
    """
    self._write_csv_row(self.overall_progress_file, progress_percent, "overall progress")
write_system_utilization
write_system_utilization(cpu_percent: float, ram_percent: float, gpu_percent: Optional[float] = None, gpu_vram_percent: Optional[float] = None) -> None

Appends a system utilization data point to CSV.

Writes timestamp and CPU, RAM, GPU utilization, and GPU VRAM percentages to session-specific CSV file. Used for plotting utilization trends over time.

Parameters:

Name Type Description Default
cpu_percent float

CPU utilization percentage (0-100).

required
ram_percent float

RAM utilization percentage (0-100).

required
gpu_percent Optional[float]

GPU utilization percentage (0-100), or None if unavailable.

None
gpu_vram_percent Optional[float]

GPU VRAM utilization percentage (0-100), or None if unavailable.

None
Source code in goliat/gui/components/data_manager.py
def write_system_utilization(
    self,
    cpu_percent: float,
    ram_percent: float,
    gpu_percent: Optional[float] = None,
    gpu_vram_percent: Optional[float] = None,
) -> None:
    """Appends a system utilization data point to CSV.

    Writes timestamp and CPU, RAM, GPU utilization, and GPU VRAM
    percentages to session-specific CSV file. Used for plotting utilization trends over time.

    Args:
        cpu_percent: CPU utilization percentage (0-100).
        ram_percent: RAM utilization percentage (0-100).
        gpu_percent: GPU utilization percentage (0-100), or None if unavailable.
        gpu_vram_percent: GPU VRAM utilization percentage (0-100), or None if unavailable.
    """
    try:
        current_time = get_ntp_utc_time()  # Use NTP time (bypasses system clock issues)
        with open(self.system_utilization_file, "a", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(
                [
                    current_time.isoformat(),
                    cpu_percent,
                    ram_percent,
                    gpu_percent if gpu_percent is not None else "",
                    gpu_vram_percent if gpu_vram_percent is not None else "",
                ]
            )
    except Exception as e:
        self.verbose_logger.error(f"Failed to write system utilization data: {e}")

Functions

goliat.gui.components.graph_manager

Graph update management component.

Classes

GraphManager

GraphManager(gui: ProgressGUI)

Manages time remaining and overall progress graph updates.

Parameters:

Name Type Description Default
gui ProgressGUI

ProgressGUI instance.

required
Source code in goliat/gui/components/graph_manager.py
def __init__(self, gui: "ProgressGUI") -> None:
    """Initializes graph manager.

    Args:
        gui: ProgressGUI instance.
    """
    self.gui = gui
Functions
update
update() -> None

Updates time remaining, overall progress, and system utilization graphs (called every 5 seconds).

Gets current ETA and progress values, writes them to CSV files (via DataManager), and adds data points to matplotlib plots. The plots show trends over time, helping users see if ETA is converging or progress is steady.

This runs less frequently than clock updates (5s vs 1s) because plotting is more expensive and the trends don't need millisecond precision.

Source code in goliat/gui/components/graph_manager.py
def update(self) -> None:
    """Updates time remaining, overall progress, and system utilization graphs (called every 5 seconds).

    Gets current ETA and progress values, writes them to CSV files (via
    DataManager), and adds data points to matplotlib plots. The plots
    show trends over time, helping users see if ETA is converging or
    progress is steady.

    This runs less frequently than clock updates (5s vs 1s) because
    plotting is more expensive and the trends don't need millisecond
    precision.
    """
    # Get current ETA
    eta_sec: Optional[float] = None
    if self.gui.profiler and self.gui.profiler.current_phase:
        current_stage_progress_ratio = self.gui.stage_progress_bar.value() / 1000.0
        eta_sec = self.gui.profiler.get_time_remaining(current_stage_progress=current_stage_progress_ratio)

    # Get current progress
    progress_percent = max(0, self.gui.overall_progress_bar.value() / 100.0)

    # Update time remaining data
    if eta_sec is not None:
        current_time = get_ntp_utc_time()  # Use NTP time (bypasses system clock issues)
        hours_remaining = eta_sec / 3600.0
        self.gui.data_manager.write_time_remaining(hours_remaining)
        self.gui.time_remaining_plot.add_data_point(current_time, hours_remaining)

    # Update overall progress data
    current_time = get_ntp_utc_time()  # Use NTP time (bypasses system clock issues)
    self.gui.data_manager.write_overall_progress(progress_percent)
    self.gui.overall_progress_plot.add_data_point(current_time, progress_percent)

Functions

goliat.gui.components.machine_id_detector

Machine ID detection utility.

Classes

MachineIdDetector

Detects machine ID (public IP or local IP) for web monitoring.

Tries external service first with retries, then falls back to local IP. Matches the logic in run_worker.py to ensure consistency.

Functions
detect staticmethod
detect(verbose_logger: Logger) -> Optional[str]

Auto-detects machine ID (public IP or local IP).

Parameters:

Name Type Description Default
verbose_logger Logger

Logger for verbose messages.

required

Returns:

Type Description
Optional[str]

Machine ID string, or None if detection failed.

Source code in goliat/gui/components/machine_id_detector.py
@staticmethod
def detect(verbose_logger: Logger) -> Optional[str]:
    """Auto-detects machine ID (public IP or local IP).

    Args:
        verbose_logger: Logger for verbose messages.

    Returns:
        Machine ID string, or None if detection failed.
    """
    try:
        import requests

        # Try external service first with retries (matches run_worker.py)
        public_ip = None
        for attempt in range(3):  # Try up to 3 times
            try:
                response = requests.get("https://api.ipify.org", timeout=10)
                if response.status_code == 200:
                    public_ip = response.text.strip()
                    if public_ip:
                        break
            except Exception:
                if attempt < 2:  # Not the last attempt
                    continue
                # Last attempt failed, will fall through to local IP

        if public_ip:
            verbose_logger.info(f"Auto-detected public IP: {public_ip}")
            return public_ip
        else:
            # Fallback to local IP
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(("8.8.8.8", 80))
            local_ip = s.getsockname()[0]
            s.close()
            verbose_logger.info(f"Auto-detected local IP: {local_ip}")
            return local_ip
    except Exception as e:
        verbose_logger.warning(f"Could not auto-detect machine ID: {e}")
        return None

goliat.gui.components.plots._matplotlib_imports

Matplotlib imports with fallback handling for plotting components.

goliat.gui.components.plots.overall_progress_plot

Overall progress plot component for GUI.

Classes

OverallProgressPlot

OverallProgressPlot()

Manages overall progress plot with real-time updates.

Creates a matplotlib line plot showing progress percentage trends over time. Updates dynamically as new data points arrive. Uses green color scheme to distinguish from time remaining plot. Y-axis fixed at 0-100%.

Source code in goliat/gui/components/plots/overall_progress_plot.py
def __init__(self) -> None:
    """Sets up matplotlib figure and axes with dark theme."""
    if Figure is None or FigureCanvas is None:
        raise ImportError("matplotlib is required for plotting")
    from matplotlib.figure import Figure as _Figure
    from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as _FigureCanvas
    from matplotlib.axes import Axes as _Axes

    self.figure: _Figure = _Figure(figsize=(10, 6), facecolor="#2b2b2b")
    self.canvas: _FigureCanvas = _FigureCanvas(self.figure)
    self.ax: _Axes = self.figure.add_subplot(111)
    self.data: List[Tuple[datetime, float]] = []
    self.max_progress_seen: float = 0.0
    self._setup()
Functions
add_data_point
add_data_point(timestamp: datetime, progress_percent: float) -> None

Adds data point and refreshes plot.

Parameters:

Name Type Description Default
timestamp datetime

Timestamp for the data point.

required
progress_percent float

Progress percentage as float.

required
Source code in goliat/gui/components/plots/overall_progress_plot.py
def add_data_point(self, timestamp: datetime, progress_percent: float) -> None:
    """Adds data point and refreshes plot.

    Args:
        timestamp: Timestamp for the data point.
        progress_percent: Progress percentage as float.
    """
    if progress_percent > self.max_progress_seen:
        self.max_progress_seen = progress_percent
    # Convert timestamp to UTC+1
    utc_plus_one_timestamp = convert_to_utc_plus_one(timestamp)
    self.data.append((utc_plus_one_timestamp, progress_percent))
    self._refresh()

Functions

goliat.gui.components.plots.pie_charts_manager

Pie charts manager component for GUI.

Classes

PieChartsManager

PieChartsManager()

Manages four pie charts displaying timing breakdowns by phase and subtask.

Shows visual breakdown of execution time: - Top-left: Phase weights (setup/run/extract relative durations) - Top-right: Setup subtasks breakdown - Bottom-left: Run subtasks breakdown - Bottom-right: Extract subtasks breakdown

Updates automatically when profiler state changes. Filters out fake aggregated entries. Uses color palette for visual distinction.

Source code in goliat/gui/components/plots/pie_charts_manager.py
def __init__(self) -> None:
    """Sets up matplotlib figure with 2x2 subplot grid."""
    if Figure is None or FigureCanvas is None:
        raise ImportError("matplotlib is required for plotting")
    from matplotlib.figure import Figure as _Figure
    from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as _FigureCanvas
    from matplotlib.axes import Axes as _Axes

    self.figure: _Figure = _Figure(figsize=(12, 10), facecolor="#2b2b2b")
    self.canvas: _FigureCanvas = _FigureCanvas(self.figure)
    self.axes: List[_Axes] = [
        self.figure.add_subplot(221),  # Top-left: Phase weights
        self.figure.add_subplot(222),  # Top-right: Setup subtasks
        self.figure.add_subplot(223),  # Bottom-left: Run subtasks
        self.figure.add_subplot(224),  # Bottom-right: Extract subtasks
    ]
    self._setup()
Functions
update
update(profiler: Profiler) -> None

Updates pie charts with timing data from profiler.

Collects phase weights and subtask timing data, filters out fake aggregated entries, and renders pie charts with percentages. Charts show relative time spent in each phase/subtask, helping identify bottlenecks.

Parameters:

Name Type Description Default
profiler Profiler

Profiler instance containing timing data.

required
Source code in goliat/gui/components/plots/pie_charts_manager.py
def update(self, profiler: "Profiler") -> None:
    """Updates pie charts with timing data from profiler.

    Collects phase weights and subtask timing data, filters out fake
    aggregated entries, and renders pie charts with percentages. Charts
    show relative time spent in each phase/subtask, helping identify
    bottlenecks.

    Args:
        profiler: Profiler instance containing timing data.
    """
    if not profiler:
        return

    colors = ["#ff6b6b", "#4ecdc4", "#45b7d1", "#f9ca24", "#6c5ce7", "#00b894", "#fdcb6e", "#e17055"]

    # Chart 0 (Top-left): Phase Weights
    ax0 = self.axes[0]
    ax0.clear()
    ax0.set_facecolor("#2b2b2b")

    # Get phase weights/times
    phase_weights: Dict[str, float] = {}
    for phase in ["setup", "run", "extract"]:
        avg_time = profiler.profiling_config.get(f"avg_{phase}_time")
        if avg_time is not None:
            phase_weights[phase.capitalize()] = avg_time

    if phase_weights:
        labels = list(phase_weights.keys())
        sizes = list(phase_weights.values())

        # Group small slices into "Others"
        labels, sizes = self._group_small_slices(labels, sizes, threshold_percent=3.0)

        pie_result = ax0.pie(
            sizes,
            labels=labels,
            autopct="%1.1f%%",
            startangle=90,
            colors=["#ff6b6b", "#4ecdc4", "#45b7d1"],
            textprops={"color": "#f0f0f0", "fontsize": 10},
        )

        autotexts = pie_result[2] if len(pie_result) > 2 else []
        for autotext in autotexts:
            autotext.set_color("#2b2b2b")
            autotext.set_fontweight("bold")
            autotext.set_fontsize(9)

        ax0.set_title("Phase Weights", fontsize=12, color="#f0f0f0", pad=10)
    else:
        ax0.text(0.5, 0.5, "No data", ha="center", va="center", fontsize=12, color="#f0f0f0", transform=ax0.transAxes)
        ax0.set_title("Phase Weights", fontsize=12, color="#f0f0f0", pad=10)
        # Hide axes when showing "No data"
        ax0.set_xticks([])
        ax0.set_yticks([])
        ax0.spines["top"].set_visible(False)
        ax0.spines["right"].set_visible(False)
        ax0.spines["bottom"].set_visible(False)
        ax0.spines["left"].set_visible(False)

    # Charts 1-3: Subtasks for each phase
    phases = ["setup", "run", "extract"]
    phase_titles = ["Setup Subtasks", "Run Subtasks", "Extract Subtasks"]

    for i, (phase, title) in enumerate(zip(phases, phase_titles), start=1):
        ax = self.axes[i]
        ax.clear()
        ax.set_facecolor("#2b2b2b")

        # Collect subtask data for this phase
        # Filter out fake aggregated entries
        fake_entries = ["simulation", "simulation_total", "results_total"]

        subtask_data: Dict[str, float] = {}
        for key, value in profiler.profiling_config.items():
            if key.startswith(f"avg_{phase}_") and key != f"avg_{phase}_time":
                # Extract the subtask name (everything after "avg_{phase}_")
                subtask_key = key.replace(f"avg_{phase}_", "")

                # Skip fake aggregated entries
                if subtask_key in fake_entries:
                    continue

                task_name = self._format_task_label(subtask_key)
                subtask_data[task_name] = value

        if subtask_data:
            labels = list(subtask_data.keys())
            sizes = list(subtask_data.values())

            # Group small slices into "Others"
            labels, sizes = self._group_small_slices(labels, sizes, threshold_percent=3.0)

            # Create pie chart
            pie_result = ax.pie(
                sizes,
                labels=labels,
                autopct="%1.1f%%",
                startangle=90,
                colors=colors[: len(labels)],
                textprops={"color": "#f0f0f0", "fontsize": 9},
            )

            # Unpack result safely
            autotexts = pie_result[2] if len(pie_result) > 2 else []

            # Enhance text visibility
            for autotext in autotexts:
                autotext.set_color("#2b2b2b")
                autotext.set_fontweight("bold")
                autotext.set_fontsize(8)

            ax.set_title(title, fontsize=12, color="#f0f0f0", pad=10)
        else:
            ax.text(0.5, 0.5, "No data", ha="center", va="center", fontsize=12, color="#f0f0f0", transform=ax.transAxes)
            ax.set_title(title, fontsize=12, color="#f0f0f0", pad=10)
            # Hide axes when showing "No data"
            ax.set_xticks([])
            ax.set_yticks([])
            ax.spines["top"].set_visible(False)
            ax.spines["right"].set_visible(False)
            ax.spines["bottom"].set_visible(False)
            ax.spines["left"].set_visible(False)

    self.figure.tight_layout()
    self.canvas.draw()

goliat.gui.components.plots.system_utilization_plot

System utilization plot component for GUI.

Classes

SystemUtilizationPlot

SystemUtilizationPlot()

Manages system utilization plot with real-time updates.

Creates a matplotlib line plot showing CPU, RAM, GPU utilization, and GPU VRAM utilization percentages over time. Updates dynamically as new data points arrive. Y-axis extends to 105% (with ticks at 0, 20, 40, 60, 80, 100) to prevent clipping of lines at 100%. GPU lines only shown if GPU is available.

Source code in goliat/gui/components/plots/system_utilization_plot.py
def __init__(self) -> None:
    """Sets up matplotlib figure and axes with dark theme."""
    if Figure is None or FigureCanvas is None:
        raise ImportError("matplotlib is required for plotting")
    from matplotlib.figure import Figure as _Figure
    from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as _FigureCanvas
    from matplotlib.axes import Axes as _Axes

    self.figure: _Figure = _Figure(figsize=(10, 6), facecolor="#2b2b2b")
    self.canvas: _FigureCanvas = _FigureCanvas(self.figure)
    self.ax: _Axes = self.figure.add_subplot(111)
    self.cpu_data: List[Tuple[datetime, float]] = []
    self.ram_data: List[Tuple[datetime, float]] = []
    self.gpu_data: List[Tuple[datetime, Optional[float]]] = []
    self.gpu_vram_data: List[Tuple[datetime, Optional[float]]] = []
    self.gpu_available: bool = False

    # System info for legend (will be populated when first data point is added)
    self.cpu_cores: int = 0
    self.total_ram_gb: float = 0.0
    self.gpu_name: Optional[str] = None
    self.total_gpu_vram_gb: float = 0.0

    self._setup()
Functions
add_data_point
add_data_point(timestamp: datetime, cpu_percent: float, ram_percent: float, gpu_percent: Optional[float] = None, gpu_vram_percent: Optional[float] = None, cpu_cores: int = 0, total_ram_gb: float = 0.0, gpu_name: Optional[str] = None, total_gpu_vram_gb: float = 0.0) -> None

Adds data point and refreshes plot.

Parameters:

Name Type Description Default
timestamp datetime

Timestamp for the data point.

required
cpu_percent float

CPU utilization percentage (0-100).

required
ram_percent float

RAM utilization percentage (0-100).

required
gpu_percent Optional[float]

GPU utilization percentage (0-100), or None if unavailable.

None
gpu_vram_percent Optional[float]

GPU VRAM utilization percentage (0-100), or None if unavailable.

None
cpu_cores int

Number of CPU cores (for legend).

0
total_ram_gb float

Total RAM in GB (for legend).

0.0
gpu_name Optional[str]

GPU model name (for legend).

None
total_gpu_vram_gb float

Total GPU VRAM in GB (for legend).

0.0
Source code in goliat/gui/components/plots/system_utilization_plot.py
def add_data_point(
    self,
    timestamp: datetime,
    cpu_percent: float,
    ram_percent: float,
    gpu_percent: Optional[float] = None,
    gpu_vram_percent: Optional[float] = None,
    cpu_cores: int = 0,
    total_ram_gb: float = 0.0,
    gpu_name: Optional[str] = None,
    total_gpu_vram_gb: float = 0.0,
) -> None:
    """Adds data point and refreshes plot.

    Args:
        timestamp: Timestamp for the data point.
        cpu_percent: CPU utilization percentage (0-100).
        ram_percent: RAM utilization percentage (0-100).
        gpu_percent: GPU utilization percentage (0-100), or None if unavailable.
        gpu_vram_percent: GPU VRAM utilization percentage (0-100), or None if unavailable.
        cpu_cores: Number of CPU cores (for legend).
        total_ram_gb: Total RAM in GB (for legend).
        gpu_name: GPU model name (for legend).
        total_gpu_vram_gb: Total GPU VRAM in GB (for legend).
    """
    # Store system info on first data point
    if len(self.cpu_data) == 0:
        self.cpu_cores = cpu_cores
        self.total_ram_gb = total_ram_gb
        self.gpu_name = gpu_name
        self.total_gpu_vram_gb = total_gpu_vram_gb

    # Convert timestamp to UTC+1
    utc_plus_one_timestamp = convert_to_utc_plus_one(timestamp)

    self.cpu_data.append((utc_plus_one_timestamp, cpu_percent))
    self.ram_data.append((utc_plus_one_timestamp, ram_percent))

    self.gpu_data.append((utc_plus_one_timestamp, gpu_percent))
    if gpu_percent is not None:
        self.gpu_available = True

    self.gpu_vram_data.append((utc_plus_one_timestamp, gpu_vram_percent))
    if gpu_vram_percent is not None:
        self.gpu_available = True

    self._refresh()

Functions

goliat.gui.components.plots.time_remaining_plot

Time remaining plot component for GUI.

Attributes

Classes

TimeRemainingPlot

TimeRemainingPlot()

Manages time remaining plot with real-time updates.

Creates a matplotlib line plot showing ETA trends over time. Updates dynamically as new data points arrive, maintaining dark theme styling consistent with GUI. Tracks maximum time seen to set appropriate Y-axis limits.

Source code in goliat/gui/components/plots/time_remaining_plot.py
def __init__(self) -> None:
    """Sets up matplotlib figure and axes with dark theme."""
    if Figure is None or FigureCanvas is None:
        raise ImportError("matplotlib is required for plotting")
    from matplotlib.figure import Figure as _Figure
    from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as _FigureCanvas
    from matplotlib.axes import Axes as _Axes

    self.figure: _Figure = _Figure(figsize=(10, 6), facecolor="#2b2b2b")
    self.canvas: _FigureCanvas = _FigureCanvas(self.figure)
    self.ax: _Axes = self.figure.add_subplot(111)
    self.data: List[Tuple[datetime, float]] = []
    self.max_time_remaining_seen: float = 0.0
    self._setup()
Functions
add_data_point
add_data_point(timestamp: datetime, hours_remaining: float) -> None

Adds data point and refreshes plot.

Parameters:

Name Type Description Default
timestamp datetime

Timestamp for the data point.

required
hours_remaining float

Hours remaining as float.

required
Source code in goliat/gui/components/plots/time_remaining_plot.py
def add_data_point(self, timestamp: datetime, hours_remaining: float) -> None:
    """Adds data point and refreshes plot.

    Args:
        timestamp: Timestamp for the data point.
        hours_remaining: Hours remaining as float.
    """
    if hours_remaining > self.max_time_remaining_seen:
        self.max_time_remaining_seen = hours_remaining
    # Convert timestamp to UTC+1
    utc_plus_one_timestamp = convert_to_utc_plus_one(timestamp)
    self.data.append((utc_plus_one_timestamp, hours_remaining))
    self._refresh()

Functions

goliat.gui.components.plots.utils

Common utilities for plotting components.

Functions

get_ntp_utc_time

get_ntp_utc_time() -> datetime

Get current UTC time from NTP server (bypasses system clock).

Uses NTP to get accurate time independent of system clock issues. Caches the result for 30 seconds to minimize performance impact. Falls back to system time if NTP query fails.

Returns:

Type Description
datetime

Current UTC time as timezone-aware datetime.

Source code in goliat/gui/components/plots/utils.py
def get_ntp_utc_time() -> datetime:
    """Get current UTC time from NTP server (bypasses system clock).

    Uses NTP to get accurate time independent of system clock issues.
    Caches the result for 30 seconds to minimize performance impact.
    Falls back to system time if NTP query fails.

    Returns:
        Current UTC time as timezone-aware datetime.
    """
    global _ntp_cache

    # Check cache first
    current_system_time = time.time()
    if _ntp_cache is not None:
        cached_time, cache_timestamp = _ntp_cache
        if current_system_time - cache_timestamp < _NTP_CACHE_DURATION:
            # Return cached time adjusted by elapsed time since cache
            elapsed = current_system_time - cache_timestamp
            return cached_time + timedelta(seconds=elapsed)

    # Query NTP
    try:
        ntp_query = bytearray(48)
        ntp_query[0] = 0x1B  # NTP version 3, client mode

        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(2.0)  # 2 second timeout
        s.sendto(ntp_query, ("pool.ntp.org", 123))
        data, _ = s.recvfrom(48)
        s.close()

        # Extract timestamp from NTP response (bytes 40-44)
        ntp_timestamp = struct.unpack("!I", data[40:44])[0] - 2208988800  # Convert from NTP epoch to Unix epoch
        utc_time = datetime.fromtimestamp(ntp_timestamp, tz=timezone.utc)

        # Cache the result
        _ntp_cache = (utc_time, current_system_time)
        return utc_time
    except Exception:
        # Fallback to system time if NTP fails (but log a warning)
        # In production, you might want to log this
        fallback_time = datetime.now(timezone.utc)
        # Still cache it to avoid repeated failures
        _ntp_cache = (fallback_time, current_system_time)
        return fallback_time

convert_to_utc_plus_one

convert_to_utc_plus_one(timestamp: datetime) -> datetime

Convert a datetime to UTC+1 timezone.

Handles both naive (assumed UTC) and timezone-aware datetimes. Works reliably across VMs worldwide by always normalizing to UTC first.

Parameters:

Name Type Description Default
timestamp datetime

Datetime to convert (can be naive or timezone-aware). If naive, assumes it's already in UTC (recommended usage).

required

Returns:

Type Description
datetime

Datetime in UTC+1 timezone (timezone-aware).

Source code in goliat/gui/components/plots/utils.py
def convert_to_utc_plus_one(timestamp: datetime) -> datetime:
    """Convert a datetime to UTC+1 timezone.

    Handles both naive (assumed UTC) and timezone-aware datetimes.
    Works reliably across VMs worldwide by always normalizing to UTC first.

    Args:
        timestamp: Datetime to convert (can be naive or timezone-aware).
                  If naive, assumes it's already in UTC (recommended usage).

    Returns:
        Datetime in UTC+1 timezone (timezone-aware).
    """
    utc_plus_one_tz = timezone(timedelta(hours=1))

    # If timestamp is naive, assume it's UTC (most reliable for VMs worldwide)
    if timestamp.tzinfo is None:
        # Treat naive datetime as UTC
        utc_timestamp = timestamp.replace(tzinfo=timezone.utc)
    else:
        # Convert timezone-aware datetime to UTC first
        utc_timestamp = timestamp.astimezone(timezone.utc)

    # Convert UTC to UTC+1
    return utc_timestamp.astimezone(utc_plus_one_tz)

goliat.gui.components.progress_animation

Progress bar animation logic.

Classes

ProgressAnimation

ProgressAnimation(progress_bar: QProgressBar, timer: QTimer, debug: bool = False)

Manages smooth progress bar animations based on estimated durations.

Provides linear interpolation animation for progress bars when explicit progress updates aren't available. Animates from current value to target (100%) over estimated duration, giving visual feedback during long tasks.

Updates every 50ms via Qt timer, calculating progress ratio from elapsed time and duration. Stops automatically when target is reached or stopped explicitly.

Parameters:

Name Type Description Default
progress_bar QProgressBar

Progress bar widget to animate (0-1000 range).

required
timer QTimer

QTimer instance for animation updates (50ms interval).

required
debug bool

Enable debug logging (currently unused).

False
Source code in goliat/gui/components/progress_animation.py
def __init__(self, progress_bar: "QProgressBar", timer: "QTimer", debug: bool = False) -> None:
    """Sets up the animation handler.

    Args:
        progress_bar: Progress bar widget to animate (0-1000 range).
        timer: QTimer instance for animation updates (50ms interval).
        debug: Enable debug logging (currently unused).
    """
    from PySide6.QtWidgets import QProgressBar as _QProgressBar
    from PySide6.QtCore import QTimer as _QTimer

    self.progress_bar: _QProgressBar = progress_bar
    self.timer: _QTimer = timer
    self.debug: bool = debug
    self.active: bool = False
    self.start_time: float = 0.0
    self.duration: float = 0.0
    self.start_value: int = 0
    self.end_value: int = 0
Functions
start
start(estimated_duration: float, end_step: int) -> None

Starts smooth animation for progress bar.

Begins linear interpolation from current value to 100% over estimated duration. If already at 100%, skips animation. Starts Qt timer if not already active.

Parameters:

Name Type Description Default
estimated_duration float

Estimated task duration in seconds (from profiler).

required
end_step int

Target step value (unused, always animates to 100%).

required
Source code in goliat/gui/components/progress_animation.py
def start(self, estimated_duration: float, end_step: int) -> None:
    """Starts smooth animation for progress bar.

    Begins linear interpolation from current value to 100% over estimated
    duration. If already at 100%, skips animation. Starts Qt timer if
    not already active.

    Args:
        estimated_duration: Estimated task duration in seconds (from profiler).
        end_step: Target step value (unused, always animates to 100%).
    """
    if self.debug:
        self._log(f"start_animation received: duration={estimated_duration:.2f}s, end_step={end_step}")

    self.start_time = time.monotonic()
    self.duration = estimated_duration
    self.start_value = self.progress_bar.value()
    self.end_value = 1000  # Progress bar range is 0-1000

    if self.start_value >= self.end_value:
        if self.debug:
            self._log("Animation skipped, start_value >= end_value.")
        return

    self.active = True
    if not self.timer.isActive():
        self.timer.start(50)
    if self.debug:
        self._log("Animation started.")
stop
stop() -> None

Stops the progress bar animation.

Source code in goliat/gui/components/progress_animation.py
def stop(self) -> None:
    """Stops the progress bar animation."""
    if self.active and self.debug:
        self._log("end_animation called.")
    self.active = False
    if self.timer.isActive():
        self.timer.stop()
update
update() -> None

Updates progress bar animation frame by frame.

Calculates current progress ratio based on elapsed time and duration, then interpolates between start and end values. Updates progress bar value and format string. Called every 50ms by Qt timer when active.

Source code in goliat/gui/components/progress_animation.py
def update(self) -> None:
    """Updates progress bar animation frame by frame.

    Calculates current progress ratio based on elapsed time and duration,
    then interpolates between start and end values. Updates progress bar
    value and format string. Called every 50ms by Qt timer when active.
    """
    if not self.active:
        return

    elapsed = time.monotonic() - self.start_time

    if self.duration > 0:
        progress_ratio = min(elapsed / self.duration, 1.0)
    else:
        progress_ratio = 1.0

    value_range = self.end_value - self.start_value
    current_value = self.start_value + int(value_range * progress_ratio)
    current_value = min(current_value, self.end_value)

    self.progress_bar.setValue(current_value)
    percent = (current_value / 1000) * 100
    self.progress_bar.setFormat(f"{percent:.0f}%")

goliat.gui.components.progress_manager

Progress bar management component.

Classes

ProgressManager

ProgressManager(gui: ProgressGUI)

Manages progress bar updates for overall and stage progress.

Parameters:

Name Type Description Default
gui ProgressGUI

ProgressGUI instance.

required
Source code in goliat/gui/components/progress_manager.py
def __init__(self, gui: "ProgressGUI") -> None:
    """Initializes progress manager.

    Args:
        gui: ProgressGUI instance.
    """
    self.gui = gui
Functions
update_overall
update_overall(current_step: float, total_steps: int) -> None

Updates overall progress bar across all simulations.

The progress bar uses a 0-10000 range internally (for finer granularity), but displays as percentage. Overall progress accounts for completed simulations plus progress within current simulation.

Parameters:

Name Type Description Default
current_step float

Current step number (0-100 range) or percentage (0-100).

required
total_steps int

Total number of steps (typically 100).

required
Source code in goliat/gui/components/progress_manager.py
def update_overall(self, current_step: float, total_steps: int) -> None:
    """Updates overall progress bar across all simulations.

    The progress bar uses a 0-10000 range internally (for finer granularity),
    but displays as percentage. Overall progress accounts for completed
    simulations plus progress within current simulation.

    Args:
        current_step: Current step number (0-100 range) or percentage (0-100).
        total_steps: Total number of steps (typically 100).
    """
    if self.gui.DEBUG:
        self.gui.update_status(f"DEBUG: update_overall_progress received: current={current_step}, total={total_steps}")
    if total_steps > 0:
        progress_percent = (current_step / total_steps) * 100
        self.gui.overall_progress_bar.setValue(int(progress_percent * 100))
        self.gui.overall_progress_bar.setFormat(f"{progress_percent:.2f}%")
        if self.gui.DEBUG:
            self.gui.update_status(f"DEBUG: Overall progress set to: {progress_percent:.2f}%")
update_stage
update_stage(stage_name: str, current_step: int, total_steps: int, sub_stage: str = '') -> None

Updates stage-specific progress bar and label.

Shows progress within current phase (setup/run/extract). Stops any active animation when explicit progress is set. Uses 0-1000 range internally for finer granularity.

Parameters:

Name Type Description Default
stage_name str

Name of current stage (e.g., 'Setup', 'Running Simulation').

required
current_step int

Current step within stage.

required
total_steps int

Total steps for the stage.

required
sub_stage str

Optional sub-stage description (currently unused).

''
Source code in goliat/gui/components/progress_manager.py
def update_stage(self, stage_name: str, current_step: int, total_steps: int, sub_stage: str = "") -> None:
    """Updates stage-specific progress bar and label.

    Shows progress within current phase (setup/run/extract). Stops any
    active animation when explicit progress is set. Uses 0-1000 range
    internally for finer granularity.

    Args:
        stage_name: Name of current stage (e.g., 'Setup', 'Running Simulation').
        current_step: Current step within stage.
        total_steps: Total steps for the stage.
        sub_stage: Optional sub-stage description (currently unused).
    """
    if self.gui.DEBUG:
        self.gui.update_status(
            f"DEBUG: update_stage_progress received: name='{stage_name}', current={current_step}, total={total_steps}, sub_stage='{sub_stage}'"
        )

    self.gui.stage_label.setText(f"Current Stage: {stage_name}")
    self.gui.total_steps_for_stage = total_steps
    self.gui.progress_animation.stop()

    progress_percent = (current_step / total_steps) if total_steps > 0 else 0
    final_value = int(progress_percent * 1000)

    self.gui.stage_progress_bar.setValue(final_value)
    self.gui.stage_progress_bar.setFormat(f"{progress_percent * 100:.0f}%")
    if self.gui.DEBUG:
        self.gui.update_status(f"DEBUG: Stage '{stage_name}' progress set to: {progress_percent * 100:.0f}%")
update_simulation_details
update_simulation_details(sim_count: int, total_sims: int, details: str) -> None

Updates simulation counter and details labels.

Parameters:

Name Type Description Default
sim_count int

Current simulation number.

required
total_sims int

Total number of simulations.

required
details str

Description of current simulation case.

required
Source code in goliat/gui/components/progress_manager.py
def update_simulation_details(self, sim_count: int, total_sims: int, details: str) -> None:
    """Updates simulation counter and details labels.

    Args:
        sim_count: Current simulation number.
        total_sims: Total number of simulations.
        details: Description of current simulation case.
    """
    self.gui.current_simulation_count = sim_count
    self.gui.total_simulations = total_sims
    self.gui.sim_counter_label.setText(f"Simulation: {sim_count} / {total_sims}")
    self.gui.sim_details_label.setText(f"Current Case: {details}")

    # Send simulation details to web dashboard
    if hasattr(self.gui, "web_bridge_manager") and self.gui.web_bridge_manager is not None:
        if self.gui.web_bridge_manager.web_bridge is not None:
            self.gui.web_bridge_manager.web_bridge.enqueue(
                {"type": "simulation_details", "simulation_count": sim_count, "total_simulations": total_sims, "current_case": details}
            )

goliat.gui.components.queue_handler

Queue message handler for processing messages from worker process.

Classes

QueueHandler

QueueHandler(gui_instance: ProgressGUI)

Handles processing of messages from the worker process queue.

Polls the multiprocessing queue and dispatches messages to appropriate GUI update methods. This decouples message handling from queue polling, making the code cleaner and easier to test.

Message types: - 'status': Log message with color coding - 'overall_progress': Update overall progress bar - 'stage_progress': Update stage progress bar - 'start_animation': Start animated progress bar - 'end_animation': Stop animation - 'profiler_update': Update profiler state and refresh timing displays - 'sim_details': Update simulation counter and details - 'finished': Study completed successfully - 'fatal_error': Study failed with fatal error

Parameters:

Name Type Description Default
gui_instance ProgressGUI

ProgressGUI instance to update with messages.

required
Source code in goliat/gui/components/queue_handler.py
def __init__(self, gui_instance: "ProgressGUI") -> None:
    """Sets up the queue handler.

    Args:
        gui_instance: ProgressGUI instance to update with messages.
    """
    self.gui: "ProgressGUI" = gui_instance
    self._MESSAGE_HANDLERS = {
        "status": self._handle_status,
        "overall_progress": self._handle_overall_progress,
        "stage_progress": self._handle_stage_progress,
        "start_animation": self._handle_start_animation,
        "end_animation": self._handle_end_animation,
        "profiler_update": self._handle_profiler_update,
        "sim_details": self._handle_sim_details,
        "finished": self._handle_finished,
        "fatal_error": self._handle_fatal_error,
    }
Functions
process_queue
process_queue() -> None

Processes messages from worker process queue and updates UI accordingly.

Polls queue non-blockingly and processes all available messages in one call. Handles different message types by calling appropriate GUI methods. Catches and logs exceptions to prevent one bad message from crashing GUI.

This method is called every 100ms by Qt timer to keep UI responsive.

After processing each message for the GUI, forwards a copy to WebGUIBridge if it exists (for web dashboard monitoring).

Source code in goliat/gui/components/queue_handler.py
def process_queue(self) -> None:
    """Processes messages from worker process queue and updates UI accordingly.

    Polls queue non-blockingly and processes all available messages in one
    call. Handles different message types by calling appropriate GUI methods.
    Catches and logs exceptions to prevent one bad message from crashing GUI.

    This method is called every 100ms by Qt timer to keep UI responsive.

    After processing each message for the GUI, forwards a copy to WebGUIBridge
    if it exists (for web dashboard monitoring).
    """
    while not self.gui.queue.empty():
        try:
            msg: Dict[str, Any] = self.gui.queue.get_nowait()
            msg_type: Optional[str] = msg.get("type")

            # Dispatch message to appropriate handler
            if msg_type:
                handler = self._MESSAGE_HANDLERS.get(msg_type)
                if handler:
                    handler(msg)

            # Forward message to web bridge if enabled
            if hasattr(self.gui, "web_bridge_manager") and self.gui.web_bridge_manager.web_bridge is not None:
                try:
                    # Sanitize profiler_update messages before forwarding
                    if msg_type == "profiler_update" and "profiler" in msg:
                        profiler = msg.get("profiler")
                        # Extract only serializable data from profiler
                        # Calculate eta_seconds using get_time_remaining() method
                        eta_seconds = None
                        if profiler and hasattr(profiler, "get_time_remaining"):
                            try:
                                # Get current stage progress ratio from GUI if available
                                current_stage_progress = 0.0
                                if hasattr(self.gui, "stage_progress_bar"):
                                    stage_value = self.gui.stage_progress_bar.value()
                                    stage_max = self.gui.stage_progress_bar.maximum()
                                    if stage_max > 0:
                                        current_stage_progress = stage_value / stage_max
                                # Call get_time_remaining() to get ETA in seconds
                                eta_seconds = profiler.get_time_remaining(current_stage_progress=current_stage_progress)
                            except Exception as e:
                                # If calculation fails, log but don't crash
                                if hasattr(self.gui, "verbose_logger"):
                                    self.gui.verbose_logger.debug(f"Failed to calculate ETA: {e}")
                        sanitized_msg = {
                            "type": "profiler_update",
                            "eta_seconds": eta_seconds,
                        }
                        self.gui.web_bridge_manager.web_bridge.enqueue(sanitized_msg)
                    else:
                        self.gui.web_bridge_manager.web_bridge.enqueue(msg)
                except Exception as e:
                    # Don't let web bridge errors crash the GUI
                    self.gui.verbose_logger.warning(f"Failed to forward message to web bridge: {e}")

        except Empty:
            break
        except Exception as e:
            self.gui.verbose_logger.error(f"Error processing GUI queue: {e}\n{traceback.format_exc()}")

goliat.gui.components.screenshot_capture

Screenshot capture component for GUI tabs.

Classes

ScreenshotCapture

ScreenshotCapture(gui: ProgressGUI)

Captures screenshots of GUI tabs for web monitoring.

Parameters:

Name Type Description Default
gui ProgressGUI

ProgressGUI instance with tabs to capture.

required
Source code in goliat/gui/components/screenshot_capture.py
def __init__(self, gui: "ProgressGUI") -> None:
    """Initialize screenshot capture component.

    Args:
        gui: ProgressGUI instance with tabs to capture.
    """
    self.gui = gui
    self.verbose_logger = logging.getLogger("screenshot_capture")
Functions
capture_all_tabs
capture_all_tabs() -> Dict[str, bytes]

Capture all GUI tabs as JPEG bytes.

Captures each tab widget individually without switching tabs, so it doesn't interfere with the user's current view.

Returns:

Type Description
Dict[str, bytes]

Dictionary mapping tab names to JPEG bytes.

Dict[str, bytes]

Empty dict if capture fails or PySide6 not available.

Source code in goliat/gui/components/screenshot_capture.py
def capture_all_tabs(self) -> Dict[str, bytes]:
    """Capture all GUI tabs as JPEG bytes.

    Captures each tab widget individually without switching tabs,
    so it doesn't interfere with the user's current view.

    Returns:
        Dictionary mapping tab names to JPEG bytes.
        Empty dict if capture fails or PySide6 not available.
    """
    if QBuffer is None or QWidget is None or QPixmap is None:
        return {}

    screenshots: Dict[str, bytes] = {}

    try:
        if not hasattr(self.gui, "tabs"):
            self.verbose_logger.warning("GUI has no tabs attribute")
            return {}

        tabs = self.gui.tabs
        tab_count = tabs.count()

        for i in range(tab_count):
            try:
                tab_widget = tabs.widget(i)
                tab_name = tabs.tabText(i)

                if tab_widget is None:
                    self.verbose_logger.warning(f"Tab {i} ({tab_name}) has no widget")
                    continue

                # Skip the main "Progress" tab - its data is already sent via other mechanisms
                if tab_name == "Progress":
                    continue

                # Get the size of the tab widget's parent (QTabWidget) to know the proper size
                # Non-visible tabs might have zero size, so we use parent size as reference
                parent_size = tabs.size()
                widget_width = tab_widget.width() if tab_widget.width() > 0 else parent_size.width()
                widget_height = tab_widget.height() if tab_widget.height() > 0 else parent_size.height()

                # Fallback to reasonable defaults if sizes are still zero
                if widget_width == 0:
                    widget_width = 800
                if widget_height == 0:
                    widget_height = 600

                # Process events to ensure all widgets are painted
                if QApplication is not None:
                    QApplication.processEvents()

                pixmap = QPixmap(widget_width, widget_height)
                pixmap.fill()
                tab_widget.render(pixmap)

                # Process events after rendering
                if QApplication is not None:
                    QApplication.processEvents()

                if pixmap.isNull():
                    self.verbose_logger.warning(f"Failed to grab pixmap for tab {tab_name}")
                    continue

                # Verify pixmap has content (not just empty/white)
                if pixmap.width() == 0 or pixmap.height() == 0:
                    self.verbose_logger.warning(f"Pixmap for tab {tab_name} has zero size")
                    continue

                # Convert to JPEG bytes
                jpeg_bytes = self._compress_to_jpeg(pixmap)

                if jpeg_bytes:
                    screenshots[tab_name] = jpeg_bytes
                    self.verbose_logger.debug(f"Captured screenshot for tab '{tab_name}' ({len(jpeg_bytes)} bytes)")

            except Exception as e:
                # Log error but continue capturing other tabs
                self.verbose_logger.warning(f"Failed to capture tab {i}: {e}", exc_info=True)
                continue

    except Exception as e:
        self.verbose_logger.error(f"Failed to capture screenshots: {e}", exc_info=True)

    return screenshots

goliat.gui.components.status_manager

Status management for GUI: colors, counting, and formatting.

Classes

StatusManager

StatusManager()

Manages status colors, counting, and message formatting for GUI.

Handles color mapping for different log types (info, warning, error, etc.), counts warnings and errors for display in error summary, and formats messages with HTML color styling for the QTextEdit widget.

Note: Uses white for 'progress' messages in GUI (unlike terminal colors) because all messages shown here are progress updates. This improves readability in the dark-themed GUI.

Source code in goliat/gui/components/status_manager.py
def __init__(self) -> None:
    """Initializes status manager with default counters and color map."""
    self.warning_count: int = 0
    self.error_count: int = 0

    # Color mapping - NOTE: Intentionally using white for "progress" in GUI
    # since all messages shown here are progress updates. This deviates from
    # the terminal color scheme defined in goliat/colors.py for better readability.
    self.color_map: dict[str, str] = {
        "default": "#f0f0f0",  # WHITE
        "progress": "#f0f0f0",  # WHITE (GUI-specific override)
        "info": "#17a2b8",  # CYAN
        "verbose": "#007acc",  # BLUE
        "warning": "#ffc107",  # YELLOW
        "error": "#dc3545",  # RED
        "fatal": "#d63384",  # MAGENTA
        "success": "#5cb85c",  # BRIGHT GREEN
        "header": "#e83e8c",  # BRIGHT MAGENTA
        "highlight": "#ffd700",  # BRIGHT YELLOW
        "caller": "#6c757d",  # DIM (gray)
    }
Functions
get_color
get_color(log_type: str) -> str

Gets HTML color code for a log type.

Parameters:

Name Type Description Default
log_type str

Type of log message.

required

Returns:

Type Description
str

HTML color code (hex).

Source code in goliat/gui/components/status_manager.py
def get_color(self, log_type: str) -> str:
    """Gets HTML color code for a log type.

    Args:
        log_type: Type of log message.

    Returns:
        HTML color code (hex).
    """
    return self.color_map.get(log_type, "#f0f0f0")
format_message
format_message(message: str, log_type: str = 'default') -> str

Formats message with HTML color styling.

Preserves leading spaces by converting them to   entities, then wraps message in a tag with appropriate color style.

Parameters:

Name Type Description Default
message str

Message text to format.

required
log_type str

Log type for color selection.

'default'

Returns:

Type Description
str

HTML-formatted message string ready for QTextEdit.

Source code in goliat/gui/components/status_manager.py
def format_message(self, message: str, log_type: str = "default") -> str:
    """Formats message with HTML color styling.

    Preserves leading spaces by converting them to &nbsp; entities, then
    wraps message in a <span> tag with appropriate color style.

    Args:
        message: Message text to format.
        log_type: Log type for color selection.

    Returns:
        HTML-formatted message string ready for QTextEdit.
    """
    # Preserve leading spaces by converting them to &nbsp;
    preserved_message = message.replace(" ", "&nbsp;")
    color = self.get_color(log_type)
    return f'<span style="color:{color};">{preserved_message}</span>'
record_log
record_log(log_type: str) -> None

Records log entry and updates warning/error counters.

Parameters:

Name Type Description Default
log_type str

Type of log message.

required
Source code in goliat/gui/components/status_manager.py
def record_log(self, log_type: str) -> None:
    """Records log entry and updates warning/error counters.

    Args:
        log_type: Type of log message.
    """
    if log_type == "warning":
        self.warning_count += 1
    elif log_type in ["error", "fatal"]:
        self.error_count += 1
get_error_summary
get_error_summary(web_connected: bool = False) -> str

Gets formatted summary of warnings and errors with optional web status.

Parameters:

Name Type Description Default
web_connected bool

Whether web dashboard is connected (optional).

False

Returns:

Type Description
str

Formatted string with emoji indicators and counts.

Source code in goliat/gui/components/status_manager.py
def get_error_summary(self, web_connected: bool = False) -> str:
    """Gets formatted summary of warnings and errors with optional web status.

    Args:
        web_connected: Whether web dashboard is connected (optional).

    Returns:
        Formatted string with emoji indicators and counts.
    """
    web_status = "🟢" if web_connected else "🔴"
    return f"⚠️ Warnings: {self.warning_count} | ❌ Errors: {self.error_count} | {web_status} Web"

goliat.gui.components.system_monitor

System resource monitoring component for GUI.

Classes

SystemMonitor

Monitors system resource utilization (CPU, RAM, GPU).

Provides methods to get current CPU usage percentage, RAM usage in GB, and GPU utilization percentage (via nvidia-smi). Gracefully handles missing dependencies (psutil) and unavailable GPU.

Functions
get_cpu_utilization staticmethod
get_cpu_utilization() -> float

Gets current CPU utilization percentage.

Uses non-blocking approach by calling cpu_percent() without interval, which returns utilization since last call. For accurate measurement, ensure this is called at consistent intervals (e.g., every 1 second).

Returns:

Type Description
float

CPU usage percentage (0-100), or 0.0 if psutil unavailable.

Source code in goliat/gui/components/system_monitor.py
@staticmethod
def get_cpu_utilization() -> float:
    """Gets current CPU utilization percentage.

    Uses non-blocking approach by calling cpu_percent() without interval,
    which returns utilization since last call. For accurate measurement,
    ensure this is called at consistent intervals (e.g., every 1 second).

    Returns:
        CPU usage percentage (0-100), or 0.0 if psutil unavailable.
    """
    if not PSUTIL_AVAILABLE:
        return 0.0
    try:
        # Non-blocking call - returns utilization since last call
        # psutil is guaranteed to be available here due to PSUTIL_AVAILABLE check
        cpu_percent = psutil.cpu_percent(interval=None)  # type: ignore[possibly-unbound]
        # Clamp to valid range (psutil can sometimes return slightly negative or >100)
        return max(0.0, min(100.0, cpu_percent))
    except Exception:
        return 0.0
get_ram_utilization staticmethod
get_ram_utilization() -> Tuple[float, float]

Gets current RAM usage and total RAM.

Returns:

Type Description
Tuple[float, float]

Tuple of (used_GB, total_GB), or (0.0, 0.0) if psutil unavailable.

Source code in goliat/gui/components/system_monitor.py
@staticmethod
def get_ram_utilization() -> Tuple[float, float]:
    """Gets current RAM usage and total RAM.

    Returns:
        Tuple of (used_GB, total_GB), or (0.0, 0.0) if psutil unavailable.
    """
    if not PSUTIL_AVAILABLE:
        return (0.0, 0.0)
    try:
        # psutil is guaranteed to be available here due to PSUTIL_AVAILABLE check
        memory = psutil.virtual_memory()  # type: ignore[possibly-unbound]
        used_gb = memory.used / (1024**3)
        total_gb = memory.total / (1024**3)
        return (used_gb, total_gb)
    except Exception:
        return (0.0, 0.0)
get_ram_utilization_detailed staticmethod
get_ram_utilization_detailed() -> Tuple[float, float, float]

Gets RAM utilization with and without cacheable memory.

Returns:

Type Description
float

Tuple of (percent_with_cache, percent_without_cache, total_GB), or (0.0, 0.0, 0.0) if psutil unavailable.

float
  • percent_with_cache: (used / total) * 100 (includes cacheable memory)
float
  • percent_without_cache: ((total - available) / total) * 100 (excludes cacheable memory)
Tuple[float, float, float]
  • total_GB: Total RAM in GB
Source code in goliat/gui/components/system_monitor.py
@staticmethod
def get_ram_utilization_detailed() -> Tuple[float, float, float]:
    """Gets RAM utilization with and without cacheable memory.

    Returns:
        Tuple of (percent_with_cache, percent_without_cache, total_GB), or (0.0, 0.0, 0.0) if psutil unavailable.
        - percent_with_cache: (used / total) * 100 (includes cacheable memory)
        - percent_without_cache: ((total - available) / total) * 100 (excludes cacheable memory)
        - total_GB: Total RAM in GB
    """
    if not PSUTIL_AVAILABLE:
        return (0.0, 0.0, 0.0)
    try:
        # psutil is guaranteed to be available here due to PSUTIL_AVAILABLE check
        memory = psutil.virtual_memory()  # type: ignore[possibly-unbound]
        total_gb = memory.total / (1024**3)

        # With cache: used / total
        percent_with_cache = memory.percent

        # Without cache: (total - available) / total
        # This excludes cacheable memory that can be freed
        percent_without_cache = ((memory.total - memory.available) / memory.total) * 100

        return (percent_with_cache, percent_without_cache, total_gb)
    except Exception:
        return (0.0, 0.0, 0.0)
get_gpu_vram_utilization staticmethod
get_gpu_vram_utilization() -> Optional[Tuple[float, float]]

Gets current GPU VRAM usage and total VRAM.

Returns:

Type Description
Optional[Tuple[float, float]]

Tuple of (used_GB, total_GB), or None if nvidia-smi unavailable.

Source code in goliat/gui/components/system_monitor.py
@staticmethod
def get_gpu_vram_utilization() -> Optional[Tuple[float, float]]:
    """Gets current GPU VRAM usage and total VRAM.

    Returns:
        Tuple of (used_GB, total_GB), or None if nvidia-smi unavailable.
    """
    try:
        result = subprocess.run(
            ["nvidia-smi", "--query-gpu=memory.used,memory.total", "--format=csv,noheader,nounits"],
            capture_output=True,
            text=True,
            timeout=2,
            check=False,
        )
        if result.returncode == 0 and result.stdout.strip():
            # Get first GPU's memory usage
            memory_str = result.stdout.strip().split("\n")[0].strip()
            parts = memory_str.split(", ")
            if len(parts) == 2:
                used_mb = float(parts[0].strip())
                total_mb = float(parts[1].strip())
                used_gb = used_mb / 1024.0
                total_gb = total_mb / 1024.0
                return (used_gb, total_gb)
        return None
    except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError, ValueError, IndexError):
        return None
get_gpu_utilization staticmethod
get_gpu_utilization() -> Optional[float]

Gets current GPU utilization percentage via nvidia-smi.

Returns:

Type Description
Optional[float]

GPU usage percentage (0-100), or None if nvidia-smi unavailable.

Source code in goliat/gui/components/system_monitor.py
@staticmethod
def get_gpu_utilization() -> Optional[float]:
    """Gets current GPU utilization percentage via nvidia-smi.

    Returns:
        GPU usage percentage (0-100), or None if nvidia-smi unavailable.
    """
    try:
        result = subprocess.run(
            ["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv,noheader,nounits"],
            capture_output=True,
            text=True,
            timeout=2,
            check=False,
        )
        if result.returncode == 0 and result.stdout.strip():
            # Get first GPU's utilization
            utilization_str = result.stdout.strip().split("\n")[0].strip()
            return float(utilization_str)
        return None
    except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError, ValueError):
        return None
get_gpu_name staticmethod
get_gpu_name() -> Optional[str]

Gets GPU name via nvidia-smi.

Returns:

Type Description
Optional[str]

GPU name (e.g., "RTX 4090"), or None if nvidia-smi unavailable.

Source code in goliat/gui/components/system_monitor.py
@staticmethod
def get_gpu_name() -> Optional[str]:
    """Gets GPU name via nvidia-smi.

    Returns:
        GPU name (e.g., "RTX 4090"), or None if nvidia-smi unavailable.
    """
    try:
        result = subprocess.run(
            ["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"],
            capture_output=True,
            text=True,
            timeout=2,
            check=False,
        )
        if result.returncode == 0 and result.stdout.strip():
            # Get first GPU's name and clean it up
            gpu_name = result.stdout.strip().split("\n")[0].strip()
            # Remove common prefixes like "NVIDIA " and clean up
            gpu_name = gpu_name.replace("NVIDIA ", "").strip()
            return gpu_name
        return None
    except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
        return None
get_cpu_cores staticmethod
get_cpu_cores() -> int

Gets number of CPU cores.

Returns:

Type Description
int

Number of CPU cores, or 0 if psutil unavailable.

Source code in goliat/gui/components/system_monitor.py
@staticmethod
def get_cpu_cores() -> int:
    """Gets number of CPU cores.

    Returns:
        Number of CPU cores, or 0 if psutil unavailable.
    """
    if not PSUTIL_AVAILABLE:
        return 0
    try:
        return psutil.cpu_count(logical=True) or 0  # type: ignore[possibly-unbound]
    except Exception:
        return 0
get_total_ram_gb staticmethod
get_total_ram_gb() -> float

Gets total RAM in GB.

Returns:

Type Description
float

Total RAM in GB, or 0.0 if psutil unavailable.

Source code in goliat/gui/components/system_monitor.py
@staticmethod
def get_total_ram_gb() -> float:
    """Gets total RAM in GB.

    Returns:
        Total RAM in GB, or 0.0 if psutil unavailable.
    """
    if not PSUTIL_AVAILABLE:
        return 0.0
    try:
        memory = psutil.virtual_memory()  # type: ignore[possibly-unbound]
        return memory.total / (1024**3)
    except Exception:
        return 0.0
is_gpu_available staticmethod
is_gpu_available() -> bool

Checks if GPU is available via nvidia-smi.

Returns:

Type Description
bool

True if nvidia-smi is available and returns successfully, False otherwise.

Source code in goliat/gui/components/system_monitor.py
@staticmethod
def is_gpu_available() -> bool:
    """Checks if GPU is available via nvidia-smi.

    Returns:
        True if nvidia-smi is available and returns successfully, False otherwise.
    """
    return SystemMonitor.get_gpu_utilization() is not None

goliat.gui.components.timings_table

Timings table component for displaying profiling statistics.

Classes

TimingsTable

TimingsTable(table_widget: QTableWidget)

Manages timings table displaying profiling statistics.

Shows execution time statistics (mean, median, min, max, percentiles) for all phases and subtasks. Filters out fake aggregated entries and organizes data by phase for easy inspection. Updates automatically when profiler state changes via queue messages.

Parameters:

Name Type Description Default
table_widget QTableWidget

QTableWidget instance to populate with timing data.

required
Source code in goliat/gui/components/timings_table.py
def __init__(self, table_widget: QTableWidget) -> None:
    """Sets up the timings table widget.

    Args:
        table_widget: QTableWidget instance to populate with timing data.
    """
    self.table: QTableWidget = table_widget
    self._setup_table()
Functions
update
update(profiler: Profiler) -> None

Populates table with timing statistics from profiler.

Collects all phase and subtask timing data, computes statistics (mean, median, percentiles), and displays in table. Filters out fake aggregated entries that shouldn't be shown.

Statistics computed: - Mean, median, min, max - 10th, 25th, 75th, 90th percentiles

Parameters:

Name Type Description Default
profiler Profiler

Profiler instance containing timing data.

required
Source code in goliat/gui/components/timings_table.py
def update(self, profiler: "Profiler") -> None:
    """Populates table with timing statistics from profiler.

    Collects all phase and subtask timing data, computes statistics
    (mean, median, percentiles), and displays in table. Filters out
    fake aggregated entries that shouldn't be shown.

    Statistics computed:
    - Mean, median, min, max
    - 10th, 25th, 75th, 90th percentiles

    Args:
        profiler: Profiler instance containing timing data.
    """
    if not profiler:
        return

    self.table.setRowCount(0)

    # Collect all tasks with their raw timing data
    all_tasks: Dict[str, Dict[str, Any]] = {}
    for phase in ["setup", "run", "extract"]:
        avg_time = profiler.profiling_config.get(f"avg_{phase}_time")
        if avg_time is not None:
            raw_times = profiler.subtask_times.get(phase, [])
            all_tasks[f"{phase}_total"] = {
                "phase": phase,
                "subtask": "---",
                "raw_times": raw_times if raw_times else [avg_time],
            }

    # Filter out fake aggregated entries that shouldn't be displayed
    fake_entries = ["setup_simulation", "run_simulation_total", "extract_results_total"]

    for key, value in profiler.profiling_config.items():
        if key.startswith("avg_") and "_time" not in key:
            task_name = key.replace("avg_", "")

            # Skip fake aggregated entries
            if task_name in fake_entries:
                continue

            parts = task_name.split("_", 1)
            phase = parts[0]
            subtask_name = parts[1] if len(parts) > 1 else phase
            raw_times = profiler.subtask_times.get(task_name, [])
            all_tasks[key] = {
                "phase": phase,
                "subtask": subtask_name,
                "raw_times": raw_times if raw_times else [value],
            }

    # Populate table with statistics
    for task_info in all_tasks.values():
        row_position = self.table.rowCount()
        self.table.insertRow(row_position)

        times: List[float] = task_info.get("raw_times", [])
        if times:
            times_array = np.array(times)
            mean_val = float(np.mean(times_array))
            median_val = float(np.median(times_array))
            min_val = float(np.min(times_array))
            max_val = float(np.max(times_array))
            p10 = float(np.percentile(times_array, 10))
            p25 = float(np.percentile(times_array, 25))
            p75 = float(np.percentile(times_array, 75))
            p90 = float(np.percentile(times_array, 90))
        else:
            mean_val = median_val = min_val = max_val = p10 = p25 = p75 = p90 = 0.0

        # Create items and set text color to ensure visibility in both light and dark modes
        light_text_color = QColor("#f0f0f0")
        items = [
            QTableWidgetItem(task_info.get("phase", "N/A")),
            QTableWidgetItem(task_info.get("subtask", "---")),
            QTableWidgetItem(f"{mean_val:.2f}"),
            QTableWidgetItem(f"{median_val:.2f}"),
            QTableWidgetItem(f"{min_val:.2f}"),
            QTableWidgetItem(f"{max_val:.2f}"),
            QTableWidgetItem(f"{p10:.2f}"),
            QTableWidgetItem(f"{p25:.2f}"),
            QTableWidgetItem(f"{p75:.2f}"),
            QTableWidgetItem(f"{p90:.2f}"),
        ]
        for item in items:
            item.setForeground(light_text_color)

        self.table.setItem(row_position, 0, items[0])
        self.table.setItem(row_position, 1, items[1])
        self.table.setItem(row_position, 2, items[2])
        self.table.setItem(row_position, 3, items[3])
        self.table.setItem(row_position, 4, items[4])
        self.table.setItem(row_position, 5, items[5])
        self.table.setItem(row_position, 6, items[6])
        self.table.setItem(row_position, 7, items[7])
        self.table.setItem(row_position, 8, items[8])
        self.table.setItem(row_position, 9, items[9])

goliat.gui.components.tray_manager

Tray icon management component.

Classes

TrayManager

TrayManager(parent_widget: QWidget, show_callback: Callable[[], None], close_callback: Callable[[], None])

Manages system tray icon and menu.

Handles system tray integration for background operation. Shows tray icon with favicon, provides context menu (Show/Exit), and handles click events to restore window. Allows users to minimize GUI to tray and continue monitoring via icon.

Parameters:

Name Type Description Default
parent_widget QWidget

Parent widget (ProgressGUI window).

required
show_callback Callable[[], None]

Function to call when restoring window.

required
close_callback Callable[[], None]

Function to call when exiting application.

required
Source code in goliat/gui/components/tray_manager.py
def __init__(self, parent_widget: QWidget, show_callback: Callable[[], None], close_callback: Callable[[], None]) -> None:
    """Sets up tray icon with menu.

    Args:
        parent_widget: Parent widget (ProgressGUI window).
        show_callback: Function to call when restoring window.
        close_callback: Function to call when exiting application.
    """
    self.parent: QWidget = parent_widget
    self.tray_icon: QSystemTrayIcon = QSystemTrayIcon(parent_widget)

    # Set icon
    icon_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "assets", "favicon.svg")
    if os.path.exists(icon_path):
        self.tray_icon.setIcon(QIcon(icon_path))
    else:
        style = parent_widget.style()
        icon = style.standardIcon(style.StandardPixmap.SP_ComputerIcon)
        self.tray_icon.setIcon(icon)
    self.tray_icon.setToolTip("Simulation is running...")

    # Create menu
    tray_menu = QMenu(parent_widget)
    show_action = QAction("Show", parent_widget)
    show_action.triggered.connect(show_callback)
    tray_menu.addAction(show_action)

    exit_action = QAction("Exit", parent_widget)
    exit_action.triggered.connect(close_callback)
    tray_menu.addAction(exit_action)

    self.tray_icon.setContextMenu(tray_menu)
    self.tray_icon.activated.connect(lambda reason: self._tray_icon_activated(reason, show_callback))
Functions
show
show() -> None

Shows the tray icon.

Source code in goliat/gui/components/tray_manager.py
def show(self) -> None:
    """Shows the tray icon."""
    self.tray_icon.show()
hide
hide() -> None

Hides the tray icon.

Source code in goliat/gui/components/tray_manager.py
def hide(self) -> None:
    """Hides the tray icon."""
    self.tray_icon.hide()
is_visible
is_visible() -> bool

Checks if tray icon is visible.

Returns:

Type Description
bool

True if visible, False otherwise.

Source code in goliat/gui/components/tray_manager.py
def is_visible(self) -> bool:
    """Checks if tray icon is visible.

    Returns:
        True if visible, False otherwise.
    """
    return self.tray_icon.isVisible()

goliat.gui.components.ui_builder

UI builder component for constructing the ProgressGUI interface.

Classes

UIBuilder

Builds UI components for ProgressGUI.

Provides static methods to construct the complete GUI layout, including tabs, progress bars, plots, tables, and buttons. Handles styling via Qt stylesheets for dark theme appearance.

Functions
get_icon_path staticmethod
get_icon_path() -> str

Gets path to window icon.

Returns:

Type Description
str

Absolute path to favicon.svg.

Source code in goliat/gui/components/ui_builder.py
@staticmethod
def get_icon_path() -> str:
    """Gets path to window icon.

    Returns:
        Absolute path to favicon.svg.
    """
    return os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "assets", "favicon.svg")
build staticmethod
build(gui_instance: ProgressGUI, status_manager: StatusManager) -> None

Builds complete UI for the GUI instance.

Sets up window properties, applies stylesheet, creates tabs (Progress, Timings, Piecharts, Time Remaining, Overall Progress), and adds control buttons. Attaches components to gui_instance for later access.

Parameters:

Name Type Description Default
gui_instance ProgressGUI

ProgressGUI instance to build UI for.

required
status_manager StatusManager

StatusManager instance for error summary display.

required
Source code in goliat/gui/components/ui_builder.py
@staticmethod
def build(gui_instance: "ProgressGUI", status_manager: "StatusManager") -> None:
    """Builds complete UI for the GUI instance.

    Sets up window properties, applies stylesheet, creates tabs (Progress,
    Timings, Piecharts, Time Remaining, Overall Progress), and adds
    control buttons. Attaches components to gui_instance for later access.

    Args:
        gui_instance: ProgressGUI instance to build UI for.
        status_manager: StatusManager instance for error summary display.
    """
    gui_instance.setWindowTitle(gui_instance.init_window_title)
    gui_instance.resize(800, 900)

    # Set window icon
    icon_path = UIBuilder.get_icon_path()
    if os.path.exists(icon_path):
        gui_instance.setWindowIcon(QIcon(icon_path))

    gui_instance.setStyleSheet(UIBuilder.STYLESHEET)

    main_layout = QVBoxLayout(gui_instance)
    gui_instance.tabs = QTabWidget()
    main_layout.addWidget(gui_instance.tabs)

    # Build tabs
    UIBuilder._build_progress_tab(gui_instance, status_manager)
    UIBuilder._build_timings_tab(gui_instance)
    UIBuilder._build_piecharts_tab(gui_instance)
    UIBuilder._build_time_remaining_tab(gui_instance)
    UIBuilder._build_overall_progress_tab(gui_instance)
    UIBuilder._build_system_utilization_tab(gui_instance)

    # Build buttons
    UIBuilder._build_buttons(gui_instance, main_layout)

goliat.gui.components.utilization_manager

System utilization management component.

Classes

UtilizationManager

UtilizationManager(gui: ProgressGUI)

Manages CPU, RAM, and GPU utilization displays.

Parameters:

Name Type Description Default
gui ProgressGUI

ProgressGUI instance.

required
Source code in goliat/gui/components/utilization_manager.py
def __init__(self, gui: "ProgressGUI") -> None:
    """Initializes utilization manager.

    Args:
        gui: ProgressGUI instance.
    """
    self.gui = gui
    # Initialize last values to avoid issues before first update
    self._last_cpu_percent: float = 0.0
    self._last_ram_percent: float = 0.0
    self._last_gpu_percent: Optional[float] = None
    self._last_gpu_vram_percent: Optional[float] = None
Functions
update
update() -> None

Updates CPU, RAM, and GPU utilization displays.

Called every second by Qt timer. Gets current utilization values from SystemMonitor and updates the progress bars and labels.

Source code in goliat/gui/components/utilization_manager.py
def update(self) -> None:
    """Updates CPU, RAM, and GPU utilization displays.

    Called every second by Qt timer. Gets current utilization values
    from SystemMonitor and updates the progress bars and labels.
    """
    # Update CPU utilization
    cpu_percent = SystemMonitor.get_cpu_utilization()
    self.gui.cpu_bar.setValue(int(cpu_percent))
    self.gui.cpu_bar.setFormat(f"{cpu_percent:.0f}%")

    # Update RAM utilization
    ram_percent_with_cache, _, total_gb = SystemMonitor.get_ram_utilization_detailed()
    used_gb, _ = SystemMonitor.get_ram_utilization()

    if total_gb > 0:
        ram_percent = ram_percent_with_cache
        self.gui.ram_bar.setValue(int(ram_percent))
        self.gui.ram_bar.setFormat(f"{used_gb:.1f}/{total_gb:.1f} GB")
    else:
        ram_percent = 0.0
        self.gui.ram_bar.setValue(0)
        self.gui.ram_bar.setFormat("N/A")

    # Update GPU utilization
    # Always try to get GPU data, not just when gpu_available is True
    # This allows recovery after temporary failures
    gpu_percent: Optional[float] = None
    gpu_percent = SystemMonitor.get_gpu_utilization()
    if gpu_percent is not None:
        self.gui.gpu_bar.setValue(int(gpu_percent))
        self.gui.gpu_bar.setFormat(f"{gpu_percent:.0f}%")
        self.gui.gpu_available = True  # Reset to True if we get data
    else:
        self.gui.gpu_bar.setValue(0)
        self.gui.gpu_bar.setFormat("N/A")
        # Don't set gpu_available to False here - allow recovery

    # Store current values for plot update (updated less frequently via graph_manager)
    self._last_cpu_percent = cpu_percent
    self._last_ram_percent = ram_percent
    self._last_gpu_percent = gpu_percent

    # Get GPU VRAM utilization for plot (not shown in main tab)
    # Always try to get VRAM data, not just when gpu_available is True
    # This allows recovery after temporary failures
    vram_info = SystemMonitor.get_gpu_vram_utilization()
    if vram_info is not None:
        used_vram_gb, total_vram_gb = vram_info
        if total_vram_gb > 0:
            self._last_gpu_vram_percent = (used_vram_gb / total_vram_gb) * 100
            self.gui.gpu_available = True  # Reset to True if we get VRAM data
        else:
            self._last_gpu_vram_percent = None
    else:
        self._last_gpu_vram_percent = None
update_plot
update_plot() -> None

Updates the system utilization plot with current values.

Called less frequently (every 2 seconds) to avoid excessive plot updates. Writes to CSV and adds data point to plot.

Source code in goliat/gui/components/utilization_manager.py
def update_plot(self) -> None:
    """Updates the system utilization plot with current values.

    Called less frequently (every 2 seconds) to avoid excessive plot updates.
    Writes to CSV and adds data point to plot.
    """
    current_time = get_ntp_utc_time()  # Use NTP time (bypasses system clock issues)

    # Write to CSV (includes RAM and GPU VRAM)
    try:
        self.gui.data_manager.write_system_utilization(
            self._last_cpu_percent, self._last_ram_percent, self._last_gpu_percent, self._last_gpu_vram_percent
        )
    except Exception as e:
        self.gui.verbose_logger.error(f"[UtilizationPlot] CSV write failed: {e}")

    # Add to plot (includes RAM and GPU VRAM)
    if hasattr(self.gui, "system_utilization_plot"):
        # Get system info for legend
        from goliat.gui.components.system_monitor import SystemMonitor

        cpu_cores = SystemMonitor.get_cpu_cores()
        total_ram_gb = SystemMonitor.get_total_ram_gb()
        gpu_name = SystemMonitor.get_gpu_name()

        # Get GPU VRAM total for legend
        # Always try to get VRAM info, not just when gpu_available is True
        total_gpu_vram_gb = 0.0
        vram_info = SystemMonitor.get_gpu_vram_utilization()
        if vram_info is not None:
            _, total_gpu_vram_gb = vram_info

        try:
            self.gui.system_utilization_plot.add_data_point(
                timestamp=current_time,
                cpu_percent=self._last_cpu_percent,
                ram_percent=self._last_ram_percent,
                gpu_percent=self._last_gpu_percent,
                gpu_vram_percent=self._last_gpu_vram_percent,
                cpu_cores=cpu_cores,
                total_ram_gb=total_ram_gb,
                gpu_name=gpu_name,
                total_gpu_vram_gb=total_gpu_vram_gb,
            )
        except Exception as e:
            self.gui.verbose_logger.error(f"[UtilizationPlot] Failed to add plot data point: {e}")
    else:
        self.gui.verbose_logger.warning("[UtilizationPlot] system_utilization_plot attribute not found on GUI")

Functions

goliat.gui.components.web_bridge_manager

Web bridge manager component for remote monitoring.

Classes

WebBridgeManager

WebBridgeManager(gui: ProgressGUI, server_url: str, machine_id: Optional[str])

Manages web GUI bridge initialization and status updates.

Handles connection to web dashboard, collects system info, and manages bridge lifecycle. Keeps web bridge code exactly as is per requirements.

Parameters:

Name Type Description Default
gui ProgressGUI

ProgressGUI instance.

required
server_url str

Web dashboard server URL.

required
machine_id Optional[str]

Machine ID for identification.

required
Source code in goliat/gui/components/web_bridge_manager.py
def __init__(self, gui: "ProgressGUI", server_url: str, machine_id: Optional[str]) -> None:
    """Initializes web bridge manager.

    Args:
        gui: ProgressGUI instance.
        server_url: Web dashboard server URL.
        machine_id: Machine ID for identification.
    """
    self.gui = gui
    self.server_url = server_url
    self.machine_id = machine_id
    self.web_bridge: Optional[Any] = None
    self.screenshot_timer: Optional[Any] = None
    self.screenshot_capture: Optional[Any] = None
Functions
initialize
initialize() -> None

Initializes web GUI bridge for remote monitoring.

Sets up connection to web dashboard, collects system info, and starts the bridge. Handles errors gracefully to allow GUI to continue without web monitoring.

Source code in goliat/gui/components/web_bridge_manager.py
def initialize(self) -> None:
    """Initializes web GUI bridge for remote monitoring.

    Sets up connection to web dashboard, collects system info, and starts
    the bridge. Handles errors gracefully to allow GUI to continue without web monitoring.
    """
    if self.machine_id:
        try:
            from goliat.utils.gui_bridge import WebGUIBridge
            from goliat.gui.components.system_monitor import SystemMonitor

            self.web_bridge = WebGUIBridge(self.server_url, self.machine_id)

            # Collect system info
            gpu_name = SystemMonitor.get_gpu_name()
            cpu_cores = SystemMonitor.get_cpu_cores()
            total_ram_gb = SystemMonitor.get_total_ram_gb()
            hostname = socket.gethostname()

            system_info = {"gpuName": gpu_name or "N/A", "cpuCores": cpu_cores, "totalRamGB": total_ram_gb, "hostname": hostname}
            self.web_bridge.set_system_info(system_info)

            # Set callback to update GUI indicator BEFORE starting
            self.web_bridge.set_connection_callback(self.gui._update_web_status)
            # start() already sends initial heartbeat, no need to send again
            self.web_bridge.start()

            # Initialize screenshot capture
            self._initialize_screenshot_capture()

            self.gui.verbose_logger.info(f"Web GUI bridge enabled: {self.server_url}, machine_id={self.machine_id}")
            self.gui.verbose_logger.info(
                f"System info: GPU={gpu_name or 'N/A'}, CPU={cpu_cores} cores, RAM={total_ram_gb:.1f} GB, Hostname={hostname}"
            )
        except Exception as e:
            self.gui.verbose_logger.warning(f"Failed to initialize web GUI bridge: {e}. Continuing without web monitoring.")
            if hasattr(self.gui, "error_counter_label") and hasattr(self.gui, "status_manager"):
                self.gui._update_web_status(False)
    else:
        if hasattr(self.gui, "error_counter_label") and hasattr(self.gui, "status_manager"):
            self.gui._update_web_status(False)
sync_progress
sync_progress() -> None

Periodically sync actual GUI progress bar values to web dashboard.

Sends the current progress bar values to the web bridge so the dashboard always shows the actual progress, even if progress messages aren't sent.

Source code in goliat/gui/components/web_bridge_manager.py
def sync_progress(self) -> None:
    """Periodically sync actual GUI progress bar values to web dashboard.

    Sends the current progress bar values to the web bridge so the dashboard
    always shows the actual progress, even if progress messages aren't sent.
    """
    if self.web_bridge is None:
        return

    try:
        # Get actual progress bar values
        overall_value = self.gui.overall_progress_bar.value()
        overall_max = self.gui.overall_progress_bar.maximum()
        overall_progress = (overall_value / overall_max * 100) if overall_max > 0 else 0

        stage_value = self.gui.stage_progress_bar.value()
        stage_max = self.gui.stage_progress_bar.maximum()
        stage_progress = (stage_value / stage_max * 100) if stage_max > 0 else 0

        # Send overall progress
        if overall_progress > 0:
            self.web_bridge.enqueue({"type": "overall_progress", "current": overall_progress, "total": 100})

        # Send stage progress if we have a stage name
        if stage_progress > 0 and hasattr(self.gui, "stage_label"):
            stage_name = self.gui.stage_label.text().replace("Current Stage: ", "")
            if stage_name and stage_name != "Current Stage:":
                self.web_bridge.enqueue({"type": "stage_progress", "name": stage_name, "current": stage_progress, "total": 100})
    except Exception as e:
        # Don't let progress sync errors break the GUI
        if hasattr(self.gui, "verbose_logger"):
            self.gui.verbose_logger.debug(f"Failed to sync progress to web: {e}")
stop
stop() -> None

Stops the web bridge and screenshot capture.

Source code in goliat/gui/components/web_bridge_manager.py
def stop(self) -> None:
    """Stops the web bridge and screenshot capture."""
    # Stop screenshot timer
    if self.screenshot_timer is not None:
        try:
            self.screenshot_timer.stop()
        except Exception as e:
            if hasattr(self.gui, "verbose_logger"):
                self.gui.verbose_logger.warning(f"Error stopping screenshot timer: {e}")

    # Stop web bridge
    if self.web_bridge is not None:
        try:
            self.web_bridge.stop()
        except Exception as e:
            if hasattr(self.gui, "verbose_logger"):
                self.gui.verbose_logger.warning(f"Error stopping web bridge: {e}")
send_finished
send_finished(error: bool = False) -> None

Sends final status update to web before stopping bridge.

Sends multiple 100% progress updates with delays to ensure the cloud receives them even if it's lagging behind. This prevents tasks from appearing stuck at 99% on the web interface.

Parameters:

Name Type Description Default
error bool

Whether study finished with errors.

False
Source code in goliat/gui/components/web_bridge_manager.py
def send_finished(self, error: bool = False) -> None:
    """Sends final status update to web before stopping bridge.

    Sends multiple 100% progress updates with delays to ensure the cloud
    receives them even if it's lagging behind. This prevents tasks from
    appearing stuck at 99% on the web interface.

    Args:
        error: Whether study finished with errors.
    """
    # Stop screenshot timer first to prevent new screenshots from being queued
    if self.screenshot_timer is not None:
        try:
            self.screenshot_timer.stop()
        except Exception as e:
            if hasattr(self.gui, "verbose_logger"):
                self.gui.verbose_logger.warning(f"Error stopping screenshot timer: {e}")

    if self.web_bridge is not None:
        try:
            import time

            # Check if bridge is still running before sending messages
            if not self.web_bridge.running:
                # Bridge already stopped, just ensure screenshot timer is stopped
                return

            # Send multiple 100% progress updates with delays to ensure cloud receives them
            # Cloud is often lagging behind, so we send updates multiple times
            for i in range(5):  # Send 5 times to ensure at least one gets through
                # Check if bridge is still running before each enqueue
                if not self.web_bridge.running:
                    break
                self.web_bridge.enqueue({"type": "overall_progress", "current": 100, "total": 100})
                # Also send stage progress at 100% if we have a stage name
                if hasattr(self.gui, "stage_label"):
                    stage_name = self.gui.stage_label.text().replace("Current Stage: ", "")
                    if stage_name and stage_name != "Current Stage:":
                        self.web_bridge.enqueue({"type": "stage_progress", "name": stage_name, "current": 100, "total": 100})
                if i < 4:  # Don't sleep after the last iteration
                    time.sleep(0.5)  # 500ms delay between updates

            # Send finished message only if bridge is still running
            if self.web_bridge.running:
                self.web_bridge.enqueue(
                    {"type": "finished", "message": "Study finished successfully" if not error else "Study finished with errors"}
                )

                # Wait longer for final messages to send (cloud might be processing previous updates)
                time.sleep(3)  # Increased from 1s to 3s to give cloud more time

            # Stop the bridge (safe to call even if already stopped)
            self.web_bridge.stop()
        except Exception as e:
            if hasattr(self.gui, "verbose_logger"):
                self.gui.verbose_logger.warning(f"Error stopping web bridge: {e}")

Scripts

Entry point scripts for running studies and analysis.

Scripts

These are top-level scripts for running studies. They are not part of the core API but are included for reference.

  • goliat study - Main entry point for running studies
  • goliat analyze - Entry point for post-processing analysis
  • goliat parallel - Script for running parallel study batches
  • goliat free-space - Script for free-space validation runs
  • goliat init - Initialize GOLIAT environment (install dependencies, setup)
  • goliat status - Show setup status and environment information
  • goliat validate - Validate configuration files
  • goliat version - Show GOLIAT version information