import adios2.stream
from adios2 import Adios, Stream
import numpy as np
def _iter_steps(stream, step_timeout_secs=None):
"""
Iterate ADIOS steps, optionally with timeout, and suppress the expected
timeout-at-EOF exception for offline files.
"""
try:
if step_timeout_secs is None:
for step in stream:
yield step
else:
for step in stream.steps(timeout=float(step_timeout_secs)):
yield step
except RuntimeError as exc:
msg = str(exc)
if "no new step within the time limit" in msg.lower():
return
raise
def _open_stream(filename, open_timeout_secs=None):
"""Open an ADIOS Stream, optionally setting ``OpenTimeoutSecs``."""
if open_timeout_secs is None:
return adios2.stream.Stream(filename, "r")
adios = Adios()
io = adios.declare_io(f"read_bp_{id(filename)}")
io.set_engine("FileStream")
io.set_parameter("OpenTimeoutSecs", str(open_timeout_secs))
return Stream(io, filename, "r")
[docs]
def ReadBPFile(filename, variables=None, step_range=None, open_timeout_secs=None, step_timeout_secs=0.1):
"""
Reads data from an ADIOS2 BP file using the Stream API (ADIOS2 v2.10+).
Parameters
----------
filename : str
Path to the ADIOS2 BP file.
variables : list of str, optional
List of variable names to read. If None, all variables will be read.
step_range : tuple of int, optional
Tuple (start, end) indicating the range of steps to read.
If None, only the last step is read.
open_timeout_secs : int | float | None, optional
If provided, configure ADIOS ``FileStream`` ``OpenTimeoutSecs``. Set to
``0`` for offline files to avoid waiting for new steps near EOF.
step_timeout_secs : int | float | None, optional
Timeout passed to ``Stream.steps(timeout=...)``. If the timeout expires,
the resulting ADIOS runtime error is treated as end-of-stream.
Defaults to ``0.1`` seconds, which works well for offline files while
avoiding long waits near EOF.
Returns
-------
data : dict
Dictionary with structure: {step_idx: {variable_name: np.ndarray}}.
"""
data = {}
# Fast path: when step_range is omitted, preserve the existing behavior
# ("read only the last step") but do it in a single pass instead of scanning
# once to count and a second time to read.
if step_range is None:
last_step_idx = None
last_step_data = None
with _open_stream(filename, open_timeout_secs=open_timeout_secs) as s:
for step in _iter_steps(s, step_timeout_secs=step_timeout_secs):
current_step = step.current_step()
available_vars = step.available_variables()
vars_to_read = variables if variables is not None else list(available_vars.keys())
step_data = {}
for var in vars_to_read:
if var in available_vars:
step_data[var] = step.read(var)
else:
print(f"Warning: Variable '{var}' not found in step {current_step}")
last_step_idx = current_step
last_step_data = step_data
if last_step_idx is not None:
data[last_step_idx] = last_step_data
return data
# Range-read path (unchanged semantics)
with _open_stream(filename, open_timeout_secs=open_timeout_secs) as s:
for step in _iter_steps(s, step_timeout_secs=step_timeout_secs):
current_step = step.current_step()
if current_step < step_range[0]:
continue
if current_step >= step_range[1]:
break
available_vars = step.available_variables()
vars_to_read = variables if variables is not None else list(available_vars.keys())
step_data = {}
for var in vars_to_read:
if var in available_vars:
step_data[var] = step.read(var)
else:
print(f"Warning: Variable '{var}' not found in step {current_step}")
data[current_step] = step_data
return data