Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Commit

Permalink
Combine models from transforms, predictors and pipelines in to one mo…
Browse files Browse the repository at this point in the history
…del. (#208)

* Initial test implementation of combining 2 or more models in to one.

* Added support to Pipeline.combine_models for combining other types of items
and transform only inputs.

* Combine Pipeline._evaluation_infer and _evaluation in to one method.
This fixes an issue where a classifier graph would not contain the
correct nodes after calling Pipeline._predict().

* Missing part of previous check-in.

* Fix the Pipeline.combine_models signature to work with Python 2.7.
  • Loading branch information
pieths authored and ganik committed Aug 4, 2019
1 parent 5306833 commit 1f97c9e
Show file tree
Hide file tree
Showing 5 changed files with 507 additions and 90 deletions.
1 change: 1 addition & 0 deletions src/python/nimbusml.pyproj
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@
<Compile Include="nimbusml\tests\feature_extraction\text\test_sentiment.py" />
<Compile Include="nimbusml\tests\idv\__init__.py" />
<Compile Include="nimbusml\tests\linear_model\test_linearsvmbinaryclassifier.py" />
<Compile Include="nimbusml\tests\pipeline\test_pipeline_combining.py" />
<Compile Include="nimbusml\tests\pipeline\test_pipeline_subclassing.py" />
<Compile Include="nimbusml\tests\preprocessing\normalization\test_meanvariancescaler.py" />
<Compile Include="nimbusml\tests\timeseries\test_iidchangepointdetector.py" />
Expand Down
11 changes: 11 additions & 0 deletions src/python/nimbusml/base_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

__all__ = ["BaseTransform"]

import os

from sklearn.base import BaseEstimator

from . import Pipeline
Expand Down Expand Up @@ -71,6 +73,15 @@ def fit(self, X, y=None, **params):
set_shape(self, X)
return self

@property
def _is_fitted(self):
"""
Tells if the transform was trained.
"""
return (hasattr(self, 'model_') and
self.model_ and
os.path.isfile(self.model_))

@trace
def transform(self, X, as_binary_data_stream=False, **params):
"""
Expand Down
2 changes: 1 addition & 1 deletion src/python/nimbusml/internal/utils/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def remove_multi_level_index(c):
self.inputs['input_data'] = X._filename
elif 'data' in self.inputs:
self.inputs['data'] = X._filename
elif not summary:
elif not (summary or params.get('no_input_data')):
raise RuntimeError(
"data should be a dataframe, FileDataStream or DataView")

Expand Down
255 changes: 166 additions & 89 deletions src/python/nimbusml/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,31 +1442,63 @@ def _fix_ranking_metrics_schema(self, out_metrics):
'DCG@1', 'DCG@2', 'DCG@3', ]
return out_metrics

@trace
def _evaluation(self, evaltype, group_id, **params):
def _evaluation_infer(self, evaltype, label_column, group_id,
**params):
all_nodes = []
if not self.steps:
if evaltype == 'auto':
raise ValueError(
"need to specify 'evaltype' explicitly if model is "
"loaded")
common_eval_args = OrderedDict(data="$scoredVectorData",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column)
params.update(common_eval_args)

if evaltype == 'binary':
all_nodes.extend([
models_binaryclassificationevaluator(**params)
])
elif evaltype == 'multiclass':
all_nodes.extend([
models_classificationevaluator(**params)
])
elif evaltype == 'regression':
all_nodes.extend([
models_regressionevaluator(**params)
])
elif evaltype == 'cluster':
all_nodes.extend([
models_clusterevaluator(**params)
])
elif evaltype == 'anomaly':
all_nodes.extend([
models_anomalydetectionevaluator(**params)
])
elif evaltype == 'ranking':
type_ = self._last_node_type() if evaltype == 'auto' else evaltype

if type_ == 'binary':
all_nodes.extend(
[models_binaryclassificationevaluator(**params)])

elif type_ == 'multiclass':
all_nodes.extend(
[models_classificationevaluator(**params)])

elif type_ in ['regressor', 'regression']:
all_nodes.extend([models_regressionevaluator(**params)])

elif type_ in ['clusterer', 'cluster']:
label_node = transforms_labelcolumnkeybooleanconverter(
data="$scoredVectorData", label_column=label_column,
output_data="$label_data")
clustering_eval_args = OrderedDict(
data="$label_data",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column)
params.update(clustering_eval_args)
all_nodes.extend([label_node,
models_clusterevaluator(**params)
])

elif type_ == 'anomaly':
label_node = transforms_labelcolumnkeybooleanconverter(
data="$scoredVectorData", label_column=label_column,
output_data="$label_data")
anom_eval_args = OrderedDict(
data="$label_data",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column
)
params.update(anom_eval_args)
all_nodes.extend(
[label_node,
models_anomalydetectionevaluator(**params)])

elif type_ == 'ranking':
svd = "$scoredVectorData"
column = [OrderedDict(Source=group_id, Name=group_id)]
algo_args = dict(data=svd, output_data=svd, column=column)
Expand All @@ -1477,77 +1509,14 @@ def _evaluation(self, evaltype, group_id, **params):
key_node,
evaluate_node
])

else:
raise ValueError(
"%s is not a valid type for evaluation." %
evaltype)

return all_nodes

def _evaluation_infer(self, evaltype, label_column, group_id,
**params):
all_nodes = []
if len(self.steps) == 0:
if evaltype == 'auto':
raise ValueError(
"need to specify 'evaltype' explicitly if model is "
"loaded")
common_eval_args = OrderedDict(data="$scoredVectorData",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column)
params.update(common_eval_args)
if evaltype == 'auto':
last_node_type = self._last_node_type()
if last_node_type == 'binary':
all_nodes.extend(
[models_binaryclassificationevaluator(**params)])

elif last_node_type == 'multiclass':
all_nodes.extend(
[models_classificationevaluator(**params)])

elif last_node_type == 'regressor':
all_nodes.extend([models_regressionevaluator(**params)])

elif last_node_type == 'clusterer':
label_node = transforms_labelcolumnkeybooleanconverter(
data="$scoredVectorData", label_column=label_column,
output_data="$label_data")
clustering_eval_args = OrderedDict(
data="$label_data",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column)
params.update(clustering_eval_args)
all_nodes.extend([label_node,
models_clusterevaluator(**params)
])

elif last_node_type == 'anomaly':
label_node = transforms_labelcolumnkeybooleanconverter(
data="$scoredVectorData", label_column=label_column,
output_data="$label_data")
anom_eval_args = OrderedDict(
data="$label_data",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column
)
params.update(anom_eval_args)
all_nodes.extend(
[label_node,
models_anomalydetectionevaluator(**params)])

else:
raise ValueError(
"evaltype is %s. Last node type is %s" %
evaltype, last_node_type)
else:
return self._evaluation(evaltype, group_id, **params)

return all_nodes

def _last_node_type(self):
last_node = self.last_node

Expand Down Expand Up @@ -1864,8 +1833,12 @@ def _predict(self, X, y=None,
scored_data="$scoredVectorData")
all_nodes.extend([score_node])

if hasattr(self, 'steps') and len(self.steps) > 0 \
and self.last_node.type == 'classifier':
if (evaltype in ['binary', 'multiclass']) or \
(hasattr(self, 'steps')
and self.steps is not None
and len(self.steps) > 0
and self.last_node.type == 'classifier'):

select_node = transforms_scorecolumnselector(
data="$scoredVectorData",
output_data="$scoreColumnsOnlyData", score_column="Score")
Expand Down Expand Up @@ -2476,3 +2449,107 @@ def score(
else:
raise ValueError(
"cannot generate score for {0}).".format(task_type))


@classmethod
def combine_models(cls, *items, **params):
"""
Combine the models of multiple pipelines, transforms
and/or predictors in to a single model. The models are
combined in the order they are seen.
:param items: the fitted pipelines, transforms and/or
predictors which contain the models to join.
:param contains_predictor: Set to `True` if the
last item contains or is a predictor. Set to
`False` if `items` only contains transforms.
The default is True.
:return: A new Pipeline which is backed by a model that
is the combination of all the models passed in
through `items`.
"""
if len(items) == 0:
raise RuntimeError(
'At least one transform, predictor'
'or pipeline must be specified.')

for item in items:
if not item._is_fitted:
raise RuntimeError(
'Item must be fitted before'
'models can be combined.')

contains_predictor = params.get('contains_predictor', True)
verbose = params.get('verbose', 0)

get_model = lambda x: x.model if hasattr(x, 'model') else x.model_

if len(items) == 1:
return Pipeline(model=get_model(items[0]))

start_time = time.time()

nodes = []
inputs = {}
transform_models = []

for index, item in enumerate(items[:-1], start=1):
var_name = 'transform_model' + str(index)
inputs[var_name] = get_model(item)
transform_models.append("$" + var_name)

if contains_predictor:
inputs['predictor_model'] = get_model(items[-1])

combine_models_node = transforms_manyheterogeneousmodelcombiner(
transform_models=transform_models,
predictor_model='$predictor_model',
model='$output_model')
nodes.append(combine_models_node)

else:
var_name = 'transform_model' + str(len(items))
inputs[var_name] = get_model(items[-1])
transform_models.append("$" + var_name)

combine_models_node = transforms_modelcombiner(
models=transform_models,
output_model='$output_model')
nodes.append(combine_models_node)

outputs = dict(output_model="")

graph = Graph(
inputs,
outputs,
False,
*nodes)

class_name = cls.__name__
method_name = inspect.currentframe().f_code.co_name
telemetry_info = ".".join([class_name, method_name])

try:
(out_model, _, _) = graph.run(
X=None,
y=None,
random_state=None,
model=None,
verbose=verbose,
is_summary=False,
telemetry_info=telemetry_info,
no_input_data=True,
**params)
except RuntimeError as e:
raise e

pipeline = Pipeline(model=out_model)

# stop the clock
pipeline._run_time = time.time() - start_time
pipeline._write_csv_time = graph._write_csv_time

return pipeline

Loading

0 comments on commit 1f97c9e

Please sign in to comment.