Source code for xgc_analysis.DiffusionProfiles

from __future__ import annotations

"""DiffusionProfiles – Stream reader for live XGC 1‑D profile output
====================================================================

This helper wraps the **ADIOS‑2** Python bindings (⩾ v2.10) in *Stream* mode so
that client code can monitor a growing ``*.bp`` directory and always access the
**latest** diffusion‑profile snapshot without rereading the entire file.

The class blocks during construction until the requested BP directory becomes
available (configurable *timeout*).  It then opens an :class:`adios2.Stream`
reader that remains *open* until :py:meth:`DiffusionProfiles.close` is called or
the object is deleted, allowing incremental calls to
:py:meth:`DiffusionProfiles.read_next_step` as new simulation output appears.

Only **one** time step is kept resident in memory—the most recent one—so the
object’s footprint stays small even for long runs.

Time‑related parameters (``timeout``, ``wait_interval`` and the ``poll_delay``
argument of :py:meth:`read_next_step`) are **always expressed in *seconds***.
Pass ``value_in_minutes * 60`` if you prefer to think in minutes.
"""

import os
import time
from pathlib import Path
from typing import Any, Dict, Optional
import numpy as np
import sys
from adios2 import Stream, bindings, Adios

__all__ = [
    "DiffusionProfiles",
]

