Skip to content

ivpsolvers

Probabilistic IVP solvers.

correction_slr0(cubature_fun=cubature_third_order_spherical) -> _Correction ¤

Zeroth-order statistical linear regression.

Source code in probdiffeq/ivpsolvers.py
508
509
510
511
512
513
514
515
def correction_slr0(cubature_fun=cubature_third_order_spherical) -> _Correction:
    """Zeroth-order statistical linear regression."""
    linearise_fun = impl.linearise.ode_statistical_1st(cubature_fun)
    return _correction_constraint_ode_statistical(
        ode_order=1,
        linearise_fun=linearise_fun,
        string_repr=f"<SLR1 with ode_order={1}>",
    )

correction_slr1(cubature_fun=cubature_third_order_spherical) -> _Correction ¤

First-order statistical linear regression.

Source code in probdiffeq/ivpsolvers.py
518
519
520
521
522
523
524
525
def correction_slr1(cubature_fun=cubature_third_order_spherical) -> _Correction:
    """First-order statistical linear regression."""
    linearise_fun = impl.linearise.ode_statistical_0th(cubature_fun)
    return _correction_constraint_ode_statistical(
        ode_order=1,
        linearise_fun=linearise_fun,
        string_repr=f"<SLR0 with ode_order={1}>",
    )

correction_ts0(*, ode_order=1) -> _Correction ¤

Zeroth-order Taylor linearisation.

Source code in probdiffeq/ivpsolvers.py
455
456
457
458
459
460
461
def correction_ts0(*, ode_order=1) -> _Correction:
    """Zeroth-order Taylor linearisation."""
    return _correction_constraint_ode_taylor(
        ode_order=ode_order,
        linearise_fun=impl.linearise.ode_taylor_0th(ode_order=ode_order),
        string_repr=f"<TS0 with ode_order={ode_order}>",
    )

correction_ts1(*, ode_order=1) -> _Correction ¤

First-order Taylor linearisation.

Source code in probdiffeq/ivpsolvers.py
464
465
466
467
468
469
470
def correction_ts1(*, ode_order=1) -> _Correction:
    """First-order Taylor linearisation."""
    return _correction_constraint_ode_taylor(
        ode_order=ode_order,
        linearise_fun=impl.linearise.ode_taylor_1st(ode_order=ode_order),
        string_repr=f"<TS1 with ode_order={ode_order}>",
    )

cubature_gauss_hermite(input_shape, degree=5) -> _PositiveCubatureRule ¤

