Source code for xgc_analysis.read_bp_file

import adios2.stream
from adios2 import Adios, Stream
import numpy as np


def _iter_steps(stream, step_timeout_secs=None):
    """
    Iterate ADIOS steps, optionally with timeout, and suppress the expected
    timeout-at-EOF exception for offline files.
    """
    try:
        if step_timeout_secs is None:
            for step in stream:
                yield step
        else:
            for step in stream.steps(timeout=float(step_timeout_secs)):
                yield step
    except RuntimeError as exc:
        msg = str(exc)
        if "no new step within the time limit" in msg.lower():
            return
        raise


def _open_stream(filename, open_timeout_secs=None):
    """Open an ADIOS Stream, optionally setting ``OpenTimeoutSecs``."""
    if open_timeout_secs is None:
        return adios2.stream.Stream(filename, "r")
    adios = Adios()
    io = adios.declare_io(f"read_bp_{id(filename)}")
    io.set_engine("FileStream")
    io.set_parameter("OpenTimeoutSecs", str(open_timeout_secs))
    return Stream(io, filename, "r")


[docs] def ReadBPFile(filename, variables=None, step_range=None, open_timeout_secs=None, step_timeout_secs=0.1): """ Reads data from an ADIOS2 BP file using the Stream API (ADIOS2 v2.10+). Parameters ---------- filename : str Path to the ADIOS2 BP file. variables : list of str, optional List of variable names to read. If None, all variables will be read. step_range : tuple of int, optional Tuple (start, end) indicating the range of steps to read. If None, only the last step is read. open_timeout_secs : int | float | None, optional If provided, configure ADIOS ``FileStream`` ``OpenTimeoutSecs``. Set to ``0`` for offline files to avoid waiting for new steps near EOF. step_timeout_secs : int | float | None, optional Timeout passed to ``Stream.steps(timeout=...)``. If the timeout expires, the resulting ADIOS runtime error is treated as end-of-stream. Defaults to ``0.1`` seconds, which works well for offline files while avoiding long waits near EOF. Returns ------- data : dict Dictionary with structure: {step_idx: {variable_name: np.ndarray}}. """ data = {} # Fast path: when step_range is omitted, preserve the existing behavior # ("read only the last step") but do it in a single pass instead of scanning # once to count and a second time to read. if step_range is None: last_step_idx = None last_step_data = None with _open_stream(filename, open_timeout_secs=open_timeout_secs) as s: for step in _iter_steps(s, step_timeout_secs=step_timeout_secs): current_step = step.current_step() available_vars = step.available_variables() vars_to_read = variables if variables is not None else list(available_vars.keys()) step_data = {} for var in vars_to_read: if var in available_vars: step_data[var] = step.read(var) else: print(f"Warning: Variable '{var}' not found in step {current_step}") last_step_idx = current_step last_step_data = step_data if last_step_idx is not None: data[last_step_idx] = last_step_data return data # Range-read path (unchanged semantics) with _open_stream(filename, open_timeout_secs=open_timeout_secs) as s: for step in _iter_steps(s, step_timeout_secs=step_timeout_secs): current_step = step.current_step() if current_step < step_range[0]: continue if current_step >= step_range[1]: break available_vars = step.available_variables() vars_to_read = variables if variables is not None else list(available_vars.keys()) step_data = {} for var in vars_to_read: if var in available_vars: step_data[var] = step.read(var) else: print(f"Warning: Variable '{var}' not found in step {current_step}") data[current_step] = step_data return data