Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Combine models from transforms, predictors and pipelines in to one model. #208

Merged
merged 5 commits into from
Aug 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/python/nimbusml.pyproj
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@
<Compile Include="nimbusml\tests\feature_extraction\text\test_sentiment.py" />
<Compile Include="nimbusml\tests\idv\__init__.py" />
<Compile Include="nimbusml\tests\linear_model\test_linearsvmbinaryclassifier.py" />
<Compile Include="nimbusml\tests\pipeline\test_pipeline_combining.py" />
<Compile Include="nimbusml\tests\pipeline\test_pipeline_subclassing.py" />
<Compile Include="nimbusml\tests\preprocessing\normalization\test_meanvariancescaler.py" />
<Compile Include="nimbusml\tests\timeseries\test_iidchangepointdetector.py" />
Expand Down
11 changes: 11 additions & 0 deletions src/python/nimbusml/base_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

__all__ = ["BaseTransform"]

import os

from sklearn.base import BaseEstimator

from . import Pipeline
Expand Down Expand Up @@ -71,6 +73,15 @@ def fit(self, X, y=None, **params):
set_shape(self, X)
return self

@property
def _is_fitted(self):
"""
Tells if the transform was trained.
"""
return (hasattr(self, 'model_') and
self.model_ and
os.path.isfile(self.model_))

@trace
def transform(self, X, as_binary_data_stream=False, **params):
"""
Expand Down
2 changes: 1 addition & 1 deletion src/python/nimbusml/internal/utils/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def remove_multi_level_index(c):
self.inputs['input_data'] = X._filename
elif 'data' in self.inputs:
self.inputs['data'] = X._filename
elif not summary:
elif not (summary or params.get('no_input_data')):
raise RuntimeError(
"data should be a dataframe, FileDataStream or DataView")

Expand Down
255 changes: 166 additions & 89 deletions src/python/nimbusml/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,31 +1442,63 @@ def _fix_ranking_metrics_schema(self, out_metrics):
'DCG@1', 'DCG@2', 'DCG@3', ]
return out_metrics

@trace
def _evaluation(self, evaltype, group_id, **params):
def _evaluation_infer(self, evaltype, label_column, group_id,
**params):
all_nodes = []
if not self.steps:
if evaltype == 'auto':
raise ValueError(
"need to specify 'evaltype' explicitly if model is "
"loaded")
common_eval_args = OrderedDict(data="$scoredVectorData",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column)
params.update(common_eval_args)

if evaltype == 'binary':
all_nodes.extend([
models_binaryclassificationevaluator(**params)
])
elif evaltype == 'multiclass':
all_nodes.extend([
models_classificationevaluator(**params)
])
elif evaltype == 'regression':
all_nodes.extend([
models_regressionevaluator(**params)
])
elif evaltype == 'cluster':
all_nodes.extend([
models_clusterevaluator(**params)
])
elif evaltype == 'anomaly':
all_nodes.extend([
models_anomalydetectionevaluator(**params)
])
elif evaltype == 'ranking':
type_ = self._last_node_type() if evaltype == 'auto' else evaltype

if type_ == 'binary':
all_nodes.extend(
[models_binaryclassificationevaluator(**params)])

elif type_ == 'multiclass':
all_nodes.extend(
[models_classificationevaluator(**params)])

elif type_ in ['regressor', 'regression']:
all_nodes.extend([models_regressionevaluator(**params)])

elif type_ in ['clusterer', 'cluster']:
label_node = transforms_labelcolumnkeybooleanconverter(
data="$scoredVectorData", label_column=label_column,
output_data="$label_data")
clustering_eval_args = OrderedDict(
data="$label_data",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column)
params.update(clustering_eval_args)
all_nodes.extend([label_node,
models_clusterevaluator(**params)
])

elif type_ == 'anomaly':
label_node = transforms_labelcolumnkeybooleanconverter(
data="$scoredVectorData", label_column=label_column,
output_data="$label_data")
anom_eval_args = OrderedDict(
data="$label_data",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column
)
params.update(anom_eval_args)
all_nodes.extend(
[label_node,
models_anomalydetectionevaluator(**params)])

elif type_ == 'ranking':
svd = "$scoredVectorData"
column = [OrderedDict(Source=group_id, Name=group_id)]
algo_args = dict(data=svd, output_data=svd, column=column)
Expand All @@ -1477,77 +1509,14 @@ def _evaluation(self, evaltype, group_id, **params):
key_node,
evaluate_node
])

else:
raise ValueError(
"%s is not a valid type for evaluation." %
evaltype)

return all_nodes

def _evaluation_infer(self, evaltype, label_column, group_id,
**params):
all_nodes = []
if len(self.steps) == 0:
if evaltype == 'auto':
raise ValueError(
"need to specify 'evaltype' explicitly if model is "
"loaded")
common_eval_args = OrderedDict(data="$scoredVectorData",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column)
params.update(common_eval_args)
if evaltype == 'auto':
last_node_type = self._last_node_type()
if last_node_type == 'binary':
all_nodes.extend(
[models_binaryclassificationevaluator(**params)])

elif last_node_type == 'multiclass':
all_nodes.extend(
[models_classificationevaluator(**params)])

elif last_node_type == 'regressor':
all_nodes.extend([models_regressionevaluator(**params)])

elif last_node_type == 'clusterer':
label_node = transforms_labelcolumnkeybooleanconverter(
data="$scoredVectorData", label_column=label_column,
output_data="$label_data")
clustering_eval_args = OrderedDict(
data="$label_data",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column)
params.update(clustering_eval_args)
all_nodes.extend([label_node,
models_clusterevaluator(**params)
])

elif last_node_type == 'anomaly':
label_node = transforms_labelcolumnkeybooleanconverter(
data="$scoredVectorData", label_column=label_column,
output_data="$label_data")
anom_eval_args = OrderedDict(
data="$label_data",
overall_metrics="$output_metrics",
score_column="Score",
label_column=label_column
)
params.update(anom_eval_args)
all_nodes.extend(
[label_node,
models_anomalydetectionevaluator(**params)])

else:
raise ValueError(
"evaltype is %s. Last node type is %s" %
evaltype, last_node_type)
else:
return self._evaluation(evaltype, group_id, **params)

return all_nodes

def _last_node_type(self):
last_node = self.last_node

Expand Down Expand Up @@ -1864,8 +1833,12 @@ def _predict(self, X, y=None,
scored_data="$scoredVectorData")
all_nodes.extend([score_node])

if hasattr(self, 'steps') and len(self.steps) > 0 \
and self.last_node.type == 'classifier':
if (evaltype in ['binary', 'multiclass']) or \
(hasattr(self, 'steps')
and self.steps is not None
and len(self.steps) > 0
and self.last_node.type == 'classifier'):

select_node = transforms_scorecolumnselector(
data="$scoredVectorData",
output_data="$scoreColumnsOnlyData", score_column="Score")
Expand Down Expand Up @@ -2476,3 +2449,107 @@ def score(
else:
raise ValueError(
"cannot generate score for {0}).".format(task_type))


@classmethod
def combine_models(cls, *items, **params):
"""
Combine the models of multiple pipelines, transforms
and/or predictors in to a single model. The models are
combined in the order they are seen.

