-
Notifications
You must be signed in to change notification settings - Fork 1
/
JAC_Utility.py
246 lines (211 loc) · 8.72 KB
/
JAC_Utility.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 15 23:55:02 2021
@author: wdu
"""
from HARK.distribution import DiscreteDistribution,combine_indep_dstns, Lognormal, MeanOneLogNormal, Uniform, calc_expectation, Uniform, Distribution
import numpy as np
class DiscreteDistribution2(Distribution):
"""
A representation of a discrete probability distribution.
Parameters
----------
pmf : np.array
An array of floats representing a probability mass function.
X : np.array or [np.array]
Discrete point values for each probability mass.
May be multivariate (list of arrays).
seed : int
Seed for random number generator.
"""
pmf = None
X = None
def __init__(self, pmf, X, seed=0):
self.pmf = pmf
self.X = X
# Set up the RNG
super().__init__(seed)
# Very quick and incomplete parameter check:
# TODO: Check that pmf and X arrays have same length.
def dim(self):
if isinstance(self.X, list):
return len(self.X)
else:
return 1
def draw_events(self, n):
"""
Draws N 'events' from the distribution PMF.
These events are indices into X.
"""
# Generate a cumulative distribution
base_draws = self.RNG.uniform(size=n)
cum_dist = np.cumsum(self.pmf)
# Convert the basic uniform draws into discrete draws
indices = cum_dist.searchsorted(base_draws)
return indices
def draw_perf(self, N, X=None, exact_match=True):
"""
Simulates N draws from a discrete distribution with probabilities P and outcomes X.
Parameters
----------
N : int
Number of draws to simulate.
X : None, int, or np.array
If None, then use this distribution's X for point values.
If an int, then the index of X for the point values.
If an np.array, use the array for the point values.
exact_match : boolean
Whether the draws should "exactly" match the discrete distribution (as
closely as possible given finite draws). When True, returned draws are
a random permutation of the N-length list that best fits the discrete
distribution. When False (default), each draw is independent from the
others and the result could deviate from the input.
Returns
-------
draws : np.array
An array of draws from the discrete distribution; each element is a value in X.
"""
if X is None:
X = self.X
J = self.dim()
elif isinstance(X, int):
X = self.X[X]
J = 1
else:
X = X
J = 1
if exact_match:
events = np.arange(self.pmf.size) # just a list of integers
cutoffs = np.round(np.cumsum(self.pmf) * N).astype(
int
) # cutoff points between discrete outcomes
top = 0
# Make a list of event indices that closely matches the discrete distribution
event_list = []
for j in range(events.size):
bot = top
top = cutoffs[j]
event_list += (top - bot) * [events[j]]
indices = self.RNG.permutation(event_list)
# Randomly permute the event indices
return indices
def draw(self, N, X=None, exact_match=True):
"""
Simulates N draws from a discrete distribution with probabilities P and outcomes X.
Parameters
----------
N : int
Number of draws to simulate.
X : None, int, or np.array
If None, then use this distribution's X for point values.
If an int, then the index of X for the point values.
If an np.array, use the array for the point values.
exact_match : boolean
Whether the draws should "exactly" match the discrete distribution (as
closely as possible given finite draws). When True, returned draws are
a random permutation of the N-length list that best fits the discrete
distribution. When False (default), each draw is independent from the
others and the result could deviate from the input.
Returns
-------
draws : np.array
An array of draws from the discrete distribution; each element is a value in X.
"""
if X is None:
X = self.X
J = self.dim()
elif isinstance(X, int):
X = self.X[X]
J = 1
else:
X = X
J = 1
if exact_match:
events = np.arange(self.pmf.size) # just a list of integers
cutoffs = np.round(np.cumsum(self.pmf) * N).astype(
int
) # cutoff points between discrete outcomes
top = 0
# Make a list of event indices that closely matches the discrete distribution
event_list = []
for j in range(events.size):
bot = top
top = cutoffs[j]
event_list += (top - bot) * [events[j]]
# Randomly permute the event indices
indices = self.RNG.permutation(event_list)
# Draw event indices randomly from the discrete distribution
else:
indices = self.draw_events(N)
# Create and fill in the output array of draws based on the output of event indices
if J > 1:
draws = np.zeros((J, N))
for j in range(J):
draws[j, :] = X[j][indices]
else:
draws = np.asarray(X)[indices]
return draws
def combine_indep_dstns2(*distributions, seed=0):
"""
Given n lists (or tuples) whose elements represent n independent, discrete
probability spaces (probabilities and values), construct a joint pmf over
all combinations of these independent points. Can take multivariate discrete
distributions as inputs.
Parameters
----------
distributions : [np.array]
Arbitrary number of distributions (pmfs). Each pmf is a list or tuple.
For each pmf, the first vector is probabilities and all subsequent vectors
are values. For each pmf, this should be true:
len(X_pmf[0]) == len(X_pmf[j]) for j in range(1,len(distributions))
Returns
-------
A DiscreteDistribution, consisting of:
P_out: np.array
Probability associated with each point in X_out.
X_out: np.array (as many as in *distributions)
Discrete points for the joint discrete probability mass function.
"""
# Get information on the distributions
dist_lengths = ()
dist_dims = ()
for dist in distributions:
dist_lengths += (len(dist.pmf),)
dist_dims += (dist.dim(),)
number_of_distributions = len(distributions)
# Initialize lists we will use
X_out = []
P_temp = []
# Now loop through the distributions, tiling and flattening as necessary.
for dd, dist in enumerate(distributions):
# The shape we want before we tile
dist_newshape = (
(1,) * dd + (len(dist.pmf),) + (1,) * (number_of_distributions - dd)
)
# The tiling we want to do
dist_tiles = dist_lengths[:dd] + (1,) + dist_lengths[dd + 1 :]
# Now we are ready to tile.
# We don't use the np.meshgrid commands, because they do not
# easily support non-symmetric grids.
# First deal with probabilities
Pmesh = np.tile(dist.pmf.reshape(dist_newshape), dist_tiles) # Tiling
flatP = Pmesh.ravel() # Flatten the tiled arrays
P_temp += [
flatP,
] # Add the flattened arrays to the output lists
# Then loop through each value variable
for n in range(dist_dims[dd]):
if dist.dim() > 1:
Xmesh = np.tile(dist.X[n].reshape(dist_newshape), dist_tiles)
else:
Xmesh = np.tile(dist.X.reshape(dist_newshape), dist_tiles)
flatX = Xmesh.ravel()
X_out += [
flatX,
]
# We're done getting the flattened X_out arrays we wanted.
# However, we have a bunch of flattened P_temp arrays, and just want one
# probability array. So get the probability array, P_out, here.
P_out = np.prod(np.array(P_temp), axis=0)
assert np.isclose(np.sum(P_out), 1), "Probabilities do not sum to 1!"
return DiscreteDistribution2(P_out, X_out, seed=seed)