Source code for xgc_analysis.DiffusionCoefficients

from pathlib import Path
from typing import Any, Dict, Optional
import numpy as np
import adios2
from adios2 import bindings, FileReader, Stream, Adios

[docs] class DiffusionCoefficients: """ Stores the profiles of diffusion model coefficients defined as functions of the poloidal magnetic flux ψ. The four diffusion coefficients are: - ptl_diffusivity (m^2/s) - momentum_diffusivity (m^2/s) - heat_conductivity (m^2/s) - ptl_pinch_velocity (m/s) The data layout of each coefficient is (nspecies, npsi) where the second dimension (the ψ-grid) is contiguous in memory. Also the ψ–grid is stored as a 1D NumPy array of length npsi. Parameters: nspecies: Number of particle species (default: 2). npsi: Number of points in the ψ–grid. """ def __init__(self, adios: adios2.Adios, *, timeout=10.0, nspecies=2, npsi=100): self.bp_path = Path("xgc.diffusion_coeff.bp") self.adios = adios self._stream: Optional["Stream"] = None if self.bp_path.exists(): # ---------------------------------------------------------------- # Read LAST step with the ADIOS2 v2.10 *FileReader* API ---------- # ---------------------------------------------------------------- with FileReader(str(self.bp_path)) as reader: n_steps = reader.num_steps() # total number of steps if n_steps == 0: raise RuntimeError( f"{bp_path} contains no data steps – cannot initialise “DiffusionCoefficients”.") last_step = n_steps - 1 steps = [last_step,1] # Scalar / grid variables ---------------------------------- n_species = int(reader.read("n_species", step_selection=steps)) psi_grid = reader.read("psi", step_selection=steps).astype(np.float64) npsi_file = psi_grid.size # Coefficient variables ------------------------------------ coeff_names = [ "ptl_diffusivity", "momentum_diffusivity", "heat_conductivity", "ptl_pinch_velocity", ] species_suffixes = [ "_elec", "_ion", "_imp1", "_imp2", "_imp3", "_imp4", "_imp5" ][:n_species] coeff_data = { name: np.zeros((n_species, npsi_file), dtype=np.float64) for name in coeff_names } for s_idx, suffix in enumerate(species_suffixes): for name in coeff_names: var_name = name + suffix coeff_data[name][s_idx, :] = reader.read(var_name, step_selection=steps) # Expose as attributes ------------------------------------------ self.nspecies = n_species self.npsi = npsi_file self.psi = psi_grid self.initialized = True self.ptl_diffusivity = coeff_data["ptl_diffusivity"] self.momentum_diffusivity = coeff_data["momentum_diffusivity"] self.heat_conductivity = coeff_data["heat_conductivity"] self.ptl_pinch_velocity = coeff_data["ptl_pinch_velocity"] else: # ---------------------------------------------------------------- # No existing file – allocate zero‑filled arrays ----------------- # ---------------------------------------------------------------- self.nspecies = int(nspecies) self.npsi = int(npsi) self.psi = np.zeros(self.npsi, dtype=np.float64) self.initialized = False self.ptl_diffusivity = np.zeros((self.nspecies, self.npsi)) self.momentum_diffusivity = np.zeros((self.nspecies, self.npsi)) self.heat_conductivity = np.zeros((self.nspecies, self.npsi)) self.ptl_pinch_velocity = np.zeros((self.nspecies, self.npsi)) self._open_stream(timeout)
[docs] def write_to_file(self, filename="xgc.diffusion_coeff.bp", gstep=-1): """ Writes the diffusion coefficient profiles and the ψ–grid to an Adios BP file as a new step. For each diffusion coefficient, the data for each species is split into an individual variable using a species suffix. The species suffixes (for species indices 0 through 6) are: 0: _elec 1: _ion 2: _imp1 3: _imp2 4: _imp3 5: _imp4 6: _imp5 In addition, the file will store: - n_species: a scalar integer (int64). - psi: a 1D array (of length npsi). The method opens the specified file (filename) using the Adios2 FileWriter in append mode, writes all the individual diffusion profiles, and then ends the step. """ # Define species suffixes. species_suffixes = ["_elec", "_ion", "_imp1", "_imp2", "_imp3", "_imp4", "_imp5"] #IOName = "input.diffusion_coefficients" #io = self.adios.declare_io(IOName) #if io.engine_type().upper() == "FILE": # print("Setting default engine FileStream for {0}".format(filename)) # io.set_engine("FileStream") # Open the file for writing (appending a new step) #with Stream(io, filename, "a") as writer: self._stream.begin_step() # Define a list of tuples: (coefficient base name, corresponding array) coeffs = [ ("ptl_diffusivity", self.ptl_diffusivity), ("momentum_diffusivity", self.momentum_diffusivity), ("heat_conductivity", self.heat_conductivity), ("ptl_pinch_velocity", self.ptl_pinch_velocity) ] # Loop over coefficients and species. for coeff_name, coeff_array in coeffs: for i in range(self.nspecies): # Form the variable name using the base name and species suffix. var_name = coeff_name + species_suffixes[i] # Extract the data for this species (a 1D array of length npsi). data = coeff_array[i, :] self._stream.write(var_name, data, shape=data.shape, start=(0,), count=(self.npsi,)) # Write out the number of species as an int64 scalar. self._stream.write("n_species", self.nspecies) # Write out the psi–grid as a 1D double array. self._stream.write("psi", self.psi, shape=data.shape, start=(0,), count=(self.npsi,)) # Write the time step index to which this data belongs self._stream.write("gstep", gstep) self._stream.end_step()
#self.adios.remove_io(IOName)
[docs] def close(self) -> None: """Close the underlying stream.""" if self._stream is not None: self._stream.close() self._stream = None
def _open_stream(self, timeout: Optional[float]) -> None: """Open the BP directory in *Stream* mode (read‑only).""" # NB: we keep a dedicated ADIOS2 Stream object – no context manager on # purpose so that the caller controls the lifetime via `close()`. io = self.adios.declare_io("input.diffusion_coefficients") if io.engine_type().upper() == "FILE": io.set_engine("FileStream") print("Setting default engine FileStream for {0}".format(self.bp_path)) if "OpenTimeoutSecs" not in io.parameters(): io.set_parameter("OpenTimeoutSecs", f"{timeout}") print("Setting default timeout = {0} seconds to open {1}".format(timeout, self.bp_path)) print(f"Waiting for file {self.bp_path}", flush=True) if self.bp_path.exists(): self._stream = Stream(io, str(self.bp_path), "a") else: self._stream = Stream(io, str(self.bp_path), "w") print(f"File opened", flush=True)