Source code for stochkin.mfep

"""
MFEP utilities for 2D free-energy surfaces.

This submodule provides:
  1) Grid-based MFEP search (Dijkstra/minimax)
  2) NEB-style path refinement
  3) Export of a 1D profile s -> F(s) compatible with 1D CTMC workflows
"""

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import heapq

import numpy as np

from scipy.interpolate import RectBivariateSpline as _RBS

from .fes import load_plumed_fes_2d


def _as_xy(point: Sequence[float], name: str) -> Tuple[float, float]:
    arr = np.asarray(point, dtype=float).ravel()
    if arr.size < 2:
        raise ValueError(f"{name} must contain at least two values [x, y].")
    return float(arr[0]), float(arr[1])


def _cumulative_arclength(x: np.ndarray, y: np.ndarray) -> np.ndarray:
    x = np.asarray(x, dtype=float).ravel()
    y = np.asarray(y, dtype=float).ravel()
    if x.size != y.size:
        raise ValueError("x and y must have the same length.")
    if x.size == 0:
        return np.asarray([], dtype=float)
    ds = np.hypot(np.diff(x), np.diff(y))
    s = np.zeros(x.size, dtype=float)
    if ds.size:
        s[1:] = np.cumsum(ds)
    return s


def _resample_polyline(path_xy: np.ndarray, n_points: int) -> np.ndarray:
    path_xy = np.asarray(path_xy, dtype=float)
    if path_xy.ndim != 2 or path_xy.shape[1] != 2:
        raise ValueError("path_xy must have shape (N, 2).")
    if path_xy.shape[0] < 2:
        raise ValueError("path_xy must contain at least two points.")
    if n_points < 2:
        raise ValueError("n_points must be >= 2.")

    s = _cumulative_arclength(path_xy[:, 0], path_xy[:, 1])
    total = float(s[-1])
    if total <= 0.0:
        out = np.repeat(path_xy[:1], n_points, axis=0)
        out[-1] = path_xy[-1]
        return out

    st = np.linspace(0.0, total, int(n_points))
    xr = np.interp(st, s, path_xy[:, 0])
    yr = np.interp(st, s, path_xy[:, 1])
    return np.column_stack([xr, yr])


