Source code for xgc_analysis.field_data

import os
import numpy as np
from xgc_analysis.mesh_data import MeshData
from xgc_analysis.read_bp_file import ReadBPFile

[docs] class FieldData: def __init__(self, mesh, work_dir="./", file_indices=None, is_axisymmetric=False, split_n0_turb=False): self.work_dir = work_dir self.file_indices = file_indices if file_indices is not None else [0] self.is_axisymmetric = is_axisymmetric self.split_n0_turb = split_n0_turb self.data = {} if is_axisymmetric: self.required_vars = [ "depsidt", "dethetadt", "dpot", "eden", "epsi", "etheta", "iden", "pot0", "time" ] self._read_2d_files(mesh) else: self.required_vars = [ "aparh", "apars", "dBphi", "dBpsi", "dBtheta", "dpot", "eden", "ejpar", "epara", "epara2", "epsi", "etheta", "iden", "ijpar", "phi_right", "pot0", "time" ] self._read_3d_files(mesh) def _read_2d_files(self, mesh): for i, idx in enumerate(self.file_indices): fname = os.path.join(self.work_dir, f"xgc.2d.{idx:05d}.bp") file_data = ReadBPFile(fname) for step, variables in file_data.items(): for var in self.required_vars: val = variables.get(var, 0.0) if var == "time": self.data.setdefault(var, {})[i] = val continue if isinstance(val, np.ndarray): if val.ndim == 2 and val.shape[0] == 1: val = val[0] # Wrap in MeshData with 1 plane val = MeshData( mesh, data_array=val[np.newaxis,:], n_components=1, dtype=val.dtype, mesh_is_axisym=True ) self.data.setdefault(var, {})[i] = val def _read_3d_files(self, mesh): for i, idx in enumerate(self.file_indices): fname = os.path.join(self.work_dir, f"xgc.3d.{idx:05d}.bp") file_data = ReadBPFile(fname) for step, variables in file_data.items(): for var in self.required_vars: val = variables.get(var, 0.0) if var == "time": self.data.setdefault(var, {})[i] = val continue if var == "phi_right": self.data.setdefault(var, {})[i] = np.squeeze(val) continue if isinstance(val, np.ndarray): if val.ndim == 2 and val.shape[0] == mesh.nphi: # val shape is (nphi, n_n) val = MeshData( mesh, data_array=val, n_components=1, dtype=val.dtype, mesh_is_axisym=mesh.is_axisymmetric ) elif val.ndim == 1: # reshape to (1, n_n) val = MeshData( mesh, data_array=np.repeat(val[np.newaxis, :], mesh.nphi, axis=0), n_components=1, dtype=val.dtype, mesh_is_axisym=mesh.is_axisymmetric ) self.data.setdefault(var, {})[i] = val
[docs] def get_array(self, var_name): """ Returns the raw NumPy array for all steps of a given variable. Parameters ---------- var_name : str Name of the variable to extract (e.g., "eden", "time", etc.). Returns ------- array : np.ndarray A NumPy array of shape: - (n_steps,) for scalar variables (e.g., time) - (n_steps, n_planes, n_vertices) for mesh-based variables """ if var_name not in self.data: raise KeyError(f"Variable '{var_name}' not found in FieldData.") step_dict = self.data[var_name] n_steps = len(step_dict) ordered_steps = sorted(step_dict.keys()) # First entry determines data type first_val = step_dict[ordered_steps[0]] if var_name == "time" or not isinstance(first_val, MeshData): # Time or other scalar data return np.array([step_dict[step] for step in ordered_steps]) # MeshData: convert each step to (n_planes, n_vertices) arrays = [step_dict[step].get_data() for step in ordered_steps] return np.stack(arrays, axis=0) # shape: (n_steps, n_planes, n_vertices)