Skip to content

Commit

Permalink
MarkovChain.replicate added; MarkovChain.simulate API changed
Browse files Browse the repository at this point in the history
  • Loading branch information
oyamad committed May 9, 2015
1 parent a9ee37a commit 95627f2
Showing 1 changed file with 171 additions and 28 deletions.
199 changes: 171 additions & 28 deletions quantecon/mc_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __init__(self, P):
self.digraph = DiGraph(self.P)

self._stationary_dists = None
self._cdfs = None

def __repr__(self):
msg = "Markov chain with transition matrix \nP = \n{0}"
Expand Down Expand Up @@ -257,10 +258,169 @@ def stationary_distributions(self):
self._compute_stationary()
return self._stationary_dists

def simulate(self, init=0, sample_size=1000):
X = mc_sample_path(self.P, init, sample_size)
@property
def cdfs(self):
if self._cdfs is None:
# See issue #137#issuecomment-96128186
cdfs = np.empty((self.n, self.n), order='C')
np.cumsum(self.P, axis=-1, out=cdfs)
self._cdfs = cdfs
return self._cdfs

def simulate(self, ts_length, init=None):
"""
Simulate a time series of the state transition of length
ts_length.
Parameters
----------
ts_length : scalar(int)
Length of the simulation.
init : scalar(int), optional(default=None)
Initial state. If None, the initial state is randomly drawn.
Returns
-------
X : ndarray(int, ndim=1)
Array containing the sample path.
"""
if init is None:
init = np.random.randint(self.n)
elif not isinstance(init, int):
raise ValueError('init must be int or None')
X = _simulate_markov_chain(self.cdfs, ts_length, init)
return X

def replicate(self, T, num_reps, init=None):
"""
Simulate num_reps observations of the state at time T.
Parameters
----------
T : scalar(int)
Time period of the observation.
num_reps : scalar(int)
Number of replication.
init : array_like(int, ndim=1) or scalar(int),
optional(default=None)
Specifies the initial state for each simulation. If it is an
array_like, its length must be equal to num_reps. If it is
None, the initial state is randomly drawn for each
simulation.
Returns
-------
X_Ts : ndarray(int, ndim=1)
Array containing the num_reps observations of the state at
time T.
"""
if init is None:
init_states = np.random.randint(self.n, size=num_reps)
elif isinstance(init, int):
init_states = np.ones(num_reps, dtype=int) * init
else:
msg = 'init must be int, array_like of length equal to ' + \
'equal to num_reps, or None'
try:
if len(init) == num_reps:
init_states = np.asarray(init)
else:
raise ValueError(msg)
except:
raise ValueError(msg)

X_Ts = _replicate_markov_chain(self.cdfs, T, num_reps, init_states)
return X_Ts


def _simulate_markov_chain(P_cdfs, ts_length, init):
"""
Main body of MarkovChain.simulate.
Parameters
----------
P_cdfs : ndarray(float, ndim=2)
Array containing as rows the CDFs of the state transition.
ts_length : scalar(int)
Length of the simulation.
init : scalar(int)
Initial state.
Returns
-------
X : ndarray(int, ndim=1)
Array containing the sample path.
Notes
-----
This routine is jit-complied if the module Numba is vailable.
"""
# === set up array to store output === #
X = np.empty(ts_length, dtype=int)
X[0] = init

# Random values, uniformly sampled from [0, 1)
u = np.random.random(size=ts_length-1)

# === generate the sample path === #
for t in range(ts_length-1):
X[t+1] = searchsorted(P_cdfs[X[t]], u[t])

return X

if numba_installed:
_simulate_markov_chain = jit(_simulate_markov_chain)


def _replicate_markov_chain(P_cdfs, T, num_reps, init_states):
"""
Main body of MarkovChain.replicate.
Parameters
----------
P_cdfs : ndarray(float, ndim=2)
Array containing as rows the CDFs of the state transition.
num_reps : scalar(int)
Number of replication.
init : ndarray(int, ndim=1)
Array of length num_reps containing the initial states.
Returns
-------
out : ndarray(int, ndim=1)
Array containing the num_reps observations of the state at
time T.
Notes
-----
This routine is jit-complied if the module Numba is vailable.
"""
out = np.empty(num_reps, dtype=int)

for i in range(num_reps):
u = np.random.random(size=T)
x_current = init_states[i]
for t in range(T):
x_next = searchsorted(P_cdfs[x_current], u[t])
x_current = x_next
out[i] = x_current

return out

if numba_installed:
_replicate_markov_chain = jit(_replicate_markov_chain)


def mc_compute_stationary(P):
"""
Expand All @@ -281,31 +441,14 @@ def mc_sample_path(P, init=0, sample_size=1000):
"""
See Section: DocStrings below
"""
n = len(P)

# CDFs, one for each row of P
cdfs = np.empty((n, n), order='C') # see issue #137#issuecomment-96128186
np.cumsum(P, axis=-1, out=cdfs)

# Random values, uniformly sampled from [0, 1)
u = np.random.random(size=sample_size)

# === set up array to store output === #
X = np.empty(sample_size, dtype=int)
if isinstance(init, int):
X[0] = init
X_0 = init
else:
cdf0 = np.cumsum(init)
X[0] = searchsorted(cdf0, u[0])

# === generate the sample path === #
for t in range(sample_size-1):
X[t+1] = searchsorted(cdfs[X[t]], u[t+1])

return X
u_0 = np.random.random(size=1)
X_0 = searchsorted(cdf0, u_0)

if numba_installed:
mc_sample_path = jit(mc_sample_path)
return MarkovChain(P).simulate(ts_length=sample_size, init=X_0)


# ------------ #
Expand Down Expand Up @@ -347,8 +490,8 @@ def mc_sample_path(P, init=0, sample_size=1000):
# -Methods- #

# -Markovchain.simulate()- #
if sys.version_info[0] == 3:
MarkovChain.simulate.__doc__ = _sample_path_docstr.format(p_arg="")
elif sys.version_info[0] == 2:
MarkovChain.simulate.__func__.__doc__ = \
_sample_path_docstr.format(p_arg="")
# if sys.version_info[0] == 3:
# MarkovChain.simulate.__doc__ = _sample_path_docstr.format(p_arg="")
# elif sys.version_info[0] == 2:
# MarkovChain.simulate.__func__.__doc__ = \
# _sample_path_docstr.format(p_arg="")

0 comments on commit 95627f2

Please sign in to comment.