import os
import warnings
import numpy as np
from matplotlib.tri import Triangulation
from xgc_analysis.mesh_data import MeshData
from xgc_analysis.read_bp_file import ReadBPFile
[docs]
class FieldData:
def __init__(self, mesh, work_dir="./", file_indices=None, is_axisymmetric=False, split_n0_turb=False):
self.work_dir = work_dir
self.file_indices = file_indices if file_indices is not None else [0]
self.is_axisymmetric = is_axisymmetric
self.split_n0_turb = split_n0_turb
self.data = {}
if is_axisymmetric:
self.required_vars = [
"depsidt", "dethetadt", "dpot", "eden",
"epsi", "etheta", "iden", "pot0", "time"
]
self._read_2d_files(mesh)
else:
self.required_vars = [
"aparh", "apars", "dBphi", "dBpsi", "dBtheta",
"dpot", "eden", "ejpar", "epara", "epara2",
"epsi", "etheta", "iden", "ijpar", "phi_right",
"pot0", "time"
]
self._read_3d_files(mesh)
def _read_2d_files(self, mesh):
for i, idx in enumerate(self.file_indices):
fname = os.path.join(self.work_dir, f"xgc.2d.{idx:05d}.bp")
file_data = ReadBPFile(fname)
for step, variables in file_data.items():
for var in self.required_vars:
val = variables.get(var, 0.0)
if var == "time":
self.data.setdefault(var, {})[i] = val
continue
if isinstance(val, np.ndarray):
if val.ndim == 2 and val.shape[0] == 1:
val = val[0]
# Wrap in MeshData with 1 plane
val = MeshData(
mesh,
data_array=val[np.newaxis,:],
n_components=1,
dtype=val.dtype,
mesh_is_axisym=True
)
self.data.setdefault(var, {})[i] = val
def _read_3d_files(self, mesh):
for i, idx in enumerate(self.file_indices):
fname = os.path.join(self.work_dir, f"xgc.3d.{idx:05d}.bp")
file_data = ReadBPFile(fname)
for step, variables in file_data.items():
for var in self.required_vars:
val = variables.get(var, 0.0)
if var == "time":
self.data.setdefault(var, {})[i] = val
continue
if var == "phi_right":
self.data.setdefault(var, {})[i] = np.squeeze(val)
continue
if isinstance(val, np.ndarray):
if val.ndim == 2 and val.shape[0] == mesh.nphi:
# val shape is (nphi, n_n)
val = MeshData(
mesh,
data_array=val,
n_components=1,
dtype=val.dtype,
mesh_is_axisym=mesh.is_axisymmetric
)
elif val.ndim == 1:
# reshape to (1, n_n)
val = MeshData(
mesh,
data_array=np.repeat(val[np.newaxis, :], mesh.nphi, axis=0),
n_components=1,
dtype=val.dtype,
mesh_is_axisym=mesh.is_axisymmetric
)
self.data.setdefault(var, {})[i] = val
[docs]
def get_array(self, var_name):
"""
Returns the raw NumPy array for all steps of a given variable.
Parameters
----------
var_name : str
Name of the variable to extract (e.g., "eden", "time", etc.).
Returns
-------
array : np.ndarray
A NumPy array of shape:
- (n_steps,) for scalar variables (e.g., time)
- (n_steps, n_planes, n_vertices) for mesh-based variables
"""
if var_name not in self.data:
raise KeyError(f"Variable '{var_name}' not found in FieldData.")
step_dict = self.data[var_name]
n_steps = len(step_dict)
ordered_steps = sorted(step_dict.keys())
# First entry determines data type
first_val = step_dict[ordered_steps[0]]
if var_name == "time" or not isinstance(first_val, MeshData):
# Time or other scalar data
return np.array([step_dict[step] for step in ordered_steps])
# MeshData: convert each step to (n_planes, n_vertices)
arrays = [step_dict[step].get_data() for step in ordered_steps]
return np.stack(arrays, axis=0) # shape: (n_steps, n_planes, n_vertices)
[docs]
def export_vtu(self, mesh):
import pyvista as pv
# Ensure the output directory for VTK files exists
vtu_dir = os.path.join(self.work_dir, "vtus")
if not os.path.exists(vtu_dir):
os.makedirs(vtu_dir)
# Warn if multiple file indices are used, as this may use significant memory
if len(self.file_indices) > 1:
warnings.warn(
"`export_vtu()` is called with multiple file_indices. "
"This will work, but may require a large amount of memory if you plan to generate many VTK files. "
"Consider instantiating separate FieldData objects for individual VTK file indices. ",
UserWarning
)
# Generate triangle connectivity (could be replaced with TriangulationData from plane.py)
triObj = Triangulation(mesh.rz[:,0], mesh.rz[:,1], mesh.nd_connect_list)
# Prepare 3D points for VTK: 2D poloidal points with a zero Z component (VTK expects 3D coordinates)
points_vtu = np.column_stack([mesh.rz[:,0], mesh.rz[:,1], np.zeros_like(mesh.rz[:,0])])
# Prepare cells for VTK
num_cells = triObj.triangles.shape[0]
cell_types = np.full(len(triObj.triangles), pv.CellType.TRIANGLE, dtype=np.uint8)
cell_array = np.hstack([
np.full((num_cells,1), 3), # triangle has 3 vertices
triObj.triangles
]).astype(np.int64)
# Choose filename format based on dimensionality
dim = "2d" if self.is_axisymmetric else "3d"
for i, idx in enumerate(self.file_indices):
fname = os.path.join(vtu_dir, f"xgc.{dim}.{idx:05d}.vtu")
# Create unstructured grid
grid = pv.UnstructuredGrid(cell_array, cell_types, points_vtu)
# -----------------------------------------------------
# Dataset 1: Point data on XGC poloidal grid
# expected shape: [nindices, nphi, nnode]
# -----------------------------------------------------
for var in self.required_vars:
if var in ("time", "phi_right"): # TODO: handle these more robustly
continue
data_array = self.get_array(var)[i, 0, :] # TODO: generalize to all planes
grid.point_data[var] = data_array
# -----------------------------------------------------
# Dataset 2: Global field data
# expected shape: [nindices]
# -----------------------------------------------------
grid.field_data["time"] = np.array([self.get_array("time")[i]], dtype=np.float64)
# -----------------------------------------------------
# Dataset 3: Additional data not present in the original xgc.3d.#####.bp file
# Manually added to support post-processing and diagnostics
# -----------------------------------------------------
grid.field_data["step"] = np.array([idx], dtype=np.int32)
# Save to .vtu file
grid.save(fname)