Skip to content

ivpsolvers

Probabilistic IVP solvers.

adaptive(slvr, /, *, ssm, atol=0.0001, rtol=0.01, control=None, norm_ord=None) ¤

Make an IVP solver adaptive.

Source code in probdiffeq/ivpsolvers.py
930
931
932
933
934
935
936
937
def adaptive(slvr, /, *, ssm, atol=1e-4, rtol=1e-2, control=None, norm_ord=None):
    """Make an IVP solver adaptive."""
    if control is None:
        control = control_proportional_integral()

    return _AdaSolver(
        slvr, ssm=ssm, atol=atol, rtol=rtol, control=control, norm_ord=norm_ord
    )

control_integral(*, clip=False, safety=0.95, factor_min=0.2, factor_max=10.0) -> _Controller ¤

Construct an integral-controller.

Source code in probdiffeq/ivpsolvers.py
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
def control_integral(
    *, clip=False, safety=0.95, factor_min=0.2, factor_max=10.0
) -> _Controller:
    """Construct an integral-controller."""

    def init(dt, /):
        return dt

    def apply(dt, /, *, error_power):
        # error_power = error_norm ** (-1.0 / error_contraction_rate)
        scale_factor_unclipped = safety * error_power

        scale_factor_clipped_min = np.minimum(scale_factor_unclipped, factor_max)
        scale_factor = np.maximum(factor_min, scale_factor_clipped_min)
        return scale_factor * dt

    def extract(dt, /):
        return dt

    if clip:

        def clip_fun(dt, /, t, t1):
            return np.minimum(dt, t1 - t)

        return _Controller(init=init, apply=apply, extract=extract, clip=clip_fun)

    return _Controller(init=init, apply=apply, extract=extract, clip=lambda v, **_kw: v)

control_proportional_integral(*, clip: bool = False, safety=0.95, factor_min=0.2, factor_max=10.0, power_integral_unscaled=0.3, power_proportional_unscaled=0.4) -> _Controller ¤

Construct a proportional-integral-controller with time-clipping.

Source code in probdiffeq/ivpsolvers.py
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
def control_proportional_integral(
    *,
    clip: bool = False,
    safety=0.95,
    factor_min=0.2,
    factor_max=10.0,
    power_integral_unscaled=0.3,
    power_proportional_unscaled=0.4,
) -> _Controller:
    """Construct a proportional-integral-controller with time-clipping."""

    class PIState(containers.NamedTuple):
        dt: float
        error_power_previously_accepted: float

    def init(dt: float, /) -> PIState:
        return PIState(dt, 1.0)

    def apply(state: PIState, /, *, error_power) -> PIState:
        # error_power = error_norm ** (-1.0 / error_contraction_rate)
        dt_proposed, error_power_prev = state

        a1 = error_power**power_integral_unscaled
        a2 = (error_power / error_power_prev) ** power_proportional_unscaled
        scale_factor_unclipped = safety * a1 * a2

        scale_factor_clipped_min = np.minimum(scale_factor_unclipped, factor_max)
        scale_factor = np.maximum(factor_min, scale_factor_clipped_min)

        # >= 1.0 because error_power is 1/scaled_error_norm
        error_power_prev = np.where(error_power >= 1.0, error_power, error_power_prev)

        dt_proposed = scale_factor * dt_proposed
        return PIState(dt_proposed, error_power_prev)

    def extract(state: PIState, /) -> float:
        dt_proposed, _error_norm_previously_accepted = state
        return dt_proposed

    if clip:

        def clip_fun(state: PIState, /, t, t1) -> PIState:
            dt_proposed, error_norm_previously_accepted = state
            dt = dt_proposed
            dt_clipped = np.minimum(dt, t1 - t)
            return PIState(dt_clipped, error_norm_previously_accepted)

        return _Controller(init=init, apply=apply, extract=extract, clip=clip_fun)

    return _Controller(init=init, apply=apply, extract=extract, clip=lambda v, **_kw: v)

correction_slr0(*, ssm, cubature_fun=cubature_third_order_spherical) -> _Correction ¤

Zeroth-order statistical linear regression.