[docs] @dataclass class MFEPPath: """ MFEP trajectory and projected 1D free-energy profile. Attributes ---------- x, y : 1D arrays MFEP coordinates in the original CV space. s : 1D array Curvilinear coordinate along the path (same length as x/y). F : 1D array Free energy sampled along the path (same length as x/y). method : str Path construction method, e.g. "grid" or "neb". objective : str Optimization objective, e.g. "integral", "barrier", "refined". indices : list[(ix,iy)] or None Optional grid indices (available for grid paths). metadata : dict Additional run details. """ x: np.ndarray y: np.ndarray s: np.ndarray F: np.ndarray method: str objective: str indices: Optional[List[Tuple[int, int]]] = None metadata: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): self.x = np.asarray(self.x, dtype=float).ravel() self.y = np.asarray(self.y, dtype=float).ravel() self.s = np.asarray(self.s, dtype=float).ravel() self.F = np.asarray(self.F, dtype=float).ravel() n = self.x.size if n == 0: raise ValueError("MFEPPath is empty.") if self.y.size != n or self.s.size != n or self.F.size != n: raise ValueError("x, y, s, F must have the same length.")
[docs] def as_xy(self) -> np.ndarray: """Return path as shape (N,2) array.""" return np.column_stack([self.x, self.y])
[docs] def save_profile_1d( self, filename: str | Path, *, subtract_min: bool = True, fmt: str = "%.10f", ) -> Path: """ Save 1D profile for CTMC scripts as two columns: s, F. Output format is PLUMED-like and compatible with loaders that read the first two numeric columns as (coordinate, free energy). """ out = Path(filename).expanduser().resolve() out.parent.mkdir(parents=True, exist_ok=True) f = self.F - np.nanmin(self.F) if subtract_min else np.asarray(self.F, dtype=float) arr = np.column_stack([self.s, f]) header = ( f"# s F | method={self.method} objective={self.objective} " f"| n={self.s.size} | subtract_min={subtract_min}\n# s F" ) np.savetxt(out, arr, fmt=fmt, header=header) return out
[docs] def save_path_xyf( self, filename: str | Path, *, subtract_min: bool = False, fmt: str = "%.10f", ) -> Path: """Save full path as four columns: s, x, y, F.""" out = Path(filename).expanduser().resolve() out.parent.mkdir(parents=True, exist_ok=True) f = self.F - np.nanmin(self.F) if subtract_min else np.asarray(self.F, dtype=float) arr = np.column_stack([self.s, self.x, self.y, f]) header = ( f"# s x y F | method={self.method} objective={self.objective} " f"| n={self.s.size} | subtract_min={subtract_min}" ) np.savetxt(out, arr, fmt=fmt, header=header) return out
[docs] class GridMFEP: """ Grid-based MFEP finder on a 2D FES. Supports two objectives: - "integral": minimum integrated free energy along the path - "barrier": minimax barrier path (minimum maximum encountered F) """ def __init__(self, x_grid: np.ndarray, y_grid: np.ndarray, fes_grid: np.ndarray): self.x = np.asarray(x_grid, dtype=float).ravel() self.y = np.asarray(y_grid, dtype=float).ravel() self.F = np.asarray(fes_grid, dtype=float) if self.x.size < 2 or self.y.size < 2: raise ValueError("x_grid and y_grid must contain at least 2 points each.") if not np.all(np.diff(self.x) > 0): raise ValueError("x_grid must be strictly increasing.") if not np.all(np.diff(self.y) > 0): raise ValueError("y_grid must be strictly increasing.") if self.F.shape != (self.x.size, self.y.size): raise ValueError( f"fes_grid shape mismatch: expected {(self.x.size, self.y.size)}, got {self.F.shape}" ) self._finite_mask = np.isfinite(self.F) if not np.any(self._finite_mask): raise ValueError("fes_grid contains no finite values.") self._fmin = float(np.nanmin(self.F)) self._F_cost = np.where(self._finite_mask, self.F - self._fmin, np.nan)
[docs] @classmethod def from_plumed( cls, filename: str | Path, *, x_col: int = 0, y_col: int = 1, fes_col: int = 2, subtract_min: bool = True, verbose: bool = False, ) -> "GridMFEP": x, y, F = load_plumed_fes_2d( str(filename), x_col=x_col, y_col=y_col, fes_col=fes_col, subtract_min=subtract_min, verbose=verbose, ) return cls(x, y, F)
[docs] def coordinate_to_index(self, point: Sequence[float]) -> Tuple[int, int]: x0, y0 = _as_xy(point, "point") ix = int(np.argmin(np.abs(self.x - x0))) iy = int(np.argmin(np.abs(self.y - y0))) return ix, iy
def _nearest_finite_index(self, idx: Tuple[int, int]) -> Tuple[int, int]: ix, iy = int(idx[0]), int(idx[1]) if self._finite_mask[ix, iy]: return ix, iy finite_idx = np.argwhere(self._finite_mask) if finite_idx.size == 0: raise ValueError("No finite cells available in FES grid.") dx = self.x[finite_idx[:, 0]] - self.x[ix] dy = self.y[finite_idx[:, 1]] - self.y[iy] k = int(np.argmin(dx * dx + dy * dy)) return int(finite_idx[k, 0]), int(finite_idx[k, 1]) def _neighbors(self, ix: int, iy: int, connectivity: int): if connectivity not in (4, 8): raise ValueError("connectivity must be 4 or 8.") steps = [(-1, 0), (1, 0), (0, -1), (0, 1)] if connectivity == 8: steps += [(-1, -1), (-1, 1), (1, -1), (1, 1)] nx, ny = self.F.shape for dxi, dyi in steps: jx = ix + dxi jy = iy + dyi if 0 <= jx < nx and 0 <= jy < ny: ds = float(np.hypot(self.x[jx] - self.x[ix], self.y[jy] - self.y[iy])) yield jx, jy, ds def _allowed( self, ix: int, iy: int, *, f_threshold: Optional[float], start: Tuple[int, int], end: Tuple[int, int], ) -> bool: if not self._finite_mask[ix, iy]: return False if f_threshold is None: return True if (ix, iy) == start or (ix, iy) == end: return True return bool(self.F[ix, iy] <= float(f_threshold)) def _reconstruct_path( self, parent: np.ndarray, start: Tuple[int, int], end: Tuple[int, int], ) -> List[Tuple[int, int]]: path = [end] cur = end while cur != start: px, py = parent[cur[0], cur[1]] if px < 0 or py < 0: raise RuntimeError("Path reconstruction failed (disconnected graph).") cur = (int(px), int(py)) path.append(cur) path.reverse() return path def _build_result( self, indices: List[Tuple[int, int]], *, method: str, objective: str, metadata: Optional[Dict[str, Any]] = None, ) -> MFEPPath: xs = np.asarray([self.x[i] for i, _ in indices], dtype=float) ys = np.asarray([self.y[j] for _, j in indices], dtype=float) fs = np.asarray([self.F[i, j] for i, j in indices], dtype=float) ss = _cumulative_arclength(xs, ys) return MFEPPath( x=xs, y=ys, s=ss, F=fs, method=method, objective=objective, indices=list(indices), metadata=dict(metadata or {}), )
[docs] def find_path( self, start: Sequence[float], end: Sequence[float], *, objective: str = "integral", connectivity: int = 8, f_threshold: Optional[float] = None, project_to_finite: bool = True, ) -> MFEPPath: """ Compute a grid MFEP between start and end points. Parameters ---------- start, end : sequence[float] Start/end coordinates [x, y]. objective : {"integral", "barrier"} integral -> minimum integrated F(s) barrier -> minimax path (minimum maximum F) connectivity : {4, 8} Grid connectivity. f_threshold : float or None Optional hard mask: allow only cells with F <= f_threshold (start/end are always allowed). project_to_finite : bool If True, snap start/end to nearest finite cell if needed. """ objective = str(objective).strip().lower() if objective not in ("integral", "barrier"): raise ValueError("objective must be 'integral' or 'barrier'.") start_idx = self.coordinate_to_index(start) end_idx = self.coordinate_to_index(end) if project_to_finite: start_idx = self._nearest_finite_index(start_idx) end_idx = self._nearest_finite_index(end_idx) if not self._allowed(*start_idx, f_threshold=f_threshold, start=start_idx, end=end_idx): raise ValueError("Start point is not on an allowed finite cell.") if not self._allowed(*end_idx, f_threshold=f_threshold, start=start_idx, end=end_idx): raise ValueError("End point is not on an allowed finite cell.") nx, ny = self.F.shape parent = np.full((nx, ny, 2), -1, dtype=int) if objective == "integral": dist = np.full((nx, ny), np.inf, dtype=float) sx, sy = start_idx dist[sx, sy] = 0.0 heap = [(0.0, sx, sy)] while heap: cur_d, ix, iy = heapq.heappop(heap) if cur_d > dist[ix, iy]: continue if (ix, iy) == end_idx: break fi = self._F_cost[ix, iy] for jx, jy, ds in self._neighbors(ix, iy, connectivity): if not self._allowed( jx, jy, f_threshold=f_threshold, start=start_idx, end=end_idx ): continue fj = self._F_cost[jx, jy] w = max(0.0, 0.5 * (fi + fj)) * ds nd = cur_d + w if nd < dist[jx, jy]: dist[jx, jy] = nd parent[jx, jy] = (ix, iy) heapq.heappush(heap, (nd, jx, jy)) if not np.isfinite(dist[end_idx[0], end_idx[1]]): raise RuntimeError("No path found under current constraints.") path_idx = self._reconstruct_path(parent, start_idx, end_idx) return self._build_result( path_idx, method="grid", objective="integral", metadata={ "start_idx": start_idx, "end_idx": end_idx, "connectivity": int(connectivity), "f_threshold": None if f_threshold is None else float(f_threshold), "cost_integral": float(dist[end_idx[0], end_idx[1]]), }, ) # barrier objective: lexicographic (barrier, integral) best_barrier = np.full((nx, ny), np.inf, dtype=float) best_integral = np.full((nx, ny), np.inf, dtype=float) sx, sy = start_idx best_barrier[sx, sy] = float(self._F_cost[sx, sy]) best_integral[sx, sy] = 0.0 heap = [(best_barrier[sx, sy], 0.0, sx, sy)] tol = 1e-15 while heap: cur_b, cur_i, ix, iy = heapq.heappop(heap) if cur_b > best_barrier[ix, iy] + tol: continue if abs(cur_b - best_barrier[ix, iy]) <= tol and cur_i > best_integral[ix, iy] + tol: continue if (ix, iy) == end_idx: break fi = self._F_cost[ix, iy] for jx, jy, ds in self._neighbors(ix, iy, connectivity): if not self._allowed( jx, jy, f_threshold=f_threshold, start=start_idx, end=end_idx ): continue fj = self._F_cost[jx, jy] edge_b = max(fi, fj) cand_b = max(cur_b, edge_b) cand_i = cur_i + max(0.0, 0.5 * (fi + fj)) * ds better = ( cand_b < best_barrier[jx, jy] - tol or ( abs(cand_b - best_barrier[jx, jy]) <= tol and cand_i < best_integral[jx, jy] - tol ) ) if better: best_barrier[jx, jy] = cand_b best_integral[jx, jy] = cand_i parent[jx, jy] = (ix, iy) heapq.heappush(heap, (cand_b, cand_i, jx, jy)) if not np.isfinite(best_barrier[end_idx[0], end_idx[1]]): raise RuntimeError("No path found under current constraints.") path_idx = self._reconstruct_path(parent, start_idx, end_idx) return self._build_result( path_idx, method="grid", objective="barrier", metadata={ "start_idx": start_idx, "end_idx": end_idx, "connectivity": int(connectivity), "f_threshold": None if f_threshold is None else float(f_threshold), "cost_barrier": float(best_barrier[end_idx[0], end_idx[1]] + self._fmin), "cost_integral_tiebreak": float(best_integral[end_idx[0], end_idx[1]]), }, )
[docs] class NEBMFEP: """ NEB-style path refiner on a 2D FES grid. Intended as a refinement step starting from a grid MFEP. """ def __init__( self, x_grid: np.ndarray, y_grid: np.ndarray, fes_grid: np.ndarray, *, fill_value: Optional[float] = None, ): self.x = np.asarray(x_grid, dtype=float).ravel() self.y = np.asarray(y_grid, dtype=float).ravel() self.F = np.asarray(fes_grid, dtype=float) if self.x.size < 2 or self.y.size < 2: raise ValueError("x_grid and y_grid must contain at least 2 points each.") if not np.all(np.diff(self.x) > 0): raise ValueError("x_grid must be strictly increasing.") if not np.all(np.diff(self.y) > 0): raise ValueError("y_grid must be strictly increasing.") if self.F.shape != (self.x.size, self.y.size): raise ValueError( f"fes_grid shape mismatch: expected {(self.x.size, self.y.size)}, got {self.F.shape}" ) finite = np.isfinite(self.F) if not np.any(finite): raise ValueError("fes_grid contains no finite values.") fmin = float(np.nanmin(self.F)) fmax = float(np.nanmax(self.F)) if fill_value is None: span = max(1.0, fmax - fmin) fill_value = fmax + 0.25 * span self.fill_value = float(fill_value) self.F_work = np.where(finite, self.F, self.fill_value) edge_order = 2 if min(self.F_work.shape) >= 3 else 1 self.dFdx, self.dFdy = np.gradient( self.F_work, self.x, self.y, edge_order=edge_order ) # Smooth bicubic spline for gradient evaluation (C² continuous) self._spl = _RBS(self.x, self.y, self.F_work, kx=3, ky=3) def _interp_scalar(self, field: np.ndarray, xq: float, yq: float) -> float: xq = float(np.clip(xq, self.x[0], self.x[-1])) yq = float(np.clip(yq, self.y[0], self.y[-1])) ix = int(np.searchsorted(self.x, xq) - 1) iy = int(np.searchsorted(self.y, yq) - 1) ix = int(np.clip(ix, 0, self.x.size - 2)) iy = int(np.clip(iy, 0, self.y.size - 2)) x0, x1 = self.x[ix], self.x[ix + 1] y0, y1 = self.y[iy], self.y[iy + 1] tx = 0.0 if x1 == x0 else (xq - x0) / (x1 - x0) ty = 0.0 if y1 == y0 else (yq - y0) / (y1 - y0) f00 = field[ix, iy] f10 = field[ix + 1, iy] f01 = field[ix, iy + 1] f11 = field[ix + 1, iy + 1] return float( (1.0 - tx) * (1.0 - ty) * f00 + tx * (1.0 - ty) * f10 + (1.0 - tx) * ty * f01 + tx * ty * f11 ) def _interp_grad(self, xq: float, yq: float) -> np.ndarray: """Gradient via bicubic spline – C² smooth, no grid-cell kinks.""" xq = float(np.clip(xq, self.x[0], self.x[-1])) yq = float(np.clip(yq, self.y[0], self.y[-1])) gx = float(self._spl(xq, yq, dx=1, grid=False)) gy = float(self._spl(xq, yq, dy=1, grid=False)) return np.asarray([gx, gy], dtype=float)
[docs] def refine( self, initial_path: np.ndarray, *, n_images: int = 120, k_spring: float = 1.0, step_size: float = 0.01, max_iter: int = 8000, tol: float = 1.0, smooth: float = 0.0, clip_to_bounds: bool = True, max_step: Optional[float] = None, adaptive_step: bool = True, step_shrink: float = 0.5, step_grow: float = 1.02, step_min: Optional[float] = None, reparam_every: int = 10, ) -> MFEPPath: """ Refine an initial path with a simple NEB-style iterative scheme. """ if n_images < 3: raise ValueError("n_images must be >= 3.") if max_iter < 1: raise ValueError("max_iter must be >= 1.") if step_size <= 0.0: raise ValueError("step_size must be > 0.") if not (0.0 <= smooth <= 1.0): raise ValueError("smooth must be in [0, 1].") if max_step is not None and float(max_step) <= 0.0: raise ValueError("max_step must be > 0 when provided.") if step_min is not None and float(step_min) <= 0.0: raise ValueError("step_min must be > 0 when provided.") if not (0.0 < float(step_shrink) < 1.0): raise ValueError("step_shrink must be in (0,1).") if float(step_grow) < 1.0: raise ValueError("step_grow must be >= 1.") if int(reparam_every) < 0: raise ValueError("reparam_every must be >= 0.") p0 = np.asarray(initial_path, dtype=float) if p0.ndim != 2 or p0.shape[1] != 2: raise ValueError("initial_path must have shape (N,2).") if p0.shape[0] < 2: raise ValueError("initial_path must contain at least two points.") images = _resample_polyline(p0, int(n_images)) images[0] = p0[0] images[-1] = p0[-1] dx = float(np.median(np.diff(self.x))) dy = float(np.median(np.diff(self.y))) grid_step = float(np.hypot(dx, dy)) max_step_eff = float(max_step) if max_step is not None else 3.0 * grid_step alpha = float(step_size) alpha_max = float(step_size) alpha_min = float(step_min) if step_min is not None else max(1e-6, 1e-3 * alpha_max) converged = False max_force = np.inf n_done = 0 prev_max_force = np.inf for it in range(int(max_iter)): forces = np.zeros_like(images) max_force = 0.0 for i in range(1, images.shape[0] - 1): t = images[i + 1] - images[i - 1] nt = float(np.linalg.norm(t)) if nt <= 1e-15: t = np.asarray([1.0, 0.0], dtype=float) else: t = t / nt grad = self._interp_grad(images[i, 0], images[i, 1]) true_force = -grad true_perp = true_force - np.dot(true_force, t) * t d_f = float(np.linalg.norm(images[i + 1] - images[i])) d_b = float(np.linalg.norm(images[i] - images[i - 1])) spring = float(k_spring) * (d_f - d_b) * t f_tot = true_perp + spring forces[i] = f_tot max_force = max(max_force, float(np.linalg.norm(f_tot))) n_done = it + 1 if max_force < float(tol): converged = True break if adaptive_step and np.isfinite(prev_max_force): if max_force > 1.05 * prev_max_force: alpha = max(alpha * float(step_shrink), alpha_min) elif max_force < 0.98 * prev_max_force: alpha = min(alpha * float(step_grow), alpha_max) disp = alpha * forces if max_step_eff > 0.0: dn = np.linalg.norm(disp[1:-1], axis=1) scale = np.ones_like(dn) m = dn > max_step_eff scale[m] = max_step_eff / (dn[m] + 1e-15) disp[1:-1] *= scale[:, None] images[1:-1] += disp[1:-1] if smooth > 0.0: images[1:-1] = ( (1.0 - float(smooth)) * images[1:-1] + 0.5 * float(smooth) * (images[:-2] + images[2:]) ) if int(reparam_every) > 0 and ((it + 1) % int(reparam_every) == 0): images = _resample_polyline(images, int(n_images)) if clip_to_bounds: images[:, 0] = np.clip(images[:, 0], self.x[0], self.x[-1]) images[:, 1] = np.clip(images[:, 1], self.y[0], self.y[-1]) images[0] = p0[0] images[-1] = p0[-1] prev_max_force = max_force f_path = np.asarray( [self._interp_scalar(self.F_work, px, py) for px, py in images], dtype=float ) s_path = _cumulative_arclength(images[:, 0], images[:, 1]) return MFEPPath( x=images[:, 0], y=images[:, 1], s=s_path, F=f_path, method="neb", objective="refined", indices=None, metadata={ "n_images": int(n_images), "k_spring": float(k_spring), "step_size": float(step_size), "max_iter": int(max_iter), "tol": float(tol), "smooth": float(smooth), "clip_to_bounds": bool(clip_to_bounds), "adaptive_step": bool(adaptive_step), "step_shrink": float(step_shrink), "step_grow": float(step_grow), "alpha_final": float(alpha), "alpha_min": float(alpha_min), "alpha_max": float(alpha_max), "max_step": float(max_step_eff), "reparam_every": int(reparam_every), "converged": bool(converged), "n_iter": int(n_done), "final_max_force": float(max_force), }, )
[docs] def refine_between( self, start: Sequence[float], end: Sequence[float], *, initializer: Optional[GridMFEP] = None, initializer_objective: str = "integral", initializer_connectivity: int = 8, initializer_f_threshold: Optional[float] = None, **neb_kwargs, ) -> MFEPPath: """ Build an initial grid path between start/end, then refine with NEB. """ if initializer is None: initializer = GridMFEP(self.x, self.y, self.F) coarse = initializer.find_path( start, end, objective=initializer_objective, connectivity=initializer_connectivity, f_threshold=initializer_f_threshold, project_to_finite=True, ) init_xy = coarse.as_xy() refined = self.refine(init_xy, **neb_kwargs) refined.metadata["initializer_method"] = coarse.method refined.metadata["initializer_objective"] = coarse.objective refined.metadata["initializer_points"] = int(coarse.s.size) return refined
[docs] def refine_fire( self, initial_path: np.ndarray, *, n_images: int = 120, k_spring: float = 1.0, dt_start: float = 0.005, dt_max: float = 0.05, max_iter: int = 8000, tol: float = 1.0, n_min: int = 5, f_inc: float = 1.1, f_dec: float = 0.5, alpha_start: float = 0.1, f_alpha: float = 0.99, smooth: float = 0.0, clip_to_bounds: bool = True, reparam_every: int = 200, ) -> MFEPPath: """Refine an NEB path using the FIRE algorithm (Bitzek et al. 2006). FIRE (Fast Inertial Relaxation Engine) uses velocity-based dynamics with adaptive time-stepping. It converges much faster than steepest descent for NEB optimisation. Parameters ---------- initial_path : (N, 2) array n_images, k_spring : NEB parameters dt_start : initial time step dt_max : maximum allowed time step max_iter : iteration cap tol : force convergence tolerance (kJ mol⁻¹ per CV-unit) n_min : FIRE delay before acceleration f_inc, f_dec : time-step scale factors alpha_start, f_alpha : FIRE mixing parameters smooth : smoothing factor ∈ [0, 1] clip_to_bounds : clip images to grid domain reparam_every : reparametrize path every N steps """ p0 = np.asarray(initial_path, dtype=float) images = _resample_polyline(p0, int(n_images)) images[0] = p0[0] images[-1] = p0[-1] dx = float(np.median(np.diff(self.x))) dy = float(np.median(np.diff(self.y))) grid_step = float(np.hypot(dx, dy)) max_step_eff = 2.0 * grid_step # limit displacement to ~2 grid cells dt = float(dt_start) alpha = float(alpha_start) v = np.zeros_like(images) n_pos = 0 # steps since last negative P converged = False max_force = np.inf n_done = 0 for it in range(int(max_iter)): # --- Compute NEB forces on interior images --- forces = np.zeros_like(images) max_force = 0.0 for i in range(1, images.shape[0] - 1): t = images[i + 1] - images[i - 1] nt = float(np.linalg.norm(t)) if nt <= 1e-15: t = np.asarray([1.0, 0.0], dtype=float) else: t = t / nt grad = self._interp_grad(images[i, 0], images[i, 1]) true_force = -grad true_perp = true_force - np.dot(true_force, t) * t d_f = float(np.linalg.norm(images[i + 1] - images[i])) d_b = float(np.linalg.norm(images[i] - images[i - 1])) spring = float(k_spring) * (d_f - d_b) * t f_tot = true_perp + spring forces[i] = f_tot max_force = max(max_force, float(np.linalg.norm(f_tot))) n_done = it + 1 if max_force < float(tol): converged = True break # --- FIRE dynamics --- P = float(np.sum(v * forces)) # power if P > 0.0: v_norm = float(np.linalg.norm(v)) f_norm = float(np.linalg.norm(forces)) if f_norm > 1e-30: v = (1.0 - alpha) * v + alpha * (v_norm / f_norm) * forces n_pos += 1 if n_pos > n_min: dt = min(dt * f_inc, dt_max) alpha = alpha * f_alpha else: v[:] = 0.0 dt = dt * f_dec alpha = float(alpha_start) n_pos = 0 # velocity Verlet half-step: v += 0.5*dt*F, x += dt*v v += 0.5 * dt * forces disp = dt * v[1:-1] # Clamp per-image displacement to max_step dn = np.linalg.norm(disp, axis=1) mask = dn > max_step_eff if np.any(mask): disp[mask] *= (max_step_eff / (dn[mask, None] + 1e-15)) # Also limit velocity to prevent further overshoot v[1:-1][mask] *= (max_step_eff / (dn[mask, None] + 1e-15)) images[1:-1] += disp if smooth > 0.0: images[1:-1] = ( (1.0 - float(smooth)) * images[1:-1] + 0.5 * float(smooth) * (images[:-2] + images[2:]) ) if int(reparam_every) > 0 and ((it + 1) % int(reparam_every) == 0): images = _resample_polyline(images, int(n_images)) v = np.zeros_like(images) n_pos = 0 alpha = float(alpha_start) if clip_to_bounds: images[:, 0] = np.clip(images[:, 0], self.x[0], self.x[-1]) images[:, 1] = np.clip(images[:, 1], self.y[0], self.y[-1]) images[0] = p0[0] images[-1] = p0[-1] f_path = np.asarray( [self._interp_scalar(self.F_work, px, py) for px, py in images], dtype=float, ) s_path = _cumulative_arclength(images[:, 0], images[:, 1]) return MFEPPath( x=images[:, 0], y=images[:, 1], s=s_path, F=f_path, method="neb-fire", objective="refined", indices=None, metadata={ "n_images": int(n_images), "k_spring": float(k_spring), "dt_start": float(dt_start), "dt_max": float(dt_max), "max_iter": int(max_iter), "tol": float(tol), "converged": bool(converged), "n_iter": int(n_done), "final_max_force": float(max_force), }, )
[docs] def compute_mfep_profile_1d( x_grid: np.ndarray, y_grid: np.ndarray, fes_grid: np.ndarray, start: Sequence[float], end: Sequence[float], *, objective: str = "integral", connectivity: int = 8, f_threshold: Optional[float] = None, use_neb: bool = True, neb_images: int = 120, neb_k_spring: float = 1.0, neb_step_size: float = 0.01, neb_max_iter: int = 8000, neb_tol: float = 1.0, neb_smooth: float = 0.0, neb_max_step: Optional[float] = None, neb_adaptive_step: bool = True, neb_reparam_every: Optional[int] = None, neb_optimizer: str = "sd", ) -> MFEPPath: """ Convenience wrapper: 1) grid MFEP between start/end 2) optional NEB refinement 3) return MFEPPath containing s,F(s) """ grid = GridMFEP(x_grid, y_grid, fes_grid) coarse = grid.find_path( start, end, objective=objective, connectivity=connectivity, f_threshold=f_threshold, project_to_finite=True, ) if not use_neb: return coarse neb = NEBMFEP(x_grid, y_grid, fes_grid) reparam = neb_reparam_every if neb_reparam_every is not None else (200 if neb_optimizer == "fire" else 10) if neb_optimizer == "fire": refined = neb.refine_fire( coarse.as_xy(), n_images=neb_images, k_spring=neb_k_spring, dt_start=neb_step_size, max_iter=neb_max_iter, tol=neb_tol, smooth=neb_smooth, clip_to_bounds=True, reparam_every=reparam, ) else: refined = neb.refine( coarse.as_xy(), n_images=neb_images, k_spring=neb_k_spring, step_size=neb_step_size, max_iter=neb_max_iter, tol=neb_tol, smooth=neb_smooth, clip_to_bounds=True, max_step=neb_max_step, adaptive_step=neb_adaptive_step, reparam_every=reparam, ) refined.metadata["initializer_objective"] = coarse.objective refined.metadata["initializer_points"] = int(coarse.s.size) return refined
__all__ = [ "MFEPPath", "GridMFEP", "NEBMFEP", "compute_mfep_profile_1d", ]