"""stochkin.fpe
=============
Fokker–Planck equation (FPE) solvers and grid-based kinetic operators.
This module provides numerical tools for forward and backward
Smoluchowski / Fokker–Planck equations on free-energy surfaces:
**Forward FPE (probability evolution)**
- :func:`solve_fp_steady_state` – 2D time-stepping toward the steady-state
density using FiPy (preferred) or an explicit NumPy fallback.
- :func:`solve_fp_1d_from_fes` – 1D time-stepping on a free-energy
profile :math:`F(s)` with position-dependent diffusion :math:`D(s)`.
**Discrete Fokker–Planck generator**
- :func:`build_fp_generator_from_fes` – build a detailed-balance-preserving
rate matrix :math:`L` on a 2D grid (SciPy sparse or dense).
**Backward BVP solvers (1D, pure NumPy)**
- :func:`solve_committor_1d_from_fes` – committor :math:`q(s)` from a
tridiagonal backward equation.
- :func:`solve_exit_time_1d_from_fes` – mean exit time :math:`\\tau(s)`.
- :func:`compute_ctmc_generator_fpe_1d` – multi-basin CTMC generator
from backward 1D solves.
- :func:`mfpt_1d_smolu_integral` – analytic Smoluchowski integral for
:math:`\\tau_{i \\to j}`.
**Backward BVP solvers (2D, FiPy)**
- :func:`solve_committor_fipy` – 2D committor on a FiPy mesh.
- :func:`solve_exit_time_fipy` – 2D mean exit time.
- :func:`compute_ctmc_generator_fpe_fipy` – multi-basin CTMC generator
from backward 2D FiPy solves.
"""
from __future__ import annotations
import numpy as np
# -------------------------------------------------------------------------
# Optional progress bar
# -------------------------------------------------------------------------
try: # pragma: no cover
from tqdm import tqdm as _tqdm
except Exception: # pragma: no cover
def _tqdm(it):
return it
# -------------------------------------------------------------------------
# Optional matplotlib (only used for quick diagnostics / plots)
# -------------------------------------------------------------------------
try: # pragma: no cover
import matplotlib.pyplot as plt
_HAVE_MPL = True
except Exception: # pragma: no cover
plt = None
_HAVE_MPL = False
# -------------------------------------------------------------------------
# Optional FiPy backend
# -------------------------------------------------------------------------
try:
from fipy import (
CellVariable,
Grid1D,
Grid2D,
# PDE terms
TransientTerm,
DiffusionTerm,
ExponentialConvectionTerm,
ImplicitSourceTerm,
)
from fipy.tools import numerix
try:
from fipy.meshes.nonUniformGrid2D import NonUniformGrid2D
except Exception: # pragma: no cover
try:
from fipy.meshes import NonUniformGrid2D # type: ignore
except Exception: # pragma: no cover
NonUniformGrid2D = None
_HAVE_FIPY = True
except Exception: # pragma: no cover
CellVariable = None # type: ignore
Grid1D = None # type: ignore
Grid2D = None # type: ignore
TransientTerm = None # type: ignore
DiffusionTerm = None # type: ignore
ExponentialConvectionTerm = None # type: ignore
ImplicitSourceTerm = None # type: ignore
numerix = None # type: ignore
NonUniformGrid2D = None # type: ignore
_HAVE_FIPY = False
def _default_fipy_solver(tolerance: float = 1e-10, iterations: int = 2000):
"""Return a reasonable default FiPy linear solver if available.
FiPy solver class locations differ between installations; this helper tries
a handful of common ones and falls back to FiPy's defaults if needed.
"""
if not _HAVE_FIPY:
return None
for mod_name, cls_name in [
("fipy.solvers.scipy", "LinearGMRESSolver"),
("fipy.solvers.scipy", "LinearBicgstabSolver"),
("fipy.solvers.scipy", "LinearPCGSolver"),
("fipy.solvers.scipy", "LinearLUSolver"),
("fipy.solvers", "LinearGMRESSolver"),
("fipy.solvers", "LinearLUSolver"),
]:
try:
mod = __import__(mod_name, fromlist=[cls_name])
cls = getattr(mod, cls_name)
try:
return cls(tolerance=float(tolerance), iterations=int(iterations))
except TypeError:
return cls(tolerance=float(tolerance))
except Exception:
continue
return None
# -------------------------------------------------------------------------
# Optional SciPy sparse backend
# -------------------------------------------------------------------------
try: # pragma: no cover
import scipy.sparse as sp # type: ignore
_HAVE_SCIPY_SPARSE = True
except Exception: # pragma: no cover
sp = None
_HAVE_SCIPY_SPARSE = False
# -------------------------------------------------------------------------
# Utilities
# -------------------------------------------------------------------------
def _compute_potential_grid(potential, xs, ys):
"""Sample U(x,y) and ∇U(x,y) on a regular grid.
Parameters
----------
potential : callable
potential([x, y]) -> (U, F) with F = -∇U.
xs, ys : 1D arrays
Grid coordinates.
Returns
-------
U, Ux, Uy : (nx, ny)
U(x,y), dU/dx, dU/dy on the grid.
"""
xs = np.asarray(xs, dtype=float)
ys = np.asarray(ys, dtype=float)
nx = xs.size
ny = ys.size
U = np.empty((nx, ny), dtype=float)
Ux = np.empty_like(U)
Uy = np.empty_like(U)
for i, xv in enumerate(xs):
for j, yv in enumerate(ys):
E, F = potential([xv, yv])
U[i, j] = float(E)
# F = -∇U ⇒ ∇U = -F
Ux[i, j] = -float(F[0])
Uy[i, j] = -float(F[1])
return U, Ux, Uy
# -------------------------------------------------------------------------
# Explicit NumPy fallback (conservative finite-volume scheme)
# -------------------------------------------------------------------------
def _solve_fp_steady_state_explicit(
potential,
xlim,
ylim,
nx,
ny,
D,
beta,
dt,
n_steps,
initial,
normalize_each_step,
plot_final,
):
"""Explicit finite-volume integration for the 2D forward FPE.
This is a fallback when FiPy is not available.
We integrate the divergence-form forward equation:
∂_t p = -∇·J
J = -D (∇p + β p ∇U)
with reflecting (no-flux) boundaries implemented by setting face fluxes
to zero at the domain boundary.
Notes
-----
This explicit scheme can require small `dt` for stability.
"""
x0, x1 = float(xlim[0]), float(xlim[1])
y0, y1 = float(ylim[0]), float(ylim[1])
xs = np.linspace(x0, x1, int(nx))
ys = np.linspace(y0, y1, int(ny))
if xs.size < 2 or ys.size < 2:
raise ValueError("nx and ny must be >= 2.")
dx = float(xs[1] - xs[0])
dy = float(ys[1] - ys[0])
U, Ux, Uy = _compute_potential_grid(potential, xs, ys)
# Initial condition
if initial == "uniform":
p = np.ones((xs.size, ys.size), dtype=float)
elif callable(initial):
X, Y = np.meshgrid(xs, ys, indexing="ij")
coords = np.vstack((X.ravel(), Y.ravel()))
p0 = np.asarray(initial(coords), dtype=float).ravel()
if p0.size != xs.size * ys.size:
raise ValueError(
f"Initial condition callable must return {xs.size*ys.size} values, got {p0.size}."
)
p = np.maximum(p0.reshape((xs.size, ys.size)), 0.0)
else:
raise ValueError("initial must be 'uniform' or a callable(coords)->array")
Dbeta = float(D) * float(beta)
def _normalize():
total = float(p.sum() * dx * dy)
if total > 0.0:
p[:] = p / total
_normalize()
# Face flux arrays
Jx = np.zeros((xs.size + 1, ys.size), dtype=float)
Jy = np.zeros((xs.size, ys.size + 1), dtype=float)
for _ in range(int(n_steps)):
# X-fluxes on vertical faces (i+1/2, j)
pL = p[:-1, :]
pR = p[1:, :]
p_face = 0.5 * (pL + pR)
Ux_face = 0.5 * (Ux[:-1, :] + Ux[1:, :])
dpdx = (pR - pL) / dx
Jx[1:-1, :] = -float(D) * (dpdx + Dbeta * p_face * Ux_face)
# Reflecting boundary faces
Jx[0, :] = 0.0
Jx[-1, :] = 0.0
# Y-fluxes on horizontal faces (i, j+1/2)
pB = p[:, :-1]
pT = p[:, 1:]
p_face = 0.5 * (pB + pT)
Uy_face = 0.5 * (Uy[:, :-1] + Uy[:, 1:])
dpdy = (pT - pB) / dy
Jy[:, 1:-1] = -float(D) * (dpdy + Dbeta * p_face * Uy_face)
Jy[:, 0] = 0.0
Jy[:, -1] = 0.0
# Divergence (cell-centered)
dJx_dx = (Jx[1:, :] - Jx[:-1, :]) / dx
dJy_dy = (Jy[:, 1:] - Jy[:, :-1]) / dy
dpdt = -(dJx_dx + dJy_dy)
p[:] = np.maximum(p + float(dt) * dpdt, 0.0)
if normalize_each_step:
_normalize()
_normalize()
if plot_final and _HAVE_MPL:
X, Y = np.meshgrid(xs, ys, indexing="ij")
fig, ax = plt.subplots()
c = ax.pcolormesh(X, Y, p, shading="auto")
fig.colorbar(c, ax=ax, label="p(x,y)")
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.set_title("Steady-state probability density p(x,y) (explicit solver)")
ax.set_aspect("equal")
plt.show()
return {
"p_grid": p,
"xs": xs,
"ys": ys,
"U_grid": U,
"Ux": Ux,
"Uy": Uy,
}
# -------------------------------------------------------------------------
# FiPy-based implicit solver (preferred)
# -------------------------------------------------------------------------
[docs]
def solve_fp_steady_state(
potential,
xlim=(-2.0, 2.0),
ylim=(-2.0, 2.0),
nx=100,
ny=100,
D=1.0,
beta=1.0,
dt=1e-3,
n_steps=500,
initial="uniform",
normalize_each_step=True,
viewer=False,
plot_final=True,
):
"""Solve the 2D forward Fokker–Planck equation toward steady state.
Evolves the probability density :math:`p(x,y,t)` of the overdamped
Langevin equation
.. math::
dX_t = -D\\beta\\,\\nabla U(X_t)\\,dt + \\sqrt{2D}\\,dW_t
via the forward FPE in divergence form:
.. math::
\\partial_t p = \\nabla\\cdot(D\\nabla p + D\\beta\\, p\\,\\nabla U)
Uses FiPy (implicit, preferred) when available; otherwise falls back
to an explicit NumPy FTCS scheme.
Parameters
----------
potential : callable
``potential([x, y]) -> (U, F)`` with ``F = -\\nabla U``.
xlim, ylim : (float, float)
Domain bounds.
nx, ny : int
Grid points per axis.
D : float
Diffusion coefficient (constant).
beta : float
Inverse temperature.
dt : float
Time step.
n_steps : int
Number of integration steps.
initial : {'uniform'} or callable
Initial distribution.
normalize_each_step : bool
Re-normalise :math:`p` at every step.
viewer : bool
Use FiPy’s live viewer (FiPy only).
plot_final : bool
Plot the final density.
Returns
-------
dict
Keys: ``'p_grid'``, ``'xs'``, ``'ys'``, ``'U_grid'``,
``'Ux'``, ``'Uy'``.
"""
if not _HAVE_FIPY:
return _solve_fp_steady_state_explicit(
potential=potential,
xlim=xlim,
ylim=ylim,
nx=nx,
ny=ny,
D=D,
beta=beta,
dt=dt,
n_steps=n_steps,
initial=initial,
normalize_each_step=normalize_each_step,
plot_final=plot_final,
)
x0, x1 = float(xlim[0]), float(xlim[1])
y0, y1 = float(ylim[0]), float(ylim[1])
Lx = x1 - x0
Ly = y1 - y0
nx = int(nx)
ny = int(ny)
if nx < 1 or ny < 1:
raise ValueError("nx and ny must be positive")
dx = Lx / nx
dy = Ly / ny
# Base mesh [0,Lx]×[0,Ly] then translate.
mesh = Grid2D(dx=dx, dy=dy, nx=nx, ny=ny)
mesh = mesh + ((x0,), (y0,))
# Sample potential and gradient on FiPy cell centers
cx = np.asarray(mesh.cellCenters[0], dtype=float)
cy = np.asarray(mesh.cellCenters[1], dtype=float)
n_cells = cx.size
U_vals = np.empty(n_cells, dtype=float)
Ux_vals = np.empty_like(U_vals)
Uy_vals = np.empty_like(U_vals)
for k in range(n_cells):
E, F = potential([cx[k], cy[k]])
U_vals[k] = float(E)
Ux_vals[k] = -float(F[0])
Uy_vals[k] = -float(F[1])
Dbeta = float(D) * float(beta)
convCoeff = CellVariable(mesh=mesh, name="Dbeta_gradU", rank=1)
convCoeff.value = np.vstack((Dbeta * Ux_vals, Dbeta * Uy_vals))
# Initial condition
if initial == "uniform":
p0 = np.ones(n_cells, dtype=float)
elif callable(initial):
coords = np.vstack((cx, cy))
p0 = np.asarray(initial(coords), dtype=float).ravel()
if p0.size != n_cells:
raise ValueError(
f"Initial callable must return array of length {n_cells}, got {p0.size}."
)
p0 = np.maximum(p0, 0.0)
else:
raise ValueError("initial must be 'uniform' or a callable(coords)->array")
p = CellVariable(mesh=mesh, name="p", value=p0)
volumes = np.asarray(mesh.cellVolumes, dtype=float)
def _normalize():
total = float((p.value * volumes).sum())
if total > 0.0:
p.setValue(p.value / total)
_normalize()
view = None
if viewer:
try: # pragma: no cover
from fipy import Viewer
view = Viewer(vars=p)
except Exception:
view = None
# FiPy equation: ∂_t p = ∇·(D ∇p) + ∇·(convCoeff * p)
eq = (
TransientTerm(var=p)
== DiffusionTerm(coeff=float(D), var=p)
+ ExponentialConvectionTerm(coeff=convCoeff, var=p)
)
for _ in _tqdm(range(int(n_steps))):
eq.solve(var=p, dt=float(dt))
p.setValue(numerix.where(p.value < 0.0, 0.0, p.value))
if normalize_each_step:
_normalize()
if view is not None:
view.plot()
_normalize()
# Map FiPy cells back to grid arrays
xs = x0 + dx * (np.arange(nx) + 0.5)
ys = y0 + dy * (np.arange(ny) + 0.5)
p_grid = np.zeros((nx, ny), dtype=float)
U_grid = np.zeros((nx, ny), dtype=float)
Ux_grid = np.zeros((nx, ny), dtype=float)
Uy_grid = np.zeros((nx, ny), dtype=float)
i_idx = np.floor((cx - x0) / dx).astype(int)
j_idx = np.floor((cy - y0) / dy).astype(int)
p_vals = np.asarray(p.value, dtype=float)
p_grid[i_idx, j_idx] = p_vals
U_grid[i_idx, j_idx] = U_vals
Ux_grid[i_idx, j_idx] = Ux_vals
Uy_grid[i_idx, j_idx] = Uy_vals
if plot_final and _HAVE_MPL:
X, Y = np.meshgrid(xs, ys, indexing="ij")
fig, ax = plt.subplots()
c = ax.pcolormesh(X, Y, p_grid, shading="auto")
fig.colorbar(c, ax=ax, label="p(x,y)")
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.set_title("Steady-state probability density p(x,y) (FiPy solver)")
ax.set_aspect("equal")
plt.show()
return {
"p_grid": p_grid,
"xs": xs,
"ys": ys,
"U_grid": U_grid,
"Ux": Ux_grid,
"Uy": Uy_grid,
}
# -------------------------------------------------------------------------
# Discrete FP generator on a 2D FES grid (for kinetic analysis)
# -------------------------------------------------------------------------
[docs]
def build_fp_generator_from_fes(xs, ys, U_grid, D, beta, sparse=True):
r"""Build a detailed-balance-preserving discrete FP generator on a regular grid.
Parameters
----------
xs, ys : 1D arrays
Grid coordinates (monotonic, uniform spacing expected).
U_grid : (nx, ny) array
Potential / free energy on the grid.
D : float or (nx, ny) array
Diffusion coefficient (scalar or field).
beta : float
Inverse temperature.
sparse : bool
If True, return a SciPy CSR matrix (requires SciPy). If False, return dense.
Returns
-------
L : (N, N)
Generator with off-diagonals >=0 and row sums 0. Indexing: k=i*ny + j.
Notes
-----
A common symmetric ("square-root") discretization is used:
.. math::
k_{i \to j} = \frac{D_{\text{face}}}{\Delta^2}
\exp\!\bigl[-\tfrac{\beta}{2}(U_j - U_i)\bigr]
so that for constant *D* the stationary distribution satisfies
:math:`\pi_i \propto \exp(-\beta U_i)` up to discretization error.
"""
xs = np.asarray(xs, dtype=float)
ys = np.asarray(ys, dtype=float)
U = np.asarray(U_grid, dtype=float)
if U.ndim != 2:
raise ValueError("U_grid must be 2D with shape (nx, ny).")
nx, ny = U.shape
# Ensure coordinate vectors match the grid shape
if xs.size != nx or ys.size != ny:
raise ValueError(
f"xs/ys lengths ({xs.size},{ys.size}) do not match U_grid shape ({nx},{ny})."
)
if not np.all(np.isfinite(U)):
raise ValueError(
"U_grid contains non-finite values; mask/crop invalid regions before building L."
)
if np.isscalar(D):
D_grid = np.full_like(U, float(D), dtype=float)
else:
D_arr = np.asarray(D, dtype=float)
if D_arr.shape != U.shape:
raise ValueError(f"D has shape {D_arr.shape}, expected scalar or {U.shape}.")
if not np.all(np.isfinite(D_arr)):
raise ValueError("D contains non-finite values.")
if np.any(D_arr <= 0.0):
raise ValueError("D must be strictly positive everywhere.")
D_grid = D_arr
if xs.size < 2 or ys.size < 2:
raise ValueError("xs and ys must have length >= 2")
dx = float(xs[1] - xs[0])
dy = float(ys[1] - ys[0])
if dx <= 0.0 or dy <= 0.0:
raise ValueError("Grid coordinates xs, ys must be strictly increasing")
# Validate (near-)uniform spacing. The symmetric detailed-balance discretization
# used below assumes constant dx, dy.
dxs = np.diff(xs)
dys = np.diff(ys)
if not (np.allclose(dxs, dx, rtol=1e-6, atol=1e-12) and np.allclose(dys, dy, rtol=1e-6, atol=1e-12)):
raise ValueError(
"Non-uniform grid spacing detected in xs/ys. "
"build_fp_generator_from_fes currently requires a uniform grid. "
"Resample/interpolate to a regular grid or implement a non-uniform discretization."
)
N = nx * ny
if sparse:
if not _HAVE_SCIPY_SPARSE:
raise ImportError(
"build_fp_generator_from_fes(sparse=True) requires scipy.sparse. "
"Install SciPy or call with sparse=False (small grids only)."
)
L = sp.lil_matrix((N, N), dtype=float)
else:
L = np.zeros((N, N), dtype=float)
def idx(i, j):
return i * ny + j
for i in range(nx):
for j in range(ny):
row = idx(i, j)
Ui = U[i, j]
Di = D_grid[i, j]
rate_sum = 0.0
# +x
if i + 1 < nx:
Uj = U[i + 1, j]
Dj = D_grid[i + 1, j]
D_face = 0.5 * (Di + Dj)
k = D_face / (dx * dx) * np.exp(-0.5 * float(beta) * (Uj - Ui))
L[row, idx(i + 1, j)] = k
rate_sum += k
# -x
if i - 1 >= 0:
Uj = U[i - 1, j]
Dj = D_grid[i - 1, j]
D_face = 0.5 * (Di + Dj)
k = D_face / (dx * dx) * np.exp(-0.5 * float(beta) * (Uj - Ui))
L[row, idx(i - 1, j)] = k
rate_sum += k
# +y
if j + 1 < ny:
Uj = U[i, j + 1]
Dj = D_grid[i, j + 1]
D_face = 0.5 * (Di + Dj)
k = D_face / (dy * dy) * np.exp(-0.5 * float(beta) * (Uj - Ui))
L[row, idx(i, j + 1)] = k
rate_sum += k
# -y
if j - 1 >= 0:
Uj = U[i, j - 1]
Dj = D_grid[i, j - 1]
D_face = 0.5 * (Di + Dj)
k = D_face / (dy * dy) * np.exp(-0.5 * float(beta) * (Uj - Ui))
L[row, idx(i, j - 1)] = k
rate_sum += k
L[row, row] = -rate_sum
return L.tocsr() if sparse else L
# -------------------------------------------------------------------------
# 1D forward FPE on a free energy profile (FiPy)
# -------------------------------------------------------------------------
[docs]
def solve_fp_1d_from_fes(
s,
F,
D,
beta=1.0,
dt=1e-3,
n_steps=1000,
initial="boltzmann",
normalize_each_step=True,
viewer=False,
):
"""1D forward Fokker–Planck solver on a free-energy profile.
Solves
.. math::
\\partial_t p(s,t) = \\partial_s\\bigl[D(s)\\bigl(
\\partial_s p + \\beta\\, p\\, F'(s)\\bigr)\\bigr]
with reflecting (no-flux) boundaries via FiPy.
Parameters
----------
s : array_like
Uniform 1D grid.
F : array_like
Free-energy values on *s*.
D : array_like
Diffusion coefficient on *s*.
beta : float
Inverse temperature.
dt : float
Time step.
n_steps : int
Number of integration steps.
initial : {'uniform', 'boltzmann'} or array_like
Initial distribution.
normalize_each_step : bool
Re-normalise at every step.
viewer : bool
Use FiPy’s live viewer.
Returns
-------
dict
Keys: ``'s'``, ``'p'``, ``'F'``, ``'D'``, ``'dF_ds'``.
Raises
------
ImportError
If FiPy is not installed.
"""
s = np.asarray(s, dtype=float)
F = np.asarray(F, dtype=float)
D = np.asarray(D, dtype=float)
if s.ndim != 1:
raise ValueError("s must be a 1D grid")
if not (F.shape == s.shape and D.shape == s.shape):
raise ValueError("F and D must have the same shape as s")
if not _HAVE_FIPY:
raise ImportError("FiPy is required for solve_fp_1d_from_fes()")
ns = s.size
if ns < 3:
raise ValueError("Need at least 3 grid points")
ds_arr = np.diff(s)
ds = float(ds_arr.mean())
if np.max(np.abs(ds_arr - ds)) > 1e-6 * abs(ds):
raise ValueError("solve_fp_1d_from_fes currently assumes a uniform grid in s")
x0 = float(s[0] - 0.5 * ds)
mesh = Grid1D(dx=ds, nx=ns) + ((x0,),)
cell_s = np.asarray(mesh.cellCenters[0], dtype=float)
F_vals = np.interp(cell_s, s, F)
D_vals = np.interp(cell_s, s, D)
dF_ds = np.gradient(F_vals, cell_s)
D_var = CellVariable(mesh=mesh, name="D(s)", value=D_vals)
# ExponentialConvectionTerm expects a *vector* coefficient.
# In 1D, that is shape (1, nCells).
convCoeff = CellVariable(mesh=mesh, name="Dbeta_dFds", rank=1)
convCoeff.value = np.array([D_vals * float(beta) * dF_ds], dtype=float)
# Initial condition
if isinstance(initial, str):
if initial.lower() == "uniform":
p0 = np.ones_like(cell_s)
elif initial.lower() == "boltzmann":
p0 = np.exp(-float(beta) * F_vals)
else:
raise ValueError("initial must be 'uniform', 'boltzmann', or an array")
else:
p0 = np.asarray(initial, dtype=float)
if p0.shape != cell_s.shape:
raise ValueError("Initial array must have the same shape as s")
p0 = np.maximum(p0, 0.0)
p = CellVariable(mesh=mesh, name="p(s)", value=p0)
volumes = np.asarray(mesh.cellVolumes, dtype=float)
def _normalize():
total = float((p.value * volumes).sum())
if total > 0.0:
p.setValue(p.value / total)
_normalize()
view = None
if viewer:
try: # pragma: no cover
from fipy import Viewer
view = Viewer(vars=p)
except Exception:
view = None
eq = (
TransientTerm(var=p)
== DiffusionTerm(coeff=D_var, var=p)
+ ExponentialConvectionTerm(coeff=convCoeff, var=p)
)
for _ in _tqdm(range(int(n_steps))):
eq.solve(var=p, dt=float(dt))
p.setValue(numerix.where(p.value < 0.0, 0.0, p.value))
if normalize_each_step:
_normalize()
if view is not None:
view.plot()
_normalize()
return {
"s": cell_s.copy(),
"p": np.asarray(p.value, dtype=float),
"F": F_vals,
"D": D_vals,
"dF_ds": dF_ds,
}
[docs]
def mfpt_1d_smolu_integral(s, F, D, beta, i_index, j_index):
"""Analytic Smoluchowski integral for the 1D MFPT :math:`\\tau_{i\\to j}`.
Uses the classical formula
.. math::
\\tau_{i\\to j}
= \\int_{s_i}^{s_j}
\\frac{e^{\\beta F(z)}}{D(z)}
\\int_{s_{\\min}}^{z} e^{-\\beta F(y)}\\,dy\\;dz
evaluated via the trapezoidal rule.
Parameters
----------
s : array_like
1D grid (must be sorted).
F : array_like
Free-energy values on *s*.
D : array_like
Diffusion coefficient on *s*.
beta : float
Inverse temperature.
i_index, j_index : int
Grid indices of source and target.
Returns
-------
float
Mean first-passage time :math:`\\tau_{i\\to j}`.
"""
s = np.asarray(s, dtype=float)
F = np.asarray(F, dtype=float)
D = np.asarray(D, dtype=float)
if not (s.ndim == F.ndim == D.ndim == 1):
raise ValueError("s, F and D must be 1D arrays")
if not (s.shape == F.shape == D.shape):
raise ValueError("s, F and D must have the same shape")
n = s.size
if i_index == j_index:
return 0.0
if not (0 <= i_index < n and 0 <= j_index < n):
raise IndexError("i_index and j_index must be valid indices")
# If target is left of start, reverse arrays to reuse same formula.
if j_index < i_index:
s = s[::-1]
F = F[::-1]
D = D[::-1]
i_index = n - 1 - i_index
j_index = n - 1 - j_index
f = np.exp(-float(beta) * F)
# A(z) = ∫_{s_min}^{z} e^{-βF}
A = np.zeros_like(s)
for k in range(1, n):
ds = s[k] - s[k - 1]
A[k] = A[k - 1] + 0.5 * (f[k - 1] + f[k]) * ds
g = np.exp(float(beta) * F) * A / D
tau = 0.0
for k in range(i_index + 1, j_index + 1):
ds = s[k] - s[k - 1]
tau += 0.5 * (g[k - 1] + g[k]) * ds
return float(tau)
# =============================================================================
# 1D backward Smoluchowski solves (NumPy tridiagonal; no FiPy required)
#
# We use the same divergence-form coefficient as the 2D FiPy backward solver:
#
# d/ds ( A(s) d u/ds ) = b(s)
#
# with
# w(s) = exp(-beta * (F(s) - Fmin))
# A(s) = D(s) * w(s)
#
# This corresponds to the backward generator
# L u = (1/w) d/ds ( D w d u/ds )
# and yields the standard committor and exit-time BVPs:
# d/ds (A q') = 0, with q=1 on B and q=0 on A
# d/ds (A tau') = -w, with tau=0 on absorbing set
#
# We discretize on a uniform 1D grid using conservative face fluxes.
# =============================================================================
def _safe_exp(x: np.ndarray) -> np.ndarray:
"""Compute exp(x) with clipping to avoid overflow/underflow."""
x = np.asarray(x, dtype=float)
return np.exp(np.clip(x, -700.0, 700.0))
def _require_uniform_grid_1d(s: np.ndarray, rtol: float = 1e-10, atol: float = 1e-12) -> float:
"""Return uniform spacing ds, or raise if s is not (approximately) uniform."""
s = np.asarray(s, dtype=float).ravel()
if s.size < 3:
raise ValueError("Need at least 3 grid points")
ds = np.diff(s)
if not np.all(ds > 0):
raise ValueError("Grid s must be strictly increasing")
ds0 = float(ds.mean())
if not np.allclose(ds, ds0, rtol=rtol, atol=atol):
raise ValueError(
"This solver requires an (approximately) uniform grid in s. "
"Resample/interpolate your FES to a uniform grid first."
)
return ds0
def _as_1d_array(x, n: int, name: str) -> np.ndarray:
x = np.asarray(x, dtype=float).ravel()
if x.size == 1:
return np.full(n, float(x[0]), dtype=float)
if x.size != n:
raise ValueError(f"{name} must be scalar or have length {n}")
return x
def _build_tridiag_div_A_grad_1d(A_cell: np.ndarray, ds: float):
"""Build tridiagonal for u -> -d/ds(A du/ds) with Neumann at domain ends.
Notes
-----
We return the matrix for the *negative* divergence operator so that the
resulting discrete operator matches the sign convention of a Markov
generator (diagonal negative, off-diagonal positive) when used as
L = (1/w) * (-div(A grad ·)).
"""
A_cell = np.asarray(A_cell, dtype=float).ravel()
n = int(A_cell.size)
if n < 3:
raise ValueError("Need at least 3 cells")
# arithmetic face values (FiPy's arithmeticFaceValue analogue)
A_face = 0.5 * (A_cell[:-1] + A_cell[1:]) # length n-1
inv_ds2 = 1.0 / (float(ds) * float(ds))
lower = np.zeros(n - 1, dtype=float)
diag = np.zeros(n, dtype=float)
upper = np.zeros(n - 1, dtype=float)
# i=0 (left boundary, Neumann: no A_{-1/2} term)
diag[0] = -A_face[0] * inv_ds2
upper[0] = +A_face[0] * inv_ds2
# interior
for i in range(1, n - 1):
Am = A_face[i - 1]
Ap = A_face[i]
lower[i - 1] = +Am * inv_ds2
diag[i] = -(Am + Ap) * inv_ds2
upper[i] = +Ap * inv_ds2
# i=n-1 (right boundary)
diag[n - 1] = -A_face[n - 2] * inv_ds2
lower[n - 2] = +A_face[n - 2] * inv_ds2
return lower, diag, upper
def _apply_dirichlet_tridiag(lower, diag, upper, rhs, idx, value):
"""Impose u[idx]=value in-place on a tridiagonal system."""
n = diag.size
idx = int(idx)
val = float(value)
# adjust neighbors to eliminate the fixed variable
if idx - 1 >= 0:
# row idx-1 has upper coefficient to u[idx]
rhs[idx - 1] -= upper[idx - 1] * val
upper[idx - 1] = 0.0
if idx + 1 <= n - 1:
# row idx+1 has lower coefficient to u[idx]
rhs[idx + 1] -= lower[idx] * val
lower[idx] = 0.0
# overwrite row idx
diag[idx] = 1.0
rhs[idx] = val
if idx - 1 >= 0:
lower[idx - 1] = 0.0
if idx <= n - 2:
upper[idx] = 0.0
def _solve_tridiagonal_thomas(lower, diag, upper, rhs):
"""Solve a tridiagonal system using the Thomas algorithm."""
lower = np.asarray(lower, dtype=float).copy()
diag = np.asarray(diag, dtype=float).copy()
upper = np.asarray(upper, dtype=float).copy()
rhs = np.asarray(rhs, dtype=float).copy()
n = diag.size
if lower.size != n - 1 or upper.size != n - 1 or rhs.size != n:
raise ValueError("Tridiagonal shapes are inconsistent")
# forward sweep
for i in range(1, n):
piv = diag[i - 1]
if piv == 0.0 or not np.isfinite(piv):
raise RuntimeError("Singular/ill-conditioned tridiagonal system")
w = lower[i - 1] / piv
diag[i] -= w * upper[i - 1]
rhs[i] -= w * rhs[i - 1]
# back substitution
x = np.zeros(n, dtype=float)
piv = diag[-1]
if piv == 0.0 or not np.isfinite(piv):
raise RuntimeError("Singular/ill-conditioned tridiagonal system")
x[-1] = rhs[-1] / piv
for i in range(n - 2, -1, -1):
piv = diag[i]
if piv == 0.0 or not np.isfinite(piv):
raise RuntimeError("Singular/ill-conditioned tridiagonal system")
x[i] = (rhs[i] - upper[i] * x[i + 1]) / piv
return x
[docs]
def solve_committor_1d_from_fes(
s,
F,
D=1.0,
beta=1.0,
mask_q1=None,
mask_q0=None,
):
"""Solve the 1D committor BVP: d/ds(A q') = 0 with internal Dirichlet sets.
Parameters
----------
s, F : 1D arrays
Grid and free energy (or potential). Must be (approximately) uniform.
D : float or 1D array
Diffusion coefficient on the grid.
beta : float
Inverse thermal energy in units compatible with F.
mask_q1, mask_q0 : boolean arrays
Dirichlet sets for q=1 and q=0.
Returns
-------
q : 1D array
Committor values.
"""
s = np.asarray(s, dtype=float).ravel()
F = np.asarray(F, dtype=float).ravel()
if s.size != F.size:
raise ValueError("s and F must have the same length")
ds = _require_uniform_grid_1d(s)
n = s.size
D = _as_1d_array(D, n, name="D")
# conditioning shift by min(F) (does not change the solution)
F0 = float(np.nanmin(F[np.isfinite(F)]))
w = _safe_exp(-float(beta) * (F - F0))
A_cell = D * w
# Build div(A grad q) and rescale rows by 1/w to solve the better-conditioned
# backward equation: (1/w) div(A grad q) = 0.
lower, diag, upper = _build_tridiag_div_A_grad_1d(A_cell, ds)
rhs = np.zeros(n, dtype=float)
w_safe = np.maximum(w, 1e-300)
# scale row i coefficients by 1/w_i
diag /= w_safe
rhs /= w_safe
upper /= w_safe[:-1]
lower /= w_safe[1:]
if mask_q1 is None or mask_q0 is None:
raise ValueError("mask_q1 and mask_q0 must be provided")
m1 = np.asarray(mask_q1, dtype=bool).ravel()
m0 = np.asarray(mask_q0, dtype=bool).ravel()
if m1.size != n or m0.size != n:
raise ValueError("mask_q1 and mask_q0 must have the same length as s")
if np.any(m1 & m0):
raise ValueError("mask_q1 and mask_q0 overlap")
# Impose Dirichlet sets
for idx in np.where(m1)[0]:
_apply_dirichlet_tridiag(lower, diag, upper, rhs, int(idx), 1.0)
for idx in np.where(m0)[0]:
_apply_dirichlet_tridiag(lower, diag, upper, rhs, int(idx), 0.0)
q = _solve_tridiagonal_thomas(lower, diag, upper, rhs)
return np.clip(q, 0.0, 1.0)
[docs]
def solve_exit_time_1d_from_fes(
s,
F,
D=1.0,
beta=1.0,
mask_absorb=None,
):
"""Solve the 1D exit-time BVP: d/ds(A tau') = -w with tau=0 on absorbing set."""
s = np.asarray(s, dtype=float).ravel()
F = np.asarray(F, dtype=float).ravel()
if s.size != F.size:
raise ValueError("s and F must have the same length")
ds = _require_uniform_grid_1d(s)
n = s.size
D = _as_1d_array(D, n, name="D")
F0 = float(np.nanmin(F[np.isfinite(F)]))
w = _safe_exp(-float(beta) * (F - F0))
A_cell = D * w
# Build div(A grad tau) and rescale rows by 1/w to solve the better-conditioned
# backward equation: (1/w) div(A grad tau) = -1.
lower, diag, upper = _build_tridiag_div_A_grad_1d(A_cell, ds)
# IMPORTANT:
# In divergence form the exit-time equation reads:
# d/ds(A(s) * d tau/ds) = -w(s), with A(s)=D(s)*w(s), w(s)=exp(-beta(F-F0))
# For numerical conditioning we solve the *row-scaled* system:
# (1/w) d/ds(A * d tau/ds) = -1
# Therefore the RHS must be -w *before* scaling; after scaling it becomes -1.
# A previous implementation used rhs=-1 and then divided by w, which turns the RHS
# into -1/w and makes tau blow up in high-free-energy regions (w -> 0).
w_safe = np.maximum(w, 1e-300)
rhs = -w_safe.copy()
diag /= w_safe
rhs /= w_safe
upper /= w_safe[:-1]
lower /= w_safe[1:]
if mask_absorb is None:
raise ValueError("mask_absorb must be provided")
ma = np.asarray(mask_absorb, dtype=bool).ravel()
if ma.size != n:
raise ValueError("mask_absorb must have the same length as s")
for idx in np.where(ma)[0]:
_apply_dirichlet_tridiag(lower, diag, upper, rhs, int(idx), 0.0)
tau = _solve_tridiagonal_thomas(lower, diag, upper, rhs)
tau = np.where(tau < 0.0, 0.0, tau)
return tau
def _weighted_average_1d(var, weight, mask):
var = np.asarray(var, dtype=float).ravel()
weight = np.asarray(weight, dtype=float).ravel()
mask = np.asarray(mask, dtype=bool).ravel()
if var.size != weight.size or var.size != mask.size:
raise ValueError("var, weight and mask must have the same length")
w = weight[mask]
if w.size == 0:
return np.nan
den = float(np.sum(w))
if den <= 0 or not np.isfinite(den):
return np.nan
return float(np.sum(var[mask] * w) / den)
[docs]
def compute_ctmc_generator_fpe_1d(
s,
F,
basin_network,
D=1.0,
beta=1.0,
init_weight="boltzmann",
verbose=True,
):
"""Compute a CTMC generator between 1D basins using backward Smoluchowski BVPs.
This is the 1D analogue of :func:`compute_ctmc_generator_fpe_fipy`, but it
uses a NumPy tridiagonal solver (no FiPy dependency).
Parameters
----------
s, F : 1D arrays
Uniform grid and free energy/potential.
basin_network : BasinNetwork1D
Must provide `labels` defined on the same grid.
D : float or 1D array
Diffusion coefficient.
beta : float
Inverse thermal energy.
init_weight : {"boltzmann", "uniform"}
Weight for basin-averages of exit times and committors.
Returns
-------
dict with keys: K, exit_mean, k_out, p_branch, basin_ids, method.
"""
s = np.asarray(s, dtype=float).ravel()
F = np.asarray(F, dtype=float).ravel()
if s.size != F.size:
raise ValueError("s and F must have the same length")
_ = _require_uniform_grid_1d(s)
labels_src = getattr(basin_network, "core_labels", None)
if labels_src is None:
labels_src = basin_network.labels
labels = np.asarray(labels_src, dtype=int).ravel()
if labels.size != s.size:
raise ValueError("basin_network.labels must have the same length as s")
basin_ids = np.unique(labels[labels >= 0])
basin_ids = np.sort(basin_ids)
n_basins = int(basin_ids.size)
if n_basins < 1:
raise ValueError("No basins found (labels>=0) — cannot build CTMC")
in_any_basin = labels >= 0
basin_masks = [(labels == bid) for bid in basin_ids]
# weights for basin averages
if str(init_weight).lower() == "uniform":
weight = np.ones_like(F, dtype=float)
elif str(init_weight).lower() == "boltzmann":
F0 = float(np.nanmin(F[np.isfinite(F)]))
weight = _safe_exp(-float(beta) * (F - F0))
else:
raise ValueError("init_weight must be 'boltzmann' or 'uniform'")
exit_mean = np.full(n_basins, np.nan, dtype=float)
k_out = np.full(n_basins, np.nan, dtype=float)
p_branch = np.full((n_basins, n_basins), np.nan, dtype=float)
K = np.zeros((n_basins, n_basins), dtype=float)
if verbose:
print(f"[1D CTMC] n_basins={n_basins}, init_weight={init_weight}")
# --- exit times ---
for i in range(n_basins):
mask_i = basin_masks[i]
if not np.any(mask_i):
continue
mask_abs = in_any_basin & (~mask_i)
tau = solve_exit_time_1d_from_fes(s=s, F=F, D=D, beta=beta, mask_absorb=mask_abs)
tau_mean = _weighted_average_1d(tau, weight, mask_i)
exit_mean[i] = float(tau_mean) if np.isfinite(tau_mean) else np.nan
if np.isfinite(exit_mean[i]) and exit_mean[i] > 0:
k_out[i] = 1.0 / exit_mean[i]
if verbose:
print(f"[1D CTMC] basin {i}: <tau_exit>={exit_mean[i]:.6g}, k_out={k_out[i]:.6g}")
# --- committors and branching ---
if n_basins == 2:
for i in range(n_basins):
if not np.isfinite(k_out[i]) or k_out[i] <= 0:
continue
j = 1 - i
p_branch[i, j] = 1.0
K[i, j] = float(k_out[i])
K[i, i] = -float(k_out[i])
if verbose:
print("[1D CTMC] n_basins=2 -> setting p_branch=1 and K_ij=k_out(i) by construction")
else:
for i in range(n_basins):
if not np.isfinite(k_out[i]) or k_out[i] <= 0:
continue
mask_i = basin_masks[i]
for j in range(n_basins):
if i == j:
continue
mask_j = basin_masks[j]
if not np.any(mask_j):
continue
mask_q1 = mask_j
mask_q0 = in_any_basin & (~mask_i) & (~mask_j)
if not np.any(mask_q0):
p_branch[i, j] = 1.0
continue
q = solve_committor_1d_from_fes(s=s, F=F, D=D, beta=beta, mask_q1=mask_q1, mask_q0=mask_q0)
pij = _weighted_average_1d(q, weight, mask_i)
p_branch[i, j] = float(np.clip(pij, 0.0, 1.0)) if np.isfinite(pij) else np.nan
row = p_branch[i, :].copy()
row[i] = np.nan
srow = np.nansum(row)
if not (np.isfinite(srow) and srow > 0):
raise RuntimeError(
f"1D CTMC: branching probabilities are all zero for basin {i}. "
"This usually indicates a labeling or mask construction issue."
)
for j in range(n_basins):
if i == j:
continue
if np.isfinite(p_branch[i, j]):
p_branch[i, j] /= float(srow)
for j in range(n_basins):
if i == j:
continue
if np.isfinite(p_branch[i, j]) and p_branch[i, j] > 0:
K[i, j] = float(k_out[i] * p_branch[i, j])
K[i, i] = -float(np.sum(K[i, :]))
if verbose:
pr = p_branch[i, :].copy(); pr[i] = 0.0
print(f"[1D CTMC] basin {i}: sum p_branch={np.nansum(pr):.6g}, row={pr}")
if verbose:
rs = K.sum(axis=1)
print(f"[1D CTMC] row-sum check (should be ~0): min={rs.min():.3e}, max={rs.max():.3e}")
return {
"K": K,
"exit_mean": exit_mean,
"k_out": k_out,
"p_branch": p_branch,
"basin_ids": np.asarray(basin_ids, dtype=int),
"method": "numpy_backward_ctmc_1d",
}
# =============================================================================
# Backward FPE kinetics with FiPy (CTMC generator from basins)
# =============================================================================
def _infer_edges_from_centers_1d(centers):
"""Infer cell-edge coordinates from a 1D array of cell centers."""
c = np.asarray(centers, dtype=float).ravel()
if c.size < 2:
raise ValueError("Need at least two centers to infer edges")
if not np.all(np.diff(c) > 0):
raise ValueError("Centers must be strictly increasing")
edges = np.empty(c.size + 1, dtype=float)
edges[1:-1] = 0.5 * (c[:-1] + c[1:])
edges[0] = c[0] - (edges[1] - c[0])
edges[-1] = c[-1] + (c[-1] - edges[-2])
return edges
def _is_uniform_spacing(edges, rtol=1e-10, atol=1e-12):
e = np.asarray(edges, dtype=float).ravel()
if e.size < 3:
return True
d = np.diff(e)
return bool(np.allclose(d, d.mean(), rtol=rtol, atol=atol))
[docs]
def build_mesh_2d_from_edges(x_edges, y_edges, overlap=2):
"""Build a FiPy 2D rectangular mesh from 1D edge arrays."""
if not _HAVE_FIPY:
raise ImportError("FiPy is required for FiPy-based kinetics solvers")
xe = np.asarray(x_edges, dtype=float).ravel()
ye = np.asarray(y_edges, dtype=float).ravel()
if xe.size < 2 or ye.size < 2:
raise ValueError("Edge arrays must have length >= 2")
if not np.all(np.diff(xe) > 0) or not np.all(np.diff(ye) > 0):
raise ValueError("Edge arrays must be strictly increasing")
dx = np.diff(xe)
dy = np.diff(ye)
nx = int(dx.size)
ny = int(dy.size)
if _is_uniform_spacing(xe) and _is_uniform_spacing(ye):
mesh = Grid2D(nx=nx, ny=ny, dx=float(dx.mean()), dy=float(dy.mean()), overlap=int(overlap))
else:
if NonUniformGrid2D is None:
raise ImportError(
"NonUniformGrid2D is not available in this FiPy installation; "
"please provide uniform edges or upgrade FiPy"
)
mesh = NonUniformGrid2D(nx=nx, ny=ny, dx=dx, dy=dy, overlap=int(overlap))
mesh = mesh + ((float(xe[0]),), (float(ye[0]),))
return mesh, nx, ny, dx, dy
def _fipy_cells_to_ij_from_edges(mesh, x_edges, y_edges):
"""Map FiPy cells to (i,j) indices on the (nx,ny) grid defined by edges."""
x_edges = np.asarray(x_edges, dtype=float).ravel()
y_edges = np.asarray(y_edges, dtype=float).ravel()
nx = x_edges.size - 1
ny = y_edges.size - 1
cx = np.asarray(numerix.array(mesh.cellCenters[0]), dtype=float)
cy = np.asarray(numerix.array(mesh.cellCenters[1]), dtype=float)
i = np.searchsorted(x_edges, cx, side="right") - 1
j = np.searchsorted(y_edges, cy, side="right") - 1
i = np.clip(i, 0, nx - 1)
j = np.clip(j, 0, ny - 1)
xc = 0.5 * (x_edges[i] + x_edges[i + 1])
yc = 0.5 * (y_edges[j] + y_edges[j + 1])
dx = np.maximum(np.abs(x_edges[i + 1] - x_edges[i]), 1e-12)
dy = np.maximum(np.abs(y_edges[j + 1] - y_edges[j]), 1e-12)
err = float(np.nanmax(np.maximum(np.abs(cx - xc) / dx, np.abs(cy - yc) / dy)))
if (not np.isfinite(err)) or err > 0.25:
raise RuntimeError(
"FiPy CTMC: failed to map FiPy cells to (i,j) indices from edges. "
f"max normalized center error={err}."
)
return i.astype(int), j.astype(int)
def _ij_field_to_fipy_cells(field_ij, i_of_cell, j_of_cell, dtype=float):
"""Map a (nx,ny) array to a FiPy cell-ordered 1D array using i/j index arrays."""
a = np.asarray(field_ij)
if a.ndim != 2:
raise ValueError("Expected a 2D array (nx, ny)")
return np.asarray(a[i_of_cell, j_of_cell], dtype=dtype)
[docs]
def make_backward_coefficient_A_face_from_cell_values(mesh, U_cell, D_cell, beta, name_U="U", name_D="D"):
"""Construct w=exp(-beta U) and A_face = (D*w).arithmeticFaceValue."""
if not _HAVE_FIPY:
raise ImportError("FiPy is required for FiPy-based kinetics solvers")
U_cell = np.asarray(U_cell, dtype=float).ravel()
if U_cell.size != mesh.numberOfCells:
raise ValueError("U_cell must be one value per FiPy cell")
# ---- sanitize U_cell to avoid NaN/inf in FiPy coefficients ----
finiteU = np.isfinite(U_cell)
if not np.any(finiteU):
raise ValueError("U_cell has no finite values")
U_min_finite = float(np.min(U_cell[finiteU]))
if float(beta) <= 0:
raise ValueError("beta must be > 0")
# Fill non-finite energies with a high barrier (in U units)
# Barrier height chosen in kT units for numerical stability.
barrier_kT = 50.0
U_fill = U_min_finite + float(barrier_kT) / float(beta)
if not np.all(finiteU):
U_cell = U_cell.copy()
U_cell[~finiteU] = U_fill
U_var = CellVariable(mesh=mesh, name=name_U, value=U_cell)
# Shift by min(U) for conditioning: exp(-beta(U-Umin)) has max 1.
U_min = float(np.min(U_cell))
w_var = numerix.exp(-float(beta) * (U_var - U_min))
if np.isscalar(D_cell):
D_scalar = float(D_cell)
if (not np.isfinite(D_scalar)) or (D_scalar <= 0.0):
D_scalar = 1e-12
D_var = D_scalar
A_cell = D_scalar * w_var
else:
D_cell = np.asarray(D_cell, dtype=float).ravel()
if D_cell.size != mesh.numberOfCells:
raise ValueError("D_cell must be one value per FiPy cell")
# Sanitize D: FiPy does not like NaN/inf and negative diffusion
badD = (~np.isfinite(D_cell)) | (D_cell <= 0)
if np.any(badD):
D_cell = D_cell.copy()
D_cell[badD] = 1e-12
D_var = CellVariable(mesh=mesh, name=name_D, value=D_cell)
A_cell = D_var * w_var
A_face = A_cell.arithmeticFaceValue
return U_var, w_var, A_face, D_var
def _internal_dirichlet_penalty(var, mask_cells, value, large_value):
"""Return (implicit_term, rhs_add) to enforce an internal Dirichlet constraint."""
m = numerix.array(mask_cells, dtype=float).ravel()
n = int(var.mesh.numberOfCells)
if m.size != n:
raise ValueError(f"mask_cells length {m.size} != mesh.numberOfCells {n}")
lv = float(large_value)
mask_var = CellVariable(mesh=var.mesh, name="_dirichlet_mask", value=m)
pen = lv * mask_var
return ImplicitSourceTerm(coeff=pen, var=var), (pen * float(value))
def _safe_scalar_max(coeff) -> float:
"""Robust scalar max() for FiPy coefficients."""
if isinstance(coeff, (tuple, list)):
return float(max(_safe_scalar_max(c) for c in coeff))
if hasattr(coeff, "value"):
coeff = coeff.value
arr = np.asarray(coeff, dtype=float)
if arr.size == 0:
return 0.0
return float(np.nanmax(arr))
[docs]
def solve_committor_fipy(
mesh,
A_face,
mask_q1,
mask_q0,
large_value=None,
enforce_reflecting=True,
solver=None,
sweep_tol=1e-10,
max_sweeps=2000,
verbose=False,
):
"""Solve the 2D committor BVP on a FiPy mesh.
Solves :math:`\\nabla\\cdot(A\\,\\nabla q) = 0` with internal
Dirichlet constraints :math:`q=1` (*mask_q1*) and :math:`q=0`
(*mask_q0*), enforced via large penalty terms.
Parameters
----------
mesh : FiPy mesh
2D mesh.
A_face : FiPy FaceVariable
Backward coefficient :math:`A = D\\,e^{-\\beta U}` on faces.
mask_q1, mask_q0 : array_like of bool
Cell masks for the two Dirichlet sets.
large_value : float or None
Penalty magnitude (auto-scaled if ``None``).
enforce_reflecting : bool
Impose no-flux on exterior faces.
solver, sweep_tol, max_sweeps : optional
FiPy solver parameters.
verbose : bool
Print diagnostics.
Returns
-------
q : CellVariable
Committor field.
info : dict
``{'residual', 'n_sweeps'}``.
"""
if not _HAVE_FIPY:
raise ImportError("FiPy is required for FiPy-based kinetics solvers")
m1 = numerix.array(mask_q1, dtype=bool)
m0 = numerix.array(mask_q0, dtype=bool)
q = CellVariable(mesh=mesh, name="committor", value=0.0)
if large_value is None:
large_value = max(1e6, 1e4 * float(_safe_scalar_max(A_face)))
if solver is None:
solver = _default_fipy_solver(tolerance=float(sweep_tol), iterations=int(max_sweeps))
if enforce_reflecting:
q.faceGrad.constrain(((0.0,), (0.0,)), where=mesh.exteriorFaces)
lhs = DiffusionTerm(coeff=A_face, var=q)
imp1, rhs1 = _internal_dirichlet_penalty(q, m1, 1.0, large_value)
imp0, rhs0 = _internal_dirichlet_penalty(q, m0, 0.0, large_value)
eq = (lhs + imp1 + imp0 == rhs1 + rhs0)
eq.solve(var=q, solver=solver)
residual = float("nan")
n_sweeps = 1
if verbose:
print(f"[FiPy committor] sweeps={n_sweeps}, residual={residual}")
return q, {"residual": residual, "n_sweeps": n_sweeps}
[docs]
def solve_exit_time_fipy(
mesh,
A_face,
w_var,
mask_absorb,
large_value=None,
enforce_reflecting=True,
solver=None,
sweep_tol=1e-10,
max_sweeps=4000,
verbose=False,
):
"""Solve the 2D mean exit-time BVP on a FiPy mesh.
Solves :math:`\\nabla\\cdot(A\\,\\nabla\\tau) = -w` with
:math:`\\tau=0` on absorbing cells, enforced via penalty.
Parameters
----------
mesh : FiPy mesh
2D mesh.
A_face : FiPy FaceVariable
Backward coefficient on faces.
w_var : CellVariable
Boltzmann weight :math:`w = e^{-\\beta(U-U_{\\min})}`.
mask_absorb : array_like of bool
Absorbing cell mask.
large_value : float or None
Penalty magnitude.
enforce_reflecting : bool
Impose no-flux on exterior faces.
solver, sweep_tol, max_sweeps : optional
FiPy solver parameters.
verbose : bool
Print diagnostics.
Returns
-------
tau : CellVariable
Mean exit-time field.
info : dict
``{'residual', 'n_sweeps'}``.
"""
if not _HAVE_FIPY:
raise ImportError("FiPy is required for FiPy-based kinetics solvers")
ma = numerix.array(mask_absorb, dtype=bool)
tau = CellVariable(mesh=mesh, name="exit_time", value=0.0)
if large_value is None:
large_value = max(1e6, 1e4 * float(_safe_scalar_max(A_face)))
if solver is None:
solver = _default_fipy_solver(tolerance=float(sweep_tol), iterations=int(max_sweeps))
if enforce_reflecting:
tau.faceGrad.constrain(((0.0,), (0.0,)), where=mesh.exteriorFaces)
lhs = DiffusionTerm(coeff=A_face, var=tau)
imp, rhs_add = _internal_dirichlet_penalty(tau, ma, 0.0, large_value)
eq = (lhs + imp == -w_var + rhs_add)
residual = None
n_sweeps = 1
try:
eq.solve(var=tau, solver=solver)
except Exception:
residual = np.inf
n_sweeps = 0
for n_sweeps in range(1, int(max_sweeps) + 1):
residual = float(eq.sweep(var=tau, solver=solver))
if residual < float(sweep_tol):
break
if verbose:
print(f"[FiPy exit-time] sweeps={n_sweeps}, residual={residual}")
return tau, {"residual": residual, "n_sweeps": n_sweeps}
def _weighted_basin_average(mesh, var_cell, weight_cell, basin_mask):
"""Volume-weighted average of var_cell over basin_mask with weights weight_cell."""
vol = numerix.array(mesh.cellVolumes, dtype=float)
v = numerix.array(var_cell, dtype=float)
w = numerix.array(weight_cell, dtype=float)
m = numerix.array(basin_mask, dtype=float)
num = float((v * w * vol * m).sum())
den = float((w * vol * m).sum())
if den <= 0 or not np.isfinite(den):
return np.nan
return num / den
[docs]
def compute_ctmc_generator_fpe_fipy(
x_centers,
y_centers,
U_ij,
basin_network,
D=1.0,
beta=1.0,
x_edges=None,
y_edges=None,
init_weight="boltzmann",
large_value=None,
enforce_reflecting=True,
solver=None,
sweep_tol=1e-10,
verbose=True,
):
r"""Compute a CTMC generator between basins using backward FPE FiPy solves.
For each basin *i*:
1. Solve exit time :math:`\tau_i` to the union of other basins (absorbing).
:math:`k_{\text{out}}(i) = 1 / \langle\tau_i\rangle` (average over basin *i*).
2. For each :math:`j \neq i`, solve committor :math:`q_{ij}` with
:math:`q=1` on basin *j* and :math:`q=0` on all other basins.
:math:`p_{ij} = \langle q_{ij}\rangle` averaged over basin *i*.
3. Set :math:`K_{ij} = k_{\text{out}}(i)\, p_{ij}` and
:math:`K_{ii} = -\sum_{j \neq i} K_{ij}`.
Returns
-------
dict with keys: K, exit_mean, k_out, p_branch, basin_ids, mesh, info, method.
"""
if not _HAVE_FIPY:
raise ImportError("FiPy is required for compute_ctmc_generator_fpe_fipy()")
x = np.asarray(x_centers, dtype=float).ravel()
y = np.asarray(y_centers, dtype=float).ravel()
U = np.asarray(U_ij, dtype=float)
if U.shape != (x.size, y.size):
raise ValueError(f"U_ij shape {U.shape} must match (nx,ny)=({x.size},{y.size})")
labels_src = getattr(basin_network, 'core_labels', None)
if labels_src is None:
labels_src = basin_network.labels
labels = np.asarray(labels_src, dtype=int)
if labels.shape != U.shape:
raise ValueError("basin_network.labels must have the same shape as U_ij")
# Basin IDs from labels (do not assume contiguous 0..n_basins-1)
basin_ids = np.unique(labels[labels >= 0])
basin_ids = np.sort(basin_ids)
n_basins = int(basin_ids.size)
if n_basins < 1:
raise ValueError("No basins found (labels>=0) — cannot build CTMC")
if x_edges is None:
x_edges = _infer_edges_from_centers_1d(x)
if y_edges is None:
y_edges = _infer_edges_from_centers_1d(y)
mesh, nx, ny, dx, dy = build_mesh_2d_from_edges(x_edges, y_edges)
# Robust mapping FiPy cells -> (i,j) on the provided (nx,ny) grid
i_of_cell, j_of_cell = _fipy_cells_to_ij_from_edges(mesh, x_edges, y_edges)
# Map fields to FiPy cell ordering
U_cell = _ij_field_to_fipy_cells(U, i_of_cell, j_of_cell, dtype=float)
if callable(D):
cx = np.asarray(mesh.cellCenters[0], dtype=float)
cy = np.asarray(mesh.cellCenters[1], dtype=float)
D_cell = np.asarray(D(cx, cy), dtype=float).ravel()
if D_cell.size != mesh.numberOfCells:
raise ValueError("Callable D(x,y) must return one value per cell")
elif np.isscalar(D):
D_cell = float(D)
else:
D_cell = _ij_field_to_fipy_cells(np.asarray(D, dtype=float), i_of_cell, j_of_cell, dtype=float)
# Backward coefficient A = D * exp(-beta U)
U_var, w_var, A_face, D_var = make_backward_coefficient_A_face_from_cell_values(
mesh, U_cell, D_cell, beta
)
# weights for basin averages
if init_weight.lower() == "uniform":
weight_cell = numerix.ones(mesh.numberOfCells, dtype=float)
elif init_weight.lower() == "boltzmann":
weight_cell = w_var
else:
raise ValueError("init_weight must be 'boltzmann' or 'uniform'")
labels_cell = _ij_field_to_fipy_cells(labels, i_of_cell, j_of_cell, dtype=int)
basin_masks = [(labels_cell == bid) for bid in basin_ids]
in_any_basin = (labels_cell >= 0)
exit_mean = np.full(n_basins, np.nan, dtype=float)
k_out = np.full(n_basins, np.nan, dtype=float)
p_branch = np.full((n_basins, n_basins), np.nan, dtype=float)
K = np.zeros((n_basins, n_basins), dtype=float)
info = {"exit": {}, "committor": {}}
if verbose:
lv_str = "None" if large_value is None else f"{float(large_value):g}"
print(f"[FiPy CTMC] n_basins={n_basins}, init_weight={init_weight}, large_value={lv_str}")
# --- exit time per basin ---
for i in range(n_basins):
mask_i = basin_masks[i]
if not np.any(mask_i):
if verbose:
print(f"[FiPy CTMC] basin {i}: empty mask -> skipping")
continue
# absorbing = all other basins
mask_abs = in_any_basin & (~mask_i)
tau, inf_tau = solve_exit_time_fipy(
mesh=mesh,
A_face=A_face,
w_var=w_var,
mask_absorb=mask_abs,
large_value=large_value,
enforce_reflecting=enforce_reflecting,
solver=solver,
sweep_tol=sweep_tol,
verbose=verbose,
)
info["exit"][int(i)] = inf_tau
tau_mean = _weighted_basin_average(mesh, tau, weight_cell, mask_i)
exit_mean[i] = float(tau_mean) if np.isfinite(tau_mean) else np.nan
if np.isfinite(exit_mean[i]) and exit_mean[i] > 0:
k_out[i] = 1.0 / exit_mean[i]
if verbose:
print(f"[FiPy CTMC] basin {i}: <tau_exit>={exit_mean[i]:.6g}, k_out={k_out[i]:.6g}")
# --- committors and branching ---
if n_basins == 2:
for i in range(n_basins):
if not np.isfinite(k_out[i]) or k_out[i] <= 0:
continue
j = 1 - i
p_branch[i, j] = 1.0
K[i, j] = float(k_out[i])
K[i, i] = -float(k_out[i])
if verbose:
print("[FiPy CTMC] n_basins=2 -> setting p_branch=1 and K_ij=k_out(i) by construction")
else:
for i in range(n_basins):
if not np.isfinite(k_out[i]) or k_out[i] <= 0:
continue
mask_i = basin_masks[i]
for j in range(n_basins):
if i == j:
continue
mask_j = basin_masks[j]
if not np.any(mask_j):
continue
mask_q1 = mask_j
mask_q0 = in_any_basin & (~mask_i) & (~mask_j)
if np.any(mask_i & mask_q0) or np.any(mask_i & mask_q1) or np.any(mask_q0 & mask_q1):
raise RuntimeError(
f"FiPy CTMC: committor masks overlap for (i={i}, j={j}). "
"This indicates a label→cell mapping or labeling bug."
)
if not np.any(mask_q0):
p_branch[i, j] = 1.0
continue
q, inf_q = solve_committor_fipy(
mesh=mesh,
A_face=A_face,
mask_q1=mask_q1,
mask_q0=mask_q0,
large_value=large_value,
enforce_reflecting=enforce_reflecting,
solver=solver,
sweep_tol=sweep_tol,
verbose=False,
)
info["committor"][(int(i), int(j))] = inf_q
pij = _weighted_basin_average(mesh, q, weight_cell, mask_i)
if np.isfinite(pij):
p_branch[i, j] = float(np.clip(float(pij), 0.0, 1.0))
else:
p_branch[i, j] = np.nan
# normalize branching row defensively
row = p_branch[i, :].copy()
row[i] = np.nan
srow = np.nansum(row)
if not (np.isfinite(srow) and srow > 0):
if verbose:
print(
f"[FiPy CTMC][ERROR] basin {i}: sum p_branch=0. "
"Cannot build a meaningful CTMC row. "
"Check committor masks and label→FiPy mapping."
)
raise RuntimeError(
f"FiPy CTMC: branching probabilities are all zero for basin {i}. "
"This usually means a committor mask bug (start basin clamped to 0) "
"or a mismatch between BasinNetwork labels and FiPy cell ordering."
)
for j in range(n_basins):
if i == j:
continue
if np.isfinite(p_branch[i, j]):
p_branch[i, j] /= float(srow)
if verbose:
pr = p_branch[i, :].copy()
pr[i] = 0.0
print(f"[FiPy CTMC] basin {i}: sum p_branch={np.nansum(pr):.6g}, row={pr}")
for j in range(n_basins):
if i == j:
continue
if np.isfinite(p_branch[i, j]) and p_branch[i, j] > 0:
K[i, j] = float(k_out[i] * p_branch[i, j])
K[i, i] = -float(np.sum(K[i, :]))
if verbose:
rs = K.sum(axis=1)
print(f"[FiPy CTMC] row-sum check (should be ~0): min={rs.min():.3e}, max={rs.max():.3e}")
return {
"K": K,
"exit_mean": exit_mean,
"k_out": k_out,
"p_branch": p_branch,
"mesh": mesh,
"info": info,
"basin_ids": np.asarray(basin_ids, dtype=int),
"method": "fipy_backward_ctmc",
"nx": nx,
"ny": ny,
"x_edges": np.asarray(x_edges, dtype=float),
"y_edges": np.asarray(y_edges, dtype=float),
}