Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Change predict, transform, predict_proba to infer metadata by default for ParallelPostFit #862

Merged
merged 21 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
11f72af
working with dask_cudf dataframe
VibhuJawa Oct 15, 2021
fa752e5
Merge branch 'main' of https://github.com/dask/dask-ml into dask_cudf…
VibhuJawa Oct 19, 2021
9c26f37
cleaned up code
VibhuJawa Oct 19, 2021
9fb42ee
added meta fallback and pytests
VibhuJawa Oct 25, 2021
f29dd19
fixed flake8 changes
VibhuJawa Oct 25, 2021
fd83064
Trigger Build
VibhuJawa Oct 26, 2021
c8197c2
Merge branch 'dask:main' into dask_cudf_fix
VibhuJawa Nov 1, 2021
f6316a9
Added futurewarning to ParallelPostFit.predict for empty meta
VibhuJawa Nov 4, 2021
3d6b38f
Trigger Build
VibhuJawa Nov 4, 2021
6c29844
Ignore warning raised in ParallelPostFit.predict due to meta=None
VibhuJawa Nov 4, 2021
749f690
Trigger Build
VibhuJawa Nov 4, 2021
094c31a
Added meta as a class attribute and added relevant doc strings, tests…
VibhuJawa Nov 8, 2021
4e46d8c
addessed review
VibhuJawa Nov 9, 2021
18c023a
renamed some variables
VibhuJawa Nov 9, 2021
5887dcb
Merge branch 'dask:main' into dask_cudf_fix
VibhuJawa Nov 11, 2021
e16d475
Updated implimentation to add separate metas for predict, predict_pro…
VibhuJawa Nov 16, 2021
433cd08
fix docstring in Incremental
VibhuJawa Nov 16, 2021
6ac4602
Merge branch 'dask_cudf_fix' of https://github.com/VibhuJawa/dask-ml …
VibhuJawa Nov 16, 2021
6cb28a6
remove minor typo in hyper-parameter-search.rst
VibhuJawa Nov 16, 2021
05091cb
removed comma from tests/test_incremental.py
VibhuJawa Nov 16, 2021
25bbfca
Added meta tests for predict, predict_proba, transform
VibhuJawa Nov 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dask_ml/model_selection/_hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ class HyperbandSearchCV(BaseIncrementalSearchCV):
prefix : str, optional, default=""
While logging, add ``prefix`` to each message.

meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the ``predict``, ``transform`` calls.
This meta is necessary for some ``predict``, ``transform`` calls
for some estimators to work with ``dask.dataframe`` and ``dask.array`` .


Examples
--------
>>> import numpy as np
Expand Down Expand Up @@ -340,6 +347,7 @@ def __init__(
scoring=None,
verbose=False,
prefix="",
meta=None,
):
self.aggressiveness = aggressiveness

Expand All @@ -354,6 +362,7 @@ def __init__(
scoring=scoring,
verbose=verbose,
prefix=prefix,
meta=meta,
)

