Source code for xgc_analysis.field_data

import os
import warnings

import numpy as np
from matplotlib.tri import Triangulation

from xgc_analysis.accessor_mixin import ArrayAccessorMixin
from xgc_analysis.bp_reader_mixin import BPReaderMixin
from xgc_analysis.mesh_data import MeshData
from xgc_analysis.plane_data import PlaneData


[docs] class FieldData(BPReaderMixin, ArrayAccessorMixin): """ Reader for XGC ``xgc.2d.XXXXX.bp`` and ``xgc.3d.XXXXX.bp`` field files. Data are stored in ``self.data`` with the structure:: self.data[var_name][step_index] = PlaneData | MeshData | scalar | np.ndarray ``step_index`` is a sequential internal index over loaded ``(file_index, bp_step)`` pairs. Use ``get_step_info(step_index)`` to recover the source file index and ADIOS step id. ``FieldData`` inherits :class:`ArrayAccessorMixin`, which provides ``get_array(var_name)`` for converting stored values to plain NumPy arrays for plotting and analysis. """ def __init__( self, mesh, work_dir="./", file_indices=None, is_axisymmetric=False, split_n0_turb=False, variables=None, read_all_steps=False, ): """ Initialize the field-data reader. Parameters ---------- mesh : Mesh Mesh object used to construct :class:`PlaneData` / :class:`MeshData` wrappers. work_dir : str, optional Directory containing ``xgc.2d.XXXXX.bp`` / ``xgc.3d.XXXXX.bp`` files. file_indices : list[int], optional List of file indices to read. If omitted, reads ``[0]``. is_axisymmetric : bool, optional If ``True``, reads ``xgc.2d`` files. Otherwise reads ``xgc.3d`` files. split_n0_turb : bool, optional Reserved for future post-processing; currently stored but not used. variables : list[str] | str | None, optional Optional explicit variable names to read. When provided, each requested variable must exist in the first loaded ADIOS step. If a requested variable is missing in later steps, zeros are substituted using the first-seen shape/dtype. If ``None``, a built-in default field-variable list is used. read_all_steps : bool, optional If ``True``, load every ADIOS step in each BP file. If ``False`` (default), load only the last ADIOS step in each file. Returns ------- None Reader state is initialized and requested files are loaded into ``self.data``. """ self.mesh = mesh self.work_dir = work_dir self.file_indices = file_indices if file_indices is not None else [0] self.is_axisymmetric = is_axisymmetric self.split_n0_turb = split_n0_turb self._init_bp_reader_state(variables=variables, read_all_steps=read_all_steps) self.data = {} if is_axisymmetric: default_vars = [ "depsidt", "dethetadt", "dpot", "eden", "epsi", "etheta", "iden", "pot0", "time", ] self.required_vars = ( self.requested_vars if self.requested_vars is not None else default_vars ) self._read_2d_files(mesh) else: default_vars = [ "aparh", "apars", "dBphi", "dBpsi", "dBtheta", "dpot", "eden", "ejpar", "epara", "epara2", "epsi", "etheta", "iden", "ijpar", "phi_right", "pot0", "time", ] self.required_vars = ( self.requested_vars if self.requested_vars is not None else default_vars ) self._read_3d_files(mesh) def _read_file_steps(self, fname): """ Read file steps using field-reader variable selection rules. Parameters ---------- fname : str Full path to one ``xgc.2d`` or ``xgc.3d`` BP file. Returns ------- dict[int, dict[str, object]] Mapping from BP step id to variable/value dictionary. """ read_vars = self.required_vars if self.requested_vars is not None else None return super()._read_file_steps(fname, read_vars=read_vars) def _read_2d_files(self, mesh): """ Read ``xgc.2d.XXXXX.bp`` files into ``self.data``. Parameters ---------- mesh : Mesh Mesh object used to build :class:`PlaneData` wrappers. Returns ------- None Populates ``self.data`` and ``self.step_index_info`` in place. 2D field arrays are wrapped as :class:`PlaneData` objects. Scalar values (notably ``time``) are stored directly. """ for idx in self.file_indices: fname = os.path.join(self.work_dir, f"xgc.2d.{idx:05d}.bp") file_data = self._read_file_steps(fname) for bp_step in sorted(file_data.keys()): variables = file_data[bp_step] step_index = self._register_step(idx, bp_step) for var, val in self._iter_step_items( variables, fname, bp_step, ordered_vars=self.required_vars, default_value=0.0, ): if var == "time": self.data.setdefault(var, {})[step_index] = val continue if isinstance(val, np.ndarray): if val.ndim == 2 and val.shape[0] == 1: val = val[0] val = PlaneData( plane=mesh.get_plane(0), data_array=val, n_components=1, dtype=val.dtype, ) self.data.setdefault(var, {})[step_index] = val def _read_3d_files(self, mesh): """ Read ``xgc.3d.XXXXX.bp`` files into ``self.data``. Parameters ---------- mesh : Mesh Mesh object used to build :class:`MeshData` / :class:`PlaneData` wrappers. Returns ------- None Populates ``self.data`` and ``self.step_index_info`` in place. 3D mesh fields are wrapped as :class:`MeshData` when truly toroidally varying (shape ``(nphi, n_n)``). Axisymmetric arrays (``(n_n,)`` or ``(1, n_n)``) are stored as :class:`PlaneData` to avoid duplicating data across planes. Scalars and special arrays (e.g. ``phi_right``) are stored directly. """ for idx in self.file_indices: fname = os.path.join(self.work_dir, f"xgc.3d.{idx:05d}.bp") file_data = self._read_file_steps(fname) for bp_step in sorted(file_data.keys()): variables = file_data[bp_step] step_index = self._register_step(idx, bp_step) for var, val in self._iter_step_items( variables, fname, bp_step, ordered_vars=self.required_vars, default_value=0.0, ): if var == "time": self.data.setdefault(var, {})[step_index] = val continue if var == "phi_right": self.data.setdefault(var, {})[step_index] = np.squeeze(val) continue if isinstance(val, np.ndarray): if val.ndim == 2 and val.shape[0] == mesh.nphi: # val shape is (nphi, n_n) val = MeshData( mesh, data_array=val, n_components=1, dtype=val.dtype, mesh_is_axisym=mesh.is_axisymmetric, ) elif val.ndim == 2 and val.shape[0] == 1: val = PlaneData( plane=mesh.get_plane(0), data_array=val[0], n_components=1, dtype=val.dtype, ) elif val.ndim == 1: val = PlaneData( plane=mesh.get_plane(0), data_array=val, n_components=1, dtype=val.dtype, ) self.data.setdefault(var, {})[step_index] = val # --- Typed accessors ---
[docs] def get_field(self, var_name, step_index=0): """Return the raw stored field item for a variable/time selection.""" return self.get_item(var_name, step_index)
[docs] def get_plane_data(self, var_name, step_index=0): """Return ``var_name`` at ``step_index`` as :class:`PlaneData`.""" return self.get_as(var_name, step_index, PlaneData)
[docs] def get_mesh_data(self, var_name, step_index=0): """Return ``var_name`` at ``step_index`` as :class:`MeshData`.""" return self.get_as(var_name, step_index, MeshData)
[docs] def get_scalar(self, var_name, step_index=0): """Return ``var_name`` at ``step_index`` as a scalar numeric value.""" return self.get_as(var_name, step_index, (int, float, np.integer, np.floating))
[docs] def export_vtu(self, mesh): """ Export loaded field data to ``.vtu`` files for visualization. Parameters ---------- mesh : Mesh Mesh object providing geometry/connectivity for the exported grid. Returns ------- None Writes one ``.vtu`` file per loaded internal step to ``<work_dir>/vtus``. """ import pyvista as pv # Ensure the output directory for VTK files exists vtu_dir = os.path.join(self.work_dir, "vtus") if not os.path.exists(vtu_dir): os.makedirs(vtu_dir) step_indices = self.list_step_indices() if not step_indices: warnings.warn("No field data loaded; skipping VTK export.", UserWarning) return # Warn if multiple file indices are used, as this may use significant memory if len(self.file_indices) > 1: warnings.warn( "`export_vtu()` is called with multiple file_indices. " "This will work, but may require a large amount of memory if you plan to generate many VTK files. " "Consider instantiating separate FieldData objects for individual VTK file indices.", UserWarning, ) # Generate triangle connectivity (could be replaced with TriangulationData from plane.py) tri_obj = Triangulation(mesh.rz[:, 0], mesh.rz[:, 1], mesh.nd_connect_list) # Prepare 3D points for VTK: 2D poloidal points with a zero Z component (VTK expects 3D coordinates) points_vtu = np.column_stack([mesh.rz[:, 0], mesh.rz[:, 1], np.zeros_like(mesh.rz[:, 0])]) # Prepare cells for VTK num_cells = tri_obj.triangles.shape[0] cell_types = np.full(len(tri_obj.triangles), pv.CellType.TRIANGLE, dtype=np.uint8) cell_array = np.hstack([ np.full((num_cells, 1), 3), tri_obj.triangles, ]).astype(np.int64) # Choose filename format based on dimensionality dim = "2d" if self.is_axisymmetric else "3d" steps_per_file = {} for info in self.step_index_info.values(): file_idx = info["file_index"] steps_per_file[file_idx] = steps_per_file.get(file_idx, 0) + 1 has_multi_step_file = any(count > 1 for count in steps_per_file.values()) for step_index in step_indices: info = self.step_index_info.get(step_index, { "file_index": step_index, "bp_step": step_index, }) file_idx = int(info["file_index"]) bp_step = int(info["bp_step"]) if has_multi_step_file: fname = os.path.join(vtu_dir, f"xgc.{dim}.{file_idx:05d}.s{bp_step:05d}.vtu") else: fname = os.path.join(vtu_dir, f"xgc.{dim}.{file_idx:05d}.vtu") # Create unstructured grid grid = pv.UnstructuredGrid(cell_array, cell_types, points_vtu) # ----------------------------------------------------- # Dataset 1: Point data on XGC poloidal grid # expected shape: [nindices, nphi, nnode] # ----------------------------------------------------- for var in self.required_vars: if var in ("time", "phi_right"): continue if var not in self.data or step_index not in self.data[var]: continue datum = self.data[var][step_index] if isinstance(datum, MeshData): data_array = datum.get_data()[0] elif isinstance(datum, PlaneData): data_array = datum.get_data() else: continue grid.point_data[var] = data_array # ----------------------------------------------------- # Dataset 2: Global field data # expected shape: [nindices] # ----------------------------------------------------- if "time" in self.data and step_index in self.data["time"]: time_value = float(np.asarray(self.data["time"][step_index]).squeeze()) grid.field_data["time"] = np.array([time_value], dtype=np.float64) # ----------------------------------------------------- # Dataset 3: Additional metadata # ----------------------------------------------------- grid.field_data["step"] = np.array([bp_step], dtype=np.int32) grid.field_data["file_index"] = np.array([file_idx], dtype=np.int32) # Save to .vtu file grid.save(fname)