"""stochkin.committor
===================
Committor (splitting probability) analysis.
The committor :math:`q(x)` is the probability that a trajectory started
at *x* reaches basin **B** before basin **A**. This module provides
two complementary approaches:
1. **Trajectory-based (shooting) committor** – launch many short
trajectories from each grid point and count outcomes.
Works for both underdamped (Langevin) and overdamped (Brownian)
dynamics. Suitable for 1D and 2D grids.
- :func:`run_short_trajectory` – single trajectory worker.
- :func:`committor_point` – estimate *q* at one point (many trials).
- :func:`committor_map_parallel` – 2D grid committor with multiprocessing.
- :func:`committor_profile_1d` – 1D committor profile.
2. **FPE-based (grid) committor** – solve the backward Fokker–Planck
equation :math:`L q = 0` with Dirichlet boundary conditions on a
discrete grid built from a :class:`~stochkin.potentials.BasinNetwork`.
- :func:`committor_map_fpe` – solve for *q(x, y)* via sparse linear algebra.
Example basin functions (:func:`basinA`, :func:`basinB`) for the
central-well / ring potential are included for quick prototyping.
"""
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Pool
import os
# Optional SciPy sparse solver
try:
import scipy.sparse.linalg as _spla # type: ignore
_HAVE_SCIPY_SPARSE_LINALG = True
except ImportError: # pragma: no cover - optional
_HAVE_SCIPY_SPARSE_LINALG = False
from .fpe import build_fp_generator_from_fes
from .potentials import BasinNetwork
from .integrators import GammaToDiffusion, overdamped_step_euler, baobab_step
from .boundaries import apply_bounds as _apply_bounds, reflect_position_velocity as _reflect_xv
[docs]
def run_short_trajectory(
potential,
x0,
v0,
max_time,
dt,
gamma,
kT,
basinA,
basinB,
m=1.0,
regime="underdamped",
diffusion=None,
overdamped_eps=1e-6,
bounds=None,
boundary=None,
):
"""Run a single short trajectory and report which basin is hit first.
This is the inner workhorse of the shooting-committor algorithm.
Starting from ``(x0, v0)``, the trajectory is integrated until it
enters *basinA* (returns ``'A'``), *basinB* (returns ``'B'``), or
*max_time* is exceeded (returns ``None``).
Parameters
----------
potential : callable
``potential(x) -> (U, F)`` with ``F = −∇U``.
x0 : array_like
Initial position.
v0 : array_like
Initial velocity (ignored for overdamped dynamics).
max_time : float
Maximum simulation time.
dt : float
Integration time step.
gamma : float
Friction coefficient.
kT : float
Thermal energy.
basinA, basinB : callable
``basin(x) -> bool`` indicating membership.
m : float
Mass (default 1).
regime : {'underdamped', 'overdamped'}
Dynamics type.
diffusion : scalar, callable, or None
Diffusion coefficient for overdamped mode.
overdamped_eps : float
Finite-difference step for ∇D.
bounds : sequence of (lo, hi) or None
Bounding box for position enforcement.
boundary : {'reflect', 'clip'} or None
How to enforce *bounds*.
Returns
-------
str or None
``'A'``, ``'B'``, or ``None`` if neither basin was reached.
"""
x = np.array(x0, dtype=float).ravel()
v = np.array(v0, dtype=float).ravel()
t = 0.0
# If bounds are provided but boundary isn't, default to reflecting.
if bounds is not None and boundary is None:
boundary = "reflect"
# Enforce bounds at t=0 so we don't start outside the domain.
if bounds is not None and boundary is not None:
if str(boundary) == "reflect":
if str(regime) == "overdamped":
x = _apply_bounds(x, bounds, mode="reflect")
else:
x, v = _reflect_xv(x, v, bounds)
else:
x = _apply_bounds(x, bounds, mode=str(boundary))
if regime == "overdamped":
beta = 1.0 / kT
if diffusion is None:
diffusion = GammaToDiffusion(gamma=gamma, kT=kT)
while t < max_time:
x = overdamped_step_euler(
potential, x, dt, beta, diffusion, eps=float(overdamped_eps)
)
if bounds is not None and boundary is not None:
x = _apply_bounds(x, bounds, mode=str(boundary))
if basinA(x): return "A"
if basinB(x): return "B"
t += dt
return None
while t < max_time:
x, v, _ = baobab_step(potential, x, v, dt, gamma, kT, m)
if bounds is not None and boundary is not None:
if str(boundary) == "reflect":
x, v = _reflect_xv(x, v, bounds)
else:
x = _apply_bounds(x, bounds, mode=str(boundary))
if basinA(x): return "A"
if basinB(x): return "B"
t += dt
return None
[docs]
def committor_point(args):
"""Estimate the committor at a single grid point via trajectory shooting.
Runs *n_trials* short trajectories from ``(x0, v0)`` and returns
the fraction that reach basin B first:
:math:`q = n_B / (n_A + n_B)`. Returns ``NaN`` if no trajectory
reached either basin.
Parameters
----------
args : tuple
Positional arguments packed as a tuple for compatibility with
``multiprocessing.Pool.map``. Fields (by index)::
0 potential 5 n_trials 10 m
1 x0 6 max_time 11 regime (optional)
2 v0 7 dt 12 diffusion (optional)
3 basinA 8 gamma 13 overdamped_eps (optional)
4 basinB 9 kT 14 bounds (optional)
15 boundary (optional)
16 seed (optional)
Returns
-------
float
Estimated committor *q ∈ [0, 1]* (or ``NaN``).
"""
# Backwards compatible positional parsing: older code passes 11 fields.
potential = args[0]
x0 = args[1]
v0 = args[2]
basinA = args[3]
basinB = args[4]
n_trials = int(args[5])
max_time = float(args[6])
dt = float(args[7])
gamma = float(args[8])
kT = float(args[9])
m = float(args[10])
regime = args[11] if len(args) > 11 else "underdamped"
diffusion = args[12] if len(args) > 12 else None
overdamped_eps = float(args[13]) if len(args) > 13 else 1e-6
bounds = args[14] if len(args) > 14 else None
boundary = args[15] if len(args) > 15 else None
seed = args[16] if len(args) > 16 else None
if seed is not None:
np.random.seed(int(seed) % (2**32 - 1))
# Run trials and count outcomes without storing all per-trial results.
nA = 0
nB = 0
for _ in range(n_trials):
r = run_short_trajectory(
potential,
x0,
v0,
max_time,
dt,
gamma,
kT,
basinA,
basinB,
m=m,
regime=regime,
diffusion=diffusion,
overdamped_eps=overdamped_eps,
bounds=bounds,
boundary=boundary,
)
if r == "A":
nA += 1
elif r == "B":
nB += 1
return nB / (nA + nB) if (nA + nB) > 0 else np.nan
[docs]
def committor_map_parallel(
potential,
basinA,
basinB,
xlim=(-1.5, 1.5),
ylim=(-1.5, 1.5),
grid_size=50,
n_trials=100,
max_time=50.0,
dt=0.01,
gamma=10.0,
kT=0.05,
m=1.0,
processes=None,
regime="underdamped",
diffusion=None,
overdamped_eps=1e-6,
bounds=None,
boundary=None,
base_seed=None,
):
"""Compute a 2D committor map using parallel trajectory shooting.
For each point on a ``grid_size × grid_size`` grid, *n_trials*
short trajectories are launched and the fraction reaching basin B
first is recorded.
Parameters
----------
potential : callable
``potential(x) -> (U, F)``.
basinA, basinB : callable
``basin(x) -> bool``.
xlim, ylim : tuple
Grid bounds.
grid_size : int
Number of grid points per axis.
n_trials : int
Shooting trials per grid point.
max_time : float
Maximum trajectory time.
dt : float
Time step.
gamma : float
Friction.
kT : float
Thermal energy.
m : float
Mass (default 1).
processes : int or None
Number of worker processes (None = all CPUs).
regime : {'underdamped', 'overdamped'}
Dynamics type.
diffusion : scalar, callable, or None
Diffusion for overdamped mode.
overdamped_eps : float
FD step for ∇D.
bounds : sequence of (lo, hi) or None
Bounding box.
boundary : {'reflect', 'clip'} or None
Bound enforcement mode.
base_seed : int or None
Base seed for reproducible per-point seeds.
Returns
-------
xs : ndarray
x-axis grid.
ys : ndarray
y-axis grid.
Q : ndarray, shape (grid_size, grid_size)
Committor values *q(x, y) ∈ [0, 1]*.
"""
xs = np.linspace(xlim[0], xlim[1], grid_size)
ys = np.linspace(ylim[0], ylim[1], grid_size)
if bounds is not None and boundary is None:
boundary = "reflect"
rng = np.random.RandomState(int(base_seed)) if base_seed is not None else None
args = []
for xi in xs:
for yj in ys:
args.append(
(
potential,
[xi, yj],
[0.0, 0.0],
basinA,
basinB,
n_trials,
max_time,
dt,
gamma,
kT,
m,
regime,
diffusion,
overdamped_eps,
bounds,
boundary,
(int(rng.randint(0, 2**31 - 1)) if rng is not None else None),
)
)
n_tasks = len(args)
if n_tasks == 0:
return xs, ys, np.empty((0, 0), dtype=float)
if processes is None:
n_procs = int(os.cpu_count() or 1)
else:
n_procs = int(processes)
n_procs = max(1, min(n_procs, n_tasks))
if n_procs <= 1:
results = [committor_point(a) for a in args]
else:
pool = Pool(processes=n_procs)
try:
chunksize = max(1, n_tasks // max(1, n_procs * 8))
# Use map (ordered) so reshape aligns with the (xi, yj) loop order.
results = pool.map(committor_point, args, chunksize=chunksize)
pool.close()
pool.join()
except Exception:
pool.terminate()
pool.join()
raise
Q = np.array(results).reshape((grid_size, grid_size))
plt.figure(figsize=(6, 5))
plt.imshow(
Q.T,
origin="lower",
extent=[xlim[0], xlim[1], ylim[0], ylim[1]],
cmap="coolwarm",
vmin=0.0,
vmax=1.0,
aspect="auto",
)
plt.colorbar(label="Committor q(x)")
plt.xlabel("x1")
plt.ylabel("x2")
plt.title("Grid-based committor map (parallel)")
plt.show()
return xs, ys, Q
[docs]
def committor_profile_1d(
potential,
basinA,
basinB,
xlim=(-1.5, 1.5),
grid_size=50,
n_trials=100,
max_time=50.0,
dt=0.01,
gamma=10.0,
kT=0.05,
m=1.0,
):
"""Compute the committor q(x) on a 1D grid via short trajectory shooting.
This is the 1D analogue of :func:`committor_map_parallel`.
Parameters are the same as in :func:`committor_map_parallel`, but with a 1D grid.
Returns
-------
xs : (grid_size,) ndarray
Grid points.
q_vals : (grid_size,) ndarray
Committor values in [0, 1].
"""
xs = np.linspace(xlim[0], xlim[1], grid_size)
q_vals = np.empty_like(xs, dtype=float)
for i, xv in enumerate(xs):
args = (
potential,
np.array([xv], dtype=float),
np.array([0.0], dtype=float),
basinA,
basinB,
int(n_trials),
float(max_time),
float(dt),
float(gamma),
float(kT),
float(m),
)
q_vals[i] = committor_point(args)
return xs, q_vals
# Example basins for a central-well/ring potential
[docs]
def basinA(x, rA=0.1):
"""Basin A: central well around the origin."""
return np.linalg.norm(x) < rA
[docs]
def basinB(x, r_inner=0.90, r_outer=1.0):
"""Basin B: circular corona (ring) between r_inner and r_outer."""
r = np.linalg.norm(x)
return (r_inner <= r <= r_outer)
[docs]
def committor_map_fpe(
basin_network: BasinNetwork,
D,
beta,
basinA_id,
basinB_id,
sparse=True,
energy_buffer_kT=None,
):
"""Compute the committor *q(x, y)* by solving the backward FPE on a grid.
Solves :math:`L q = 0` with Dirichlet conditions
:math:`q = 0` on basin A and :math:`q = 1` on basin B, where *L*
is the detailed-balance-preserving discrete Fokker–Planck generator
built from the free-energy surface.
Parameters
----------
basin_network : BasinNetwork
Must provide ``xs``, ``ys``, ``U``, and ``labels``.
D : float or ndarray, shape (nx, ny)
Diffusion coefficient (scalar or field).
beta : float
Inverse temperature :math:`1/(k_BT)`.
basinA_id, basinB_id : int
Integer basin labels for the two absorbing states.
sparse : bool
Use sparse linear algebra (default ``True``).
energy_buffer_kT : float or None
If given, restrict A and B to low-energy cores within
*energy_buffer_kT* of the basin minimum (in kT units).
The remainder of each basin becomes part of the solved domain.
Returns
-------
xs, ys : ndarray
Grid coordinate arrays.
q_grid : ndarray, shape (nx, ny)
Committor values in [0, 1].
Raises
------
ImportError
If SciPy sparse is not available.
"""
if not _HAVE_SCIPY_SPARSE_LINALG:
raise ImportError(
"committor_map_fpe requires scipy.sparse.linalg. "
"Install SciPy or use the trajectory-based committor_map_parallel."
)
xs = np.asarray(basin_network.xs, dtype=float)
ys = np.asarray(basin_network.ys, dtype=float)
U = np.asarray(basin_network.U, dtype=float)
labels = np.asarray(basin_network.labels, dtype=int)
nx, ny = U.shape
N = nx * ny
U_flat = U.reshape(N)
labels_flat = labels.reshape(N)
# 1) Build initial A/B masks based purely on labels
labelA_mask = labels_flat == basinA_id
labelB_mask = labels_flat == basinB_id
if not labelA_mask.any():
raise ValueError(f"No grid points found with basinA_id={basinA_id}.")
if not labelB_mask.any():
raise ValueError(f"No grid points found with basinB_id={basinB_id}.")
# 2) Optionally restrict to low-energy cores inside each basin
if energy_buffer_kT is not None:
# local energy thresholds in each basin
U_A = U_flat[labelA_mask]
U_B = U_flat[labelB_mask]
U_A_min = U_A.min()
U_B_min = U_B.min()
# convert buffer in kT (dimensionless) to energy units
buffer_energy = energy_buffer_kT / beta
A_mask = labelA_mask & (U_flat <= U_A_min + buffer_energy)
B_mask = labelB_mask & (U_flat <= U_B_min + buffer_energy)
# Fallback if thresholds are too strict and we emptied a basin
if not A_mask.any():
A_mask = labelA_mask
if not B_mask.any():
B_mask = labelB_mask
else:
# Use the full basins as boundary sets
A_mask = labelA_mask
B_mask = labelB_mask
# 3) Determine unknown region (where we solve the PDE)
unknown_mask = ~(A_mask | B_mask)
unknown_idx = np.where(unknown_mask)[0]
# If there are no unknown states, the committor is trivial:
# - q=0 in A, q=1 in B
if unknown_idx.size == 0:
q_flat = np.zeros(N, dtype=float)
q_flat[A_mask] = 0.0
q_flat[B_mask] = 1.0
q_grid = q_flat.reshape(nx, ny)
return xs, ys, q_grid
# 4) Build FP generator
L = build_fp_generator_from_fes(xs, ys, U, D, beta, sparse=sparse)
# 5) Restrict generator to unknown states
A_sub = L[unknown_idx, :][:, unknown_idx]
# Right-hand side: b_i = - Σ_{j∈B} L_ij * 1 (q_B = 1), q_A = 0
B_idx = np.where(B_mask)[0]
if B_idx.size > 0:
contrib = L[unknown_idx, :][:, B_idx].sum(axis=1)
b = -np.asarray(contrib).ravel()
else:
b = np.zeros(unknown_idx.size, dtype=float)
# 6) Solve for q on unknown states
q_unknown = _spla.spsolve(A_sub, b)
# 7) Assemble full committor
q_flat = np.zeros(N, dtype=float)
q_flat[A_mask] = 0.0
q_flat[B_mask] = 1.0
q_flat[unknown_idx] = q_unknown
q_grid = q_flat.reshape(nx, ny)
return xs, ys, q_grid