def _get_SHAs(self, brackets):
Expand Down
17 changes: 14 additions & 3 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _score(


def _create_model(model: Model, ident: Int, **params: Params) -> Tuple[Model, Meta]:
""" Create a model by cloning and then setting params """
"""Create a model by cloning and then setting params"""
with log_errors():
model = clone(model).set_params(**params)
return model, {"model_id": ident, "params": params, "partial_fit_calls": 0}
Expand Down Expand Up @@ -515,6 +515,7 @@ def __init__(
tol=1e-3,
verbose=False,
prefix="",
meta=None,
):
self.parameters = parameters
self.test_size = test_size
Expand All @@ -524,7 +525,9 @@ def __init__(
self.tol = tol
self.verbose = verbose
self.prefix = prefix
super(BaseIncrementalSearchCV, self).__init__(estimator, scoring=scoring)
super(BaseIncrementalSearchCV, self).__init__(
estimator, scoring=scoring, meta=meta
)

async def _validate_parameters(self, X, y):
if (self.max_iter is not None) and self.max_iter < 1:
Expand Down Expand Up @@ -846,6 +849,12 @@ class IncrementalSearchCV(BaseIncrementalSearchCV):
prefix : str, optional, default=""
While logging, add ``prefix`` to each message.

meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the ``predict``, ``transform`` calls.
This meta is necessary for some ``predict``, ``transform`` calls
for some estimators to work with ``dask.dataframe`` and ``dask.array`` .

Attributes
----------
cv_results_ : dict of np.ndarrays
Expand Down Expand Up @@ -977,6 +986,7 @@ def __init__(
verbose=False,
prefix="",
scores_per_fit=None,
meta=None,
):

self.n_initial_parameters = n_initial_parameters
Expand All @@ -995,6 +1005,7 @@ def __init__(
tol=tol,
verbose=verbose,
prefix=prefix,
meta=meta,
)

def _decay_deprecated(self):
Expand Down Expand Up @@ -1338,7 +1349,7 @@ def _adapt(self, info):
start = self.n_initial_parameters

def inverse(time):
""" Decrease target number of models inversely with time """
"""Decrease target number of models inversely with time"""
return int(start / (1 + time) ** self.decay_rate)

example = toolz.first(info.values())
Expand Down
55 changes: 46 additions & 9 deletions dask_ml/wrappers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Meta-estimators for parallelizing estimators using the scikit-learn API."""
import logging
from warnings import warn

import dask.array as da
import dask.dataframe as dd
Expand Down Expand Up @@ -46,6 +47,12 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi
a single NumPy array, which may exhaust the memory of your worker.
You probably want to always specify `scoring`.

meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the ``predict``, ``transform`` calls.
This meta is necessary for some ``predict``, ``transform`` calls
for some estimators to work with ``dask.dataframe`` and ``dask.array`` .

Notes
-----

Expand Down Expand Up @@ -115,9 +122,10 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi
[0.99407016, 0.00592984]])
"""

def __init__(self, estimator=None, scoring=None):
def __init__(self, estimator=None, scoring=None, meta=None):
self.estimator = estimator
self.scoring = scoring
self.meta = meta

def _check_array(self, X):
"""Validate an array for post-fit tasks.
Expand Down Expand Up @@ -202,13 +210,24 @@ def transform(self, X):
"""
self._check_method("transform")
X = self._check_array(X)
meta = self.meta
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(X, da.Array):
xx = np.zeros((1, X.shape[1]), dtype=X.dtype)
dt = _transform(xx, self._postfit_estimator).dtype
return X.map_blocks(_transform, estimator=self._postfit_estimator, dtype=dt)
if meta is None:
xx = np.zeros((1, X.shape[1]), dtype=X.dtype)
x_t = _transform(xx, self._postfit_estimator)
meta = type(x_t)((), dtype=x_t.dtype)
return X.map_blocks(
_transform, estimator=self._postfit_estimator, meta=meta
)
elif isinstance(X, dd._Frame):
return X.map_partitions(_transform, estimator=self._postfit_estimator)
if meta is None:
# dask-dataframe relies on dd.core.no_default
# for infering meta
meta = dd.core.no_default
return X.map_partitions(
_transform, estimator=self._postfit_estimator, meta=meta
)
else:
return _transform(X, estimator=self._postfit_estimator)

Expand Down Expand Up @@ -271,18 +290,27 @@ def predict(self, X):
"""
self._check_method("predict")
X = self._check_array(X)
meta = self.meta

if meta is None:
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved
warn(
"No meta provided to `ParallelPostFit.predict`. "
"Defaulting meta to np.empty(1, dtype=np.int64).\n"
"Will default to None(infer) in the future release",
FutureWarning,
)
meta = np.empty(1, dtype=np.int64)

if isinstance(X, da.Array):
result = X.map_blocks(
_predict, dtype="int", estimator=self._postfit_estimator, drop_axis=1
_predict, estimator=self._postfit_estimator, drop_axis=1, meta=meta
)
return result

elif isinstance(X, dd._Frame):
return X.map_partitions(
_predict, estimator=self._postfit_estimator, meta=np.array([1])
_predict, estimator=self._postfit_estimator, meta=meta
)

else:
return _predict(X, estimator=self._postfit_estimator)

Expand Down Expand Up @@ -424,6 +452,12 @@ class Incremental(ParallelPostFit):
of the Dask arrays (default), or to fit in sequential order. This does
not control shuffle between blocks or shuffling each block.

meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the ``predict``, ``transform`` calls.
This meta is necessary for some ``predict``, ``transform`` calls
for some estimators to work with ``dask.dataframe`` and ``dask.array`` .

Attributes
----------
estimator_ : Estimator
Expand Down Expand Up @@ -460,11 +494,14 @@ def __init__(
shuffle_blocks=True,
random_state=None,
assume_equal_chunks=True,
meta=None,
):
self.shuffle_blocks = shuffle_blocks
self.random_state = random_state
self.assume_equal_chunks = assume_equal_chunks
super(Incremental, self).__init__(estimator=estimator, scoring=scoring)
super(Incremental, self).__init__(
estimator=estimator, scoring=scoring, meta=meta
)

@property
def _postfit_estimator(self):
Expand Down
2 changes: 1 addition & 1 deletion docs/source/hyper-parameter-search.rst
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ to use post-estimation features like scoring or prediction, we recommend using
from dask_ml.wrappers import ParallelPostFit
params = {'estimator__alpha': loguniform(1e-2, 1e0),
'estimator__l1_ratio': uniform(0, 1)}
est = ParallelPostFit(SGDClassifier(tol=1e-3, random_state=0))
est = ParallelPostFit(SGDClassifier(tol=1e-3, random_state=0), meta=np.empty(1, dtype=np.int64))
search = HyperbandSearchCV(est, params, max_iter=9, random_state=0)
search.fit(X_train, y_train, classes=[0, 1]);
search.score(X_test, y_test)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/incremental.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ between machines.
X

estimator = SGDClassifier(random_state=10, max_iter=100)
clf = Incremental(estimator)
clf = Incremental(estimator, meta=np.empty(1, dtype=np.int64))
clf.fit(X, y, classes=[0, 1])

In this example, we make a (small) random Dask Array. It has 100 samples,
Expand Down
2 changes: 1 addition & 1 deletion docs/source/meta-estimators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ copying over learned attributes, that's all that ``ParallelPostFit`` does.

.. ipython:: python

clf = ParallelPostFit(estimator=GradientBoostingClassifier())
clf = ParallelPostFit(estimator=GradientBoostingClassifier(),meta=np.empty(1,dtype=np.int32))
clf.fit(X, y)

This class is useful for predicting for or transforming large datasets.
Expand Down
13 changes: 10 additions & 3 deletions tests/model_selection/test_hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def test_basic(c, s, a, b, array_type, library, max_iter):
}
model = SGDClassifier(tol=-np.inf, penalty="elasticnet", random_state=42, eta0=0.1)
if library == "dask-ml":
model = Incremental(model)
model = Incremental(model, meta=np.empty(1, dtype=np.int64))
params = {"estimator__" + k: v for k, v in params.items()}
elif library == "ConstantFunction":
model = ConstantFunction()
Expand Down Expand Up @@ -259,7 +259,9 @@ async def test_correct_params(c, s, a, b):
est = ConstantFunction()
X, y = make_classification(n_samples=10, n_features=4, chunks=10)
params = {"value": np.linspace(0, 1)}
search = HyperbandSearchCV(est, params, max_iter=9)
search = HyperbandSearchCV(
est, params, max_iter=9, meta=np.empty(1, dtype=np.int64)
)

base = {
"estimator",
Expand All @@ -275,7 +277,12 @@ async def test_correct_params(c, s, a, b):
"verbose",
"prefix",
}
assert set(search.get_params().keys()) == base.union({"aggressiveness"})

search_keys = set(search.get_params().keys())
# we remove meta because thats dask specific attribute
search_keys.remove("meta")

assert search_keys == base.union({"aggressiveness"})
meta = search.metadata
SHAs_params = [
bracket["SuccessiveHalvingSearchCV params"] for bracket in meta["brackets"]
Expand Down
18 changes: 14 additions & 4 deletions tests/model_selection/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def test_explicit(c, s, a, b):
params = [{"alpha": 0.1}, {"alpha": 0.2}]

def additional_calls(scores):
""" Progress through predefined updates, checking along the way """
"""Progress through predefined updates, checking along the way"""
ts = scores[0][-1]["partial_fit_calls"]
ts -= 1 # partial_fit_calls = time step + 1
if ts == 0:
Expand Down Expand Up @@ -350,7 +350,13 @@ def score(self, *args, **kwargs):
model = ConstantClassifier()

search = IncrementalSearchCV(
model, params, n_initial_parameters=10, patience=5, tol=0, max_iter=10,
model,
params,
n_initial_parameters=10,
patience=5,
tol=0,
max_iter=10,
meta=np.empty(1, dtype=np.int64),
)
await search.fit(X, y, classes=[0, 1])

Expand Down Expand Up @@ -466,7 +472,9 @@ async def test_small(c, s, a, b):
X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5))
model = SGDClassifier(tol=1e-3, penalty="elasticnet")
params = {"alpha": [0.1, 0.5, 0.75, 1.0]}
search = IncrementalSearchCV(model, params, n_initial_parameters="grid")
search = IncrementalSearchCV(
model, params, n_initial_parameters="grid", meta=np.empty(1, dtype=np.int64)
)
await search.fit(X, y, classes=[0, 1])
X_ = await c.compute(X)
search.predict(X_)
Expand All @@ -478,7 +486,9 @@ async def test_smaller(c, s, a, b):
X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5))
model = SGDClassifier(tol=1e-3, penalty="elasticnet")
params = {"alpha": [0.1, 0.5]}
search = IncrementalSearchCV(model, params, n_initial_parameters="grid")
search = IncrementalSearchCV(
model, params, n_initial_parameters="grid", meta=np.empty(1, dtype=np.int64)
)
await search.fit(X, y, classes=[0, 1])
X_ = await c.compute(X)
search.predict(X_)
Expand Down
16 changes: 11 additions & 5 deletions tests/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_incremental_basic(scheduler, dataframes):
est1 = SGDClassifier(random_state=0, tol=1e-3, average=True)
est2 = clone(est1)

clf = Incremental(est1, random_state=0)
clf = Incremental(est1, random_state=0, meta=np.empty(1, dtype=np.int64))
result = clf.fit(X, y, classes=[0, 1])
assert result is clf

Expand Down Expand Up @@ -95,7 +95,9 @@ def test_incremental_basic(scheduler, dataframes):

def test_in_gridsearch(scheduler, xy_classification):
X, y = xy_classification
clf = Incremental(SGDClassifier(random_state=0, tol=1e-3))
clf = Incremental(
SGDClassifier(random_state=0, tol=1e-3), meta=np.empty(1, dtype=np.int64)
)
param_grid = {"estimator__alpha": [0.1, 10]}
gs = sklearn.model_selection.GridSearchCV(clf, param_grid, cv=3)

Expand All @@ -115,7 +117,9 @@ def test_scoring(scheduler, xy_classification, scoring=dask_ml.metrics.accuracy_
def test_scoring_string(scheduler, xy_classification, scoring):
X, y = xy_classification
with scheduler() as (s, [a, b]):
clf = Incremental(SGDClassifier(tol=1e-3), scoring=scoring)
clf = Incremental(
SGDClassifier(tol=1e-3), scoring=scoring, meta=np.empty(1, dtype=np.int64)
)
assert callable(check_scoring(clf, scoring=scoring))
clf.fit(X, y, classes=np.unique(y))
clf.score(X, y)
Expand All @@ -141,7 +145,7 @@ def test_score_ndarrays():
y = np.ones(10)

sgd = SGDClassifier(tol=1e-3)
inc = Incremental(sgd, scoring="accuracy")
inc = Incremental(sgd, scoring="accuracy", meta=np.empty(1, dtype=np.int64))

inc.partial_fit(X, y, classes=[0, 1])
inc.fit(X, y, classes=[0, 1])
Expand All @@ -159,7 +163,9 @@ def test_score(xy_classification):

X, y = xy_classification
inc = Incremental(
SGDClassifier(max_iter=1000, random_state=0, tol=1e-3), scoring="accuracy"
SGDClassifier(max_iter=1000, random_state=0, tol=1e-3),
scoring="accuracy",
meta=np.empty(1, dtype=np.int64),
)

with client:
Expand Down
Loading