"""stochkin.workflows
===================
High-level, one-call wrappers for the most common stochastic-kinetics
pipelines. Each function bundles loading, basin detection, and CTMC
generator construction into a single call and returns a uniform result
dictionary.
Result dictionary keys
----------------------
s 1-D grid (CV values)
F Free energy at grid points [kJ/mol]
D_used Diffusion coefficient used on the grid [CV²/time_unit]
kT Thermal energy at the requested temperature [kJ/mol]
K Rate matrix in the input *time_unit* [1/time_unit]
K_ps Rate matrix in picoseconds [1/ps]
exit_mean Mean exit time per basin [time_unit]
exit_ps Mean exit time per basin [ps]
k_out Total exit rate per basin [1/time_unit]
k_out_ps Total exit rate per basin [1/ps]
p_branch Branching-probability matrix [dimensionless]
labels_full Integer label per grid point (basin id, or -1)
basin_ids Sorted array of integer basin ids
"""
from __future__ import annotations
from pathlib import Path
from typing import Optional, Sequence, Tuple, Union
import numpy as np
from itertools import combinations
from .fes import load_plumed_fes_2d, make_fes_potential_from_grid
from .potentials import build_basin_network_from_fes_1d, build_basin_network_from_potential
from .fpe import compute_ctmc_generator_fpe_1d
from .mfep import compute_mfep_profile_1d
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_K_B_KJ: float = 0.008314462618 # kJ mol⁻¹ K⁻¹
# ---------------------------------------------------------------------------
# Time-unit helpers
# ---------------------------------------------------------------------------
def _time_unit_to_ps(unit: str) -> float:
"""Return the number of picoseconds in one *unit*."""
table = {"fs": 1e-3, "ps": 1.0, "ns": 1e3, "us": 1e6, "ms": 1e9}
key = unit.lower()
if key not in table:
raise ValueError(
f"Unknown time unit '{unit}'. Supported: {list(table)}"
)
return table[key]
def _kT(T: float) -> float:
return _K_B_KJ * float(T)
# ---------------------------------------------------------------------------
# Basin-core helpers
# ---------------------------------------------------------------------------
def _build_core_labels(
s: np.ndarray,
F: np.ndarray,
basin_network,
core_fraction: Optional[float],
) -> None:
"""Attach ``core_labels`` to *basin_network* in-place (modifies in place).
Each core is the contiguous segment around the basin minimum that
contains the closest ``core_fraction`` fraction of basin points.
If *core_fraction* is ``None`` the full-basin labels are used.
"""
if core_fraction is None:
return
labels_full = basin_network.labels
core_labels = np.full_like(labels_full, -1, dtype=int)
for b in basin_network.basins:
bid = int(b.id)
idx_b = np.flatnonzero(labels_full == bid)
if idx_b.size == 0:
continue
# Locate the basin minimum — prefer the detected minimum position
# stored on the basin object (``b.minimum``), which corresponds to
# the local minimum found by the basin-detection algorithm.
# Falling back to argmin(F) would pick the *global* F-minimum
# inside the basin, which for shallow basins on a barrier flank
# can sit at the basin boundary rather than at the local dip.
basin_min_pos = getattr(b, "minimum", None)
if basin_min_pos is not None:
i_min = int(idx_b[np.argmin(np.abs(s[idx_b] - float(basin_min_pos)))])
else:
i_min = int(idx_b[np.argmin(F[idx_b])])
mpos = float(s[i_min])
# Sort basin points by distance to minimum; keep top fraction
dist = np.abs(s[idx_b] - mpos)
n_core = max(1, int(np.ceil(core_fraction * idx_b.size)))
order = np.argsort(dist)
core_idx = idx_b[np.sort(order[:n_core])]
# Find the contiguous segment containing i_min
tmp = np.zeros(s.size, dtype=bool)
tmp[core_idx] = True
a = i_min
b_end = i_min
while a - 1 >= 0 and tmp[a - 1]:
a -= 1
while b_end + 1 < s.size and tmp[b_end + 1]:
b_end += 1
core_labels[a : b_end + 1] = bid
basin_network.core_labels = core_labels
# ---------------------------------------------------------------------------
# Result packaging
# ---------------------------------------------------------------------------
def _pack_result(
s: np.ndarray,
F: np.ndarray,
D_used,
kT_val: float,
time_unit_ps: float,
K_raw: np.ndarray,
info: dict,
) -> dict:
"""Collect CTMC outputs into the canonical result dict."""
K_raw = np.asarray(K_raw, dtype=float)
n = K_raw.shape[0]
K_ps = K_raw / time_unit_ps
exit_mean = np.asarray(
info.get("exit_mean", np.full(n, np.nan)), dtype=float
)
k_out = np.asarray(
info.get("k_out", np.full(n, np.nan)), dtype=float
)
p_branch = np.asarray(
info.get("p_branch", np.full((n, n), np.nan)), dtype=float
)
labels_full = np.asarray(
info.get("labels_full", np.zeros(s.size, dtype=int)), dtype=int
)
basin_ids = np.asarray(
info.get("basin_ids", np.arange(n, dtype=int)), dtype=int
)
return {
"s": s,
"F": F,
"D_used": D_used,
"kT": kT_val,
"K": K_raw,
"K_ps": K_ps,
"exit_mean": exit_mean,
"exit_ps": exit_mean * time_unit_ps,
"k_out": k_out,
"k_out_ps": k_out / time_unit_ps,
"p_branch": p_branch,
"labels_full": labels_full,
"basin_ids": basin_ids,
}
def _call_ctmc_1d(s, F, basin_network, D, beta, init_weight, verbose):
"""Call compute_ctmc_generator_fpe_1d and normalise output to (K, info)."""
res = compute_ctmc_generator_fpe_1d(
s=s,
F=F,
basin_network=basin_network,
D=D,
beta=beta,
init_weight=init_weight,
verbose=verbose,
)
if isinstance(res, dict):
K = np.asarray(res["K"], dtype=float)
info = {k: v for k, v in res.items() if k != "K"}
else: # legacy tuple
K = np.asarray(res[0], dtype=float)
info = res[1] if len(res) > 1 else {}
return K, info
# ---------------------------------------------------------------------------
# D-profile helpers (ported from CLI script; reusable by external callers)
# ---------------------------------------------------------------------------
[docs]
def interface_to_centers(
x_interface: np.ndarray,
D_interface: np.ndarray,
method: str = "harmonic",
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Convert Hummer-style interface diffusion values to bin-center values.
Hummer's Bayesian estimator places D_{i+1/2} at internal bin *interfaces*.
This routine reconstructs bin-center D_i by extrapolating one bin to each
edge and then averaging (or taking harmonic means) of adjacent interface
values.
Parameters
----------
x_interface : array (M,)
CV positions of the M internal interfaces.
D_interface : array (M,)
D values at those interfaces.
method : {'harmonic', 'avg'}
``'harmonic'`` (default) gives a better approximation for barriers
(avoids over-estimating D at low-D regions); ``'avg'`` is simpler.
Returns
-------
x_center : array (M+1,)
D_center : array (M+1,)
edges : array (M+2,) full edge positions used internally
"""
xI = np.asarray(x_interface, dtype=float).ravel()
DI = np.asarray(D_interface, dtype=float).ravel()
if xI.size != DI.size:
raise ValueError("x_interface and D_interface must have the same length.")
if xI.size < 2:
raise ValueError("Need at least 2 interface points.")
dx_l = xI[1] - xI[0]
dx_r = xI[-1] - xI[-2]
edges = np.empty(xI.size + 2, dtype=float)
edges[1:-1] = xI
edges[0] = xI[0] - dx_l
edges[-1] = xI[-1] + dx_r
xC = 0.5 * (edges[:-1] + edges[1:]) # shape (M+1,)
DC = np.empty(xC.size, dtype=float)
DC[0] = DI[0]
DC[-1] = DI[-1]
if xC.size > 2:
if method == "harmonic":
with np.errstate(divide="ignore", invalid="ignore"):
DC[1:-1] = 2.0 / (1.0 / DI[:-1] + 1.0 / DI[1:])
bad = ~np.isfinite(DC[1:-1]) | (DC[1:-1] <= 0)
DC[1:-1][bad] = 0.5 * (DI[:-1][bad] + DI[1:][bad])
elif method == "avg":
DC[1:-1] = 0.5 * (DI[:-1] + DI[1:])
else:
raise ValueError("method must be 'harmonic' or 'avg'.")
return xC, DC, edges
[docs]
def interpolate_D_to_grid(
s_grid: np.ndarray,
x_D: np.ndarray,
D_vals: np.ndarray,
method: str = "linear",
) -> np.ndarray:
"""Interpolate a D(s) profile onto *s_grid*, clamping outside the range.
Parameters
----------
s_grid : array (N,)
Target uniform grid.
x_D : array (M,)
CV positions of the diffusion profile.
D_vals : array (M,)
Diffusion values at those positions.
method : {'linear', 'pchip'}
Interpolation method. ``'pchip'`` requires SciPy.
Returns
-------
D_grid : array (N,) non-negative finite diffusion values on s_grid.
"""
s_grid = np.asarray(s_grid, dtype=float).ravel()
x_D = np.asarray(x_D, dtype=float).ravel()
D_vals = np.asarray(D_vals, dtype=float).ravel()
good = D_vals[np.isfinite(D_vals) & (D_vals > 0)]
if good.size == 0:
raise ValueError("D profile has no positive finite values.")
left = float(np.median(good[: min(3, good.size)]))
right = float(np.median(good[max(0, good.size - 3):]))
if method == "linear":
Dg = np.interp(s_grid, x_D, D_vals, left=left, right=right)
elif method == "pchip":
from scipy.interpolate import PchipInterpolator # type: ignore
f = PchipInterpolator(x_D, D_vals, extrapolate=True)
Dg = np.asarray(f(s_grid), dtype=float)
Dg = np.where(s_grid < x_D[0], left, Dg)
Dg = np.where(s_grid > x_D[-1], right, Dg)
else:
raise ValueError("method must be 'linear' or 'pchip'.")
floor = float(good.min() * 1e-6)
Dg = np.where(np.isfinite(Dg) & (Dg > 0), Dg, floor)
return Dg
# ===========================================================================
# Public high-level workflow functions
# ===========================================================================
[docs]
def run_1d_ctmc(
s: Union[np.ndarray, Sequence],
F: Union[np.ndarray, Sequence],
D: Union[float, np.ndarray],
T: float = 300.0,
time_unit: str = "ps",
max_basins: Optional[int] = None,
core_fraction: Optional[float] = 0.05,
init_weight: str = "boltzmann",
resample_n: int = 500,
verbose: bool = True,
) -> dict:
"""Compute 1D CTMC kinetics from arrays *s*, *F*, *D*.
This is the lowest-level workflow: it accepts the free-energy profile and
diffusion coefficient directly as arrays and handles everything else
(basin detection, core labelling, BVP solve, result packing).
Parameters
----------
s : array (N,)
*Uniform* grid of CV values.
F : array (N,)
Free energy in kJ/mol at each grid point.
D : float or array (N,)
Diffusion coefficient in CV²/time_unit. Scalar = constant D;
array = position-dependent D(s).
T : float
Temperature in Kelvin. Default 300 K.
time_unit : str
Time unit of *D* (and of the returned ``K`` / ``exit_mean``).
One of ``'fs'``, ``'ps'`` (default), ``'ns'``, ``'us'``.
max_basins : int, optional
Cap the number of detected basins.
core_fraction : float, optional
Fraction of each basin (ranked by proximity to the minimum) that
counts as the "core" for CTMC entry/exit averaging. 0.05 = 5 %.
Use ``None`` to use the full basin.
init_weight : {'boltzmann', 'uniform'}
Weight for basin averages of exit times.
verbose : bool
Returns
-------
dict
See module-level docstring for key descriptions.
"""
s = np.asarray(s, dtype=float).ravel()
F = np.asarray(F, dtype=float).ravel()
if s.size != F.size:
raise ValueError("s and F must have the same length.")
kT_val = _kT(T)
beta = 1.0 / kT_val
tps = _time_unit_to_ps(time_unit)
bn = build_basin_network_from_fes_1d(s, F, max_basins=max_basins, verbose=verbose)
_build_core_labels(s, F, bn, core_fraction)
K, info = _call_ctmc_1d(s, F, bn, D, beta, init_weight, verbose)
info["labels_full"] = bn.labels
info["basin_ids"] = np.array([b.id for b in bn.basins], dtype=int)
return _pack_result(s, F, D, kT_val, tps, K, info)
[docs]
def run_1d_ctmc_from_plumed(
fes_path: Union[str, Path],
D: Union[float, np.ndarray],
T: float = 300.0,
time_unit: str = "ps",
crop: Optional[Tuple[float, float]] = None,
resample_n: Optional[int] = None,
s_col: int = 0,
F_col: int = 1,
max_basins: Optional[int] = None,
core_fraction: Optional[float] = 0.05,
init_weight: str = "boltzmann",
verbose: bool = True,
) -> dict:
"""Load a 1D PLUMED FES and compute CTMC kinetics with a constant *D*.
Parameters
----------
fes_path : str or Path
Path to a PLUMED-style 1D FES file (comments start with ``#``).
D : float
Diffusion coefficient in CV²/time_unit.
T : float
Temperature in K.
time_unit : str
Time unit for *D* and for the returned rates.
crop : (s_lo, s_hi), optional
Restrict the FES to this CV range before analysis.
resample_n : int, optional
Resample to this many uniform points (useful after cropping).
s_col, F_col : int
Column indices for CV and free energy in the FES file.
resample_n : int
Resample the MFEP arc-length profile to this many uniform points
before running the FPE solver (which requires a uniform grid).
Default 500.
max_basins, core_fraction, init_weight, verbose :
Forwarded to :func:`run_1d_ctmc`.
Returns
-------
dict (same keys as :func:`run_1d_ctmc`)
"""
from .fes import load_plumed_fes_1d
s, F = load_plumed_fes_1d(
fes_path, x_col=s_col, fes_col=F_col, verbose=verbose
)
if crop is not None:
lo, hi = float(crop[0]), float(crop[1])
mask = (s >= lo) & (s <= hi)
s, F = s[mask], F[mask]
if resample_n is not None:
su = np.linspace(s[0], s[-1], int(resample_n))
F = np.interp(su, s, F)
s = su
return run_1d_ctmc(
s=s, F=F, D=D, T=T, time_unit=time_unit,
max_basins=max_basins, core_fraction=core_fraction,
init_weight=init_weight, verbose=verbose,
)
[docs]
def run_1d_ctmc_with_hummer_D(
fes_path: Union[str, Path],
d_csv: Union[str, Path],
T: float = 300.0,
time_unit: str = "ps",
d_xcol: str = "x_interface",
d_col: str = "D_med",
d_grid: str = "interface",
d_interface_mode: str = "harmonic",
d_time_unit: str = "ps",
d_interp_method: str = "linear",
crop: Optional[Tuple[float, float]] = None,
resample_n: int = 500,
s_col: int = 0,
F_col: int = 1,
max_basins: Optional[int] = None,
core_fraction: Optional[float] = 0.05,
init_weight: str = "boltzmann",
verbose: bool = True,
) -> dict:
"""Load a 1D PLUMED FES and a Hummer-style D(s) profile; compute CTMC.
This is the standard workflow when D(s) has been estimated from short
MD runs using the Hummer (2005) Bayesian inference.
Parameters
----------
fes_path : str or Path
PLUMED 1D FES file.
d_csv : str or Path
CSV file with the diffusion profile. Must contain at least two
columns named *d_xcol* and *d_col*.
T : float
Temperature in K.
time_unit : str
Time unit for the returned rates.
d_xcol : str
Column name for CV positions in the CSV (default ``'x_interface'``).
d_col : str
Column name for D values in the CSV (default ``'D_med'``).
d_grid : {'interface', 'center'}
``'interface'``: the CSV positions are internal bin *interfaces* and
D values are face-centred (Hummer-style). They will be converted to
bin-centre values via :func:`interface_to_centers`.
``'center'``: D values are already at bin centres; no conversion.
d_interface_mode : {'harmonic', 'avg'}
Conversion method when ``d_grid='interface'``.
d_time_unit : str
Time unit of the raw D values in the CSV (default ``'ps'``).
d_interp_method : {'linear', 'pchip'}
How to interpolate D onto the FES grid.
crop : (s_lo, s_hi), optional
CV range to retain from the FES.
resample_n : int
Resample the FES to this many uniform points before interpolating D.
Required because the FPE solver needs a uniform grid. Default 500.
s_col, F_col : int
Column indices in the FES file.
resample_n : int
Resample the MFEP arc-length profile to this many uniform points
before running the FPE solver (which requires a uniform grid).
Default 500.
max_basins, core_fraction, init_weight, verbose :
Forwarded to :func:`run_1d_ctmc`.
Returns
-------
dict (same keys as :func:`run_1d_ctmc`)
"""
import pandas as pd # local import; pandas is an optional dep
from .fes import load_plumed_fes_1d
# Load FES
s, F = load_plumed_fes_1d(
fes_path, x_col=s_col, fes_col=F_col, verbose=verbose
)
if crop is not None:
lo, hi = float(crop[0]), float(crop[1])
mask = (s >= lo) & (s <= hi)
s, F = s[mask], F[mask]
# Resample to uniform grid (mandatory for FPE solver)
s_grid = np.linspace(float(s[0]), float(s[-1]), int(resample_n))
F_grid = np.interp(s_grid, s, F)
# Load D profile
df = pd.read_csv(d_csv)
x_D_raw = df[d_xcol].values.astype(float)
D_raw = df[d_col].values.astype(float)
# Interface → center conversion
if d_grid == "interface":
x_D_src, D_src, _ = interface_to_centers(
x_D_raw, D_raw, method=d_interface_mode
)
else:
x_D_src, D_src = x_D_raw, D_raw
# Unit conversion: raw D [cv²/d_time_unit] → [cv²/time_unit]
ps_per_d = _time_unit_to_ps(d_time_unit)
ps_per_out = _time_unit_to_ps(time_unit)
D_src = D_src * (ps_per_out / ps_per_d)
# Interpolate D onto FES grid
D_grid = interpolate_D_to_grid(
s_grid, x_D_src, D_src, method=d_interp_method
)
return run_1d_ctmc(
s=s_grid, F=F_grid, D=D_grid, T=T, time_unit=time_unit,
max_basins=max_basins, core_fraction=core_fraction,
init_weight=init_weight, verbose=verbose,
)
[docs]
def run_mfep_ctmc(
fes2d_path: Union[str, Path],
start: Tuple[float, float],
end: Tuple[float, float],
D_s: float,
T: float = 300.0,
time_unit: str = "ps",
neb_images: int = 120,
neb_steps: int = 3000,
neb_spring: float = 1.0,
use_neb: bool = True,
max_basins: Optional[int] = None,
core_fraction: Optional[float] = 0.05,
init_weight: str = "boltzmann",
resample_n: int = 500,
verbose: bool = True,
) -> dict:
"""Find the MFEP on a 2D FES and compute 1D CTMC kinetics along it.
Workflow:
1. Load the 2D PLUMED FES.
2. Find the minimum free-energy path (MFEP) from *start* to *end* using
a grid-Dijkstra seed followed by NEB refinement.
3. Extract the 1D free-energy profile F(s) along the arc-length s.
4. Run :func:`run_1d_ctmc` on F(s) with the supplied constant *D_s*.
Parameters
----------
fes2d_path : str or Path
Path to a 2D PLUMED FES file.
start, end : (x, y)
Start and end points for the MFEP in CV space.
D_s : float
Diffusion coefficient along the arc-length s [arc-length²/time_unit].
T : float
Temperature in K.
time_unit : str
Time unit for *D_s* and the returned rates.
neb_images : int
Number of NEB images (controls resolution of the 1D profile).
neb_steps : int
Maximum NEB optimisation steps.
neb_spring : float
NEB spring constant (kJ/mol per CV² or compatible units).
use_neb : bool
If False, skip NEB refinement and use the raw grid path.
resample_n : int
Resample the MFEP arc-length profile to this many uniform points
before running the FPE solver (which requires a uniform grid).
Default 500.
max_basins, core_fraction, init_weight, verbose :
Forwarded to :func:`run_1d_ctmc`.
Returns
-------
dict
Same keys as :func:`run_1d_ctmc` plus ``'mfep_path'``
(:class:`~stochkin.mfep.MFEPPath`).
"""
# Load 2D FES
x_grid, y_grid, fes_grid = load_plumed_fes_2d(fes2d_path, verbose=verbose)
# MFEP (grid Dijkstra seed + optional NEB refinement)
path = compute_mfep_profile_1d(
x_grid=x_grid,
y_grid=y_grid,
fes_grid=fes_grid,
start=start,
end=end,
use_neb=use_neb,
neb_images=neb_images,
neb_k_spring=neb_spring,
neb_max_iter=neb_steps,
)
# Report NEB convergence
if verbose and hasattr(path, "metadata") and "converged" in path.metadata:
md = path.metadata
conv_tag = "converged" if md["converged"] else "NOT converged"
print(f"[NEB] {conv_tag} after {md['n_iter']} iters "
f"(max_force={md['final_max_force']:.3e}, tol={md['tol']:.1e})")
# 1D profile along arc-length – resample to uniform grid
s_raw = path.s
F_raw = path.F - np.nanmin(path.F)
s = np.linspace(s_raw[0], s_raw[-1], int(resample_n))
F_1d = np.interp(s, s_raw, F_raw)
result = run_1d_ctmc(
s=s, F=F_1d, D=D_s, T=T, time_unit=time_unit,
max_basins=max_basins, core_fraction=core_fraction,
init_weight=init_weight, verbose=verbose,
)
result["mfep_path"] = path
return result
# ---------------------------------------------------------------------------
# Multi-MFEP workflow: all-basin pairwise rates from a 2D FES
# ---------------------------------------------------------------------------
[docs]
def run_multi_mfep_ctmc(
fes2d_path: Union[str, Path],
D_s: float,
T: float = 300.0,
time_unit: str = "ps",
neb_images: int =8120,
neb_steps: int = 3000,
neb_spring: float = 1.0,
use_neb: bool = True,
max_basins: Optional[int] = None,
core_fraction: Optional[float] = 0.05,
init_weight: str = "boltzmann",
resample_n: int = 500,
verbose: bool = True,
) -> dict:
"""Detect all basins on a 2D FES and compute pairwise MFEP-based CTMC rates.
Workflow:
1. Load the 2D PLUMED FES and build an interpolated potential.
2. Detect all basins on the 2D surface.
3. For every pair of basins (i, j), compute the MFEP connecting their
minima and run the 1D CTMC pipeline on the resulting F(s) profile.
4. Assemble a full N×N rate matrix from the pairwise 2-basin results.
Parameters
----------
fes2d_path : str or Path
Path to a 2D PLUMED FES file.
D_s : float
Constant diffusion coefficient along the arc-length
[arc-length² / time_unit].
T : float
Temperature in K.
time_unit : str
Time unit for rates (``'ps'``, ``'ns'``, …).
neb_images, neb_steps, neb_spring, use_neb :
NEB parameters forwarded to :func:`compute_mfep_profile_1d`.
max_basins : int or None
Keep only the *max_basins* deepest minima.
core_fraction, init_weight :
Forwarded to :func:`run_1d_ctmc`.
resample_n : int
Uniform resampling density for F(s) before the FPE solve.
verbose : bool
Print progress information.
Returns
-------
dict with keys:
basin_network : BasinNetwork
The detected 2D basins.
basin_ids : np.ndarray
Sorted array of basin indices.
K : np.ndarray, shape (N, N)
Full rate matrix [1/time_unit].
K_ps : np.ndarray
Rate matrix in ps⁻¹.
exit_times : np.ndarray
Mean exit time from each basin [time_unit].
exit_ps : np.ndarray
Mean exit time in ps.
kT : float
Thermal energy [kJ/mol].
legs : dict
``legs[(i, j)]`` is the full result dict from ``run_mfep_ctmc``
for the directed MFEP leg from basin *i* to basin *j*.
"""
# 1. Load the 2D FES
x_grid, y_grid, fes_grid = load_plumed_fes_2d(fes2d_path, verbose=verbose)
kT = _kT(T)
ps_per_unit = _time_unit_to_ps(time_unit)
# 2. Build an interpolated potential and detect 2D basins
fes_pot = make_fes_potential_from_grid(x_grid, y_grid, fes_grid)
basin_net = build_basin_network_from_potential(
fes_pot,
xlim=(float(x_grid[0]), float(x_grid[-1])),
ylim=(float(y_grid[0]), float(y_grid[-1])),
nx=len(x_grid),
ny=len(y_grid),
max_basins=max_basins,
verbose=verbose,
)
n = basin_net.n_basins
basin_ids = np.array([b.id for b in basin_net.basins])
if verbose:
for b in basin_net.basins:
print(f" Basin {b.id}: minimum at ({b.minimum[0]:.2f}, "
f"{b.minimum[1]:.2f}), F_min={b.f_min:.2f}")
# 3. Pairwise MFEPs
K = np.zeros((n, n), dtype=float)
legs: dict = {}
for i, j in combinations(range(n), 2):
bi, bj = basin_net.basins[i], basin_net.basins[j]
start = tuple(bi.minimum.tolist())
end = tuple(bj.minimum.tolist())
if verbose:
print(f"\n--- MFEP leg {bi.id} → {bj.id} "
f"({start} → {end}) ---")
try:
leg = run_mfep_ctmc(
fes2d_path=fes2d_path,
start=start,
end=end,
D_s=D_s,
T=T,
time_unit=time_unit,
neb_images=neb_images,
neb_steps=neb_steps,
neb_spring=neb_spring,
use_neb=use_neb,
core_fraction=core_fraction,
init_weight=init_weight,
resample_n=resample_n,
verbose=verbose,
)
except Exception as exc:
if verbose:
print(f" ⚠ Leg {bi.id}→{bj.id} failed: {exc}")
continue
legs[(i, j)] = leg
# The leg result contains a 2-basin K matrix on the *arc-length*
# sub-problem. Basin 0 in the leg corresponds to the start (basin i)
# and basin 1 corresponds to the end (basin j).
K_leg = leg["K"] # shape (n_leg, n_leg) in [1/time_unit]
n_leg = K_leg.shape[0]
if n_leg >= 2:
# Forward rate: K_leg[0, 1] is k(i→j), K_leg[1, 0] is k(j→i)
K[i, j] += K_leg[0, n_leg - 1]
K[j, i] += K_leg[n_leg - 1, 0]
elif verbose:
print(f" ⚠ Leg {bi.id}→{bj.id}: only {n_leg} basin(s) "
"detected along the MFEP — no rate extracted.")
# 4. Fix diagonal so each row sums to 0
for i in range(n):
K[i, i] = -np.sum(K[i, :])
K_ps = K * ps_per_unit
with np.errstate(divide="ignore"):
exit_times = np.where(K.diagonal() != 0, -1.0 / K.diagonal(), np.inf)
exit_ps = exit_times * ps_per_unit
if verbose:
print(f"\n{'='*60}")
print(f"Full {n}×{n} rate matrix K [{time_unit}⁻¹]:")
print(K)
print(f"\nExit times [{time_unit}]: {exit_times}")
return {
"basin_network": basin_net,
"basin_ids": basin_ids,
"K": K,
"K_ps": K_ps,
"exit_times": exit_times,
"exit_ps": exit_ps,
"kT": kT,
"legs": legs,
}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
__all__ = [
# High-level wrappers
"run_1d_ctmc",
"run_1d_ctmc_from_plumed",
"run_1d_ctmc_with_hummer_D",
"run_mfep_ctmc",
"run_multi_mfep_ctmc",
# D-profile helpers exposed for advanced users
"interface_to_centers",
"interpolate_D_to_grid",
]