Skip to content

ivpsolve

Routines for estimating solutions of initial value problems.

Solution ¤

Estimated initial value problem solution.

Source code in probdiffeq/ivpsolve.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class Solution:
    """Estimated initial value problem solution."""

    def __init__(self, t, u, output_scale, marginals, posterior, num_steps):
        """Construct a solution object."""
        self.t = t
        self.u = u
        self.output_scale = output_scale
        self.marginals = marginals
        self.posterior = posterior
        self.num_steps = num_steps

    def __repr__(self):
        """Evaluate a string-representation of the solution object."""
        return (
            f"{self.__class__.__name__}("
            f"t={self.t},"
            f"u={self.u},"
            f"output_scale={self.output_scale},"
            f"marginals={self.marginals},"
            f"posterior={self.posterior},"
            f"num_steps={self.num_steps},"
            ")"
        )

    def __len__(self):
        """Evaluate the length of a solution."""
        if np.ndim(self.t) < 1:
            msg = "Solution object not batched :("
            raise ValueError(msg)
        return self.t.shape[0]

    def __getitem__(self, item):
        """Access a single item of the solution."""
        if np.ndim(self.t) < 1:
            msg = "Solution object not batched :("
            raise ValueError(msg)

        if np.ndim(self.t) == 1 and item != -1:
            msg = "Access to non-terminal states is not available."
            raise ValueError(msg)

        return tree_util.tree_map(lambda s: s[item, ...], self)

    def __iter__(self):
        """Iterate through the solution."""
        if np.ndim(self.t) <= 1:
            msg = "Solution object not batched :("
            raise ValueError(msg)

        for i in range(self.t.shape[0]):
            yield self[i]

__getitem__(item) ¤

Access a single item of the solution.

Source code in probdiffeq/ivpsolve.py
52
53
54
55
56
57
58
59
60
61
62
def __getitem__(self, item):
    """Access a single item of the solution."""
    if np.ndim(self.t) < 1:
        msg = "Solution object not batched :("
        raise ValueError(msg)

    if np.ndim(self.t) == 1 and item != -1:
        msg = "Access to non-terminal states is not available."
        raise ValueError(msg)

    return tree_util.tree_map(lambda s: s[item, ...], self)

__init__(t, u, output_scale, marginals, posterior, num_steps) ¤

Construct a solution object.

Source code in probdiffeq/ivpsolve.py
23
24
25
26
27
28
29
30
def __init__(self, t, u, output_scale, marginals, posterior, num_steps):
    """Construct a solution object."""
    self.t = t
    self.u = u
    self.output_scale = output_scale
    self.marginals = marginals
    self.posterior = posterior
    self.num_steps = num_steps

__iter__() ¤

Iterate through the solution.

Source code in probdiffeq/ivpsolve.py
64
65
66
67
68
69
70
71
def __iter__(self):
    """Iterate through the solution."""
    if np.ndim(self.t) <= 1:
        msg = "Solution object not batched :("
        raise ValueError(msg)

    for i in range(self.t.shape[0]):
        yield self[i]

__len__() ¤

Evaluate the length of a solution.

Source code in probdiffeq/ivpsolve.py
45
46
47
48
49
50
def __len__(self):
    """Evaluate the length of a solution."""
    if np.ndim(self.t) < 1:
        msg = "Solution object not batched :("
        raise ValueError(msg)
    return self.t.shape[0]

__repr__() ¤

Evaluate a string-representation of the solution object.

Source code in probdiffeq/ivpsolve.py
32
33
34
35
36
37
38
39
40
41
42
43
def __repr__(self):
    """Evaluate a string-representation of the solution object."""
    return (
        f"{self.__class__.__name__}("
        f"t={self.t},"
        f"u={self.u},"
        f"output_scale={self.output_scale},"
        f"marginals={self.marginals},"
        f"posterior={self.posterior},"
        f"num_steps={self.num_steps},"
        ")"
    )

simulate_terminal_values(vector_field, initial_condition, t0, t1, adaptive_solver, dt0) -> Solution ¤

Simulate the terminal values of an initial value problem.

Source code in probdiffeq/ivpsolve.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def simulate_terminal_values(
    vector_field, initial_condition, t0, t1, adaptive_solver, dt0
) -> Solution:
    """Simulate the terminal values of an initial value problem."""
    save_at = np.asarray([t1])
    (_t, solution_save_at), _, num_steps = _solve_and_save_at(
        tree_util.Partial(vector_field),
        t0,
        initial_condition,
        save_at=save_at,
        adaptive_solver=adaptive_solver,
        dt0=dt0,
    )
    # "squeeze"-type functionality (there is only a single state!)
    squeeze_fun = functools.partial(np.squeeze_along_axis, axis=0)
    solution_save_at = tree_util.tree_map(squeeze_fun, solution_save_at)
    num_steps = tree_util.tree_map(squeeze_fun, num_steps)

    # I think the user expects marginals, so we compute them here
    posterior, output_scale = solution_save_at
    marginals = posterior.init if isinstance(posterior, markov.MarkovSeq) else posterior
    u = impl.hidden_model.qoi(marginals)
    return Solution(
        t=t1,
        u=u,
        marginals=marginals,
        posterior=posterior,
        output_scale=output_scale,
        num_steps=num_steps,
    )