:param items: the fitted pipelines, transforms and/or
predictors which contain the models to join.

:param contains_predictor: Set to `True` if the
last item contains or is a predictor. Set to
`False` if `items` only contains transforms.
The default is True.

:return: A new Pipeline which is backed by a model that
is the combination of all the models passed in
through `items`.
"""
if len(items) == 0:
raise RuntimeError(
'At least one transform, predictor'
'or pipeline must be specified.')

for item in items:
if not item._is_fitted:
raise RuntimeError(
'Item must be fitted before'
'models can be combined.')

contains_predictor = params.get('contains_predictor', True)
verbose = params.get('verbose', 0)

get_model = lambda x: x.model if hasattr(x, 'model') else x.model_

if len(items) == 1:
return Pipeline(model=get_model(items[0]))

start_time = time.time()

nodes = []
inputs = {}
transform_models = []

for index, item in enumerate(items[:-1], start=1):
var_name = 'transform_model' + str(index)
inputs[var_name] = get_model(item)
transform_models.append("$" + var_name)

if contains_predictor:
inputs['predictor_model'] = get_model(items[-1])

combine_models_node = transforms_manyheterogeneousmodelcombiner(
transform_models=transform_models,
predictor_model='$predictor_model',
model='$output_model')
nodes.append(combine_models_node)

else:
var_name = 'transform_model' + str(len(items))
inputs[var_name] = get_model(items[-1])
transform_models.append("$" + var_name)

combine_models_node = transforms_modelcombiner(
models=transform_models,
output_model='$output_model')
nodes.append(combine_models_node)

outputs = dict(output_model="")

graph = Graph(
inputs,
outputs,
False,
*nodes)

class_name = cls.__name__
method_name = inspect.currentframe().f_code.co_name
telemetry_info = ".".join([class_name, method_name])

try:
(out_model, _, _) = graph.run(
X=None,
y=None,
random_state=None,
model=None,
verbose=verbose,
is_summary=False,
telemetry_info=telemetry_info,
no_input_data=True,
**params)
except RuntimeError as e:
raise e

pipeline = Pipeline(model=out_model)

# stop the clock
pipeline._run_time = time.time() - start_time
pipeline._write_csv_time = graph._write_csv_time

return pipeline

Loading