-
Notifications
You must be signed in to change notification settings - Fork 108
/
gaussian.py
312 lines (246 loc) · 10.9 KB
/
gaussian.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""GaussianMultivariate module."""
import logging
import sys
import numpy as np
import pandas as pd
from scipy import stats
from copulas import (
EPSILON, check_valid_values, get_instance, get_qualified_name, random_state, store_args,
validate_random_state)
from copulas.multivariate.base import Multivariate
from copulas.univariate import GaussianUnivariate, Univariate
LOGGER = logging.getLogger(__name__)
DEFAULT_DISTRIBUTION = Univariate
class GaussianMultivariate(Multivariate):
"""Class for a multivariate distribution that uses the Gaussian copula.
Args:
distribution (str or dict):
Fully qualified name of the class to be used for modeling the marginal
distributions or a dictionary mapping column names to the fully qualified
distribution names.
"""
correlation = None
columns = None
univariates = None
@store_args
def __init__(self, distribution=DEFAULT_DISTRIBUTION, random_state=None):
self.random_state = validate_random_state(random_state)
self.distribution = distribution
def __repr__(self):
"""Produce printable representation of the object."""
if self.distribution == DEFAULT_DISTRIBUTION:
distribution = ''
elif isinstance(self.distribution, type):
distribution = f'distribution="{self.distribution.__name__}"'
else:
distribution = f'distribution="{self.distribution}"'
return f'GaussianMultivariate({distribution})'
def _transform_to_normal(self, X):
if isinstance(X, pd.Series):
X = X.to_frame().T
elif not isinstance(X, pd.DataFrame):
if len(X.shape) == 1:
X = [X]
X = pd.DataFrame(X, columns=self.columns)
U = []
for column_name, univariate in zip(self.columns, self.univariates):
if column_name in X:
column = X[column_name]
U.append(univariate.cdf(column.to_numpy()).clip(EPSILON, 1 - EPSILON))
return stats.norm.ppf(np.column_stack(U))
def _get_correlation(self, X):
"""Compute correlation matrix with transformed data.
Args:
X (numpy.ndarray):
Data for which the correlation needs to be computed.
Returns:
numpy.ndarray:
computed correlation matrix.
"""
result = self._transform_to_normal(X)
correlation = pd.DataFrame(data=result).corr().to_numpy()
correlation = np.nan_to_num(correlation, nan=0.0)
# If singular, add some noise to the diagonal
if np.linalg.cond(correlation) > 1.0 / sys.float_info.epsilon:
correlation = correlation + np.identity(correlation.shape[0]) * EPSILON
return pd.DataFrame(correlation, index=self.columns, columns=self.columns)
@check_valid_values
def fit(self, X):
"""Compute the distribution for each variable and then its correlation matrix.
Arguments:
X (pandas.DataFrame):
Values of the random variables.
"""
LOGGER.info('Fitting %s', self)
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
columns = []
univariates = []
for column_name, column in X.items():
if isinstance(self.distribution, dict):
distribution = self.distribution.get(column_name, DEFAULT_DISTRIBUTION)
else:
distribution = self.distribution
LOGGER.debug('Fitting column %s to %s', column_name, distribution)
univariate = get_instance(distribution)
try:
univariate.fit(column)
except BaseException:
log_message = (
f'Unable to fit to a {distribution} distribution for column {column_name}. '
'Using a Gaussian distribution instead.'
)
LOGGER.info(log_message)
univariate = GaussianUnivariate()
univariate.fit(column)
columns.append(column_name)
univariates.append(univariate)
self.columns = columns
self.univariates = univariates
LOGGER.debug('Computing correlation')
self.correlation = self._get_correlation(X)
self.fitted = True
LOGGER.debug('GaussianMultivariate fitted successfully')
def probability_density(self, X):
"""Compute the probability density for each point in X.
Arguments:
X (pandas.DataFrame):
Values for which the probability density will be computed.
Returns:
numpy.ndarray:
Probability density values for points in X.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
transformed = self._transform_to_normal(X)
return stats.multivariate_normal.pdf(transformed, cov=self.correlation)
def cumulative_distribution(self, X):
"""Compute the cumulative distribution value for each point in X.
Arguments:
X (pandas.DataFrame):
Values for which the cumulative distribution will be computed.
Returns:
numpy.ndarray:
Cumulative distribution values for points in X.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
transformed = self._transform_to_normal(X)
return stats.multivariate_normal.cdf(transformed, cov=self.correlation)
def _get_conditional_distribution(self, conditions):
"""Compute the parameters of a conditional multivariate normal distribution.
The parameters of the conditioned distribution are computed as specified here:
https://en.wikipedia.org/wiki/Multivariate_normal_distribution#Conditional_distributions
Args:
conditions (pandas.Series):
Mapping of the column names and column values to condition on.
The input values have already been transformed to their normal distribution.
Returns:
tuple:
* means (numpy.array):
mean values to use for the conditioned multivariate normal.
* covariance (numpy.array):
covariance matrix to use for the conditioned
multivariate normal.
* columns (list):
names of the columns that will be sampled conditionally.
"""
columns2 = conditions.index
columns1 = self.correlation.columns.difference(columns2)
sigma11 = self.correlation.loc[columns1, columns1].to_numpy()
sigma12 = self.correlation.loc[columns1, columns2].to_numpy()
sigma21 = self.correlation.loc[columns2, columns1].to_numpy()
sigma22 = self.correlation.loc[columns2, columns2].to_numpy()
mu1 = np.zeros(len(columns1))
mu2 = np.zeros(len(columns2))
sigma12sigma22inv = sigma12 @ np.linalg.inv(sigma22)
mu_bar = mu1 + sigma12sigma22inv @ (conditions - mu2)
sigma_bar = sigma11 - sigma12sigma22inv @ sigma21
return mu_bar, sigma_bar, columns1
def _get_normal_samples(self, num_rows, conditions):
"""Get random rows in the standard normal space.
If no conditions are given, the values are sampled from a standard normal
multivariate.
If conditions are given, they are transformed to their equivalent standard
normal values using their marginals and then the values are sampled from
a standard normal multivariate conditioned on the given condition values.
"""
if conditions is None:
covariance = self.correlation
columns = self.columns
means = np.zeros(len(columns))
else:
conditions = pd.Series(conditions)
normal_conditions = self._transform_to_normal(conditions)[0]
normal_conditions = pd.Series(normal_conditions, index=conditions.index)
means, covariance, columns = self._get_conditional_distribution(normal_conditions)
samples = np.random.multivariate_normal(means, covariance, size=num_rows)
return pd.DataFrame(samples, columns=columns)
@random_state
def sample(self, num_rows=1, conditions=None):
"""Sample values from this model.
Argument:
num_rows (int):
Number of rows to sample.
conditions (dict or pd.Series):
Mapping of the column names and column values to condition on.
Returns:
numpy.ndarray:
Array of shape (n_samples, *) with values randomly
sampled from this model distribution. If conditions have been
given, the output array also contains the corresponding columns
populated with the given values.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
samples = self._get_normal_samples(num_rows, conditions)
output = {}
for column_name, univariate in zip(self.columns, self.univariates):
if conditions and column_name in conditions:
# Use the values that were given as conditions in the original space.
output[column_name] = np.full(num_rows, conditions[column_name])
else:
cdf = stats.norm.cdf(samples[column_name])
output[column_name] = univariate.percent_point(cdf)
return pd.DataFrame(data=output)
def to_dict(self):
"""Return a `dict` with the parameters to replicate this object.
Returns:
dict:
Parameters of this distribution.
"""
self.check_fit()
univariates = [univariate.to_dict() for univariate in self.univariates]
return {
'correlation': self.correlation.to_numpy().tolist(),
'univariates': univariates,
'columns': self.columns,
'type': get_qualified_name(self),
}
@classmethod
def from_dict(cls, copula_dict):
"""Create a new instance from a parameters dictionary.
Args:
params (dict):
Parameters of the distribution, in the same format as the one
returned by the ``to_dict`` method.
Returns:
Multivariate:
Instance of the distribution defined on the parameters.
"""
instance = cls()
instance.univariates = []
columns = copula_dict['columns']
instance.columns = columns
for parameters in copula_dict['univariates']:
instance.univariates.append(Univariate.from_dict(parameters))
correlation = copula_dict['correlation']
instance.correlation = pd.DataFrame(correlation, index=columns, columns=columns)
instance.fitted = True
return instance