import os
import warnings
import numpy as np
from matplotlib.tri import Triangulation
from xgc_analysis.accessor_mixin import ArrayAccessorMixin
from xgc_analysis.bp_reader_mixin import BPReaderMixin
from xgc_analysis.mesh_data import MeshData
from xgc_analysis.plane_data import PlaneData
[docs]
class FieldData(BPReaderMixin, ArrayAccessorMixin):
"""
Reader for XGC ``xgc.2d.XXXXX.bp`` and ``xgc.3d.XXXXX.bp`` field files.
Data are stored in ``self.data`` with the structure::
self.data[var_name][step_index] = PlaneData | MeshData | scalar | np.ndarray
``step_index`` is a sequential internal index over loaded ``(file_index,
bp_step)`` pairs. Use ``get_step_info(step_index)`` to recover the source
file index and ADIOS step id.
``FieldData`` inherits :class:`ArrayAccessorMixin`, which provides
``get_array(var_name)`` for converting stored values to plain NumPy arrays
for plotting and analysis.
"""
def __init__(
self,
mesh,
work_dir="./",
file_indices=None,
is_axisymmetric=False,
split_n0_turb=False,
variables=None,
read_all_steps=False,
):
"""
Initialize the field-data reader.
Parameters
----------
mesh : Mesh
Mesh object used to construct :class:`PlaneData` / :class:`MeshData`
wrappers.
work_dir : str, optional
Directory containing ``xgc.2d.XXXXX.bp`` / ``xgc.3d.XXXXX.bp`` files.
file_indices : list[int], optional
List of file indices to read. If omitted, reads ``[0]``.
is_axisymmetric : bool, optional
If ``True``, reads ``xgc.2d`` files. Otherwise reads ``xgc.3d`` files.
split_n0_turb : bool, optional
Reserved for future post-processing; currently stored but not used.
variables : list[str] | str | None, optional
Optional explicit variable names to read. When provided, each
requested variable must exist in the first loaded ADIOS step.
If a requested variable is missing in later steps, zeros are
substituted using the first-seen shape/dtype. If ``None``, a
built-in default field-variable list is used.
read_all_steps : bool, optional
If ``True``, load every ADIOS step in each BP file. If ``False``
(default), load only the last ADIOS step in each file.
Returns
-------
None
Reader state is initialized and requested files are loaded into
``self.data``.
"""
self.mesh = mesh
self.work_dir = work_dir
self.file_indices = file_indices if file_indices is not None else [0]
self.is_axisymmetric = is_axisymmetric
self.split_n0_turb = split_n0_turb
self._init_bp_reader_state(variables=variables, read_all_steps=read_all_steps)
self.data = {}
if is_axisymmetric:
default_vars = [
"depsidt",
"dethetadt",
"dpot",
"eden",
"epsi",
"etheta",
"iden",
"pot0",
"time",
]
self.required_vars = (
self.requested_vars if self.requested_vars is not None else default_vars
)
self._read_2d_files(mesh)
else:
default_vars = [
"aparh",
"apars",
"dBphi",
"dBpsi",
"dBtheta",
"dpot",
"eden",
"ejpar",
"epara",
"epara2",
"epsi",
"etheta",
"iden",
"ijpar",
"phi_right",
"pot0",
"time",
]
self.required_vars = (
self.requested_vars if self.requested_vars is not None else default_vars
)
self._read_3d_files(mesh)
def _read_file_steps(self, fname):
"""
Read file steps using field-reader variable selection rules.
Parameters
----------
fname : str
Full path to one ``xgc.2d`` or ``xgc.3d`` BP file.
Returns
-------
dict[int, dict[str, object]]
Mapping from BP step id to variable/value dictionary.
"""
read_vars = self.required_vars if self.requested_vars is not None else None
return super()._read_file_steps(fname, read_vars=read_vars)
def _read_2d_files(self, mesh):
"""
Read ``xgc.2d.XXXXX.bp`` files into ``self.data``.
Parameters
----------
mesh : Mesh
Mesh object used to build :class:`PlaneData` wrappers.
Returns
-------
None
Populates ``self.data`` and ``self.step_index_info`` in place.
2D field arrays are wrapped as :class:`PlaneData` objects.
Scalar values (notably ``time``) are stored directly.
"""
for idx in self.file_indices:
fname = os.path.join(self.work_dir, f"xgc.2d.{idx:05d}.bp")
file_data = self._read_file_steps(fname)
for bp_step in sorted(file_data.keys()):
variables = file_data[bp_step]
step_index = self._register_step(idx, bp_step)
for var, val in self._iter_step_items(
variables,
fname,
bp_step,
ordered_vars=self.required_vars,
default_value=0.0,
):
if var == "time":
self.data.setdefault(var, {})[step_index] = val
continue
if isinstance(val, np.ndarray):
if val.ndim == 2 and val.shape[0] == 1:
val = val[0]
val = PlaneData(
plane=mesh.get_plane(0),
data_array=val,
n_components=1,
dtype=val.dtype,
)
self.data.setdefault(var, {})[step_index] = val
def _read_3d_files(self, mesh):
"""
Read ``xgc.3d.XXXXX.bp`` files into ``self.data``.
Parameters
----------
mesh : Mesh
Mesh object used to build :class:`MeshData` / :class:`PlaneData`
wrappers.
Returns
-------
None
Populates ``self.data`` and ``self.step_index_info`` in place.
3D mesh fields are wrapped as :class:`MeshData` when truly toroidally
varying (shape ``(nphi, n_n)``). Axisymmetric arrays (``(n_n,)`` or
``(1, n_n)``) are stored as :class:`PlaneData` to avoid duplicating data
across planes. Scalars and special arrays (e.g. ``phi_right``) are stored
directly.
"""
for idx in self.file_indices:
fname = os.path.join(self.work_dir, f"xgc.3d.{idx:05d}.bp")
file_data = self._read_file_steps(fname)
for bp_step in sorted(file_data.keys()):
variables = file_data[bp_step]
step_index = self._register_step(idx, bp_step)
for var, val in self._iter_step_items(
variables,
fname,
bp_step,
ordered_vars=self.required_vars,
default_value=0.0,
):
if var == "time":
self.data.setdefault(var, {})[step_index] = val
continue
if var == "phi_right":
self.data.setdefault(var, {})[step_index] = np.squeeze(val)
continue
if isinstance(val, np.ndarray):
if val.ndim == 2 and val.shape[0] == mesh.nphi:
# val shape is (nphi, n_n)
val = MeshData(
mesh,
data_array=val,
n_components=1,
dtype=val.dtype,
mesh_is_axisym=mesh.is_axisymmetric,
)
elif val.ndim == 2 and val.shape[0] == 1:
val = PlaneData(
plane=mesh.get_plane(0),
data_array=val[0],
n_components=1,
dtype=val.dtype,
)
elif val.ndim == 1:
val = PlaneData(
plane=mesh.get_plane(0),
data_array=val,
n_components=1,
dtype=val.dtype,
)
self.data.setdefault(var, {})[step_index] = val
# --- Typed accessors ---
[docs]
def get_field(self, var_name, step_index=0):
"""Return the raw stored field item for a variable/time selection."""
return self.get_item(var_name, step_index)
[docs]
def get_plane_data(self, var_name, step_index=0):
"""Return ``var_name`` at ``step_index`` as :class:`PlaneData`."""
return self.get_as(var_name, step_index, PlaneData)
[docs]
def get_mesh_data(self, var_name, step_index=0):
"""Return ``var_name`` at ``step_index`` as :class:`MeshData`."""
return self.get_as(var_name, step_index, MeshData)
[docs]
def get_scalar(self, var_name, step_index=0):
"""Return ``var_name`` at ``step_index`` as a scalar numeric value."""
return self.get_as(var_name, step_index, (int, float, np.integer, np.floating))
[docs]
def export_vtu(self, mesh):
"""
Export loaded field data to ``.vtu`` files for visualization.
Parameters
----------
mesh : Mesh
Mesh object providing geometry/connectivity for the exported grid.
Returns
-------
None
Writes one ``.vtu`` file per loaded internal step to
``<work_dir>/vtus``.
"""
import pyvista as pv
# Ensure the output directory for VTK files exists
vtu_dir = os.path.join(self.work_dir, "vtus")
if not os.path.exists(vtu_dir):
os.makedirs(vtu_dir)
step_indices = self.list_step_indices()
if not step_indices:
warnings.warn("No field data loaded; skipping VTK export.", UserWarning)
return
# Warn if multiple file indices are used, as this may use significant memory
if len(self.file_indices) > 1:
warnings.warn(
"`export_vtu()` is called with multiple file_indices. "
"This will work, but may require a large amount of memory if you plan to generate many VTK files. "
"Consider instantiating separate FieldData objects for individual VTK file indices.",
UserWarning,
)
# Generate triangle connectivity (could be replaced with TriangulationData from plane.py)
tri_obj = Triangulation(mesh.rz[:, 0], mesh.rz[:, 1], mesh.nd_connect_list)
# Prepare 3D points for VTK: 2D poloidal points with a zero Z component (VTK expects 3D coordinates)
points_vtu = np.column_stack([mesh.rz[:, 0], mesh.rz[:, 1], np.zeros_like(mesh.rz[:, 0])])
# Prepare cells for VTK
num_cells = tri_obj.triangles.shape[0]
cell_types = np.full(len(tri_obj.triangles), pv.CellType.TRIANGLE, dtype=np.uint8)
cell_array = np.hstack([
np.full((num_cells, 1), 3),
tri_obj.triangles,
]).astype(np.int64)
# Choose filename format based on dimensionality
dim = "2d" if self.is_axisymmetric else "3d"
steps_per_file = {}
for info in self.step_index_info.values():
file_idx = info["file_index"]
steps_per_file[file_idx] = steps_per_file.get(file_idx, 0) + 1
has_multi_step_file = any(count > 1 for count in steps_per_file.values())
for step_index in step_indices:
info = self.step_index_info.get(step_index, {
"file_index": step_index,
"bp_step": step_index,
})
file_idx = int(info["file_index"])
bp_step = int(info["bp_step"])
if has_multi_step_file:
fname = os.path.join(vtu_dir, f"xgc.{dim}.{file_idx:05d}.s{bp_step:05d}.vtu")
else:
fname = os.path.join(vtu_dir, f"xgc.{dim}.{file_idx:05d}.vtu")
# Create unstructured grid
grid = pv.UnstructuredGrid(cell_array, cell_types, points_vtu)
# -----------------------------------------------------
# Dataset 1: Point data on XGC poloidal grid
# expected shape: [nindices, nphi, nnode]
# -----------------------------------------------------
for var in self.required_vars:
if var in ("time", "phi_right"):
continue
if var not in self.data or step_index not in self.data[var]:
continue
datum = self.data[var][step_index]
if isinstance(datum, MeshData):
data_array = datum.get_data()[0]
elif isinstance(datum, PlaneData):
data_array = datum.get_data()
else:
continue
grid.point_data[var] = data_array
# -----------------------------------------------------
# Dataset 2: Global field data
# expected shape: [nindices]
# -----------------------------------------------------
if "time" in self.data and step_index in self.data["time"]:
time_value = float(np.asarray(self.data["time"][step_index]).squeeze())
grid.field_data["time"] = np.array([time_value], dtype=np.float64)
# -----------------------------------------------------
# Dataset 3: Additional metadata
# -----------------------------------------------------
grid.field_data["step"] = np.array([bp_step], dtype=np.int32)
grid.field_data["file_index"] = np.array([file_idx], dtype=np.int32)
# Save to .vtu file
grid.save(fname)