Source code for xgc_analysis.mesh

import numpy as np
from adios2 import FileReader
from xgc_analysis.plane import Plane
from scipy.sparse import csr_matrix as _CSR
from typing import List


def _mapping_to_csr_list(mapping: dict,
                         nd_connect: np.ndarray,
                         vertex_count: int) -> List[_CSR]:
    """
    Convert ``compute_fieldline_mapping`` output into a list of CSR matrices,
    one per toroidal step.  For an axisymmetric mesh every matrix has shape
    ``(n_vert, n_vert)`` and projects data forward by Δφ.

    Each matrix Pₛ fulfils
        f(φ₀ + (s+1)Δφ) = Pₛ @ f(φ₀)
    """
    tri  = mapping["triangle_index"]        # (n_vert, n_steps)
    w    = mapping["bary_weights"]          # (n_vert, n_steps, 3)
    n_v, n_steps = tri.shape

    # auto-detect 1-based connectivity
    conn = nd_connect.copy()
    if conn.max() >= vertex_count:
        conn -= 1

    mats = []
    for s in range(n_steps):
        tri_s   = tri[:, s]
        valid   = tri_s >= 0
        if not np.any(valid):
            mats.append(_CSR((vertex_count, vertex_count)))
            continue

        rows    = np.repeat(np.nonzero(valid)[0], 3)
        cols    = conn[tri_s[valid]].reshape(-1)
        data    = w[valid, s, :].reshape(-1)

        mats.append(_CSR((data, (rows, cols)),
                         shape=(vertex_count, vertex_count)))
    return mats

###############################################################################
# The "mesh" class: an overarching container for one or more planes.
###############################################################################

