-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add csp.random module with initial versions of a Poisson timer and Br…
…ownian Motion generator, with time-varying parameters and test cases Remove brownian_increments and replace with return_increments flag on brownian_motion node. Incorporate PR feedback Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
- Loading branch information
Showing
3 changed files
with
391 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
import numpy as np | ||
from datetime import timedelta | ||
from typing import TypeVar | ||
|
||
import csp | ||
from csp import ts | ||
from csp.stats import numpy_to_list | ||
from csp.typing import Numpy1DArray, NumpyNDArray | ||
|
||
__all__ = ("poisson_timer", "brownian_motion", "brownian_motion_1d") | ||
|
||
|
||
T = TypeVar("T") | ||
|
||
|
||
@csp.node | ||
def poisson_timer(rate: ts[float], seed: object, value: "~T" = True) -> ts["T"]: | ||
"""Generate events according to a Poisson process with time-varying rate. | ||
For a fixed-interval timer see csp.timer. | ||
Args: | ||
rate: The rate of the Poisson process (per second), must be non-negative | ||
seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng | ||
value: The value to tick when there are events (similar to csp.timer) | ||
""" | ||
with csp.alarms(): | ||
event = csp.alarm(bool) | ||
|
||
with csp.state(): | ||
s_rng = np.random.default_rng(seed) | ||
s_scheduled_event = None | ||
|
||
if csp.ticked(rate): | ||
if rate < 0: | ||
raise ValueError(f"{csp.now()}: rate must be non-negative") | ||
if s_scheduled_event: | ||
csp.cancel_alarm(event, s_scheduled_event) | ||
if rate > 0: | ||
seconds = s_rng.exponential(1.0 / rate) | ||
s_scheduled_event = csp.schedule_alarm(event, timedelta(seconds=seconds), True) | ||
|
||
if csp.ticked(event): | ||
seconds = s_rng.exponential(1.0 / rate) | ||
s_scheduled_event = csp.schedule_alarm(event, timedelta(seconds=seconds), True) | ||
return value | ||
|
||
|
||
def _make_brownian_increment(t, s_rng, drift, s_cov_decomp): | ||
# Make the brownian increment as efficiently as possible | ||
values = s_rng.normal(scale=np.sqrt(t), size=drift.size) | ||
np.dot(s_cov_decomp, values, out=values) | ||
np.add(drift * t, values, out=values) | ||
return values | ||
|
||
|
||
def _matrix_decomposition(matrix, now): | ||
# Split matrix decomposition into its own function to help with profiling and make it easier to replace | ||
# We use SVD instead of Cholesky because it's more stable and handles the zero variance case. | ||
# Could also use eigenvalue decomposition, but we choose svd because it's the default for np.random.Generator.multivariate_normal | ||
U, S, Vt = np.linalg.svd(matrix) | ||
if not np.allclose(matrix, matrix.T): | ||
raise ValueError(f"{now}: covariance not symmetric") | ||
if not np.allclose(U, Vt): | ||
raise ValueError(f"{now}: covariance not positive semidefinite") | ||
return np.dot(U, np.sqrt(np.diag(S)), out=U) | ||
|
||
|
||
@csp.node | ||
def brownian_motion( | ||
trigger: ts[object], | ||
drift: ts[Numpy1DArray[float]], | ||
covariance: ts[NumpyNDArray[float]], | ||
seed: object, | ||
return_increments: bool = False, | ||
) -> ts[Numpy1DArray[float]]: | ||
"""Generate multi-dimensional Brownian motion (or increments) at the trigger times, with time-varying drift and covariance. | ||
The Brownian motion starts once drift and covariance have at least 1 tick each, and will start from zero. | ||
To use a different start value, use csp.const(initial_value) + brownian_motion(...) | ||
Args: | ||
trigger: When to return the value of the process | ||
drift: Drift parameter (per second), i.e. array of length n | ||
covariance: Covariance matrix (per second), i.e. array of size nxn | ||
seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng | ||
return_increments: Whether to return increments of the brownian motion at trigger times instead of the process itself | ||
""" | ||
with csp.state(): | ||
s_rng = np.random.default_rng(seed) | ||
s_cov_decomp = None # Placeholder for covariance matrix decomposition | ||
s_last_change = None # Placeholder for last csp.now() | ||
s_last_drift = None # Placeholder for last value of drift | ||
s_last_value = None # Placeholder for cumulative value between trigger ticks | ||
|
||
if csp.ticked(drift, covariance) and csp.valid(drift, covariance): | ||
if s_last_change is None: | ||
if not drift.ndim == 1: | ||
raise ValueError(f"{csp.now()}: drift must be 1-dimensional") | ||
if not covariance.ndim == 2: | ||
raise ValueError(f"{csp.now()}: covariance must be 2-dimensional") | ||
if not drift.size == covariance.shape[0]: | ||
raise ValueError(f"{csp.now()}: drift and covariance must have same length") | ||
s_last_value = np.zeros_like(drift) | ||
else: | ||
t = (csp.now() - s_last_change).total_seconds() | ||
values = _make_brownian_increment(t, s_rng, s_last_drift, s_cov_decomp) | ||
np.add(values, s_last_value, out=s_last_value) | ||
s_last_change = csp.now() | ||
if s_last_drift is not None and drift.shape != s_last_drift.shape: | ||
raise ValueError(f"{csp.now()}: shape of drift is not allowed to change") | ||
s_last_drift = drift | ||
|
||
if csp.ticked(covariance): | ||
# Only do the covariance decomposition when it changes | ||
if s_cov_decomp is not None and covariance.shape != s_cov_decomp.shape: | ||
raise ValueError(f"{csp.now()}: shape of covariance is not allowed to change") | ||
s_cov_decomp = _matrix_decomposition(covariance, csp.now()) | ||
|
||
if csp.ticked(trigger) and csp.valid(drift, covariance): | ||
if s_last_change is not None: | ||
t = (csp.now() - s_last_change).total_seconds() | ||
values = _make_brownian_increment(t, s_rng, s_last_drift, s_cov_decomp) | ||
s_last_change = csp.now() | ||
if return_increments: | ||
# Add in "last value" in case drift/covariance changed in between trigger updates | ||
np.add(s_last_value, values, out=values) | ||
s_last_value.fill(0.0) | ||
return values | ||
else: | ||
np.add(s_last_value, values, out=s_last_value) | ||
return s_last_value | ||
|
||
|
||
@csp.graph | ||
def brownian_motion_1d( | ||
trigger: ts[object], drift: ts[float], variance: ts[float], seed: object, return_increments: bool = False | ||
) -> ts[float]: | ||
"""Generate one-dimensional Brownian motion at the trigger times, with time-varying drift and variance. | ||
The Brownian motion starts once drift and covariance have at least 1 tick each, and will start from zero. | ||
To use a different start value, use csp.const(initial_value) + brownian_motion_1d(...) | ||
Args: | ||
trigger: When to return the value of the process | ||
drift: Drift parameter (per second) | ||
variance: Variance parameter (per second) | ||
seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng | ||
return_increments: Whether to return increments of the brownian motion at trigger times instead of the process itself | ||
""" | ||
drift = csp.apply(drift, lambda x: np.array([x]), Numpy1DArray[float]) | ||
covariance = csp.apply(covariance, lambda x: np.array([[x]]), NumpyNDArray[float]) | ||
bm = brownian_motion(trigger, drift, covariance, seed=seed, return_increments=return_increments) | ||
return numpy_to_list(bm, 1)[0] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
import numpy as np | ||
import unittest | ||
from datetime import datetime, timedelta | ||
|
||
import csp | ||
from csp.random import brownian_motion, brownian_motion_1d, poisson_timer | ||
from csp.typing import Numpy1DArray, NumpyNDArray | ||
|
||
|
||
class TestPoissonTimer(unittest.TestCase): | ||
def test_simple(self): | ||
rate = csp.const(2.0) # two events per second | ||
out = csp.run( | ||
poisson_timer, rate, seed=1234, value="foo", starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1000) | ||
)[0] | ||
# Should be about 2k events, subject to random noise | ||
self.assertGreater(len(out), 1900) | ||
self.assertLess(len(out), 2100) | ||
self.assertEqual(out[0][1], "foo") | ||
|
||
def test_delayed_start(self): | ||
rate = csp.const(0.1, delay=timedelta(days=1)) # two events per second | ||
out = csp.run( | ||
poisson_timer, rate, seed=1234, value="foo", starttime=datetime(2020, 1, 1), endtime=timedelta(days=2) | ||
)[0] | ||
self.assertGreater(out[0][0], datetime(2020, 1, 2)) | ||
|
||
def test_changing_rate(self): | ||
rate1 = 1.0 | ||
rate3 = 0.1 | ||
rate = csp.curve( | ||
float, [(datetime(2020, 1, 1, 0), rate1), (datetime(2020, 1, 1, 1), 0.0), (datetime(2020, 1, 1, 2), rate3)] | ||
) | ||
out = csp.run(poisson_timer, rate, seed=1234, starttime=datetime(2020, 1, 1), endtime=datetime(2020, 1, 1, 3))[ | ||
0 | ||
] | ||
first_hour = [v for t, v in out if t <= datetime(2020, 1, 1, 1)] | ||
second_hour = [v for t, v in out if datetime(2020, 1, 1, 1) < t <= datetime(2020, 1, 1, 2)] | ||
third_hour = [v for t, v in out if t > datetime(2020, 1, 1, 2)] | ||
self.assertGreater(len(first_hour), 60 * 60 * rate1 * 0.9) | ||
self.assertLess(len(first_hour), 60 * 60 * rate1 * 1.1) | ||
self.assertEqual(len(second_hour), 0) | ||
self.assertGreater(len(third_hour), 60 * 60 * rate3 * 0.9) | ||
self.assertLess(len(third_hour), 60 * 60 * rate3 * 1.1) | ||
|
||
|
||
class TestBrownianMotion(unittest.TestCase): | ||
def test_simple_increments(self): | ||
mean = np.array([10.0, 0.0]) | ||
cov = np.array([[1.0, 0.5], [0.5, 2.0]]) | ||
# Use two second increment to make sure time scaling is ok | ||
n_seconds = 2 | ||
trigger = csp.timer(timedelta(seconds=n_seconds)) | ||
out = csp.run( | ||
brownian_motion, | ||
trigger, | ||
csp.const(mean), | ||
csp.const(cov), | ||
seed=1234, | ||
return_increments=True, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=timedelta(seconds=2000), | ||
)[0] | ||
self.assertEqual(len(out), 1000) | ||
data = np.vstack([o[1] for o in out]) | ||
err_mean = data.mean(axis=0) - mean * n_seconds | ||
err_cov = np.cov(data.T) - cov * n_seconds | ||
self.assertLess(np.abs(err_mean).max(), 0.2) | ||
self.assertLess(np.abs(err_cov).max(), 0.2) | ||
|
||
def test_bad_covariance(self): | ||
mean = np.array([10.0, 0.0]) | ||
covs = [] | ||
covs.append(np.array([[1.0], [2.0]])) # Not square | ||
covs.append(np.array([[1.0]])) # Not same length as drift | ||
covs.append(np.array([[1.0, 0.5], [1.0, 2.0]])) # Not symmetric | ||
covs.append(np.array([[1.0, 10.0], [10.0, 2.0]])) # Not positive semi-definite | ||
trigger = csp.timer(timedelta(seconds=1)) | ||
for cov in covs: | ||
self.assertRaises( | ||
ValueError, | ||
csp.run, | ||
brownian_motion, | ||
trigger, | ||
csp.const(mean), | ||
csp.const(cov), | ||
seed=1234, | ||
return_increments=True, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=timedelta(seconds=2), | ||
) | ||
|
||
def test_brownian_motion(self): | ||
# Test that increments add to brownian motion | ||
mean = np.array([10.0, 0.0]) | ||
cov = np.array([[1.0, 0.5], [0.5, 2.0]]) | ||
trigger = csp.timer(timedelta(seconds=1)) | ||
out = csp.run( | ||
brownian_motion, | ||
trigger, | ||
csp.const(mean), | ||
csp.const(cov), | ||
seed=1234, | ||
return_increments=True, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=timedelta(seconds=100), | ||
)[0] | ||
data = np.vstack([o[1] for o in out]) | ||
bm_out = csp.run( | ||
brownian_motion, | ||
trigger, | ||
csp.const(mean), | ||
csp.const(cov), | ||
seed=1234, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=timedelta(seconds=100), | ||
)[0] | ||
err = bm_out[-1][1] - data.sum(axis=0) | ||
self.assertAlmostEquals(np.abs(err).max(), 0.0) | ||
|
||
def test_brownian_motion_1d(self): | ||
mean = 10.0 | ||
cov = 1.0 | ||
trigger = csp.timer(timedelta(seconds=1)) | ||
out = csp.run( | ||
brownian_motion, | ||
trigger, | ||
csp.const(np.array([mean])), | ||
csp.const(np.array([[cov]])), | ||
seed=1234, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=timedelta(seconds=100), | ||
)[0] | ||
out1 = csp.run( | ||
brownian_motion_1d, | ||
trigger, | ||
csp.const(mean), | ||
csp.const(cov), | ||
seed=1234, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=timedelta(seconds=100), | ||
)[0] | ||
self.assertEqual(out[-1][1][0], out1[-1][1]) | ||
|
||
def test_change_at_trigger(self): | ||
mean1 = np.array([1.0]) | ||
mean2 = np.array([-1.0]) | ||
cov = np.array([[0.0]]) | ||
trigger = csp.timer(timedelta(seconds=1)) | ||
drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1), mean1), (datetime(2020, 1, 1, 0, 0, 1), mean2)]) | ||
cov = csp.const(cov) | ||
out = csp.run( | ||
brownian_motion, | ||
trigger, | ||
drift, | ||
cov, | ||
seed=1234, | ||
return_increments=True, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=datetime(2020, 1, 1, 0, 0, 2), | ||
)[0] | ||
target = [(datetime(2020, 1, 1, 0, 0, 1), np.array([1.0])), (datetime(2020, 1, 1, 0, 0, 2), np.array([-1.0]))] | ||
np.testing.assert_equal(out, target) | ||
|
||
def test_change_between_triggers(self): | ||
mean1 = np.array([1.0]) | ||
mean2 = np.array([2.0]) | ||
cov = np.array([[0.0]]) | ||
trigger = csp.timer(timedelta(seconds=2)) | ||
drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1), mean1), (datetime(2020, 1, 1, 0, 0, 1), mean2)]) | ||
cov = csp.const(cov) | ||
out = csp.run( | ||
brownian_motion, | ||
trigger, | ||
drift, | ||
cov, | ||
seed=1234, | ||
return_increments=True, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=datetime(2020, 1, 1, 0, 0, 2), | ||
)[0] | ||
target = [(datetime(2020, 1, 1, 0, 0, 2), np.array([3.0]))] | ||
np.testing.assert_equal(out, target) | ||
|
||
def test_changing_parameters(self): | ||
mean1 = np.array([10.0, 0.0]) | ||
mean2 = np.array([1.0, 0.0]) | ||
cov1 = np.array([[1.0, 0.5], [0.5, 2.0]]) | ||
cov2 = np.array([[0.0, 0.0], [0.0, 1.0]]) # Note zero covariance for first dim! | ||
n_seconds = 1 | ||
trigger = csp.timer(timedelta(seconds=n_seconds)) | ||
drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1, 0), mean1), (datetime(2020, 1, 1, 1), mean2)]) | ||
cov = csp.curve(NumpyNDArray[float], [(datetime(2020, 1, 1, 0), cov1), (datetime(2020, 1, 1, 1), cov2)]) | ||
out = csp.run( | ||
brownian_motion, | ||
trigger, | ||
drift, | ||
cov, | ||
seed=1234, | ||
return_increments=True, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=datetime(2020, 1, 1, 2), | ||
)[0] | ||
data1 = np.vstack([v for t, v in out if t <= datetime(2020, 1, 1, 1)]) | ||
data2 = np.vstack([v for t, v in out if datetime(2020, 1, 1, 1) < t <= datetime(2020, 1, 1, 2)]) | ||
|
||
err_mean1 = data1.mean(axis=0) - mean1 * n_seconds | ||
err_cov1 = np.cov(data1.T) - cov1 * n_seconds | ||
self.assertLess(np.abs(err_mean1).max(), 0.05) | ||
self.assertLess(np.abs(err_cov1).max(), 0.05) | ||
|
||
err_mean2 = data2.mean(axis=0) - mean2 * n_seconds | ||
err_cov2 = np.cov(data2.T) - cov2 * n_seconds | ||
self.assertLess(np.abs(err_mean2).max(), 0.05) | ||
self.assertLess(np.abs(err_cov2).max(), 0.05) | ||
|
||
def disable_test_performance(self): | ||
dim = 1_000 | ||
N = 100_000 | ||
mean = np.zeros(dim) | ||
cov = np.diag(np.ones(dim)) | ||
trigger = csp.timer(timedelta(seconds=1)) | ||
|
||
def graph(): | ||
out = brownian_motion(trigger, csp.const(mean), csp.const(cov), seed=1234) | ||
csp.add_graph_output("BM", out, tick_count=1) | ||
|
||
start = datetime.utcnow() | ||
|
||
from threadpoolctl import threadpool_limits # May need to pip install separately | ||
|
||
with threadpool_limits(limits=1): # To limit numpy parallelism | ||
csp.run( | ||
graph, | ||
starttime=datetime(2020, 1, 1), | ||
endtime=timedelta(seconds=N), | ||
) | ||
end = datetime.utcnow() | ||
print(f"Elapsed: {end-start}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,7 @@ develop = [ | |
"pytest-cov", | ||
"pytest-sugar", | ||
"scikit-build", | ||
"threadpoolctl", | ||
"tornado", | ||
] | ||
|
||
|