Skip to content

Commit

Permalink
Merge pull request #80 from HDI-Project/68_modeler_parameter_not_used
Browse files Browse the repository at this point in the history
Issue 68: Add support for Vine Copulas as a Modeler, different distributions for Gaussian Copula.
  • Loading branch information
ManuelAlvarezC authored Feb 5, 2019
2 parents 8d074d8 + c811c0e commit 04b3a34
Show file tree
Hide file tree
Showing 5 changed files with 747 additions and 209 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,14 @@ The modeler can also be saved to a file using the `save()` method. This will sav
on the specified path.

```python
>>> modeler.save('models/demo_model.pkl')
>>> modeler.save('demo_model.pkl')
```

If you have stored a model in a previous session using the command above, you can load the model
using the `load()` method:

```python
>>> modeler = Modeler.load('models/demo_model.pkl')
>>> modeler = Modeler.load('demo_model.pkl')
```

### Using the Sampler
Expand Down
226 changes: 166 additions & 60 deletions sdv/modeler.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,65 @@
import logging
import pickle

import numpy as np
import pandas as pd
from copulas.multivariate import GaussianMultivariate
from copulas import get_qualified_name
from copulas.multivariate import GaussianMultivariate, TreeTypes
from copulas.univariate import GaussianUnivariate

# Configure logger
logger = logging.getLogger(__name__)

DEFAULT_MODEL = GaussianMultivariate
DEFAULT_DISTRIBUTION = GaussianUnivariate
IGNORED_DICT_KEYS = ['fitted', 'distribution', 'type']

MODELLING_ERROR_MESSAGE = (
'There was an error while trying to model the database. If you are using a custom'
'distribution or model, please try again using the default ones. If the problem persist,'
'please report it here: https://github.com/HDI-Project/SDV/issues'
)


class Modeler:
"""Class responsible for modeling database."""
"""Class responsible for modeling database.
Args:
data_navigator (DataNavigator): object for the dataset.
model (type): Class of model to use.
distribution (type): Class of distribution to use. Will be deprecated shortly.
model_kwargs (dict): Keyword arguments to pass to model.
"""

DEFAULT_PRIMARY_KEY = 'GENERATED_PRIMARY_KEY'

def __init__(self, data_navigator, model=DEFAULT_MODEL, distribution=DEFAULT_DISTRIBUTION):
def __init__(self, data_navigator, model=DEFAULT_MODEL, distribution=None, model_kwargs=None):
"""Instantiates a modeler object.
Args:
data_navigator (DataNavigator): object for the dataset.
transformed_data (dict): transformed tables {table_name:dataframe}.
model (type): Class of model to use.
distribution (type): Class of model to use.
"""
self.tables = {}
self.models = {}
self.child_locs = {} # maps table->{child: col #}
self.dn = data_navigator
self.model = model
self.distribution = distribution

if distribution and model != DEFAULT_MODEL:
raise ValueError(
'`distribution` argument is only suported for `GaussianMultivariate` model.')

if distribution:
distribution = get_qualified_name(distribution)
else:
distribution = get_qualified_name(DEFAULT_DISTRIBUTION)

if not model_kwargs:
if model == DEFAULT_MODEL:
model_kwargs = {'distribution': distribution}

else:
model_kwargs = {'vine_type': TreeTypes.REGULAR}

self.model_kwargs = model_kwargs

def save(self, file_name):
"""Saves model to file destination.
Expand Down Expand Up @@ -60,27 +88,81 @@ def get_pk_value(self, pk, index, mapping):

return val

def flatten_model(self, model):
@classmethod
def _flatten_array(cls, nested, prefix=''):
"""Return a dictionary with the values of the given nested array.
Args:
nested (list, np.array): Iterable to flatten.
prefix (str): Name to append to the array indices.
Returns:
dict
"""
result = {}
for index in range(len(nested)):
prefix_key = '__'.join([prefix, str(index)]) if len(prefix) else str(index)

if isinstance(nested[index], (list, np.ndarray)):
result.update(cls._flatten_array(nested[index], prefix=prefix_key))

else:
result[prefix_key] = nested[index]

return result

@classmethod
def _flatten_dict(cls, nested, prefix=''):
"""Return a flatten dict from a nested one.
This method returns a flatten version of a dictionary, concatenating key names with
double underscores, that is:
Args:
nested (dict): Original dictionary to flatten.
prefix (str): Prefix to append to key name
Returns:
dict: Flattened dictionary. That is, all its keys hold a primitive value.
"""
result = {}

for key in nested.keys():
prefix_key = '__'.join([prefix, str(key)]) if len(prefix) else key

if key in IGNORED_DICT_KEYS:
continue

elif isinstance(nested[key], dict):
result.update(cls._flatten_dict(nested[key], prefix_key))

elif isinstance(nested[key], (np.ndarray, list)):
result.update(cls._flatten_array(nested[key], prefix_key))

else:
result[prefix_key] = nested[key]

return result

@classmethod
def flatten_model(cls, model, name=''):
"""Flatten a model's parameters into an array.
Args:
model: a model object
model(self.model): Instance of model.
name (str): Prefix to the parameter name.
Returns:
pd.Series: parameters for model
"""
params = list(model.covariance.flatten())

for col_model in model.distribs.values():
params.extend([col_model.std, col_model.mean])

return pd.Series(params)
return pd.Series(cls._flatten_dict(model.to_dict(), name))

def get_foreign_key(self, fields, primary):
"""Get foreign key from primary key.
Args:
fields (dict): metadata's fields key for a given table.
fields (dict): metadata `fields` key for a given table.
primary (str): Name of primary key in original table.
Return:
Expand All @@ -98,7 +180,7 @@ def impute_table(table):
"""Fill in any NaN values in a table.
Args:
table(pandas.DataFrame):
table(pandas.DataFrame): Table to fill NaN values
Returns:
pandas.DataFrame
Expand All @@ -122,85 +204,105 @@ def fit_model(self, data):
data (pandas.DataFrame): Data to train the model with.
Returns:
GaussianMultivariate: Fitted model.
model: Instance of self.model fitted with data.
"""
model = self.model()
model = self.model(**self.model_kwargs)
model.fit(data)

