import os
import numpy as np
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Any
from xgc_analysis.constants import echarge, atomic_mass_unit
from xgc_analysis.periodic_table import get_element_name_by_mass # You’ll need to implement or import this.
[docs]
class Species:
"""
Class representing a plasma species with basic physical properties.
"""
def __init__(self, sim, index):
"""
Initialize a Species object.
Parameters:
- sim: Simulation object containing the input parameters.
- index: Index of the species in the parameter lists.
"""
ptl_param = sim.input_params.get("ptl_param", {})
# Extract parameters from Simulation input
mass_au = ptl_param["ptl_mass_au"][index] # mass in atomic units
charge_eu = ptl_param["ptl_charge_eu"][index] # charge in units of e
marker_type = ptl_param["ptl_marker_type"][index]
dynamic_f0 = ptl_param["ptl_dynamic_f0"][index]
# Derived quantities
self.mass_au = mass_au
self.mass_kg = mass_au * atomic_mass_unit
self.charge_eu = charge_eu
self.charge_C = charge_eu * echarge
self.name = get_element_name_by_mass(mass_au)
self.species_type = "None" if marker_type == 0 else marker_type
self.dynamic_f0 = dynamic_f0
# ----------------------------------------------------------------------
# 0. A tiny container that keeps one profile together
# ----------------------------------------------------------------------
@dataclass
class InitialProfile:
psi_n: np.ndarray
value: np.ndarray
type: int | None = None # used only for flow
# ----------------------------------------------------------------------
# 1. Utility for reading the ASCII profile files
# ----------------------------------------------------------------------
def _read_profile_file(path: str) -> tuple[np.ndarray, np.ndarray]:
"""Read an XGC‑style 1‑D profile file (see user spec)."""
with open(path, "r", encoding="utf‑8") as fp:
n_lines = int(fp.readline().strip())
psi, val = np.loadtxt(
[next(fp) for _ in range(n_lines)], # read exactly `n_lines` rows
unpack=True)
# consume the trailing “-1” sentinel so that the file pointer is
# at EOF even if somebody decides to reuse it.
fp.readline()
return psi, val
# ----------------------------------------------------------------------
# 2. Scan the eq_param namelist once, then build per‑species dictionaries
# ----------------------------------------------------------------------
eq_param: Dict[str, Any] = sim.input_params.get("eq_param", {})
sml_param: Dict[str, Any] = sim.input_params.get("sml_param", {})
# Every key we care about in one place
SHAPE_KEYS = [
"eq_dens_shape", "eq_temp_shape", "eq_flow_shape",
"eq_dens_diff_shape", "eq_pinch_v_shape",
"eq_flow_diff_shape", "eq_t_diff_shape",
]
FILE_KEY_TO_LABEL = {
"eq_dens_file": "density",
"eq_temp_file": "temperature",
"eq_flow_file": "flow",
"eq_dens_diff_file": "ptl_diffusivity",
"eq_pinch_v_file": "ptl_pinch_velocity",
"eq_flow_diff_file": "momentum_diffusivity",
"eq_t_diff_file": "heat_conductivity",
}
eq_param = sim.input_params.get("eq_param", {})
sml_param = sim.input_params.get("sml_param", {})
# ----------------------------------------------------------------------
# Compute the directory that holds the profile files
# ----------------------------------------------------------------------
# • simulation.data_directory is guaranteed to be a usable base.
# • sml_input_dir may be:
# - missing → use only data_directory
# - a plain string (e.g. "./input_dir")
# - a list/tuple/ndarray (because namelist arrays are parsed that way)
# We normalise all of those into one clean Path.
# ----------------------------------------------------------------------
input_dir_raw = sml_param.get("sml_input_file_dir", "")
# If it’s a list/tuple/array pick the FIRST entry; namelists are 1‑based,
# but here the directory is *global*, not per species.
if isinstance(input_dir_raw, (list, tuple)):
input_dir_raw = input_dir_raw[0] if input_dir_raw else ""
# Cast to str, strip quotes/whitespace, then make a Path.
input_dir = Path(str(input_dir_raw).strip().strip('"').strip("'"))
base_dir = Path(sim.data_directory).expanduser().resolve()
if input_dir != Path(".") and str(input_dir) != "":
base_dir = (base_dir / input_dir).resolve()
i_sp = index # for readability
flow_type = eq_param.get("eq_flow_type", []) # may be absent/short
self.initial_profiles: Dict[str, InitialProfile] = {}
for shape_key in SHAPE_KEYS:
shapes = eq_param.get(shape_key)
if not shapes or i_sp >= len(shapes):
continue # parameter missing or too short
if shapes[i_sp] != -1:
continue # profile not file‑based
file_key = shape_key.replace("_shape", "_file")
label = FILE_KEY_TO_LABEL[file_key]
file_paths = eq_param.get(file_key, [])
f_name = str(file_paths[i_sp]).strip() if i_sp < len(file_paths) else ""
if not f_name:
# "_shape == -1" but filename missing → silently skip, per spec
continue
f_path = os.path.join(base_dir, f_name)
try:
psi_n, vals = _read_profile_file(f_path)
except FileNotFoundError as err:
raise FileNotFoundError(
f"Initial‑profile file '{f_path}' not found for species #{i_sp}."
) from err
self.initial_profiles[label] = InitialProfile(
psi_n=psi_n,
value=vals,
type=int(flow_type[i_sp]) if label == "flow" and i_sp < len(flow_type) else None
)
def __repr__(self):
return (f"Species(name={self.name}, type={self.species_type}, mass={self.mass_au:.2f} amu, "
f"{self.mass_kg:.2e} kg, charge={self.charge_eu} e, {self.charge_C:.2e} C, "
f"dynamic_f0={self.dynamic_f0})")
# --------------------------------------------------------------
# Vectorised interpolation of an initial profile (simplified)
# --------------------------------------------------------------
[docs]
def interpolate_profile(
self,
name: str,
psi_new: np.ndarray,
*,
fill: str | float | None = "edge",
) -> np.ndarray:
"""
Interpolate one of this species’ initial profiles onto a new ψ grid.
Parameters
----------
name : str
Profile key in ``self.initial_profiles``
(“density”, “temperature”, “flow”, …).
psi_new : ndarray
Target ψ‑grid (1‑D NumPy array).
fill : {"edge", None, float}, default "edge"
How to handle ψ outside the original range:
• "edge" – use nearest edge value (default)
• None – raise ValueError if extrapolation needed
• float – fill with that scalar
Returns
-------
ndarray
Interpolated profile values at ``psi_new``.
"""
# ---------- fetch the stored profile ----------
# ---------- return zeros if the profile is absent ----------
if name not in self.initial_profiles:
print(f"Unknown profile '{name}'. ")
print(f"Available: {list(self.initial_profiles)}")
return np.zeros_like(psi_new, dtype=float)
# ----------------------------------------------------------------
prof = self.initial_profiles[name]
x, y = prof.psi_n, prof.value
if x[0] > x[-1]: # ensure ascending order for np.interp
x, y = x[::-1], y[::-1]
# ---------- decide on fill strategy ----------
if fill == "edge":
left, right = y[0], y[-1]
elif fill is None:
if (psi_new < x[0]).any() or (psi_new > x[-1]).any():
raise ValueError(
f"psi_new extends outside [{x[0]}, {x[-1]}] and "
f"fill=None was requested."
)
left = right = np.nan # never used because no extrapolation
else: # float
left = right = float(fill)
# ---------- vectorised interpolation ----------
return np.interp(psi_new, x, y, left=left, right=right)