Source code for stochkin.uncertainty

"""stochkin.uncertainty
=====================

Monte Carlo uncertainty propagation for CTMC kinetics on 1-D
free-energy surfaces.

**Strategy.**  Perturb the inputs *F(s)* and *D(s)* within their error
bars, re-run the CTMC pipeline for each perturbed sample, and collect
statistics (mean, standard deviation, percentile-based confidence
intervals) on every predicted quantity (rates, exit times, branching
probabilities).

Two perturbation models are provided:

* **Gaussian (additive)** – suitable for free energies *F(s)*.
* **Log-normal (multiplicative)** – suitable for diffusion coefficients
  *D(s)*, which must remain positive.

When the Hummer Bayesian estimator supplies posterior credible intervals
(``F_lo``/``F_hi``, ``D_lo``/``D_hi``), the module converts them to
standard deviations automatically.

Main entry points
-----------------
bootstrap_ctmc_1d
    Propagate *F* and *D* uncertainties through
    :func:`~stochkin.workflows.run_1d_ctmc`.
bootstrap_ctmc_with_hummer_D
    Convenience wrapper that reads Hummer posterior intervals from CSV
    and propagates them through
    :func:`~stochkin.workflows.run_1d_ctmc_with_hummer_D`.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional, Sequence, Tuple, Union

import numpy as np

from .workflows import (
    _kT,
    _time_unit_to_ps,
    run_1d_ctmc,
    run_1d_ctmc_with_hummer_D,
    interface_to_centers,
    interpolate_D_to_grid,
)


# ======================================================================
# Perturbation samplers
# ======================================================================

_Z95 = 1.959964  # z-score for 95 % two-sided CI


def _ci_to_sigma(lo: np.ndarray, hi: np.ndarray,
                 ci_level: float = 0.95) -> np.ndarray:
    """Convert a symmetric credible interval [lo, hi] to a Gaussian σ."""
    from scipy.stats import norm  # type: ignore
    z = norm.ppf(0.5 + ci_level / 2.0)
    return np.maximum((np.asarray(hi) - np.asarray(lo)) / (2.0 * z), 0.0)


def _ci_to_log_sigma(lo: np.ndarray, hi: np.ndarray,
                     ci_level: float = 0.95) -> np.ndarray:
    """Convert [D_lo, D_hi] to log-space σ for log-normal sampling.

    Assumes that (log D_lo, log D_hi) spans a ``ci_level`` credible
    interval on the log scale.
    """
    from scipy.stats import norm  # type: ignore
    z = norm.ppf(0.5 + ci_level / 2.0)
    lo_a = np.asarray(lo, dtype=float)
    hi_a = np.asarray(hi, dtype=float)
    # Protect against non-positive values
    lo_safe = np.where(lo_a > 0, lo_a, 1e-30)
    hi_safe = np.where(hi_a > 0, hi_a, 1e-30)
    return np.maximum((np.log(hi_safe) - np.log(lo_safe)) / (2.0 * z), 0.0)


def _smooth_noise(noise_1d: np.ndarray, s: np.ndarray,
                  corr_length: float) -> np.ndarray:
    """Apply Gaussian smoothing to *noise_1d* with a given correlation length.

    This turns i.i.d. perturbations into spatially correlated noise with
    approximate correlation length *corr_length* in CV units, while
    preserving the point-wise variance (approximately).
    """
    ds = float(np.median(np.diff(s)))
    if corr_length <= ds:
        return noise_1d
    sigma_pts = corr_length / ds
    try:
        from scipy.ndimage import gaussian_filter1d  # type: ignore
    except ImportError:
        # Fallback: simple running-average box filter
        k = max(1, int(round(sigma_pts)))
        kernel = np.ones(2 * k + 1) / (2 * k + 1)
        smoothed = np.convolve(noise_1d, kernel, mode="same")
        # Rescale to preserve variance
        scale = np.std(noise_1d) / max(np.std(smoothed), 1e-30)
        return smoothed * scale

    smoothed = gaussian_filter1d(noise_1d, sigma=sigma_pts, mode="reflect")
    # Rescale to preserve variance
    scale = np.std(noise_1d) / max(np.std(smoothed), 1e-30)
    return smoothed * scale


def _sample_F(
    rng: np.random.RandomState,
    F: np.ndarray,
    F_sigma: Optional[np.ndarray],
    s: np.ndarray,
    corr_length: Optional[float],
) -> np.ndarray:
    """Draw one perturbed F(s) sample (additive Gaussian).

    When *corr_length* is ``None`` a minimal smoothing of 3 grid
    spacings is applied so that i.i.d. noise does not introduce
    spurious local minima (which would change the basin topology).
    """
    if F_sigma is None:
        return F.copy()
    ds = float(np.median(np.diff(s)))
    # Default smoothing: 3 grid spacings (avoids spurious minima)
    cl = corr_length if corr_length is not None else 3.0 * ds
    noise = _smooth_noise(rng.standard_normal(F.size), s, cl) * F_sigma
    return F + noise


def _sample_D(
    rng: np.random.RandomState,
    D: np.ndarray,
    D_log_sigma: Optional[np.ndarray],
    D_sigma: Optional[np.ndarray],
    s: np.ndarray,
    corr_length: Optional[float],
) -> np.ndarray:
    """Draw one perturbed D(s) sample (log-normal or Gaussian, always > 0).

    As with *_sample_F*, a minimal smoothing of 3 grid spacings is
    applied when *corr_length* is ``None``.
    """
    ds = float(np.median(np.diff(s)))
    cl = corr_length if corr_length is not None else 3.0 * ds

    if D_log_sigma is not None:
        noise = _smooth_noise(rng.standard_normal(D.size), s, cl)
        noise /= max(np.std(noise), 1e-30)  # unit variance
        D_samp = D * np.exp(noise * D_log_sigma)
    elif D_sigma is not None:
        noise = _smooth_noise(rng.standard_normal(D.size), s, cl)
        noise /= max(np.std(noise), 1e-30)
        D_samp = D + noise * D_sigma
    else:
        return D.copy()
    # Ensure positivity
    floor = float(np.min(D[D > 0]) * 1e-6) if np.any(D > 0) else 1e-30
    return np.maximum(D_samp, floor)


# ======================================================================
# Result container
# ======================================================================

[docs] @dataclass class UncertaintyResult: """Container for bootstrap uncertainty estimates. All ``*_mean``, ``*_std``, ``*_ci_lo``, ``*_ci_hi`` arrays have the same shape as the corresponding quantity in the reference (unperturbed) result. The ``*_samples`` arrays have an extra leading axis of size ``n_bootstrap``. Attributes ---------- reference : dict Full result dictionary from the *unperturbed* run (see :func:`~stochkin.workflows.run_1d_ctmc`). n_bootstrap : int Number of successful bootstrap replicates. n_failed : int Number of failed replicates (basin detection changed, solver diverged, etc.). confidence_level : float Confidence level for the reported CI (default 0.95). K_mean, K_std, K_ci_lo, K_ci_hi : ndarray Statistics of the rate matrix *K* [1/time_unit]. K_ps_mean, K_ps_std, K_ps_ci_lo, K_ps_ci_hi : ndarray Statistics of *K* in ps⁻¹. exit_mean_mean, exit_mean_std, exit_mean_ci_lo, exit_mean_ci_hi : ndarray Statistics of the mean exit time per basin [time_unit]. k_out_mean, k_out_std, k_out_ci_lo, k_out_ci_hi : ndarray Statistics of the total exit rate per basin [1/time_unit]. p_branch_mean, p_branch_std, p_branch_ci_lo, p_branch_ci_hi : ndarray Statistics of the branching-probability matrix. K_samples : ndarray, shape (n_bootstrap, n_basins, n_basins) Raw bootstrap samples of *K*. exit_mean_samples : ndarray, shape (n_bootstrap, n_basins) Raw samples of exit times. k_out_samples : ndarray, shape (n_bootstrap, n_basins) Raw samples of exit rates. p_branch_samples : ndarray, shape (n_bootstrap, n_basins, n_basins) Raw samples of branching probabilities. """ reference: dict n_bootstrap: int n_failed: int confidence_level: float # Rate matrix K_mean: np.ndarray K_std: np.ndarray K_ci_lo: np.ndarray K_ci_hi: np.ndarray K_samples: np.ndarray K_ps_mean: np.ndarray K_ps_std: np.ndarray K_ps_ci_lo: np.ndarray K_ps_ci_hi: np.ndarray # Exit times exit_mean_mean: np.ndarray exit_mean_std: np.ndarray exit_mean_ci_lo: np.ndarray exit_mean_ci_hi: np.ndarray exit_mean_samples: np.ndarray # Exit rates k_out_mean: np.ndarray k_out_std: np.ndarray k_out_ci_lo: np.ndarray k_out_ci_hi: np.ndarray k_out_samples: np.ndarray # Branching probabilities p_branch_mean: np.ndarray p_branch_std: np.ndarray p_branch_ci_lo: np.ndarray p_branch_ci_hi: np.ndarray p_branch_samples: np.ndarray
[docs] def summary(self, time_unit: str = "") -> str: """Return a human-readable summary string.""" tu = f" [{time_unit}]" if time_unit else "" tu_inv = f" [{time_unit}⁻¹]" if time_unit else "" lines = [ f"Bootstrap CTMC uncertainty ({self.n_bootstrap} replicates, " f"{self.n_failed} failed, {self.confidence_level:.0%} CI)", "", ] nb = self.K_mean.shape[0] # Per-basin exit times and rates lines.append(f"{'Basin':>6} {'<τ_exit>' + tu:>22} " f"{'k_out' + tu_inv:>22}") lines.append(f"{'':>6} {'mean ± std':>22} {'mean ± std':>22}") for i in range(nb): tau_s = (f"{self.exit_mean_mean[i]:.4g} ± " f"{self.exit_mean_std[i]:.2g}") k_s = (f"{self.k_out_mean[i]:.4g} ± " f"{self.k_out_std[i]:.2g}") lines.append(f"{i:>6} {tau_s:>22} {k_s:>22}") lines.append("") lines.append(f"Rate matrix K{tu_inv} (mean ± std):") for i in range(nb): row = " ".join( f"{self.K_mean[i, j]:+.3e}±{self.K_std[i, j]:.1e}" for j in range(nb) ) lines.append(f" [{row}]") lines.append("") lines.append(f"Rate matrix K{tu_inv} ({self.confidence_level:.0%} CI):") for i in range(nb): row = " ".join( f"[{self.K_ci_lo[i, j]:+.3e}, {self.K_ci_hi[i, j]:+.3e}]" for j in range(nb) ) lines.append(f" [{row}]") return "\n".join(lines)
# ====================================================================== # Aggregation helper # ====================================================================== def _aggregate( samples: np.ndarray, confidence: float, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Compute mean, std, CI_lo, CI_hi from a stack of samples (axis 0).""" alpha = (1.0 - confidence) / 2.0 with np.errstate(all="ignore"): mean = np.nanmean(samples, axis=0) std = np.nanstd(samples, axis=0, ddof=1) ci_lo = np.nanpercentile(samples, 100 * alpha, axis=0) ci_hi = np.nanpercentile(samples, 100 * (1 - alpha), axis=0) return mean, std, ci_lo, ci_hi # ====================================================================== # Main bootstrap # ======================================================================
[docs] def bootstrap_ctmc_1d( s: Union[np.ndarray, Sequence], F: Union[np.ndarray, Sequence], D: Union[float, np.ndarray], *, # --- Perturbation specification for F --- F_err: Union[None, float, np.ndarray] = None, F_lo: Optional[np.ndarray] = None, F_hi: Optional[np.ndarray] = None, # --- Perturbation specification for D --- D_err: Union[None, float, np.ndarray] = None, D_rel_err: Union[None, float, np.ndarray] = None, D_lo: Optional[np.ndarray] = None, D_hi: Optional[np.ndarray] = None, # --- Bootstrap / correlation parameters --- n_bootstrap: int = 200, ci_level: float = 0.95, confidence: Optional[float] = None, corr_length: Optional[float] = None, seed: Optional[int] = None, # --- CTMC parameters (forwarded to run_1d_ctmc) --- T: float = 300.0, time_unit: str = "ps", max_basins: Optional[int] = None, core_fraction: Optional[float] = 0.05, init_weight: str = "boltzmann", verbose: bool = False, ) -> UncertaintyResult: """Propagate F(s) / D(s) uncertainties through the 1-D CTMC pipeline. For each bootstrap replicate: 1. Draw a perturbed F(s) from *N(F, σ_F)* (Gaussian, additive). 2. Draw a perturbed D(s) from *LogNormal(D, σ_log D)* or *N(D, σ_D)* (always clamped to D > 0). 3. Run :func:`~stochkin.workflows.run_1d_ctmc` on the perturbed inputs. If the number of detected basins changes for a given replicate (different topology), that replicate is discarded. Parameters ---------- s, F, D : array-like Central (best-estimate) grid, free energy, and diffusion coefficient. Same semantics as :func:`run_1d_ctmc`. F_err : float or array, optional Standard deviation of F(s). Scalar → uniform error. F_lo, F_hi : array, optional Lower / upper bounds of a credible interval on F. Used to compute σ_F when *F_err* is not given. D_err : float or array, optional *Absolute* standard deviation of D(s) (Gaussian perturbation). D_rel_err : float or array, optional *Relative* error of D(s) (e.g. 0.3 = 30 %). Converted to a log-normal σ. D_lo, D_hi : array, optional Lower / upper bounds of a credible interval on D(s). Converted to a log-normal σ when neither *D_err* nor *D_rel_err* is given. n_bootstrap : int Number of bootstrap replicates (default 200). ci_level : float Credible-interval level used to interpret *F_lo/F_hi* and *D_lo/D_hi* (default 0.95 → 95 % CI). confidence : float, optional Confidence level for the *output* intervals (defaults to *ci_level*). corr_length : float, optional Spatial correlation length (in CV units) for the perturbation noise. When set, point-wise i.i.d. noise is smoothed by a Gaussian kernel of this width, producing correlated perturbations. ``None`` → independent noise at each grid point. seed : int, optional Random seed for reproducibility. T, time_unit, max_basins, core_fraction, init_weight : Forwarded to :func:`run_1d_ctmc`. verbose : bool If ``True``, print a progress counter. Returns ------- UncertaintyResult Dataclass with ``*_mean``, ``*_std``, ``*_ci_lo``, ``*_ci_hi`` for every CTMC output, plus the full ``*_samples`` arrays and the *reference* (unperturbed) result. Examples -------- >>> import numpy as np, stochkin as sk >>> s = np.linspace(0, 1, 200) >>> F = 5.0 * (1 - (2*s - 1)**2)**2; F -= F.min() >>> res = sk.bootstrap_ctmc_1d(s, F, D=0.01, F_err=0.5, ... n_bootstrap=50, seed=42) >>> print(res.summary("ps")) """ s = np.asarray(s, dtype=float).ravel() F = np.asarray(F, dtype=float).ravel() n_grid = s.size if np.isscalar(D): D_arr = np.full(n_grid, float(D)) else: D_arr = np.asarray(D, dtype=float).ravel() if confidence is None: confidence = ci_level # ---- resolve F perturbation ---- F_sigma: Optional[np.ndarray] = None if F_err is not None: F_sigma = np.broadcast_to( np.asarray(F_err, dtype=float), n_grid ).copy() elif F_lo is not None and F_hi is not None: F_sigma = _ci_to_sigma( np.asarray(F_lo, dtype=float), np.asarray(F_hi, dtype=float), ci_level=ci_level, ) # ---- resolve D perturbation ---- D_log_sigma: Optional[np.ndarray] = None D_sigma: Optional[np.ndarray] = None if D_lo is not None and D_hi is not None: D_log_sigma = _ci_to_log_sigma( np.asarray(D_lo, dtype=float), np.asarray(D_hi, dtype=float), ci_level=ci_level, ) elif D_rel_err is not None: # Relative error → log-space σ (for small rel_err, σ_log ≈ rel_err) D_log_sigma = np.broadcast_to( np.asarray(D_rel_err, dtype=float), n_grid ).copy() elif D_err is not None: D_sigma = np.broadcast_to( np.asarray(D_err, dtype=float), n_grid ).copy() # ---- reference (unperturbed) run ---- ref = run_1d_ctmc( s=s, F=F, D=D_arr, T=T, time_unit=time_unit, max_basins=max_basins, core_fraction=core_fraction, init_weight=init_weight, verbose=False, ) nb_ref = len(ref["basin_ids"]) # Force every bootstrap replicate to detect the same number of # basins as the reference run. Without this constraint, # perturbation noise can create or destroy shallow local minima, # leading to a different basin topology and an unusable replicate. bootstrap_max_basins = nb_ref # ---- bootstrap loop ---- rng = np.random.RandomState(seed) K_list = [] K_ps_list = [] exit_list = [] k_out_list = [] p_branch_list = [] n_failed = 0 for b in range(n_bootstrap): if verbose and (b + 1) % max(1, n_bootstrap // 10) == 0: print(f" bootstrap {b + 1}/{n_bootstrap} …") F_b = _sample_F(rng, F, F_sigma, s, corr_length) D_b = _sample_D(rng, D_arr, D_log_sigma, D_sigma, s, corr_length) try: res_b = run_1d_ctmc( s=s, F=F_b, D=D_b, T=T, time_unit=time_unit, max_basins=bootstrap_max_basins, core_fraction=core_fraction, init_weight=init_weight, verbose=False, ) except Exception: n_failed += 1 continue # Discard if basin topology still changed (shouldn't happen with # max_basins pinned, but guard against edge cases). if len(res_b["basin_ids"]) != nb_ref: n_failed += 1 continue K_list.append(res_b["K"]) K_ps_list.append(res_b["K_ps"]) exit_list.append(res_b["exit_mean"]) k_out_list.append(res_b["k_out"]) p_branch_list.append(res_b["p_branch"]) if len(K_list) == 0: raise RuntimeError( "All bootstrap replicates failed. Likely the FES perturbation " "is too large relative to the barrier height, causing basin " "topology changes in every sample." ) K_all = np.array(K_list) K_ps_all = np.array(K_ps_list) exit_all = np.array(exit_list) kout_all = np.array(k_out_list) pb_all = np.array(p_branch_list) n_ok = K_all.shape[0] K_m, K_s, K_lo, K_hi = _aggregate(K_all, confidence) Kp_m, Kp_s, Kp_lo, Kp_hi = _aggregate(K_ps_all, confidence) ex_m, ex_s, ex_lo, ex_hi = _aggregate(exit_all, confidence) ko_m, ko_s, ko_lo, ko_hi = _aggregate(kout_all, confidence) pb_m, pb_s, pb_lo, pb_hi = _aggregate(pb_all, confidence) return UncertaintyResult( reference=ref, n_bootstrap=n_ok, n_failed=n_failed, confidence_level=confidence, # K K_mean=K_m, K_std=K_s, K_ci_lo=K_lo, K_ci_hi=K_hi, K_samples=K_all, # K_ps K_ps_mean=Kp_m, K_ps_std=Kp_s, K_ps_ci_lo=Kp_lo, K_ps_ci_hi=Kp_hi, # exit exit_mean_mean=ex_m, exit_mean_std=ex_s, exit_mean_ci_lo=ex_lo, exit_mean_ci_hi=ex_hi, exit_mean_samples=exit_all, # k_out k_out_mean=ko_m, k_out_std=ko_s, k_out_ci_lo=ko_lo, k_out_ci_hi=ko_hi, k_out_samples=kout_all, # p_branch p_branch_mean=pb_m, p_branch_std=pb_s, p_branch_ci_lo=pb_lo, p_branch_ci_hi=pb_hi, p_branch_samples=pb_all, )
# ====================================================================== # Hummer-D convenience wrapper # ======================================================================
[docs] def bootstrap_ctmc_with_hummer_D( fes_path: Union[str, Path], d_csv: Union[str, Path], *, # --- Hummer FES uncertainty --- fes_err_path: Optional[Union[str, Path]] = None, F_err: Union[None, float, np.ndarray] = None, F_lo_col: str = "F_lo", F_hi_col: str = "F_hi", # --- Hummer D uncertainty --- D_lo_col: str = "D_lo", D_hi_col: str = "D_hi", perturb_D: bool = True, perturb_F: bool = True, # --- Bootstrap parameters --- n_bootstrap: int = 200, ci_level: float = 0.95, confidence: Optional[float] = None, corr_length: Optional[float] = None, seed: Optional[int] = None, # --- Parameters forwarded to run_1d_ctmc_with_hummer_D --- T: float = 300.0, time_unit: str = "ps", d_xcol: str = "x_interface", d_col: str = "D_med", d_grid: str = "interface", d_interface_mode: str = "harmonic", d_time_unit: str = "ps", d_interp_method: str = "linear", crop: Optional[Tuple[float, float]] = None, resample_n: int = 500, s_col: int = 0, F_col: int = 1, max_basins: Optional[int] = None, core_fraction: Optional[float] = 0.05, init_weight: str = "boltzmann", verbose: bool = False, ) -> UncertaintyResult: """Propagate Hummer-posterior uncertainties through the 1-D CTMC pipeline. This is a convenience wrapper around :func:`bootstrap_ctmc_1d` that: 1. Loads the PLUMED 1-D FES and Hummer D-profile CSV. 2. Reads ``D_lo`` / ``D_hi`` columns (and optionally ``F_lo`` / ``F_hi``) from the CSV to construct per-point error bars. 3. Converts both inputs to the common uniform grid. 4. Calls :func:`bootstrap_ctmc_1d` with the resolved σ arrays. Parameters ---------- fes_path : str or Path PLUMED 1-D FES file. d_csv : str or Path Hummer diffusion-profile CSV (must contain *d_xcol*, *d_col*, and – if *perturb_D* – *D_lo_col* and *D_hi_col*). fes_err_path : str or Path, optional Separate CSV with the FES credible interval. If given, must contain columns ``x_center``, ``F_lo``, ``F_hi`` (or names set by *F_lo_col* / *F_hi_col*). When the FES uncertainty lives in the same CSV as D, set this to the same file. F_err : float or array, optional Uniform FES standard deviation. Overrides *fes_err_path*. perturb_D, perturb_F : bool Toggle individual perturbation channels (default both ``True``). n_bootstrap, ci_level, confidence, corr_length, seed : Bootstrap and perturbation parameters — see :func:`bootstrap_ctmc_1d`. T, time_unit, d_xcol, d_col, d_grid, d_interface_mode, d_time_unit, d_interp_method, crop, resample_n, s_col, F_col, max_basins, core_fraction, init_weight : Forwarded to the underlying CTMC pipeline (same semantics as :func:`~stochkin.workflows.run_1d_ctmc_with_hummer_D`). verbose : bool Print progress. Returns ------- UncertaintyResult """ import pandas as pd # type: ignore from .fes import load_plumed_fes_1d if confidence is None: confidence = ci_level # ---- Load FES ---- s_raw, F_raw = load_plumed_fes_1d( fes_path, x_col=s_col, fes_col=F_col, verbose=False, ) if crop is not None: lo, hi = float(crop[0]), float(crop[1]) mask = (s_raw >= lo) & (s_raw <= hi) s_raw, F_raw = s_raw[mask], F_raw[mask] s_grid = np.linspace(float(s_raw[0]), float(s_raw[-1]), int(resample_n)) F_grid = np.interp(s_grid, s_raw, F_raw) # ---- Load D profile ---- df = pd.read_csv(d_csv) x_D_raw = df[d_xcol].values.astype(float) D_raw = df[d_col].values.astype(float) if d_grid == "interface": x_D_src, D_src, _ = interface_to_centers( x_D_raw, D_raw, method=d_interface_mode ) else: x_D_src, D_src = x_D_raw, D_raw # Unit conversion ps_per_d = _time_unit_to_ps(d_time_unit) ps_per_out = _time_unit_to_ps(time_unit) D_src = D_src * (ps_per_out / ps_per_d) D_grid = interpolate_D_to_grid( s_grid, x_D_src, D_src, method=d_interp_method ) # ---- Resolve D uncertainty ---- D_lo_grid = None D_hi_grid = None if perturb_D and D_lo_col in df.columns and D_hi_col in df.columns: D_lo_raw = df[D_lo_col].values.astype(float) D_hi_raw = df[D_hi_col].values.astype(float) if d_grid == "interface": _, D_lo_src, _ = interface_to_centers( x_D_raw, D_lo_raw, method=d_interface_mode ) _, D_hi_src, _ = interface_to_centers( x_D_raw, D_hi_raw, method=d_interface_mode ) else: D_lo_src = D_lo_raw D_hi_src = D_hi_raw D_lo_src *= (ps_per_out / ps_per_d) D_hi_src *= (ps_per_out / ps_per_d) D_lo_grid = interpolate_D_to_grid( s_grid, x_D_src, D_lo_src, method=d_interp_method ) D_hi_grid = interpolate_D_to_grid( s_grid, x_D_src, D_hi_src, method=d_interp_method ) # ---- Resolve F uncertainty ---- F_sigma_grid = None if perturb_F: if F_err is not None: F_sigma_grid = np.broadcast_to( np.asarray(F_err, dtype=float), s_grid.size ).copy() elif fes_err_path is not None: df_f = pd.read_csv(fes_err_path) x_f = df_f.iloc[:, 0].values.astype(float) # first col = CV fl = df_f[F_lo_col].values.astype(float) fh = df_f[F_hi_col].values.astype(float) sigma_f_raw = _ci_to_sigma(fl, fh, ci_level=ci_level) F_sigma_grid = np.interp(s_grid, x_f, sigma_f_raw) # ---- Call the core bootstrap ---- return bootstrap_ctmc_1d( s=s_grid, F=F_grid, D=D_grid, F_err=F_sigma_grid, D_lo=D_lo_grid, D_hi=D_hi_grid, n_bootstrap=n_bootstrap, ci_level=ci_level, confidence=confidence, corr_length=corr_length, seed=seed, T=T, time_unit=time_unit, max_basins=max_basins, core_fraction=core_fraction, init_weight=init_weight, verbose=verbose, )
# ====================================================================== # Public API # ====================================================================== __all__ = [ "UncertaintyResult", "bootstrap_ctmc_1d", "bootstrap_ctmc_with_hummer_D", ]