(Statistician's) Gauss-Hermite cubature.

The number of cubature points is prod(input_shape)**degree.

Source code in probdiffeq/ivpsolvers.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def cubature_gauss_hermite(input_shape, degree=5) -> _PositiveCubatureRule:
    """(Statistician's) Gauss-Hermite cubature.

    The number of cubature points is `prod(input_shape)**degree`.
    """
    assert len(input_shape) == 1
    (dim,) = input_shape

    # Roots of the probabilist/statistician's Hermite polynomials (in Numpy...)
    _roots = special.roots_hermitenorm(n=degree, mu=True)
    pts, weights, sum_of_weights = _roots
    weights = weights / sum_of_weights

    # Transform into jax arrays and take square root of weights
    pts = np.asarray(pts)
    weights_sqrtm = np.sqrt(np.asarray(weights))

    # Build a tensor grid and return class
    tensor_pts = _tensor_points(pts, d=dim)
    tensor_weights_sqrtm = _tensor_weights(weights_sqrtm, d=dim)
    return _PositiveCubatureRule(points=tensor_pts, weights_sqrtm=tensor_weights_sqrtm)

cubature_third_order_spherical(input_shape) -> _PositiveCubatureRule ¤

Third-order spherical cubature integration.

Source code in probdiffeq/ivpsolvers.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def cubature_third_order_spherical(input_shape) -> _PositiveCubatureRule:
    """Third-order spherical cubature integration."""
    assert len(input_shape) <= 1
    if len(input_shape) == 1:
        (d,) = input_shape
        points_mat, weights_sqrtm = _third_order_spherical_params(d=d)
        return _PositiveCubatureRule(points=points_mat, weights_sqrtm=weights_sqrtm)

    # If input_shape == (), compute weights via input_shape=(1,)
    # and 'squeeze' the points.
    points_mat, weights_sqrtm = _third_order_spherical_params(d=1)
    (S, _) = points_mat.shape
    points = np.reshape(points_mat, (S,))
    return _PositiveCubatureRule(points=points, weights_sqrtm=weights_sqrtm)

cubature_unscented_transform(input_shape, r=1.0) -> _PositiveCubatureRule ¤

Unscented transform.

Source code in probdiffeq/ivpsolvers.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def cubature_unscented_transform(input_shape, r=1.0) -> _PositiveCubatureRule:
    """Unscented transform."""
    assert len(input_shape) <= 1
    if len(input_shape) == 1:
        (d,) = input_shape
        points_mat, weights_sqrtm = _unscented_transform_params(d=d, r=r)
        return _PositiveCubatureRule(points=points_mat, weights_sqrtm=weights_sqrtm)

    # If input_shape == (), compute weights via input_shape=(1,)
    # and 'squeeze' the points.
    points_mat, weights_sqrtm = _unscented_transform_params(d=1, r=r)
    (S, _) = points_mat.shape
    points = np.reshape(points_mat, (S,))
    return _PositiveCubatureRule(points=points, weights_sqrtm=weights_sqrtm)

prior_ibm(num_derivatives, output_scale=None) ¤

Construct an adaptive(/continuous-time), multiply-integrated Wiener process.

Source code in probdiffeq/ivpsolvers.py
773
774
775
776
777
def prior_ibm(num_derivatives, output_scale=None):
    """Construct an adaptive(/continuous-time), multiply-integrated Wiener process."""
    output_scale = output_scale or np.ones_like(impl.prototypes.output_scale())
    discretise = impl.conditional.ibm_transitions(num_derivatives, output_scale)
    return discretise, num_derivatives

prior_ibm_discrete(ts, *, num_derivatives, output_scale=None) ¤

Compute a time-discretised, multiply-integrated Wiener process.

Source code in probdiffeq/ivpsolvers.py
780
781
782
783
784
785
786
787
788
789
790
def prior_ibm_discrete(ts, *, num_derivatives, output_scale=None):
    """Compute a time-discretised, multiply-integrated Wiener process."""
    discretise, _ = prior_ibm(num_derivatives, output_scale=output_scale)
    transitions, (p, p_inv) = functools.vmap(discretise)(np.diff(ts))

    preconditioner_apply_vmap = functools.vmap(impl.conditional.preconditioner_apply)
    conditionals = preconditioner_apply_vmap(transitions, p, p_inv)

    output_scale = np.ones_like(impl.prototypes.output_scale())
    init = impl.normal.standard(num_derivatives + 1, output_scale=output_scale)
    return stats.MarkovSeq(init, conditionals)

solver(strategy) ¤

Create a solver that does not calibrate the output scale automatically.

Source code in probdiffeq/ivpsolvers.py
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
def solver(strategy, /):
    """Create a solver that does not calibrate the output scale automatically."""

    def step(state: _SolverState, *, vector_field, dt, calibration):
        del calibration  # unused

        error, _observed, state_strategy = strategy.begin(
            state.strategy, dt=dt, vector_field=vector_field
        )
        state_strategy = strategy.complete(
            state_strategy, output_scale=state.output_scale
        )
        # Extract and return solution
        state = _SolverState(strategy=state_strategy, output_scale=state.output_scale)
        return dt * error, state

    string_repr = f"<Uncalibrated solver with {strategy}>"
    return _solver_calibrated(
        strategy=strategy,
        calibration=_calibration_none(),
        impl_step=step,
        string_repr=string_repr,
        requires_rescaling=False,
    )

solver_dynamic(strategy) ¤

Create a solver that calibrates the output scale dynamically.

Source code in probdiffeq/ivpsolvers.py
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
def solver_dynamic(strategy):
    """Create a solver that calibrates the output scale dynamically."""
    string_repr = f"<Dynamic solver with {strategy}>"

    def step_dynamic(state, /, *, dt, vector_field, calibration):
        error, observed, state_strategy = strategy.begin(
            state.strategy, dt=dt, vector_field=vector_field
        )

        output_scale = calibration.update(state.output_scale, observed=observed)

        prior, _calibrated = calibration.extract(output_scale)
        state_strategy = strategy.complete(state_strategy, output_scale=prior)

        # Return solution
        state = _SolverState(strategy=state_strategy, output_scale=output_scale)
        return dt * error, state

    return _solver_calibrated(
        strategy=strategy,
        calibration=_calibration_most_recent(),
        string_repr=string_repr,
        impl_step=step_dynamic,
        requires_rescaling=False,
    )

solver_mle(strategy) ¤

Create a solver that calibrates the output scale via maximum-likelihood.

Warning: needs to be combined with a call to stats.calibrate() after solving if the MLE-calibration shall be used.

Source code in probdiffeq/ivpsolvers.py
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
def solver_mle(strategy):
    """Create a solver that calibrates the output scale via maximum-likelihood.

    Warning: needs to be combined with a call to stats.calibrate()
    after solving if the MLE-calibration shall be *used*.
    """
    string_repr = f"<MLE-solver with {strategy}>"

    def step_mle(state, /, *, dt, vector_field, calibration):
        output_scale_prior, _calibrated = calibration.extract(state.output_scale)
        error, _, state_strategy = strategy.begin(
            state.strategy, dt=dt, vector_field=vector_field
        )

        state_strategy = strategy.complete(
            state_strategy, output_scale=output_scale_prior
        )
        observed = state_strategy.aux_corr

        # Calibrate
        output_scale = calibration.update(state.output_scale, observed=observed)

        # Return
        state = _SolverState(strategy=state_strategy, output_scale=output_scale)
        return dt * error, state

    return _solver_calibrated(
        calibration=_calibration_running_mean(),
        impl_step=step_mle,
        strategy=strategy,
        string_repr=string_repr,
        requires_rescaling=True,
    )

strategy_filter(prior, correction: _Correction) -> _Strategy ¤

Construct a filter.

Source code in probdiffeq/ivpsolvers.py
642
643
644
645
646
647
648
649
650
651
652
def strategy_filter(prior, correction: _Correction, /) -> _Strategy:
    """Construct a filter."""
    extrapolation_impl = _extra_impl_precon_filter(*prior)
    return _strategy(
        extrapolation_impl,
        correction,
        string_repr=f"<Filter with {extrapolation_impl}, {correction}>",
        is_suitable_for_save_at=True,
        is_suitable_for_offgrid_marginals=True,
        is_suitable_for_save_every_step=True,
    )

strategy_fixedpoint(prior, correction: _Correction) -> _Strategy ¤

Construct a fixedpoint-smoother.

Source code in probdiffeq/ivpsolvers.py
629
630
631
632
633
634
635
636
637
638
639
def strategy_fixedpoint(prior, correction: _Correction, /) -> _Strategy:
    """Construct a fixedpoint-smoother."""
    extrapolation_impl = _extra_impl_precon_fixedpoint(*prior)
    return _strategy(
        extrapolation_impl,
        correction,
        is_suitable_for_save_at=True,
        is_suitable_for_save_every_step=False,
        is_suitable_for_offgrid_marginals=False,
        string_repr=f"<Fixedpoint smoother with {extrapolation_impl}, {correction}>",
    )

strategy_smoother(prior, correction: _Correction) -> _Strategy ¤

Construct a smoother.

Source code in probdiffeq/ivpsolvers.py
616
617
618
619
620
621
622
623
624
625
626
def strategy_smoother(prior, correction: _Correction, /) -> _Strategy:
    """Construct a smoother."""
    extrapolation_impl = _extra_impl_precon_smoother(*prior)
    return _strategy(
        extrapolation_impl,
        correction,
        is_suitable_for_save_at=False,
        is_suitable_for_save_every_step=True,
        is_suitable_for_offgrid_marginals=True,
        string_repr=f"<Smoother with {extrapolation_impl}, {correction}>",
    )