solve_and_save_at(vector_field, initial_condition, save_at, adaptive_solver, dt0) -> Solution ¤

Solve an initial value problem and return the solution at a pre-determined grid.

Warning: highly EXPERIMENTAL feature!

This feature is highly experimental. There is no guarantee that it works correctly. It might be deleted tomorrow and without any deprecation policy.

Source code in probdiffeq/ivpsolve.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def solve_and_save_at(
    vector_field, initial_condition, save_at, adaptive_solver, dt0
) -> Solution:
    """Solve an initial value problem and return the solution at a pre-determined grid.

    !!! warning "Warning: highly EXPERIMENTAL feature!"
        This feature is highly experimental.
        There is no guarantee that it works correctly.
        It might be deleted tomorrow
        and without any deprecation policy.

    """
    if not adaptive_solver.solver.strategy.is_suitable_for_save_at:
        msg = (
            f"Strategy {adaptive_solver.solver.strategy} should not "
            f"be used in solve_and_save_at. "
        )
        warnings.warn(msg, stacklevel=1)

    (_t, solution_save_at), _, num_steps = _solve_and_save_at(
        tree_util.Partial(vector_field),
        save_at[0],
        initial_condition,
        save_at=save_at[1:],
        adaptive_solver=adaptive_solver,
        dt0=dt0,
    )

    # I think the user expects the initial condition to be part of the state
    # (as well as marginals), so we compute those things here
    posterior_t0, *_ = initial_condition
    posterior_save_at, output_scale = solution_save_at
    _tmp = _userfriendly_output(posterior=posterior_save_at, posterior_t0=posterior_t0)
    marginals, posterior = _tmp
    u = impl.hidden_model.qoi(marginals)
    return Solution(
        t=save_at,
        u=u,
        marginals=marginals,
        posterior=posterior,
        output_scale=output_scale,
        num_steps=num_steps,
    )

solve_and_save_every_step(vector_field, initial_condition, t0, t1, adaptive_solver, dt0) -> Solution ¤

Solve an initial value problem and save every step.

This function uses a native-Python while loop.

Warning

Not JITable, not reverse-mode-differentiable.

Source code in probdiffeq/ivpsolve.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def solve_and_save_every_step(
    vector_field, initial_condition, t0, t1, adaptive_solver, dt0
) -> Solution:
    """Solve an initial value problem and save every step.

    This function uses a native-Python while loop.

    !!! warning
        Not JITable, not reverse-mode-differentiable.
    """
    if not adaptive_solver.solver.strategy.is_suitable_for_save_every_step:
        msg = (
            f"Strategy {adaptive_solver.solver.strategy} should not "
            f"be used in solve_and_save_every_step."
        )
        warnings.warn(msg, stacklevel=1)

    generator = _solution_generator(
        tree_util.Partial(vector_field),
        t0,
        initial_condition,
        t1=t1,
        adaptive_solver=adaptive_solver,
        dt0=dt0,
    )
    (t, solution_every_step), _dt, num_steps = tree_array_util.tree_stack(
        list(generator)
    )

    # I think the user expects the initial time-point to be part of the grid
    # (Even though t0 is not computed by this function)
    t = np.concatenate((np.atleast_1d(t0), t))

    # I think the user expects marginals, so we compute them here
    posterior_t0, *_ = initial_condition
    posterior, output_scale = solution_every_step
    _tmp = _userfriendly_output(posterior=posterior, posterior_t0=posterior_t0)
    marginals, posterior = _tmp

    u = impl.hidden_model.qoi(marginals)
    return Solution(
        t=t,
        u=u,
        marginals=marginals,
        posterior=posterior,
        output_scale=output_scale,
        num_steps=num_steps,
    )

solve_fixed_grid(vector_field, initial_condition, grid, solver) -> Solution ¤

Solve an initial value problem on a fixed, pre-determined grid.

Source code in probdiffeq/ivpsolve.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def solve_fixed_grid(vector_field, initial_condition, grid, solver) -> Solution:
    """Solve an initial value problem on a fixed, pre-determined grid."""
    # Compute the solution

    def body_fn(s, dt):
        _error, s_new = solver.step(state=s, vector_field=vector_field, dt=dt)
        return s_new, s_new

    t0 = grid[0]
    state0 = solver.init(t0, initial_condition)
    _, result_state = control_flow.scan(body_fn, init=state0, xs=np.diff(grid))
    _t, (posterior, output_scale) = solver.extract(result_state)

    # I think the user expects marginals, so we compute them here
    posterior_t0, *_ = initial_condition
    _tmp = _userfriendly_output(posterior=posterior, posterior_t0=posterior_t0)
    marginals, posterior = _tmp

    u = impl.hidden_model.qoi(marginals)
    return Solution(
        t=grid,
        u=u,
        marginals=marginals,
        posterior=posterior,
        output_scale=output_scale,
        num_steps=np.arange(1.0, len(grid)),
    )