from pathlib import Path
from typing import Any, Dict, Optional
import numpy as np
import adios2
from adios2 import bindings, FileReader, Stream, Adios
[docs]
class DiffusionCoefficients:
"""
Stores the profiles of diffusion model coefficients defined as functions
of the poloidal magnetic flux ψ.
The four diffusion coefficients are:
- ptl_diffusivity (m^2/s)
- momentum_diffusivity (m^2/s)
- heat_conductivity (m^2/s)
- ptl_pinch_velocity (m/s)
The data layout of each coefficient is (nspecies, npsi) where the second dimension
(the ψ-grid) is contiguous in memory. Also the ψ–grid is stored as a 1D NumPy array of length npsi.
Parameters:
nspecies: Number of particle species (default: 2).
npsi: Number of points in the ψ–grid.
"""
def __init__(self, adios: adios2.Adios, *, timeout=10.0, nspecies=2, npsi=100):
self.bp_path = Path("xgc.diffusion_coeff.bp")
self.adios = adios
self._stream: Optional["Stream"] = None
if self.bp_path.exists():
# ----------------------------------------------------------------
# Read LAST step with the ADIOS2 v2.10 *FileReader* API ----------
# ----------------------------------------------------------------
with FileReader(str(self.bp_path)) as reader:
n_steps = reader.num_steps() # total number of steps
if n_steps == 0:
raise RuntimeError(
f"{bp_path} contains no data steps – cannot initialise “DiffusionCoefficients”.")
last_step = n_steps - 1
steps = [last_step,1]
# Scalar / grid variables ----------------------------------
n_species = int(reader.read("n_species", step_selection=steps))
psi_grid = reader.read("psi", step_selection=steps).astype(np.float64)
npsi_file = psi_grid.size
# Coefficient variables ------------------------------------
coeff_names = [
"ptl_diffusivity",
"momentum_diffusivity",
"heat_conductivity",
"ptl_pinch_velocity",
]
species_suffixes = [
"_elec", "_ion", "_imp1", "_imp2", "_imp3", "_imp4", "_imp5"
][:n_species]
coeff_data = {
name: np.zeros((n_species, npsi_file), dtype=np.float64)
for name in coeff_names
}
for s_idx, suffix in enumerate(species_suffixes):
for name in coeff_names:
var_name = name + suffix
coeff_data[name][s_idx, :] = reader.read(var_name, step_selection=steps)
# Expose as attributes ------------------------------------------
self.nspecies = n_species
self.npsi = npsi_file
self.psi = psi_grid
self.initialized = True
self.ptl_diffusivity = coeff_data["ptl_diffusivity"]
self.momentum_diffusivity = coeff_data["momentum_diffusivity"]
self.heat_conductivity = coeff_data["heat_conductivity"]
self.ptl_pinch_velocity = coeff_data["ptl_pinch_velocity"]
else:
# ----------------------------------------------------------------
# No existing file – allocate zero‑filled arrays -----------------
# ----------------------------------------------------------------
self.nspecies = int(nspecies)
self.npsi = int(npsi)
self.psi = np.zeros(self.npsi, dtype=np.float64)
self.initialized = False
self.ptl_diffusivity = np.zeros((self.nspecies, self.npsi))
self.momentum_diffusivity = np.zeros((self.nspecies, self.npsi))
self.heat_conductivity = np.zeros((self.nspecies, self.npsi))
self.ptl_pinch_velocity = np.zeros((self.nspecies, self.npsi))
self._open_stream(timeout)
[docs]
def write_to_file(self, filename="xgc.diffusion_coeff.bp", gstep=-1):
"""
Writes the diffusion coefficient profiles and the ψ–grid to an Adios BP file as a new step.
For each diffusion coefficient, the data for each species is split into an individual variable
using a species suffix. The species suffixes (for species indices 0 through 6) are:
0: _elec
1: _ion
2: _imp1
3: _imp2
4: _imp3
5: _imp4
6: _imp5
In addition, the file will store:
- n_species: a scalar integer (int64).
- psi: a 1D array (of length npsi).
The method opens the specified file (filename) using the Adios2 FileWriter in append mode,
writes all the individual diffusion profiles, and then ends the step.
"""
# Define species suffixes.
species_suffixes = ["_elec", "_ion", "_imp1", "_imp2", "_imp3", "_imp4", "_imp5"]
#IOName = "input.diffusion_coefficients"
#io = self.adios.declare_io(IOName)
#if io.engine_type().upper() == "FILE":
# print("Setting default engine FileStream for {0}".format(filename))
# io.set_engine("FileStream")
# Open the file for writing (appending a new step)
#with Stream(io, filename, "a") as writer:
self._stream.begin_step()
# Define a list of tuples: (coefficient base name, corresponding array)
coeffs = [
("ptl_diffusivity", self.ptl_diffusivity),
("momentum_diffusivity", self.momentum_diffusivity),
("heat_conductivity", self.heat_conductivity),
("ptl_pinch_velocity", self.ptl_pinch_velocity)
]
# Loop over coefficients and species.
for coeff_name, coeff_array in coeffs:
for i in range(self.nspecies):
# Form the variable name using the base name and species suffix.
var_name = coeff_name + species_suffixes[i]
# Extract the data for this species (a 1D array of length npsi).
data = coeff_array[i, :]
self._stream.write(var_name, data, shape=data.shape, start=(0,), count=(self.npsi,))
# Write out the number of species as an int64 scalar.
self._stream.write("n_species", self.nspecies)
# Write out the psi–grid as a 1D double array.
self._stream.write("psi", self.psi, shape=data.shape, start=(0,), count=(self.npsi,))
# Write the time step index to which this data belongs
self._stream.write("gstep", gstep)
self._stream.end_step()
#self.adios.remove_io(IOName)
[docs]
def close(self) -> None:
"""Close the underlying stream."""
if self._stream is not None:
self._stream.close()
self._stream = None
def _open_stream(self, timeout: Optional[float]) -> None:
"""Open the BP directory in *Stream* mode (read‑only)."""
# NB: we keep a dedicated ADIOS2 Stream object – no context manager on
# purpose so that the caller controls the lifetime via `close()`.
io = self.adios.declare_io("input.diffusion_coefficients")
if io.engine_type().upper() == "FILE":
io.set_engine("FileStream")
print("Setting default engine FileStream for {0}".format(self.bp_path))
if "OpenTimeoutSecs" not in io.parameters():
io.set_parameter("OpenTimeoutSecs", f"{timeout}")
print("Setting default timeout = {0} seconds to open {1}".format(timeout, self.bp_path))
print(f"Waiting for file {self.bp_path}", flush=True)
if self.bp_path.exists():
self._stream = Stream(io, str(self.bp_path), "a")
else:
self._stream = Stream(io, str(self.bp_path), "w")
print(f"File opened", flush=True)