Source code in probdiffeq/ivpsolvers.py
563
564
565
566
def correction_slr0(*, ssm, cubature_fun=cubature_third_order_spherical) -> _Correction:
    """Zeroth-order statistical linear regression."""
    linearize = ssm.linearise.ode_statistical_1st(cubature_fun)
    return _CorrectionSLR(ssm=ssm, ode_order=1, linearize=linearize, name="SLR0")

correction_slr1(*, ssm, cubature_fun=cubature_third_order_spherical) -> _Correction ¤

First-order statistical linear regression.

Source code in probdiffeq/ivpsolvers.py
569
570
571
572
def correction_slr1(*, ssm, cubature_fun=cubature_third_order_spherical) -> _Correction:
    """First-order statistical linear regression."""
    linearize = ssm.linearise.ode_statistical_0th(cubature_fun)
    return _CorrectionSLR(ssm=ssm, ode_order=1, linearize=linearize, name="SLR1")

correction_ts0(*, ssm, ode_order=1) -> _Correction ¤

Zeroth-order Taylor linearisation.

Source code in probdiffeq/ivpsolvers.py
551
552
553
554
def correction_ts0(*, ssm, ode_order=1) -> _Correction:
    """Zeroth-order Taylor linearisation."""
    linearize = ssm.linearise.ode_taylor_0th(ode_order=ode_order)
    return _CorrectionTS(name="TS0", ode_order=ode_order, ssm=ssm, linearize=linearize)

correction_ts1(*, ssm, ode_order=1) -> _Correction ¤

First-order Taylor linearisation.

Source code in probdiffeq/ivpsolvers.py
557
558
559
560
def correction_ts1(*, ssm, ode_order=1) -> _Correction:
    """First-order Taylor linearisation."""
    linearize = ssm.linearise.ode_taylor_1st(ode_order=ode_order)
    return _CorrectionTS(name="TS1", ode_order=ode_order, ssm=ssm, linearize=linearize)

cubature_gauss_hermite(input_shape, degree=5) -> _PositiveCubatureRule ¤