[docs] class DiffusionProfiles: """Live reader for ``xgc.diffusion_profiles.bp``. Reads diffusion profile data from an Adios BP file (directory) ``xgc.diffusion_profiles.bp``. The constructor waits for a file called "xgc.diffusion_profiles.py" to be created. Once it is found, it opens it as a persistent :class:`adios2.Stream`, drains all existing steps while retaining only the final one. The Stream is left open until it is closed with the member method close(). A blocking request for a new step can be posted with the member method read_next_step. The file ``xgc.diffusion_profiles.bp`` has the following structure: double density n_steps*{n_species, n_samples, n_surf} double flow n_steps*{n_species, n_samples, n_surf} double temp n_steps*{n_species, n_samples, n_surf} int32_t n_samples n_steps*scalar int32_t n_species n_steps*scalar int32_t n_surf n_steps*scalar double psi n_steps*{n_surf} int32_t steps n_steps*{n_samples} double time n_steps*{n_samples} That is, there are (for example) n_steps steps recorded in the BP file. The constructor of this class reads only the last step (i.e. the most recent one). The property latest_step_data returns a dictionary with the following keys: density : A 3D array (shape (n_species, n_samples, n_surf)) of the diffusion density. flow : A 3D array (shape (n_species, n_samples, n_surf) of the parallel mean flow. temp : A 3D array (shape (n_species, n_samples, n_surf)) of temperature (in eV). n_samples : An integer (n_samples) representing the number of time samples. n_species : An integer (default 2) representing the number of particle species. n_surf : An integer (n_surf) representing the number of ψ-surfaces. psi : A 1D array (shape (n_surf,)) containing the poloidal magnetic flux values. steps : A 1D array (shape (n_samples,)) of time step indices. time : A 1D array (shape (n_samples,)) of simulation times (in seconds). Parameters ---------- bp_dir : str | os.PathLike Path to the BP directory written by XGC. The constructor **blocks** until the directory exists (subject to *timeout*). wait_interval : float, optional Polling interval *in seconds* while waiting for the directory. Defaults to **1.0 s**. timeout : float | None, optional Abort after this many *seconds* if the directory still does not exist. ``None`` (default) means *wait indefinitely*. Notes ----- *The object keeps exactly one snapshot in memory.* Callers needing to store a history must copy out the data returned by :pyattr:`latest_step_data`. """ def __init__(self, adios: adios2.Adios, *, bp_dir: os.PathLike | str="./xgc.diffusion_profiles.bp", wait_interval: float = 1.0, timeout: Optional[float] = None, ) -> None: self.path = Path(bp_dir) self._stream: Optional["Stream"] = None self._step_index: int | None = None self._data: Dict[str, np.ndarray] | None = None #self._wait_for_file(wait_interval, timeout) #print(f"Opening {self.path}", flush=True) self.adios = adios self._open_stream(timeout) print(f"Skipping to last time step", flush=True) self._drain_initial_steps(timeout) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ @property def latest_step_data(self) -> Dict[str, np.ndarray]: """Return a **shallow copy** of the dictionary holding the current step. Raises ------ RuntimeError If no step has been read yet (e.g., the writer hasn’t flushed a step to disk). """ if self._data is None: raise RuntimeError("No step has been read yet.") # Return a shallow copy so callers cannot accidentally mutate internals return dict(self._data)
[docs] def read_next_step(self, *, my_timeout: float = 10.0, poll_delay: float = 0.5) -> bool: """Advance the stream by **one** time step. Examples -------- >>> prof = DiffusionProfiles("xgc.diffusion_profiles.bp") >>> prof.read_next_step(block=False) False # no new data yet >>> # ... later >>> prof.read_next_step() True # newest snapshot now loaded Parameters ---------- my_timeout Maximum wait time for a new step; if no new step is found within my_timeout seconds, return False. poll_delay Sleep duration (seconds) between availability checks when *block* is *True*. Returns ------- bool *True* if a new step was successfully read, *False* otherwise. """ if self._stream is None: raise RuntimeError("Stream already closed.") found = False print(f"Waiting for next step in xgc.diffusion_profiles.bp", flush=True) status = self._stream.begin_step(timeout=my_timeout) # Could raise exception on error, but for now just print to let the close() try to call if status == bindings.StepStatus.OtherError: print("Stream error reading profiles", file=sys.stderr) elif status == bindings.StepStatus.NotReady: print("Timed out waiting for profiles step", file=sys.stderror) elif status == bindings.StepStatus.EndOfStream: print("EndofStream found") elif status == bindings.StepStatus.OK: print(f"Found next diffusion profiles step", flush=True) self._read_current_step() self._stream.end_step() found = True return found
[docs] def close(self) -> None: """Close the underlying stream.""" if self._stream is not None: self._stream.close() self._stream = None
# ------------------------------------------------------------------ # Convenience properties – keep backward compatibility # ------------------------------------------------------------------ @property def n_species(self) -> int | None: """Number of species in the **current** snapshot (or ``None``).""" return self._data.get("n_species") if self._data else None @property def n_samples(self) -> int | None: """Number of (time) samples per species in snapshot, else ``None``.""" return self._data.get("n_samples") if self._data else None @property def n_surf(self) -> int | None: """Radial surface count (psi grid size) in snapshot, else ``None``.""" return self._data.get("n_surf") if self._data else None # ------------------------------------------------------------------ # Representation # ------------------------------------------------------------------ def __repr__(self) -> str: """Unambiguous, one‑liner representation for debugging/logging.""" if self._data is None: return f"{self.__class__.__name__}(waiting for data)" def _shape(name: str) -> str: arr = self._data.get(name) return str(arr.shape) if isinstance(arr, np.ndarray) else "N/A" pieces = [ f"step={self._step_index}", f"n_species={self.n_species}", f"n_samples={self.n_samples}", f"n_surf={self.n_surf}", f"density.shape={_shape('density')}", f"psi.shape={_shape('psi')}", ] return f"{self.__class__.__name__}({', '.join(pieces)})" # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _wait_for_file(self, interval: float, timeout: Optional[float]) -> None: """Block until *self.path* exists (optional timeout).""" print(f"Waiting for file {self.path}", flush=True) start = time.monotonic() while not self.path.exists(): if timeout is not None and (time.monotonic() - start) > timeout: raise FileNotFoundError( f"Timed out after {timeout} s waiting for '{self.path}'." ) print(f"Waiting for file {self.path}", flush=True) time.sleep(interval) def _open_stream(self, timeout: Optional[float]) -> None: """Open the BP directory in *Stream* mode (read‑only).""" # NB: we keep a dedicated ADIOS2 Stream object – no context manager on # purpose so that the caller controls the lifetime via `close()`. io = self.adios.declare_io("diagnosis.diffusion_profiles") if io.engine_type().upper() == "FILE": io.set_engine("FileStream") print("Setting default engine FileStream for {0}".format(filename)) if "OpenTimeoutSecs" not in io.parameters(): io.set_parameter("OpenTimeoutSecs", f"{timeout}") print("Setting default timeout = {0} seconds to open {1}".format(timeout, self.path)) print(f"Waiting for file {self.path}", flush=True) self._stream = Stream(io, str(self.path), "r") print(f"File opened", flush=True) #self._stream = Stream(str(self.path), "r") def _drain_initial_steps(self, timeout: float = 10.0) -> None: """Consume all steps currently on disk – retain only the last one.""" assert self._stream is not None def wait_for_next_step(timeout: float = 10.0) -> bool: """Wait for a new step to become available.""" status = self._stream.begin_step(timeout=timeout) if status == bindings.StepStatus.OK: return True elif status == bindings.StepStatus.EndOfStream: print("EndofStream found") return False elif status == bindings.StepStatus.OtherError: raise RuntimeError("Stream error") return False # Read the first step (if available), and loop to discard intermediates if not wait_for_next_step(timeout=timeout): raise RuntimeError("Timed out waiting for the first step in diffusion profiles") while True: print("Reading step (draining)", flush=True) self._read_current_step() self._stream.end_step() # Try to get the next step (within timeout) if not wait_for_next_step(timeout=0): # No more steps – keep current one open print("Last available step retained", flush=True) return def _read_current_step(self) -> None: """Read variables in *the current* step into :pyattr:`_data`.""" assert self._stream is not None # for mypy variables = self._stream.available_variables() data: Dict[str, np.ndarray] = {} for name in variables.keys(): # ADIOS2 returns NumPy arrays directly when shape > 0. data[name] = self._stream.read(name) # Overwrite previous snapshot self._data = data # ``current_step()`` became public in v2.9 – defensive fallback try: self._step_index = self._stream.current_step() except AttributeError: # pragma: no cover self._step_index = (self._step_index or 0) + 1 # ------------------------------------------------------------------ # Context‑manager helpers (optional) # ------------------------------------------------------------------ def __enter__(self): """Enter context manager → returns *self* (stream already open).""" return self def __exit__(self, exc_type, exc, tb): """Ensure the stream is closed when leaving a *with* block.""" self.close()
# ------------------------------------------------------------------------------ # Example usage: # ------------------------------------------------------------------------------ if __name__ == "__main__": # Create an instance of DiffusionProfiles from xgc.diffusion_profiles.bp. diff_profiles = DiffusionProfiles("xgc.diffusion_profiles.bp") # Print some details. print("DiffusionProfiles read from file:") print(diff_profiles) # For example, check the shape of the temperature array. print("Temperature data shape:", diff_profiles.temp.shape) # Check simulation time values. print("Time steps:", diff_profiles.time)