return model

def _create_extension(self, df, transformed_child_table):
"""Return the flattened model from a dataframe."""
# remove column of foreign key
def _create_extension(self, foreign, transformed_child_table, table_info):
"""Return the flattened model from a dataframe.
Args:
foreign(pandas.DataFrame): Object with Index of elements from children table elements
of a given foreign_key.
transformed_child_table(pandas.DataFrame): Table of data to fil
table_info (tuple(str, str)): foreign_key and child table names.
Returns:
pd.Series : Parameter extension
"""

foreign_key, child_name = table_info
try:
conditional_data = transformed_child_table.loc[df.index]
conditional_data = transformed_child_table.loc[foreign.index].copy()
conditional_data = conditional_data.drop(foreign_key, axis=1)

except KeyError:
return None

clean_df = self.impute_table(conditional_data)
return self.flatten_model(self.fit_model(clean_df), child_name)

return self.flatten_model(self.fit_model(clean_df))
def _get_extensions(self, pk, children):
"""Generate list of extension for child tables.
def _extension_from_group(self, transformed_child_table):
"""Wrapper around _create_extension to use it with pd.DataFrame.apply."""
def f(group):
return self._create_extension(group, transformed_child_table)
return f
Args:
pk (str): Name of the primary_key column in the parent table.
children (set[str]): Names of the children.
def _get_extensions(self, pk, children, table_name):
"""Generate list of extension for child tables."""
# keep track of which columns belong to which child
start = 0
end = 0
extensions = []
Returns: list(pandas.DataFrame)
# make sure child_locs has value for table name
self.child_locs[table_name] = self.child_locs.get(table_name, {})
Each element of the list is generated for one single children.
That dataframe should have as index.name the `foreign_key` name, and as index
it's values.
The values for a given index is generated by flattening a model fit with the related
data to that index in the children table.
"""
extensions = []

# find children that ref primary key
for child in children:
child_table = self.dn.tables[child].data
child_meta = self.dn.tables[child].meta

fields = child_meta['fields']
fk = self.get_foreign_key(fields, pk)

if not fk:
continue

# check if leaf node
if not self.dn.get_children(child):
transformed_child_table = self.dn.transformed_data[child]

else:
transformed_child_table = self.tables[child]

fields = child_meta['fields']
fk = self.get_foreign_key(fields, pk)
table_info = (fk, '__' + child)

if not fk:
continue
foreign_key_values = child_table[fk].unique()
parameters = {}

extension = child_table.groupby(fk)
extension = extension.apply(self._extension_from_group(transformed_child_table))
for foreign_key in foreign_key_values:
foreign_index = child_table[child_table[fk] == foreign_key]
parameter = self._create_extension(
foreign_index, transformed_child_table, table_info)

if len(extension):
# keep track of child column indices
end = max(end, start + extension.shape[1])
if parameter is not None:
parameters[foreign_key] = parameter.to_dict()

self.child_locs[table_name][child] = (start, end)
extension = pd.DataFrame(parameters).T
extension.index.name = fk

# rename columns
extension.columns = range(start, end)
if len(extension):
extensions.append(extension)
start = end

return extensions

def CPA(self, table):
"""Run CPA algorithm on a table.
Conditional Parameter Aggregation. It will take the tab
Conditional Parameter Aggregation. It will take the table's children and generate
extensions (parameters from modelling the related children for each foreign key)
and merge them to the original `table`
Args:
table (string): name of table.
Returns:
None:
None
"""
logger.info('Modeling %s', table)
# Grab table
Expand All @@ -214,7 +316,7 @@ def CPA(self, table):

# start with transformed table
extended_table = self.dn.transformed_data[table]
extensions = self._get_extensions(pk, children, table)
extensions = self._get_extensions(pk, children)

# add extensions
for extension in extensions:
Expand All @@ -237,12 +339,16 @@ def RCPA(self, table):

def model_database(self):
"""Use RCPA and store model for database."""
for table in self.dn.tables:
if not self.dn.get_parents(table):
self.RCPA(table)
try:
for table in self.dn.tables:
if not self.dn.get_parents(table):
self.RCPA(table)

for table in self.tables:
clean_table = self.impute_table(self.tables[table])
self.models[table] = self.fit_model(clean_table)

for table in self.tables:
clean_table = self.impute_table(self.tables[table])
self.models[table] = self.fit_model(clean_table)
except (ValueError, np.linalg.linalg.LinAlgError):
ValueError(MODELLING_ERROR_MESSAGE)

logger.info('Modeling Complete')
Loading

0 comments on commit 04b3a34

Please sign in to comment.