(Statistician's) Gauss-Hermite cubature.

The number of cubature points is prod(input_shape)**degree.

Source code in probdiffeq/ivpsolvers.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def cubature_gauss_hermite(input_shape, degree=5) -> _PositiveCubatureRule:
    """(Statistician's) Gauss-Hermite cubature.

    The number of cubature points is `prod(input_shape)**degree`.
    """
    assert len(input_shape) == 1
    (dim,) = input_shape

    # Roots of the probabilist/statistician's Hermite polynomials (in Numpy...)
    _roots = special.roots_hermitenorm(n=degree, mu=True)
    pts, weights, sum_of_weights = _roots
    weights = weights / sum_of_weights

    # Transform into jax arrays and take square root of weights
    pts = np.asarray(pts)
    weights_sqrtm = np.sqrt(np.asarray(weights))

    # Build a tensor grid and return class
    tensor_pts = _tensor_points(pts, d=dim)
    tensor_weights_sqrtm = _tensor_weights(weights_sqrtm, d=dim)
    return _PositiveCubatureRule(points=tensor_pts, weights_sqrtm=tensor_weights_sqrtm)

cubature_third_order_spherical(input_shape) -> _PositiveCubatureRule ¤

Third-order spherical cubature integration.

Source code in probdiffeq/ivpsolvers.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def cubature_third_order_spherical(input_shape) -> _PositiveCubatureRule:
    """Third-order spherical cubature integration."""
    assert len(input_shape) <= 1
    if len(input_shape) == 1:
        (d,) = input_shape
        points_mat, weights_sqrtm = _third_order_spherical_params(d=d)
        return _PositiveCubatureRule(points=points_mat, weights_sqrtm=weights_sqrtm)

    # If input_shape == (), compute weights via input_shape=(1,)
    # and 'squeeze' the points.
    points_mat, weights_sqrtm = _third_order_spherical_params(d=1)
    (S, _) = points_mat.shape
    points = np.reshape(points_mat, (S,))
    return _PositiveCubatureRule(points=points, weights_sqrtm=weights_sqrtm)

cubature_unscented_transform(input_shape, r=1.0) -> _PositiveCubatureRule ¤

Unscented transform.

Source code in probdiffeq/ivpsolvers.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def cubature_unscented_transform(input_shape, r=1.0) -> _PositiveCubatureRule:
    """Unscented transform."""
    assert len(input_shape) <= 1
    if len(input_shape) == 1:
        (d,) = input_shape
        points_mat, weights_sqrtm = _unscented_transform_params(d=d, r=r)
        return _PositiveCubatureRule(points=points_mat, weights_sqrtm=weights_sqrtm)

    # If input_shape == (), compute weights via input_shape=(1,)
    # and 'squeeze' the points.
    points_mat, weights_sqrtm = _unscented_transform_params(d=1, r=r)
    (S, _) = points_mat.shape
    points = np.reshape(points_mat, (S,))
    return _PositiveCubatureRule(points=points, weights_sqrtm=weights_sqrtm)

prior_ibm(tcoeffs, *, ssm_fact: str, output_scale=None) ¤

Construct an adaptive(/continuous-time), multiply-integrated Wiener process.

Source code in probdiffeq/ivpsolvers.py
29
30
31
32
33
34
35
36
37
38
def prior_ibm(tcoeffs, *, ssm_fact: str, output_scale=None):
    """Construct an adaptive(/continuous-time), multiply-integrated Wiener process."""
    ssm = impl.choose(ssm_fact, tcoeffs_like=tcoeffs)

    output_scale_user = output_scale or np.ones_like(ssm.prototypes.output_scale())
    discretize = ssm.conditional.ibm_transitions(output_scale=output_scale_user)

    output_scale_calib = np.ones_like(ssm.prototypes.output_scale())
    prior = _MarkovProcess(tcoeffs, output_scale_calib, discretize=discretize)
    return prior, ssm

prior_ibm_discrete(ts, *, tcoeffs_like, ssm_fact: str, output_scale=None) ¤

Compute a time-discretized, multiply-integrated Wiener process.

Source code in probdiffeq/ivpsolvers.py
41
42
43
44
45
46
47
48
49
50
51
def prior_ibm_discrete(ts, *, tcoeffs_like, ssm_fact: str, output_scale=None):
    """Compute a time-discretized, multiply-integrated Wiener process."""
    prior, ssm = prior_ibm(tcoeffs_like, output_scale=output_scale, ssm_fact=ssm_fact)
    transitions, (p, p_inv) = functools.vmap(prior.discretize)(np.diff(ts))

    preconditioner_apply_vmap = functools.vmap(ssm.conditional.preconditioner_apply)
    conditionals = preconditioner_apply_vmap(transitions, p, p_inv)

    output_scale = np.ones_like(ssm.prototypes.output_scale())
    init = ssm.normal.standard(len(tcoeffs_like), output_scale=output_scale)
    return stats.MarkovSeq(init, conditionals), ssm

solver(extrapolation, /, *, correction, prior, ssm) ¤

Create a solver that does not calibrate the output scale automatically.

Source code in probdiffeq/ivpsolvers.py
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
def solver(extrapolation, /, *, correction, prior, ssm):
    """Create a solver that does not calibrate the output scale automatically."""

    def step(state: _State, *, vector_field, dt, calibration):
        del calibration  # unused

        prior_discretized = prior.discretize(dt)
        hidden, extra = extrapolation.begin(
            state.hidden, state.aux_extra, prior_discretized=prior_discretized
        )
        t = state.t + dt
        error, _, corr = correction.estimate_error(
            hidden, vector_field=vector_field, t=t
        )

        hidden, extra = extrapolation.complete(
            hidden, extra, output_scale=state.output_scale
        )
        hidden, corr = correction.complete(hidden, corr)

        # Extract and return solution
        state = _State(
            t=t, hidden=hidden, aux_extra=extra, output_scale=state.output_scale
        )
        return dt * error, state

    return _ProbabilisticSolver(
        ssm=ssm,
        prior=prior,
        extrapolation=extrapolation,
        correction=correction,
        calibration=_calibration_none(),
        step_implementation=step,
        name="Probabilistic solver",
        requires_rescaling=False,
    )

solver_dynamic(extrapolation, *, correction, prior, ssm) ¤

Create a solver that calibrates the output scale dynamically.

Source code in probdiffeq/ivpsolvers.py
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
def solver_dynamic(extrapolation, *, correction, prior, ssm):
    """Create a solver that calibrates the output scale dynamically."""

    def step_dynamic(state, /, *, dt, vector_field, calibration):
        prior_discretized = prior.discretize(dt)
        hidden, extra = extrapolation.begin(
            state.hidden, state.aux_extra, prior_discretized=prior_discretized
        )
        t = state.t + dt
        error, observed, corr = correction.estimate_error(
            hidden, vector_field=vector_field, t=t
        )

        output_scale = calibration.update(state.output_scale, observed=observed)

        prior_, _calibrated = calibration.extract(output_scale)
        hidden, extra = extrapolation.complete(hidden, extra, output_scale=prior_)
        hidden, corr = correction.complete(hidden, corr)

        # Return solution
        state = _State(t=t, hidden=hidden, aux_extra=extra, output_scale=output_scale)
        return dt * error, state

    return _ProbabilisticSolver(
        prior=prior,
        ssm=ssm,
        extrapolation=extrapolation,
        correction=correction,
        calibration=_calibration_most_recent(ssm=ssm),
        name="Dynamic probabilistic solver",
        step_implementation=step_dynamic,
        requires_rescaling=False,
    )

solver_mle(extrapolation, /, *, correction, prior, ssm) ¤

Create a solver that calibrates the output scale via maximum-likelihood.

Warning: needs to be combined with a call to stats.calibrate() after solving if the MLE-calibration shall be used.

Source code in probdiffeq/ivpsolvers.py
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
def solver_mle(extrapolation, /, *, correction, prior, ssm):
    """Create a solver that calibrates the output scale via maximum-likelihood.

    Warning: needs to be combined with a call to stats.calibrate()
    after solving if the MLE-calibration shall be *used*.
    """

    def step_mle(state, /, *, dt, vector_field, calibration):
        output_scale_prior, _calibrated = calibration.extract(state.output_scale)

        prior_discretized = prior.discretize(dt)
        hidden, extra = extrapolation.begin(
            state.hidden, state.aux_extra, prior_discretized=prior_discretized
        )
        t = state.t + dt
        error, _, corr = correction.estimate_error(
            hidden, vector_field=vector_field, t=t
        )

        hidden, extra = extrapolation.complete(
            hidden, extra, output_scale=output_scale_prior
        )
        hidden, observed = correction.complete(hidden, corr)

        output_scale = calibration.update(state.output_scale, observed=observed)
        state = _State(t=t, hidden=hidden, aux_extra=extra, output_scale=output_scale)
        return dt * error, state

    return _ProbabilisticSolver(
        ssm=ssm,
        name="Probabilistic solver with MLE calibration",
        prior=prior,
        calibration=_calibration_running_mean(ssm=ssm),
        step_implementation=step_mle,
        extrapolation=extrapolation,
        correction=correction,
        requires_rescaling=True,
    )

strategy_filter(*, ssm) ¤

Construct a filter.

Source code in probdiffeq/ivpsolvers.py
607
608
609
610
611
612
613
614
615
def strategy_filter(*, ssm):
    """Construct a filter."""
    return _ExtraImplFilter(
        name="Filter",
        ssm=ssm,
        is_suitable_for_save_at=True,
        is_suitable_for_save_every_step=True,
        is_suitable_for_offgrid_marginals=True,
    )

strategy_fixedpoint(*, ssm) ¤

Construct a fixedpoint-smoother.

Source code in probdiffeq/ivpsolvers.py
596
597
598
599
600
601
602
603
604
def strategy_fixedpoint(*, ssm):
    """Construct a fixedpoint-smoother."""
    return _ExtraImplFixedPoint(
        name="Fixed-point smoother",
        ssm=ssm,
        is_suitable_for_save_at=True,
        is_suitable_for_save_every_step=False,
        is_suitable_for_offgrid_marginals=False,
    )

strategy_smoother(*, ssm) ¤

Construct a smoother.

Source code in probdiffeq/ivpsolvers.py
585
586
587
588
589
590
591
592
593
def strategy_smoother(*, ssm):
    """Construct a smoother."""
    return _ExtraImplSmoother(
        name="Smoother",
        ssm=ssm,
        is_suitable_for_save_at=False,
        is_suitable_for_save_every_step=True,
        is_suitable_for_offgrid_marginals=True,
    )