import logging
from functools import cached_property
import numpy as np
import pyvista as pv
from fem2geo.internal.io import load_solver_output
from fem2geo.internal.schema import ModelSchema
from fem2geo.utils import tensor
__all__ = ["Model"]
log = logging.getLogger("fem2geoLogger")
[docs]
class Model:
"""
Interface to a FEM/BEM model for geomechanical post-processing.
Wraps a mesh loaded from VTK/VTU files and provides named access to stress,
strain, kinematic, and scalar fields in ENU coordinates.
Parameters
----------
grid : pyvista.UnstructuredGrid
Mesh with arrays renamed to canonical names by
:func:`~fem2geo.internal.io.load_solver_output`.
schema : ModelSchema
Schema mapping canonical names to solver-specific array names and units.
See Also
--------
Model.from_file : Construct a Model directly from a VTK/VTU path.
"""
def __init__(self, grid: pv.UnstructuredGrid, schema: ModelSchema):
self.grid = grid
self.schema = schema
[docs]
@classmethod
def from_file(cls, path, schema: ModelSchema | str = "adeli") -> "Model":
"""
Load a model from a VTK/VTU file.
Parameters
----------
path : str or Path
Path to the mesh file.
schema : ModelSchema or str
Schema instance, name of a built-in schema (e.g. ``"adeli"``,
``"adeli2"``), or path to a custom schema YAML file.
Returns
-------
Model
"""
if isinstance(schema, str):
schema = ModelSchema.load(schema)
return cls(load_solver_output(path, schema), schema)
# tensors
@cached_property
def strain(self) -> np.ndarray:
"""Total strain tensor :math:`\\varepsilon_{ij}`, shape (N, 3, 3)."""
return self._assemble_tensor("strain")
@cached_property
def strain_rate(self) -> np.ndarray:
"""Total strain rate tensor :math:`\\dot{\\varepsilon}_{ij}`, shape (N, 3, 3)."""
return self._assemble_tensor("strain_rate")
# kinematics
@cached_property
def u(self) -> np.ndarray:
"""Displacement vector :math:`u_i`, shape (N, 3)."""
return self.get("u")
@cached_property
def v(self) -> np.ndarray:
"""Velocity vector :math:`v_i`, shape (N, 3)."""
return self.get("v")
@cached_property
def t(self) -> np.ndarray:
"""Time field :math:`t`, shape (N,)."""
return self.get("t")
# scalar fields
@cached_property
def i1_strain(self) -> np.ndarray:
"""First strain invariant :math:`I_1(\\varepsilon) = \\mathrm{tr}\\, \\varepsilon`, shape (N,)."""
return self.get("i1_strain")
@cached_property
def j2_strain(self) -> np.ndarray:
"""Second deviatoric strain invariant :math:`J_2(\\varepsilon) = \\tfrac{1}{2} e_{ij} e_{ij}`, shape (N,)."""
return self.get("j2_strain")
@cached_property
def j2_stress(self) -> np.ndarray:
"""Second deviatoric stress invariant :math:`J_2(\\sigma) = \\tfrac{1}{2} s_{ij} s_{ij}`, shape (N,)."""
return self.get("j2_stress")
@cached_property
def i1_strain_rate(self) -> np.ndarray:
"""First strain rate invariant :math:`I_1(\\dot{\\varepsilon}) = \\mathrm{tr}\\,\\dot{\\varepsilon}`, shape (N,)."""
return self.get("i1_strain_rate")
@cached_property
def j2_strain_rate(self) -> np.ndarray:
"""Second deviatoric strain rate invariant :math:`J_2(\\dot{\\varepsilon})`,
shape (N,)."""
return self.get("j2_strain_rate")
@cached_property
def plastic_eff(self) -> np.ndarray:
"""Effective plastic strain :math:`\\varepsilon^p_{\\mathrm{eff}}`, shape (N,
)."""
return self.get("plastic_eff")
@cached_property
def plastic_vol(self) -> np.ndarray:
"""Volumetric plastic strain :math:`\\varepsilon^p_{\\mathrm{vol}}`,
shape (N,)."""
return self.get("plastic_vol")
@cached_property
def plastic_yield(self) -> np.ndarray:
"""Plastic yield indicator, shape (N,)."""
return self.get("plastic_yield")
@cached_property
def plastic_mode(self) -> np.ndarray:
"""Plastic failure mode, shape (N,)."""
return self.get("plastic_mode")
@cached_property
def mean_stress(self) -> np.ndarray:
"""Mean stress (pressure) :math:`p = \\tfrac{1}{3}\\,\\mathrm{tr}\\,\\sigma`,
shape (N,)."""
return self.get("mean_stress")
@cached_property
def viscosity(self) -> np.ndarray:
"""Effective viscosity :math:`\\eta`, shape (N,)."""
return self.get("viscosity")
@cached_property
def threshold_ratio(self) -> np.ndarray:
"""Yield threshold ratio, shape (N,)."""
return self.get("threshold_ratio")
@cached_property
def temperature(self) -> np.ndarray:
"""Temperature field :math:`T`, shape (N,)."""
return self.get("temperature")
@cached_property
def fluid_pressure(self) -> np.ndarray:
"""Fluid pressure :math:`p_f`, shape (N,)."""
return self.get("fluid_pressure")
@cached_property
def darcy_vel(self) -> np.ndarray:
"""Darcy velocity vector :math:`q_i`, shape (N, 3)."""
return self.get("darcy_vel")
@cached_property
def heat_flux(self) -> np.ndarray:
"""Heat flux vector :math:`q^T_i`, shape (N, 3)."""
return self.get("heat_flux")
# geometry
@property
def points(self) -> np.ndarray:
"""Mesh node coordinates, shape (N_points, 3)."""
return self.grid.points
@property
def n_points(self) -> np.ndarray:
"""Mesh node coordinates, shape (N_points, 3)."""
return self.grid.n_points
@property
def cells(self) -> int:
"""Number of cells in the mesh."""
return self.grid.cells
@property
def n_cells(self) -> int:
"""Number of cells in the mesh."""
return self.grid.number_of_cells
@cached_property
def cell_volumes(self) -> np.ndarray:
"""Cell volumes :math:`V_c`, shape (N,)."""
return self.grid.compute_cell_sizes().cell_data["Volume"]
@cached_property
def cell_centers(self) -> np.ndarray:
"""Cell center coordinates, shape (N, 3)."""
return self.grid.cell_centers().points
# stress
@cached_property
def stress(self) -> np.ndarray:
"""
Full stress tensor :math:`\\sigma_{ij}`, shape (N, 3, 3).
Assembled from the schema tensor if available. Otherwise, reconstructed from
principal values and directions.
"""
if "stress" in self.schema.tensors:
try:
return self._assemble_tensor("stress")
except KeyError:
log.warning("Full stress tensor not found")
required = ("val_s1", "val_s3", "dir_s1", "dir_s3")
missing = [
k for k in required
if k not in self.grid.cell_data
and k not in self.grid.point_data
]
if missing:
raise KeyError(f"Cannot assemble stress — missing: {missing}")
log.warning("Reconstructing stress tensor from principals.")
vals = np.column_stack([self.val_s1, self.val_s2, self.val_s3])
dirs = np.stack([self.dir_s1, self.dir_s2, self.dir_s3], axis=-1)
return tensor.reconstruct_from_principals(vals, dirs)
@cached_property
def stress_dev(self) -> np.ndarray:
r"""
Deviatoric stress tensor, shape (N, 3, 3).
.. math::
s_{ij} = \sigma_{ij} - \tfrac{1}{3}\,\sigma_{kk}\,\delta_{ij}
"""
s = self.stress
trace = np.trace(s, axis1=1, axis2=2)
return s - (trace / 3.0)[:, None, None] * np.eye(3)
# strain
@cached_property
def strain_plastic(self) -> np.ndarray:
"""
Plastic strain tensor :math:`\\varepsilon^p_{ij}`, shape (N, 3, 3).
Loaded from schema if available. Otherwise, reconstructed from
``plastic_eff`` and ``plastic_vol`` assuming isotropic flow rules.
"""
if "strain_plastic" in self.schema.tensors:
try:
return self._assemble_tensor("strain_plastic")
except KeyError:
log.warning(
"strain_plastic missing — reconstructing assuming coaxiality."
)
try:
eff = self.plastic_eff
vol = self.plastic_vol
except KeyError:
log.info("No plastic strain data — assuming elastic model.")
return np.zeros((self.n_cells, 3, 3))
if np.all(eff == 0) and np.all(vol == 0):
return np.zeros((self.n_cells, 3, 3))
dev = np.column_stack([self.val_s1, self.val_s2, self.val_s3])
j2 = np.sqrt(0.5 * np.sum(dev**2, axis=1, keepdims=True))
j2 = np.where(j2 < 1e-30, 1.0, j2)
shape = dev / j2
ep = eff[:, None] * shape + (vol / 3.0)[:, None]
dirs = np.stack([self.dir_s1, self.dir_s2, self.dir_s3], axis=-1)
return tensor.reconstruct_from_principals(ep, dirs)
@cached_property
def strain_elastic(self) -> np.ndarray:
r"""
Elastic strain tensor, shape (N, 3, 3).
Loaded from schema if available, otherwise computed as
.. math::
\varepsilon^e_{ij} = \varepsilon_{ij} - \varepsilon^p_{ij}
"""
if "strain_elastic" in self.schema.tensors:
try:
return self._assemble_tensor("strain_elastic")
except KeyError:
log.warning(
"strain_elastic missing. Computing as total - plastic."
)
return self.strain - self.strain_plastic
# principal stress directions
@cached_property
def dir_s1(self) -> np.ndarray:
"""Maximum compressive principal stress direction :math:`\\mathbf{n}_1`,
shape (N, 3)."""
if "dir_s1" in self.grid.array_names:
return self.get("dir_s1")
else:
return self.eigenvectors("stress")[:, :, 2]
@cached_property
def dir_s2(self) -> np.ndarray:
"""Intermediate principal stress direction :math:`\\mathbf{n}_2`, shape (N,
3)."""
if "dir_s2" in self.grid.array_names:
return self.get("dir_s2")
elif "dir_s3" in self.grid.array_names:
d = np.cross(self.dir_s1, self.dir_s3)
norms = np.linalg.norm(d, axis=1, keepdims=True)
return d / np.where(norms < 1e-12, 1.0, norms)
else:
return self.eigenvectors("stress")[:, :, 1]
@cached_property
def dir_s3(self) -> np.ndarray:
"""Minimum compressive principal stress direction :math:`\\mathbf{n}_3`,
shape (N, 3)."""
if "dir_s3" in self.grid.array_names:
return self.get("dir_s3")
else:
return self.eigenvectors("stress")[:, :, 0]
# eigendecomposition
[docs]
def eigenvectors(self, name: str) -> np.ndarray:
"""
Eigenvectors of a tensor field, sorted by ascending eigenvalue (in continuum
mechanics convention: compression is negative).
Parameters
----------
name : str
Canonical tensor name (e.g. ``'stress'``, ``'strain_plastic'``).
Returns
-------
numpy.ndarray, shape (N, 3, 3)
Eigenvectors as columns per cell.
"""
return tensor.eigenvectors(getattr(self, name))
[docs]
def eigenvalues(self, name: str) -> np.ndarray:
"""
Eigenvalues of a tensor field, sorted ascending (in continuum mechanics
convention: compression is negative).
Parameters
----------
name : str
Canonical tensor name (e.g. ``'stress'``, ``'strain_plastic'``).
Returns
-------
numpy.ndarray, shape (N, 3)
Eigenvalues per cell, column 0 smallest (most compressive).
"""
return tensor.eigenvalues(getattr(self, name))
# principal stress values
@cached_property
def val_s1(self) -> np.ndarray:
"""
Most compressive principal stress :math:`\\sigma_1`, shape (N,).
Loaded from the grid if available, otherwise the smallest eigenvalue
of the stress tensor.
"""
if "val_s1" in self.grid.array_names:
return self.get("val_s1")
return self.eigenvalues("stress")[:, 0]
@cached_property
def val_s2(self) -> np.ndarray:
"""
Intermediate principal stress :math:`\\sigma_2`, shape (N,).
Loaded from the grid if available, otherwise computed as
``-(val_s1 + val_s3)`` (deviatoric trace-free condition).
"""
if "val_s2" in self.grid.array_names:
return self.get("val_s2")
return -(self.val_s1 + self.val_s3)
@cached_property
def val_s3(self) -> np.ndarray:
"""
Least compressive principal stress :math:`\\sigma_3`, shape (N,).
Loaded from the grid if available, otherwise the largest eigenvalue
of the stress tensor.
"""
if "val_s3" in self.grid.array_names:
return self.get("val_s3")
return self.eigenvalues("stress")[:, 2]
# extraction
# averages
[docs]
def avg_tensor(self, name: str) -> np.ndarray:
r"""
Cell-volume-weighted average of a tensor field.
.. math::
\bar{T}_{ij} = \frac{\sum_c V_c\, T_{ij}^{(c)}}{\sum_c V_c}
where :math:`V_c` is the volume of cell :math:`c` and
:math:`T_{ij}^{(c)}` is the tensor value at that cell.
Parameters
----------
name : str
Tensor name. Accepts schema tensors and computed
properties: ``'stress'``, ``'stress_dev'``,
``'strain'``, ``'strain_rate'``,
``'strain_plastic'``, ``'strain_elastic'``.
Returns
-------
numpy.ndarray, shape (3, 3)
Symmetric average tensor.
"""
t = getattr(self, name)
w = self.cell_volumes
avg = np.einsum("ijk,i->jk", t, w) / w.sum()
return 0.5 * (avg + avg.T)
[docs]
def avg_principals(self, name: str = "stress") -> tuple:
"""
Eigenvectors of the the volume-weighted average of a tensor field.
Parameters
----------
name : str
Canonical tensor name (default: ``'stress'``).
Returns
-------
val : numpy.ndarray, shape (3,)
Eigenvalues sorted ascending.
vec : numpy.ndarray, shape (3, 3)
Eigenvectors as columns in ENU coordinates.
"""
avg = self.avg_tensor(name)
val, vec = np.linalg.eigh(avg)
order = np.argsort(val)
return val[order], vec[:, order]
# persistence
[docs]
def save(self, path) -> None:
"""
Save the model grid to a VTK/VTU file.
Parameters
----------
path : str or Path
Output file path.
"""
self.grid.save(str(path))
# internal
[docs]
def get(self, canonical: str) -> np.ndarray:
"""Look up a named array from cell or point data."""
for store in (self.grid.cell_data, self.grid.point_data):
if canonical in store:
return np.asarray(store[canonical])
raise KeyError(f"Field '{canonical}' not found in grid.")
def _assemble_tensor(self, name: str) -> np.ndarray:
if name not in self.schema.tensors:
raise KeyError(f"No tensor '{name}' in schema '{self.schema.name}'.")
entry = self.schema.tensors[name]
if entry.is_packed:
return tensor.unpack_voigt6(self.get(name))
arrays = {comp: self.get(f"_tensor_{name}_{comp}") for comp in entry.components}
return tensor.unpack_components(arrays)
def _extract_cells(self, point_mask: np.ndarray) -> pv.UnstructuredGrid:
conn = self.grid.cell_connectivity
starts = self.grid.offset[:-1]
flagged = point_mask[conn].astype(np.intp)
counts = np.add.reduceat(flagged, starts)
cell_ids = np.where(counts > 0)[0]
log.info(f"Extracted {len(cell_ids)} cells")
if len(cell_ids) == 0:
return pv.UnstructuredGrid()
return self.grid.extract_cells(cell_ids)