-
Notifications
You must be signed in to change notification settings - Fork 1
/
MCMC.py
103 lines (88 loc) · 3.66 KB
/
MCMC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from typing import Callable, Optional, Tuple
from numpy.typing import NDArray
import numpy as np
from model_types import GradModel, LogDensityModel
class Metropolis:
def __init__(
self,
model: LogDensityModel,
proposal_rng: Callable[[], float],
init: Optional[NDArray[np.float64]] = None,
):
self._model = model
self._dim = self._model.dims()
self._q_rng = proposal_rng
self._theta = init or np.random.normal(size=self._dim)
self._log_p_theta = self._model.log_density(self._theta)
def __iter__(self):
return self
def __next__(self):
return self.sample()
def sample(self) -> Tuple[NDArray[np.float64], float]:
# does not include initial value as first draw
theta_star = self._theta + self._q_rng()
log_p_theta_star = self._model.log_density(theta_star)
if np.log(np.random.uniform()) < log_p_theta_star - self._log_p_theta:
self._theta = theta_star
self._log_p_theta = log_p_theta_star
return self._theta, self._log_p_theta
class HMCDiag:
def __init__(
self,
model: GradModel,
stepsize: float,
steps: int,
metric_diag: Optional[NDArray[np.float64]] = None,
init: Optional[NDArray[np.float64]] = None,
):
self._model = model
self._dim = self._model.dims()
self._stepsize = stepsize
self._steps = steps
self._metric = metric_diag or np.ones(self._dim)
self._theta = init or np.random.normal(size=self._dim)
def __iter__(self):
return self
def __next__(self):
return self.sample()
def joint_logp(self, theta: NDArray[np.float64], rho: NDArray[np.float64]) -> float:
return self._model.log_density(theta) - 0.5 * np.dot(
rho, np.multiply(self._metric, rho)
)
def leapfrog(
self, theta: NDArray[np.float64], rho: NDArray[np.float64]
) -> Tuple[NDArray[np.float64], NDArray[np.float64]]:
# TODO(bob-carpenter): refactor to share non-initial and non-final updates
for n in range(self._steps):
lp, grad = self._model.log_density_gradient(theta)
rho_mid = rho + 0.5 * self._stepsize * np.multiply(self._metric, grad)
theta = theta + self._stepsize * rho_mid
lp, grad = self._model.log_density_gradient(theta)
rho = rho_mid + 0.5 * self._stepsize * np.multiply(self._metric, grad)
return (theta, rho)
def sample(self) -> Tuple[NDArray[np.float64], float]:
rho = np.random.normal(size=self._dim)
logp = self.joint_logp(self._theta, rho)
theta_prop, rho_prop = self.leapfrog(self._theta, rho)
logp_prop = self.joint_logp(theta_prop, rho_prop)
if np.log(np.random.uniform()) < logp_prop - logp:
self._theta = theta_prop
return self._theta, logp_prop
return self._theta, logp
class DelayedRejection:
def __init__(self, model, proposal1_rng, proposal2_rng, init=[]):
self._model = model
self._dim = self._model.dims()
self._q1_rng = proposal1_rng
self._q2_rng = proposal2_rng
self._theta = init or np.random.normal(size=self._dim)
self._log_p_theta = self._model.log_density(self._theta)
def sample(self):
theta1 = self._theta + self._q1_rng()
log_p_theta1 = self._model.log_density(theta1)
accept = np.log(np.random.uniform()) < log_p_theta1 - self._log_p_theta
if accept:
self._theta = theta1
self._log_p_theta = log_p_theta1
return self._theta, self._log_p_theta
theta2 = self._theta + self._q2_rng(