[docs] class Mesh: """ A general mesh class that consists of an array of "plane" objects. The mesh data for all planes comes from the same BP file ("xgc.mesh.bp"). The file also contains the variables "delta_phi" and "wedge_angle". The number of wedges is computed as: wedge_n = 2*pi / wedge_angle, and the number of planes is computed as: nphi = wedge_angle / delta_phi. If the input flag is_axisymmetric is True, then the mesh is axisymmetric and only one plane is set up. Otherwise, nphi planes are created (all identical in this version). """ def __init__(self, is_axisymmetric, data_dir="."): # Read the wedge information from the BP file. with FileReader(data_dir+"/xgc.mesh.bp") as reader: self.delta_phi = float(reader.read("delta_phi").item()) self.wedge_angle = float(reader.read("wedge_angle").item()) # Compute the number of wedges and number of planes. # Here, we round to the nearest integer. self.wedge_n = int(round(2 * np.pi / self.wedge_angle)) self.nphi = int(round(self.wedge_angle / self.delta_phi)) self.is_axisymmetric = is_axisymmetric if is_axisymmetric: # For axisymmetric meshes, all planes are identical, so only one plane is created. self.planes = [Plane(data_dir=data_dir)] self.n_planes = 1 else: # For non-axisymmetric meshes, set up an array of nphi planes. self.planes = [Plane(data_dir=data_dir) for _ in range(self.nphi)] self.n_planes = self.nphi
[docs] def setup_fieldline_mapping(self, magnetic_field, *, direction="forward", rk4_substeps: int = 2, n_int: int = 250): """ Trace all mesh vertices along B and build one CSR propagator matrix for every stored toroidal step. The objects are cached as self.fl_mapping – raw dict from compute_fieldline_mapping self.fl_csr_matrices – list[CSR] (length = n_steps) Call this after construction if the mapping was not required earlier. """ from fieldline_mapping import compute_fieldline_mapping if (not self.is_axisymmetric): raise ValueError("Field-following mapping is currently only available for axisymmetric meshes.") # --- run tracer ---------------------------------------------------- self.fl_mapping = compute_fieldline_mapping( self, magnetic_field, rk4_substeps=rk4_substeps, n_int=n_int, direction=direction, ) # --- assemble sparse matrices ------------------------------------- ref_plane = self.planes[0] # axisymmetric assumption for now self.fl_csr_matrices = _mapping_to_csr_list( self.fl_mapping, ref_plane.nd_connect_list, ref_plane.n_n, )
[docs] def get_plane(self, index=0): """ Returns the plane at the specified index. If the mesh is axisymmetric, index is ignored. """ if self.is_axisymmetric: return self.planes[0] else: if index < 0 or index >= len(self.planes): raise IndexError("Plane index out of range.") return self.planes[index]
[docs] def get_vertex_counts(self): """ Returns a list of the number of vertices on each of the mesh's planes. For an axisymmetric mesh (is_axisymmetric=True), the same number of vertices (from the single plane stored) is repeated for each plane (nphi times). For a non-axisymmetric mesh, the function returns the number of vertices for each plane. """ if self.is_axisymmetric: return [self.planes[0].n_n] * self.nphi else: return [p.n_n for p in self.planes]
# ---------------- global (3‑D) diagnostics -----------------
[docs] def flux_avg_to_surf(self, meshdata: np.ndarray) -> np.ndarray: """Flux‑surface average (⟨f⟩_ψ) of a 3‑D field, averaged over φ. Returns ------- ndarray Shape (nsurf,) for scalar, or (nsurf, n_components) for vector data. """ per_plane = [pl.flux_avg_to_surf(meshdata[i]) for i, pl in enumerate(self.planes)] return np.mean(per_plane, axis=0)
[docs] def flux_avg_from_surf(self, flux_avg: np.ndarray) -> np.ndarray: """Map flux‑surface averaged data back to vertices on *every* plane (producing a representative PlaneData).""" per_plane = [pl.flux_avg_from_surf(flux_avg) for pl in self.planes] return np.mean(per_plane, axis=0)
[docs] def flux_avg_to_from_surf(self, meshdata: np.ndarray) -> np.ndarray: """Compute flux-surface average on each plane and project back to the vertices φ.""" per_plane = [pl.flux_avg_to_surf(meshdata[i]) for i, pl in enumerate(self.planes)] surf_mean = np.mean(per_plane, axis=0) return self.planes[0].flux_avg_from_surf(surf_mean)
[docs] def flux_avg_to_surf_norm(self, meshdata: np.ndarray) -> np.ndarray: """Flux‑surface average of vector‑norm, averaged over the toroidal direction.""" per_plane = [pl.flux_avg_to_surf_norm(meshdata[i]) for i, pl in enumerate(self.planes)] return np.mean(per_plane, axis=0)
# ------------ toroidal / field‑line helpers --------------
[docs] def toroidal_average(self, meshdata: np.ndarray) -> np.ndarray: """Simple arithmetic mean over the toroidal direction (returns vertex‑shaped ndarray).""" return np.mean(meshdata, axis=0)
[docs] def ff_map_real2ff(self, meshdata: np.ndarray) -> np.ndarray: """ Maps mesh data from the regular representation of data on planes (points with the same index on different planes are connected toroidally) to field-following representation (vertices with the same index are connected along the magnetic field). The output format is list of MeshData objects of the same type, one for the left, one for the right plane. """ if (self.nphi == 1 or meshdata.ndim == 1): raise ValueError("Data must not be axisymmetric for field-aligned mapping.") if (meshdata.shape[0] != self.nphi): raise ValueError("Input data has wrong shape.") required = ("ff_hdp_fwd", "ff_hdp_rev") missing = [a for a in required if getattr(self.planes[0], a, None) is None] if missing: raise RuntimeError("Parallel-derivative data missing in plane 0: " + ", ".join(missing)) # Not yet compatible with Stellarator if (meshdata.ndim == 3): # Make the vector components the first dimension n_components = meshdata.shape[2] data = np.transpose(meshdata,(2,0,1)) else: # Add artificial extra dimension n_components = 1 data = meshdata[np.newaxis,:] mapped_data = np.zeros((2,n_components,self.nphi,self.planes[0].n_n)) for i_comp in range(n_components): for i in range(self.nphi): if (i == 0): im = self.nphi-1 else: im = i-1 l_plane_data = data[i_comp,im,:] r_plane_data = data[i_comp,i, :] if (self.is_axisymmetric): l_plane_obj = self.planes[0] r_plane_obj = l_plane_obj else: l_plane_obj = self.planes[im] r_plane_obj = self.planes[i ] mapped_data[1,i_comp,i,:] = r_plane_obj.ff_hdp_fwd.multiply_left(r_plane_data) mapped_data[0,i_comp,i,:] = l_plane_obj.ff_hdp_rev.multiply_left(l_plane_data) if (n_components > 1): # Transpose back to original ordering mapped_data = np.transpose(mapped_data,(0,2,3,1)) else: # Remove dummy dimension mapped_data = np.squeeze(mapped_data) return mapped_data
[docs] def apply_fieldline_mapping(self, meshdata, *, ref_plane: int = 0): """ Pull the data of *every* plane onto the vertices of *ref_plane* along magnetic-field lines. Returns ------- np.ndarray If the stored field is scalar: shape (n_steps, n_vert) If the stored field is vector: shape (n_steps, n_vert, n_components) Axis-0 walks along the field line (φ = φ₀ + (s+1)Δφ); Axis-1 enumerates the vertices of the reference plane. """ # ---------------------------------------------------------- # 0. sanity checks # ---------------------------------------------------------- if not hasattr(self, "fl_csr_matrices") or not hasattr(self, "fl_mapping"): raise RuntimeError("Call mesh.setup_fieldline_mapping(...) first.") if (self.nphi == 1 or meshdata.ndim == 1): raise ValueError("Data must not be axisymmetric.") if (meshdata.shape[0] != self.nphi): raise ValueError("Input data has wrong shape.") if (meshdata.ndim == 3): # Make the vector components the first dimension n_comp = meshdata.shape[2] else: # Add artificial extra dimension n_comp = 1 P_list = self.fl_csr_matrices # list[CSR] plane_idx = self.fl_mapping["plane_index"] # (n_steps,) n_steps = len(P_list) n_vert = P_list[0].shape[0] # rows = reference vertices # ---------------------------------------------------------- # 1. allocate output # ---------------------------------------------------------- if n_comp == 1: out = np.empty((n_steps, n_vert), dtype=meshdata.dtype) else: out = np.empty((n_steps, n_vert, n_comp), dtype=meshdata.dtype) # ---------------------------------------------------------- # 2. loop over toroidal steps # ---------------------------------------------------------- for s in range(n_steps): data_src = meshdata[int(plane_idx[s])] # (n_src, [n_comp]) if n_comp == 1: # -------- scalar out[s] = P_list[s] @ data_src else: # -------- vector for c in range(n_comp): out[s, :, c] = P_list[s] @ data_src[:, c] return out
# ---------------- Gradient -----------------
[docs] def evaluate_scalar_gradients(self, field_2d: np.ndarray) -> np.ndarray: """ Apply per-plane gradient operators to a scalar field on the full 3-D mesh. Parameters ---------- field_2d : np.ndarray, shape (nphi, n_n) `field_2d[i_phi, i_node]` holds the scalar value at node *i_node* on poloidal plane *i_phi*. Returns ------- grads : np.ndarray, shape (nphi, n_n, 2) `grads[i_phi]` is the `(n_n, 2)` array returned by the corresponding plane's `evaluate_scalar_gradients`. """ if (self.planes[0].gradient_r_psi is None or self.planes[0].gradient_z_theta is None): raise RuntimeError("Gradient matrices are unavailable; cannot evaluate " "in-plane gradients on this mesh.") if field_2d.shape != (self.nphi, self.planes[0].n_n): raise ValueError( f"Expected shape ({self.nphi}, {self.planes[0].n_n}), got {field_2d.shape}" ) out = np.empty((self.nphi, self.planes[0].n_n, 2), dtype=field_2d.dtype) for i_phi, (plane, field_slice) in enumerate(zip(self.planes, field_2d)): out[i_phi] = plane.evaluate_scalar_gradients(field_slice) return out
# ------------------------------------------------------------------ # 2-nd–order finite-difference derivative parallel to B # ------------------------------------------------------------------
[docs] def evaluate_grad_par(self, field_2d: np.ndarray, bt_sign: int) -> np.ndarray: """ Compute b·∇(field) on an axisymmetric mesh. Parameters ---------- field_2d : np.ndarray, shape (nphi, n_n) Scalar field values on the full mesh (first index = poloidal plane, second = node). bt_sign : int, sign of the toroidal magnetic field ( +1 or -1 ). Returns ------- np.ndarray Same shape as *field_2d* with the parallel derivative. Notes ----- Uses the centred, 2-nd-order accurate stencil b·∇f ≈ sgn * [ -l_r/(l_l l_tot) f_{i-1} + (l_r-l_l)/(l_l l_r) f_i + l_l/(l_r l_tot) f_{i+1} ] where the forward / reverse projection matrices *ff_1dp_fwd* and *ff_1dp_rev* as well as the parallel arc-lengths *dl_par_1dp_fwd* and *dl_par_1dp_rev* are taken from ``planes[0]`` (valid only when ``self.is_axisymmetric`` is *True*). """ # -------- sanity checks ------------------------------------------------- if not getattr(self, "is_axisymmetric", False): raise RuntimeError("evaluate_grad_par currently supports only " "axisymmetric meshes (self.is_axisymmetric must be True).") required = ("ff_1dp_fwd", "ff_1dp_rev", "dl_par_1dp_fwd", "dl_par_1dp_rev") missing = [a for a in required if getattr(self.planes[0], a, None) is None] if missing: raise RuntimeError("Parallel-derivative data missing in plane 0: " + ", ".join(missing)) nphi, n_n = field_2d.shape if (nphi != self.nphi) or (n_n != self.planes[0].n_n): raise ValueError(f"field_2d has shape {field_2d.shape}, " f"expected ({self.nphi}, {self.planes[0].n_n}).") plane0 = self.planes[0] required = ("ff_1dp_fwd", "ff_1dp_rev", "dl_par_1dp_fwd", "dl_par_1dp_rev", "bfield") if not all(hasattr(plane0, attr) for attr in required): missing = [a for a in required if not hasattr(plane0, a)] raise AttributeError("Plane is missing required attributes: " + ", ".join(missing)) # -------- pre-compute constant coefficients ----------------------------- # forward / reverse arc-length arrays l_r = np.asarray(plane0.dl_par_1dp_fwd) l_l = np.asarray(plane0.dl_par_1dp_rev) l_tot = l_r + l_l coeff_l = -l_r / (l_l * l_tot) # shape (n_n,) coeff_0 = (l_r - l_l) / (l_l * l_r) coeff_r = l_l / (l_r * l_tot) # overall sign (same on all nodes for an axisymmetric mesh) sgn = int(bt_sign) # -------- main loop over φ planes --------------------------------------- out = np.empty_like(field_2d) fwd_mat = plane0.ff_1dp_fwd # CSR (n_n × n_n) rev_mat = plane0.ff_1dp_rev for iphi in range(nphi): iphi_l = iphi - 1 if iphi > 0 else nphi - 1 iphi_r = (iphi + 1) % nphi # project neighbouring planes onto the current one f_l = rev_mat.dot(field_2d[iphi_l]) # (n_n,) f_r = fwd_mat.dot(field_2d[iphi_r]) out[iphi] = sgn * ( coeff_l * f_l + coeff_0 * field_2d[iphi] + coeff_r * f_r ) return out
############################################################################### # Example usage of the overall mesh class. ############################################################################### if __name__ == "__main__": # Create an axisymmetric mesh (only one plane). axis_mesh = Mesh(is_axisymmetric=True) print("Axisymmetric mesh:") print("Number of planes:", len(axis_mesh.planes)) axis_mesh.get_plane().plot_surfaces() # Create a non-axisymmetric mesh (multiple planes). nonaxis_mesh = Mesh(is_axisymmetric=False) print("Non-axisymmetric mesh:") print("Number of planes:", len(nonaxis_mesh.planes))