Source code for xgc_analysis.plane

import os
import warnings
from pathlib import Path
import numpy as np
from adios2 import FileReader
from xgc_analysis.csr_matrix import SparseMatrixCSR, sparse_matrix_reader
from matplotlib.tri import Triangulation, LinearTriInterpolator
from xgc_analysis.read_bp_file import ReadBPFile
from xgc_analysis.plotting import plot_grid_contour


###############################################################################
# The "TriangulationData" class contains a uniform RZ mesh and a triangulation
# needed for ploting data on a planar mesh.
###############################################################################

[docs] class TriangulationData: """ A simple container class to hold interpolation grid data and a Matplotlib Triangulation object. Attributes: RI (np.ndarray): 2D array of R coordinates of the interpolation grid. ZI (np.ndarray): 2D array of Z coordinates of the interpolation grid. triObj (Triangulation): A Matplotlib Triangulation object for the mesh. """ def __init__(self, RI, ZI, triObj): self.RI = RI self.ZI = ZI self.triObj = triObj
############################################################################### # The "plane" class: describes a single planar unstructured triangle mesh. ###############################################################################
[docs] class Plane: """ A class to represent a planar unstructured triangle mesh (in cylindrical coordinates at constant toroidal angle) read from an Adios BP (BP version 5) file using the Adios2 Python FileReader API (v2.10.2+). The BP file (e.g., 'xgc.mesh.bp') is assumed to contain the following variables: Scalars: - n_n : Number of mesh vertices. - n_t : Number of triangles. - nsurf : Number of psi contour surfaces. - surf_maxlen : Maximum number of vertices on any psi surface. Arrays: - rz : (n_n, 2) coordinates (R, Z) of vertices. - nd_connect_list : (n_t, 3) triangle connectivity (1-based indices). - tr_area : (n_t,) triangle areas. - node_vol : (n_n,) Voronoi volume (measure 1). - node_vol_nearest: (n_n,) Voronoi volume (nearest neighbor measure). - psi : (n_n,) poloidal magnetic flux. - theta : (n_n,) straight field-line poloidal angle. - region : (n_n,) region index of the vertices (1) core, (2) SOL, (3) privat flux, (100) wall - surf_len : (nsurf,) number of vertices on each psi surface. - surf_idx : (nsurf, surf_maxlen) 1-based vertex indices on each psi surface. - rmin : (nsurf,) minor radius of each surface. - rmaj : (nsurf,) major radius of each surface. - epsilon : (nsurf,) inverse aspect ratio. - m_max_surf : (nsurf,) Fourier space resolution limit. - psi_surf : (nsurf,) psi value on each surface. - qsafety : (nsurf,) safety factor. - trapped : (nsurf,) trapped particle fraction. Plotting functionality: - triObj : Matplotlib triangulation object for unstructured meshes. This is needed for plotting fields defined on the planar mesh Matrices: - cnv_to_surf : ... - cnv_from_surf : ... - grad_r_psi : ... - grad_z_theta : ... Diagnostic functions: - plot_vertices_with_wall : ... - plot surfaces : ... - plot_triangles : ... """ def __init__(self, filename="xgc.mesh.bp", data_dir=".", setup_ff2real_mapping=False): self.filename = data_dir + "/" + filename data_dir2 = Path(data_dir) # makes joining paths easier # ------------------------------------------------------------------ # helper lambdas: fail-soft matrix / vector loading # ------------------------------------------------------------------ def _safe_sparse(bpfile: Path, keys, tag): """Return SparseMatrixCSR or None if the file is missing.""" if bpfile.exists(): return SparseMatrixCSR(lambda: sparse_matrix_reader(str(bpfile), keys)) warnings.warn(f"{tag}: '{bpfile.name}' not found – skipping.", RuntimeWarning, stacklevel=2) return None # <-- attribute always exists def _safe_readbp(bpfile: Path, variables, tag): """Return np.ndarray or None if the file is missing.""" if bpfile.exists(): return ReadBPFile(str(bpfile), variables=variables) warnings.warn(f"{tag}: '{bpfile.name}' not found – skipping.", RuntimeWarning, stacklevel=2) return None with FileReader(self.filename) as reader: # Read scalar variables. Use .item() to convert the 0-dim numpy array to a Python scalar. self.n_n = int(reader.read("n_n").item()) self.n_t = int(reader.read("n_t").item()) self.nsurf = int(reader.read("nsurf").item()) self.surf_maxlen = int(reader.read("surf_maxlen").item()) self.nwall = int(reader.read("grid_nwall").item()) # Read array variables self.rz = reader.read("rz") # shape: (n_n, 2) self.nd_connect_list = reader.read("nd_connect_list") # shape: (n_t, 3) try: self.wall_nodes = reader.read("grid_wall_nodes")-1 # shape: (nwall,) except: print(f"wall_nodes is not available") self.tr_area = reader.read("tr_area") # shape: (n_t,) self.node_vol = reader.read("node_vol") # shape: (n_n,) self.node_vol_nearest = reader.read("node_vol_nearest") # shape: (n_n,) try: self.node_vol_ff0 = reader.read("node_vol_ff0") # shape: (n_n,) self.node_vol_ff1 = reader.read("node_vol_ff1") # shape: (n_n,) except: print(f"node_vol_ff is not available") # Psi is included in the magnetic field class instead. #self.psi = reader.read("psi") # shape: (n_n,) self.theta = reader.read("theta") # shape: (n_n,) self.region = reader.read("region") # shape: (n_n,) self.surf_len = reader.read("surf_len") # shape: (nsurf,) self.surf_idx = reader.read("surf_idx")-1 # shape: (nsurf, surf_maxlen) self.rmin = reader.read("rmin") # shape: (nsurf,) self.rmaj = reader.read("rmaj") # shape: (nsurf,) self.epsilon = reader.read("epsilon") # shape: (nsurf,) self.m_max_surf = reader.read("m_max_surf") # shape: (nsurf,) self.psi_surf = reader.read("psi_surf") # shape: (nsurf,) self.qsafety = reader.read("qsafety") # shape: (nsurf,) self.trapped = reader.read("trapped") # shape: (nsurf,) # This is an interim solution. Eventually, axis and X-point # information is supposed to be included in xgc.mesh.bp with FileReader(data_dir+"/xgc.equil.bp") as reader: self.axis_r = float(reader.read("eq_axis_r").item()) self.axis_z = float(reader.read("eq_axis_z").item()) self.x_psi = float(reader.read("eq_x_psi").item()) self.x_r = float(reader.read("eq_x_r").item()) self.x_z = float(reader.read("eq_x_z").item()) # Set up a map of flux-surfaces that cross the outer midplane self.surf_map = self.get_psi_surface_index_map() # Read the 1D diagnostic volumes with FileReader(data_dir+"/xgc.volumes.bp") as reader: self.vol_1d = reader.read("diag_1d_vol") if (self.vol_1d.shape[0] != len(self.surf_map)): raise ValueError(f"Length of 1D volume array ({self.vol_1d.shape[0]}) does not equal the length of the surface map ({len(self.surf_map)}).") # Load additional matrices using the SparseMatrixCSR class. # # to_surf: calculates volume-weighted average of the input (data on a planar # unstructured mesh) on a 2D flux-surface on a plane (phi=const.), # i.e. the "local" flux-surface average self.cnv_to_surf = _safe_sparse(data_dir2/"xgc.cnv_to_surf.bp", ["ncols", "nrows", "width", "nelement", "eindex", "value"], "cnv_to_surf") # from_surf: projects the flux-surface averages (on a 1D psi-grid --> psi_surf # back onto the planar unstructured mesh. self.cnv_from_surf = _safe_sparse(data_dir2/"xgc.cnv_from_surf.bp", ["ncols", "nrows", "width", "nelement", "eindex", "value"], "cnv_from_surf") # Read "basis" from "xgc.grad_rz.bp" and check its length. grad_file = data_dir2/"xgc.grad_rz.bp" self.basis = _safe_readbp(data_dir2/"xgc.grad_rz.bp", ("basis",), "basis") if (self.basis is not None): self.basis=self.basis[0]['basis'] #if grad_file.exists(): # with FileReader(grad_file) as reader: # basis = np.asarray(reader.read("basis")) # if basis.shape[0] != self.n_n: # raise ValueError( # f"Length of 'basis' ({basis.shape[0]}) ≠ n_n={self.n_n}" # ) # self.basis = basis #else: # warnings.warn("'xgc.grad_rz.bp' not found – gradients unavailable.", # RuntimeWarning, stacklevel=2) # self.basis = None # gradient_r_psi: calculates the directional derivative in either R # (basis=1) or grad(psi) (basis=0) direction. self.gradient_r_psi = _safe_sparse( grad_file, ["n_r", "m_r", "w_r", "nelement_r", "eindex_r", "value_r"], "gradient_r_psi" ) # gradient_z_psi: calculates the directional derivative in either Z # (basis=1) or theta (poloidal, basis=0) direction. self.gradient_z_theta = _safe_sparse( grad_file, ["n_z", "m_z", "w_z", "nelement_z", "eindex_z", "value_z"], "gradient_z_theta" ) # ----------------------------------------- # mapping matrices # The mapping matrices project vertex positions # along the magnetic field for a full or half toroidal # step and contain the triangle indices and barycentric # weights for each origin vertex on the target plane. # Note that "forward" (counter-clockwise looking from above) # and "backward" refer to the direction # in which the vertex positions are projected. Therefore, if # one wants to project the data of the "right" plane of a toroidal # sector to the left plane (backward direction), one actually has to # multiply the data on the right plane with the forward # projection matrix. (Yes, this is confusing.) # ----------------------------------------- # # 1) Full toroidal step # a) Forward direction (counter-clockwise looking from above self.ff_1dp_fwd = _safe_sparse(data_dir2/"xgc.ff_1dp_fwd.bp", ["ncols", "nrows", "width", "nelement", "eindex", "value"], "ff_1dp_fwd") self.dl_par_1dp_fwd = _safe_readbp(data_dir2/"xgc.ff_1dp_fwd.bp", ("dl_par",), "dl_par_1dp_fwd") if (self.dl_par_1dp_fwd is not None): self.dl_par_1dp_fwd=self.dl_par_1dp_fwd[0]['dl_par'] # # b) Backward direction (counter-clockwise looking from above self.ff_1dp_rev = _safe_sparse(data_dir2/"xgc.ff_1dp_rev.bp", ["ncols", "nrows", "width", "nelement", "eindex", "value"], "ff_1dp_rev") self.dl_par_1dp_rev = _safe_readbp(data_dir2/"xgc.ff_1dp_rev.bp", ("dl_par",), "dl_par_1dp_rev") if (self.dl_par_1dp_rev is not None): self.dl_par_1dp_rev=self.dl_par_1dp_rev[0]['dl_par'] # # 2) Half toroidal step # a) Forward direction (counter-clockwise looking from above self.ff_hdp_fwd = _safe_sparse(data_dir2/"xgc.ff_hdp_fwd.bp", ["ncols", "nrows", "width", "nelement", "eindex", "value"], "ff_hdp_fwd") self.dl_par_hdp_fwd = _safe_readbp(data_dir2/"xgc.ff_hdp_fwd.bp", ("dl_par",), "dl_par_hdp_fwd") if (self.dl_par_hdp_fwd is not None): self.dl_par_hdp_fwd=self.dl_par_hdp_fwd[0]['dl_par'] # # b) Backward direction (counter-clockwise looking from above self.ff_hdp_rev = _safe_sparse(data_dir2/"xgc.ff_hdp_rev.bp", ["ncols", "nrows", "width", "nelement", "eindex", "value"], "ff_hdp_rev") self.dl_par_hdp_rev = _safe_readbp(data_dir2/"xgc.ff_hdp_rev.bp", ("dl_par",), "dl_par_hdp_rev") if (self.dl_par_hdp_rev is not None): self.dl_par_hdp_rev=self.dl_par_hdp_rev[0]['dl_par'] # # For stellarators, this should probably be in the Mesh class, rather than in Plane. if (setup_ff2real_mapping): self.gen_ff2real_mapping()
[docs] def get_triangulation_data( self, n_int: int = 200, R_bounds: tuple[float, float] | None = None, Z_bounds: tuple[float, float] | None = None, ): """ Build a regular (R,Z) interpolation grid and a Matplotlib Triangulation that matches the unstructured mesh. Parameters ---------- n_int : int, default 200 Number of grid points in *each* dimension (R and Z). R_bounds, Z_bounds : (min, max) tuple or None, optional Explicit coordinate limits for the interpolation grid. If ``None`` (default) the routine uses the min/max of the vertex coordinates. Returns ------- TriangulationData Object containing (RI, ZI, triObj). """ if n_int < 2: raise ValueError("n_int must be ≥ 2") # ------------------------------------------------------------ # 1. limits Rmin, Rmax = (self.rz[:, 0].min(), self.rz[:, 0].max()) if R_bounds is None else R_bounds Zmin, Zmax = (self.rz[:, 1].min(), self.rz[:, 1].max()) if Z_bounds is None else Z_bounds if Rmin >= Rmax or Zmin >= Zmax: raise ValueError("Bounds must satisfy min < max in both directions") # 2. uniform grid Ri = np.linspace(Rmin, Rmax, n_int) Zi = np.linspace(Zmin, Zmax, n_int) RI, ZI = np.meshgrid(Ri, Zi) # 3. triangulation built from mesh vertices / connectivity triObj = Triangulation(self.rz[:, 0], self.rz[:, 1], self.nd_connect_list) return TriangulationData(RI, ZI, triObj)
# ------------------------------------------------------------------ # Flux‑surface averaging utilities (geometry‑centric) # ------------------------------------------------------------------
[docs] def flux_avg_to_surf(self, vertex_data: np.ndarray) -> np.ndarray: """Return the flux-surface average ⟨f⟩ for a vertex‑defined scalar or vector field f.""" if self.cnv_to_surf is None: raise RuntimeError("FSA matrices are unavailable for this plane; " "file(s) were missing at construction time.") if vertex_data.ndim == 1: vertex_data = vertex_data[:, None] if vertex_data.shape[0] != self.n_n: raise ValueError( f"vertex_data length {vertex_data.shape[0]} does not match {self.n_n} vertices") comps = [self.cnv_to_surf.multiply_transpose_left(vertex_data[:, i]) for i in range(vertex_data.shape[1])] out = np.column_stack(comps) return out[:, 0] if out.shape[1] == 1 else out
[docs] def flux_avg_to_surf_norm(self, field: np.ndarray) -> np.ndarray: """ Flux-surface average of the *Euclidean norm* of a vertex-centred field on this poloidal plane. Parameters ---------- field : np.ndarray * If shape == (n_n,) → treated as a scalar field. * If shape == (n_n, n_comp) → treated as a vector field; the norm is computed along the last axis. Returns ------- np.ndarray 1-D array of length ``n_psi`` (the number of flux-surfaces on the plane) containing the volume-weighted flux-surface averages. Raises ------ RuntimeError If the conversion matrix ``cnv_to_surf`` is unavailable because its backing .bp file was missing when the Plane was constructed. ValueError If the input array does not have the required first dimension ``n_n``. """ # ---- prerequisite matrix ------------------------------------------------ if self.cnv_to_surf is None: raise RuntimeError("flux_avg_to_surf_norm unavailable: " "'xgc.cnv_to_surf.bp' missing at Plane construction.") # ---- verify shape -------------------------------------------------------- if field.shape[0] != self.n_n: raise ValueError( f"field.shape[0] = {field.shape[0]} does not match n_n = {self.n_n}." ) # ---- compute point-wise norm -------------------------------------------- if field.ndim == 1: # scalar field norms = field else: # vector field norms = np.sqrt(np.linalg.norm(field, axis=-1)) # ---- flux-surface average ------------------------------------------------ return self.cnv_to_surf.multiply_transpose_left(norms)
[docs] def flux_avg_from_surf(self, flux_avg: np.ndarray) -> np.ndarray: """Map a flux-surface averaged quantity <f>(psi) back to vertices.""" if self.cnv_from_surf is None: raise RuntimeError("FSA matrices are unavailable for this plane; " "file(s) were missing at construction time.") if flux_avg.ndim == 1: flux_avg = flux_avg[:, None] if flux_avg.shape[0] != self.nsurf: raise ValueError( f"flux_avg length {flux_avg.shape[0]} does not match {self.nsurf} ψ‑surfaces") comps = [self.cnv_from_surf.multiply_left(flux_avg[:, i]) for i in range(flux_avg.shape[1])] out = np.column_stack(comps) return out[:, 0] if out.shape[1] == 1 else out
[docs] def flux_avg_to_from_surf(self, vertex_data: np.ndarray) -> np.ndarray: """Smooth `vertex_data` by averaging on ψ and mapping back to vertices.""" if self.cnv_to_surf is None or self.cnv_from_surf is None: raise RuntimeError("FSA matrices are unavailable for this plane; " "file(s) were missing at construction time.") return self.flux_avg_from_surf(self.flux_avg_to_surf(vertex_data))
# ------------------------------------------------------------------ # ------------------------------------------------------------------ # In-plane gradient operation (R,Z)/(psi,theta) derivatives # ------------------------------------------------------------------
[docs] def evaluate_scalar_gradients(self, field_1d: np.ndarray) -> np.ndarray: """ Evaluate the radial and poloidal gradients of a scalar field. (Note that it depends on the value of self.basis whether the derivatives are (R,Z) or (psi,theta) directonal derivatives!) Parameters ---------- field_1d : np.ndarray, shape (n_n,) Nodal values of the scalar field on this plane. Returns ------- grads : np.ndarray, shape (n_n, 2) Column 0 → ∂f/∂ψ (gradient_r_psi ⋅ f) Column 1 → ∂f/∂θ (gradient_z_theta ⋅ f) """ if self.gradient_r_psi is None or self.gradient_z_theta is None: raise RuntimeError("Gradient matrices are unavailable for this plane; " "file(s) were missing at construction time.") if field_1d.shape != (self.n_n,): raise ValueError( f"Expected field shape ({self.n_n},), got {field_1d.shape}" ) # Sparse ▽ • f (CSR dot product is cheap) dpsi = self.gradient_r_psi.dot(field_1d) dtht = self.gradient_z_theta.dot(field_1d) # Stack into a (n_n, 2) array return np.column_stack((dpsi, dtht))
# ------------------------------------------------------------------
[docs] def plot_contour( self, values_in: np.ndarray, *, n_int: int = 200, plot_norm: bool = False, i_comp: int = 0, var_name: str = "", title: str = "", levels: int | None = 60, lower: float | None = None, upper: float | None = None, R_bounds: tuple[float, float] | None = None, Z_bounds: tuple[float, float] | None = None, filename: str | None = None, ): """ Plot the field stored in this PlaneData instance as a filled-contour map in the (R,Z) plane. Parameters ---------- values_in : np.ndarray 1-D array of length ``self.n_n`` (= number of vertices). If you pass a 2-D array, its Euclidean norm is taken or a single component (convenient for vector data), controlled by plot_norm and i_comp. n_int : int, default 200 Grid resolution in both R and Z used for interpolation. plot_norm : bool, default False In case of vector data, whether to plot the norm | v | of the vector. i_comp : int, default 0 In case of vector data, which component to plot. var_name : str, optional Label for the colour-bar (e.g. ``"nₑ (m⁻³)"``). Defaults to ``""``. title : str, optional Header for the plot window. Defaults to ``""``. levels : int or sequence, optional If an int, the number of evenly-spaced contour levels (Matplotlib default is fine). If an explicit 1-D sequence, those values are used as contour levels. Defaults to 60. lower, upper : float or None, optional Explicit lower/upper bounds for the colour-map. If either is ``None`` the routine uses the corresponding data extreme. R_bounds, Z_bounds : (min, max) tuple or None, optional Explicit limits for the interpolation grid. Passed straight through to ``plane.get_triangulation_data``. ``None`` ⇒ auto-limits. """ if n_int < 2: raise ValueError("n_int must be ≥ 2") # ----------------------------------------------------------------– # 1. Obtain interpolation grid and triangulation from Plane tri_data = self.get_triangulation_data( n_int=n_int, R_bounds=R_bounds, Z_bounds=Z_bounds, ) RI, ZI, triObj = tri_data.RI, tri_data.ZI, tri_data.triObj # 2. Select scalar values to plot (vector fields → Euclidean norm) if (values_in.ndim==2 and plot_norm): values = np.sqrt(np.linalg.norm(values_in, axis=1)) elif (values_in.ndim==2): values = np.sqeeze(values_in[:,i_comp]) else: values = values_in if values.ndim != 1 or len(values) != self.n_n: raise ValueError("Values must be a 1-D array with length equal " "to the number of mesh vertices") # 3. Interpolate to the regular (R,Z) grid tci = LinearTriInterpolator(triObj, values) data_grid = np.ma.masked_invalid(tci(RI, ZI)) # 2-D masked array return plot_grid_contour( data_grid, RI, ZI, x_label="R (m)", y_label="Z (m)", var_name=var_name, title=title, levels=levels, lower=lower, upper=upper, filename=filename )
# ------------------------------------------------------------------ # ---------------------------------------------------------------------- # plotting along a field-line bundle # ----------------------------------------------------------------------
[docs] def plot_fieldline_contour( self, data: np.ndarray, # shape (n_steps, self.n_n) surf_idx: int, # ψ-surface to take *, var_name: str = "", title: str = "", levels: int | np.ndarray | None = 60, lower: float | None = None, upper: float | None = None, filename: str | None = None, ): """ Filled-contour plot of *data* along the magnetic field on the requested ψ-surface. Parameters ---------- data : ndarray Result of `MeshData.apply_fieldline_mapping`, shape (n_steps, n_vertices). Axis-0 = distance along B, axis-1 = vertex index on *this* plane. surf_idx : int Which ψ-surface to plot (index into self.surf_* arrays). var_name, title : str Labels for colour-bar and figure title. levels : int or 1-D array or None Passed to `contourf`, like in `plot_contour`. lower, upper : float or None Manual colour limits (None → data min/max). """ # ------------------------------------------------------------------ # 1. vertices on the chosen ψ-surface surf_len = self.surf_len[surf_idx] surf_vertices = self.surf_idx[surf_idx, :surf_len] - 1 # 0-based theta_surf = self.theta[surf_vertices] # (surf_len,) # ensure θ is strictly increasing (needed for 1-D interp) order = np.argsort(theta_surf) theta_sorted = theta_surf[order] data_surf = data[:, surf_vertices][:, order] # reorder too # ------------------------------------------------------------------ # 2. build a uniform θ grid (2× resolution) n_uniform = 2 * surf_len theta_uniform = np.linspace(theta_sorted.min(), theta_sorted.max(), n_uniform) # interpolate each toroidal slice onto the uniform grid data_uniform = np.empty((data_surf.shape[0], n_uniform), dtype=data_surf.dtype) for k in range(data_surf.shape[0]): data_uniform[k] = np.interp(theta_uniform, theta_sorted, data_surf[k]) X, Y = np.meshgrid(np.arange(data_uniform.shape[0]), # y-axis = step index theta_uniform) # x-axis = θ (uniform) # The inner dimension (last one in C-ordering) is plotted along the vertical axis. # --> Transpose data so that phi is along X and theta along Y. return self._plot_grid_contour( data_uniform.T, X, Y, x_label="Toroidal step index", y_label=r"$\theta$ (rad)", var_name=var_name, title=title or f"Field-line slice on ψ-surface {surf_idx}", levels=levels, lower=lower, upper=upper, )
# ------------------------------------------------------------------ # Generate a matrix for mapping data from the field-aligned to the natural representation.
[docs] def gen_ff2real_mapping(self): # print("Generating forward half-integer plane ff-to-real mapping...") # cols = np.zeros(self.ff_hdp_fwd.indices.size ,dtype=self.ff_hdp_fwd.indices.dtype) rows = np.zeros(self.ff_hdp_fwd.indices.size ,dtype=self.ff_hdp_fwd.indices.dtype) data = np.zeros(self.ff_hdp_fwd.indices.size ,dtype=self.ff_hdp_fwd.data.dtype) data_sum = np.zeros(self.ff_hdp_fwd.indptr.size-1,dtype=self.ff_hdp_fwd.data.dtype) node_vol_ff = self.node_vol_ff1 # # Step 1: Generate the inverse mapping from the real2ff matrix for i in range(self.ff_hdp_fwd.indices.size): j=self.ff_hdp_fwd.indices[i] data_sum[j]+=self.ff_hdp_fwd.data[i]*node_vol_ff[j] rows[i]=j cols[i]=np.min((np.where(self.ff_hdp_fwd.indptr>i))[0])-1 data[i]=self.ff_hdp_fwd.data[i] if np.fmod(i,10000)==0: print("gen_ff2real, step 1: i=",i,"/",self.ff_hdp_fwd.indices.size-1) # # Step 2: Normalize the matrix rows and fill empty rows using the reverse real2ff mapping for i in range(data_sum.size): if data_sum[i]>0: data[(np.where(rows==i))[0]]/=data_sum[i] else: # Append data from ff_hdp_rev to rows, cols and data j0 = self.ff_hdp_rev.indptr[i] j1 = self.ff_hdp_rev.indptr[i+1]-1 if j1>=j0: data1 = self.ff_hdp_rev.data[j0:j1] cols1 = self.ff_hdp_rev.indices[j0:j1] rows1 = np.zeros(j1-j0+1) + i rows = np.concatenate((rows,rows1)) cols = np.concatenate((cols,cols1)) if np.fmod(i,10000)==0: print("gen_ff2real, step 2: i=",i,"/",data_dum.size-1) # # Step 3: Generate sparse CSR matrix from data: self.ff2real_hdp_fwd = csr_sparse((data,(rows,cols)))
# Accessor methods for mesh vertices and triangles
[docs] def get_vertex_coordinates(self): """ Returns: np.ndarray: Array of vertex coordinates with shape (n_n, 2) representing (R,Z) pairs. """ return self.rz
[docs] def get_triangle_connectivity(self): """ Returns: np.ndarray: Array of triangle connectivity indices (1-based) with shape (n_t, 3). """ return self.nd_connect_list
[docs] def get_triangle_areas(self): """ Returns: np.ndarray: Array of triangle areas with shape (n_t,). """ return self.tr_area
[docs] def get_voronoi_volumes(self): """ Returns: tuple: Two np.ndarray objects representing the vertex Voronoi volumes (node_vol and node_vol_nearest), each with shape (n_n,). """ return self.node_vol, self.node_vol_nearest
# Accessor methods for psi contour surfaces
[docs] def get_surface_indices(self): """ Returns: np.ndarray: 2D array (shape: (nsurf, surf_maxlen)) containing 1-based vertex indices on each surface. """ return self.surf_idx
[docs] def get_surface_lengths(self): """ Returns: np.ndarray: 1D array of length nsurf indicating the number of vertices on each surface. """ return self.surf_len
[docs] def get_surface_properties(self): """ Returns: dict: A dictionary of surface properties with keys: - 'rmin' : Minor radius (np.ndarray of shape (nsurf,)) - 'rmaj' : Major radius (np.ndarray of shape (nsurf,)) - 'epsilon' : Inverse aspect ratio (np.ndarray of shape (nsurf,)) - 'm_max_surf' : Fourier space resolution limit (np.ndarray of shape (nsurf,)) - 'psi_surf' : Psi value on each surface (np.ndarray of shape (nsurf,)) - 'qsafety' : Safety factor (np.ndarray of shape (nsurf,)) - 'trapped' : Trapped particle fraction (np.ndarray of shape (nsurf,)) """ return { "rmin": self.rmin, "rmaj": self.rmaj, "epsilon": self.epsilon, "m_max_surf": self.m_max_surf, "psi_surf": self.psi_surf, "qsafety": self.qsafety, "trapped": self.trapped }
[docs] def get_surface_vertex_indices(self, surface_index): """ Returns the vertex indices for a given psi surface. Args: surface_index (int): The index of the surface (0-based). Returns: np.ndarray: An array of vertex indices (0-based) corresponding to the given surface. """ if surface_index < 0 or surface_index >= self.nsurf: raise IndexError("Surface index out of range.") # Get the number of valid vertices on this surface. num_vertices = int(self.surf_len[surface_index]) # Return only the valid portion of the surf_idx row. return self.surf_idx[surface_index,0:num_vertices]
[docs] def get_psi_surface_index_map(self, tol=1e-4): r""" Identify the **ψ‑surfaces** that are valid for subsequent analysis. Strategy -------- For every surface *i* (whose length is ``surf_len[i]``): 1. **Single‑vertex surface** * If the surface contains exactly **one** vertex and that vertex’s coordinates ``(R, Z)`` are within ``tol`` of the magnetic‑axis position ``(axis_r, axis_z)``, accept the surface. 2. **Multi‑vertex surface** * Restrict to vertices with ``R >= axis_r``. * Among them, find • the vertex whose ``Z`` is **largest below** the axis • the vertex whose ``Z`` is **smallest above** the axis. * Accept the surface only if **both** vertices exist. Parameters ---------- tol : float, optional Tolerance that defines “close to the magnetic axis”, measured in the same units as ``R`` and ``Z``. Default is ``1e‑4``. Returns ------- list[int] Zero‑based indices of the ψ‑surfaces that satisfy the above criteria. """ index_map = [] # Loop over all psi-surfaces (assumed to be 0-indexed; nsurf is the total number) for i in range(self.nsurf): n_points = int(self.surf_len[i]) if n_points == 0: continue # Skip empty surfaces # Get the indices for this surface (already 0-based) and then the vertex coordinates. surf_vertex_indices = self.get_surface_vertex_indices(i) # Extract the (R,Z) coordinates from the global vertex array self.rz. coords = self.rz[surf_vertex_indices, :] # shape (n_points, 2) R_vals = coords[:, 0] Z_vals = coords[:, 1] R_vals.shape Z_vals.shape # For surfaces of length 1, check if the vertex is the magnetic axis. if n_points == 1: if (abs(R_vals[0] - self.axis_r) < tol and abs(Z_vals[0] - self.axis_z) < tol): index_map.append(i) continue # For longer surfaces, consider only vertices with R >= axis_r. valid = (R_vals >= self.axis_r) if not np.any(valid): continue # No vertex meets the criterion R_valid = R_vals[valid] Z_valid = Z_vals[valid] # Initialize best differences. best_lower_diff = np.inf best_upper_diff = np.inf found_lower = False found_upper = False # Loop over valid vertices. for z in Z_valid: diff = abs(z - self.axis_z) if z < self.axis_z: if diff < best_lower_diff: best_lower_diff = diff found_lower = True elif z > self.axis_z: if diff < best_upper_diff: best_upper_diff = diff found_upper = True # If z exactly equals axis_z (within tol) it can serve as either. elif abs(z - self.axis_z) < tol: found_lower = True found_upper = True if found_lower and found_upper: index_map.append(i) return index_map
[docs] def calculate_flux_surface_area(self): """ Calculates the surface area of each flux surface assuming that the 2D ψ-surfaces are rotated around the Z-axis by 2π. For each ψ-surface, the routine: - Loops over its vertices. - For each segment between adjacent vertices, computes the midpoint (R_m, Z_m) and the distance l_seg. - If the surface is closed (i.e. self.surf_region[i] == 1), the last vertex connects to the first; if open (region==2 or 3), the last vertex is skipped. - The area of each segment is 2π * R_m * l_seg. Returns: A list of surface areas (one per ψ-surface). Assumes that the Plane class has the following attributes: - self.nsurf: total number of ψ-surfaces. - self.surf_len: 1D array of length nsurf (number of vertices per surface). - self.surf_idx: 2D array (nsurf × surf_maxlen) of 0-based vertex indices. - self.rz: 2D array (n_n × 2) with (R,Z) coordinates. - self.surf_region: 1D array (length nsurf) with region codes (1 for closed, 2 or 3 for open). """ surface_areas = [] for i in range(self.nsurf): n_points = int(self.surf_len[i]) if n_points < 2: # For surfaces with fewer than 2 vertices, area is zero. surface_areas.append(0.0) continue # Get the region property for this ψ-surface. region_code = self.region[self.surf_idx[0,i]] area = 0.0 if region_code == 1: # Closed surface. for j in range(n_points): idx1 = self.surf_idx[i, j] idx2 = self.surf_idx[i, (j + 1) % n_points] R1, Z1 = self.rz[idx1, :] R2, Z2 = self.rz[idx2, :] Rm = 0.5 * (R1 + R2) lseg = np.sqrt((R2 - R1)**2 + (Z2 - Z1)**2) area += 2 * np.pi * Rm * lseg else: # Open surface (region==2 or 3). # Loop over segments from vertex 0 to vertex n_points-2. for j in range(n_points - 1): idx1 = self.surf_idx[i, j] idx2 = self.surf_idx[i, j + 1] R1, Z1 = self.rz[idx1, :] R2, Z2 = self.rz[idx2, :] Rm = 0.5 * (R1 + R2) lseg = np.sqrt((R2 - R1)**2 + (Z2 - Z1)**2) area += 2 * np.pi * Rm * lseg surface_areas.append(area) return np.array(surface_areas)
[docs] def plot_surfaces(self): """Plot all psi surfaces.""" vertex_coords = self.get_vertex_coordinates() plt.figure(figsize=(8, 8)) for s in range(self.nsurf): num = int(self.surf_len[s]) indices = self.surf_idx[s, :num] coords = vertex_coords[indices, :] plt.plot(coords[:, 0], coords[:, 1]) wall_coords = vertex_coords[self.wall_nodes,:] plt.plot(wall_coords[:,0], wall_coords[:,1], label=f"Wall") plt.xlabel("R") plt.ylabel("Z") plt.title("Psi Surfaces") plt.legend(fontsize="small", loc="upper right", ncol=1) plt.axis("equal") plt.show()
[docs] def plot_vertices_with_wall(self): """ Plots the mesh vertices as dots and overlays the wall as a closed line in the background. The wall is defined by the grid_wall_nodes variable. """ plt.figure(figsize=(8, 8)) # Retrieve the wall coordinates. wall_indices = self.wall_nodes wall_coords = self.rz[wall_indices, :] # shape: (n_wall, 2) # Close the wall contour by appending the first coordinate. wall_coords_closed = np.vstack([wall_coords, wall_coords[0]]) # Plot the wall as a red line. plt.plot(wall_coords_closed[:, 0], wall_coords_closed[:, 1], 'r-', lw=2, label="Wall") # Plot vertices as blue dots. plt.plot(self.rz[:, 0], self.rz[:, 1], 'bo', markersize=2, label="Vertices") plt.xlabel("R") plt.ylabel("Z") plt.title("Mesh Vertices and Wall") plt.legend() plt.axis("equal") plt.show()
[docs] def plot_triangles(self): """ Plots all the triangles of the mesh. For each triangle, the 3 vertex indices from nd_connect_list are used to get the corresponding (R, Z) coordinates from rz. The first vertex is appended to close the triangle, and the outline is plotted. """ import matplotlib.pyplot as plt plt.figure(figsize=(8, 8)) for itr in range(self.n_t): # Get the three vertex indices (0-based) for the triangle. tri_indices = self.nd_connect_list[itr,:] # Retrieve the corresponding (R, Z) coordinates. coords = self.rz[tri_indices, :] # shape: (3, 2) # Append the first coordinate to the end to close the triangle. coords_closed = np.vstack([coords, coords[0]]) # Plot the triangle outline. plt.plot(coords_closed[:, 0], coords_closed[:, 1], 'k-', lw=0.5) plt.xlabel("R") plt.ylabel("Z") plt.title("Mesh Triangles") plt.axis("equal") plt.show()
# Example usage if __name__ == "__main__": # Create a mesh instance from the BP file. plane_obj = Plane("xgc.mesh.bp") # Print basic mesh information. print("Number of vertices:", plane_obj.n_n) print("Number of triangles:", plane_obj.n_t) print("Number of psi surfaces:", plane_obj.nsurf) # Access some data through accessor methods. rz = plane_obj.get_vertex_coordinates() connectivity = plane_obj.get_triangle_connectivity() surface_props = plane_obj.get_surface_properties() print("Vertex coordinates shape:", rz.shape) print("Triangle connectivity shape:", connectivity.shape) print("Surface properties keys:", list(surface_props.keys())) # Plot all psi surfaces using Matplotlib. vertex_coords = plane_obj.get_vertex_coordinates() # (n_n, 2) #plane_obj.plot_surfaces() #plane_obj.plot_vertices() #plane_obj.plot_triangles() # Print shapes of the additional matrices. mat = plane_obj.cnv_to_surf.get_csr_matrix() print("cnv_to_surf shape:", mat.shape) mat = plane_obj.cnv_from_surf.get_csr_matrix() print("cnv_from_surf shape:", mat.shape) mat = plane_obj.gradient_r_psi.get_csr_matrix() print("gradient_r_psi:", mat.shape) mat = plane_obj.gradient_z_theta.get_csr_matrix() print("gradient_z_theta:", mat.shape)