Skip to content

Commit

Permalink
Change predict, transform, predict_proba to infer metadata by d…
Browse files Browse the repository at this point in the history
…efault for `ParallelPostFit` (#862)
  • Loading branch information
VibhuJawa authored Nov 17, 2021
1 parent 11f9703 commit f752e29
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 17 deletions.
24 changes: 24 additions & 0 deletions dask_ml/model_selection/_hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ class HyperbandSearchCV(BaseIncrementalSearchCV):
prefix : str, optional, default=""
While logging, add ``prefix`` to each message.
predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``predict`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``predict_proba`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``transform`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
Examples
--------
>>> import numpy as np
Expand Down Expand Up @@ -340,6 +358,9 @@ def __init__(
scoring=None,
verbose=False,
prefix="",
predict_meta=None,
predict_proba_meta=None,
transform_meta=None,
):
self.aggressiveness = aggressiveness

Expand All @@ -354,6 +375,9 @@ def __init__(
scoring=scoring,
verbose=verbose,
prefix=prefix,
predict_meta=predict_meta,
predict_proba_meta=predict_proba_meta,
transform_meta=transform_meta,
)

def _get_SHAs(self, brackets):
Expand Down
39 changes: 36 additions & 3 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _score(


def _create_model(model: Model, ident: Int, **params: Params) -> Tuple[Model, Meta]:
""" Create a model by cloning and then setting params """
"""Create a model by cloning and then setting params"""
with log_errors():
model = clone(model).set_params(**params)
return model, {"model_id": ident, "params": params, "partial_fit_calls": 0}
Expand Down Expand Up @@ -515,6 +515,9 @@ def __init__(
tol=1e-3,
verbose=False,
prefix="",
predict_meta=None,
predict_proba_meta=None,
transform_meta=None,
):
self.parameters = parameters
self.test_size = test_size
Expand All @@ -524,7 +527,13 @@ def __init__(
self.tol = tol
self.verbose = verbose
self.prefix = prefix
super(BaseIncrementalSearchCV, self).__init__(estimator, scoring=scoring)
super(BaseIncrementalSearchCV, self).__init__(
estimator,
scoring=scoring,
predict_meta=predict_meta,
predict_proba_meta=predict_proba_meta,
transform_meta=transform_meta,
)

async def _validate_parameters(self, X, y):
if (self.max_iter is not None) and self.max_iter < 1:
Expand Down Expand Up @@ -846,6 +855,24 @@ class IncrementalSearchCV(BaseIncrementalSearchCV):
prefix : str, optional, default=""
While logging, add ``prefix`` to each message.
predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``predict`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``predict_proba`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``transform`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
Attributes
----------
cv_results_ : dict of np.ndarrays
Expand Down Expand Up @@ -977,6 +1004,9 @@ def __init__(
verbose=False,
prefix="",
scores_per_fit=None,
predict_meta=None,
predict_proba_meta=None,
transform_meta=None,
):

self.n_initial_parameters = n_initial_parameters
Expand All @@ -995,6 +1025,9 @@ def __init__(
tol=tol,
verbose=verbose,
prefix=prefix,
predict_meta=predict_meta,
predict_proba_meta=predict_proba_meta,
transform_meta=transform_meta,
)

def _decay_deprecated(self):
Expand Down Expand Up @@ -1338,7 +1371,7 @@ def _adapt(self, info):
start = self.n_initial_parameters

def inverse(time):
""" Decrease target number of models inversely with time """
"""Decrease target number of models inversely with time"""
return int(start / (1 + time) ** self.decay_rate)

example = toolz.first(info.values())
Expand Down
138 changes: 127 additions & 11 deletions dask_ml/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi
a single NumPy array, which may exhaust the memory of your worker.
You probably want to always specify `scoring`.
predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``predict`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``predict_proba`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``transform`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
Notes
-----
Expand Down Expand Up @@ -115,9 +133,19 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi
[0.99407016, 0.00592984]])
"""

def __init__(self, estimator=None, scoring=None):
def __init__(
self,
estimator=None,
scoring=None,
predict_meta=None,
predict_proba_meta=None,
transform_meta=None,
):
self.estimator = estimator
self.scoring = scoring
self.predict_meta = predict_meta
self.predict_proba_meta = predict_proba_meta
self.transform_meta = transform_meta

def _check_array(self, X):
"""Validate an array for post-fit tasks.
Expand Down Expand Up @@ -202,13 +230,24 @@ def transform(self, X):
"""
self._check_method("transform")
X = self._check_array(X)
meta = self.transform_meta

if isinstance(X, da.Array):
xx = np.zeros((1, X.shape[1]), dtype=X.dtype)
dt = _transform(xx, self._postfit_estimator).dtype
return X.map_blocks(_transform, estimator=self._postfit_estimator, dtype=dt)
if meta is None:
meta = _get_output_dask_ar_meta_for_estimator(
_transform, self._postfit_estimator, X
)
return X.map_blocks(
_transform, estimator=self._postfit_estimator, meta=meta
)
elif isinstance(X, dd._Frame):
return X.map_partitions(_transform, estimator=self._postfit_estimator)
if meta is None:
# dask-dataframe relies on dd.core.no_default
# for infering meta
meta = dd.core.no_default
return X.map_partitions(
_transform, estimator=self._postfit_estimator, meta=meta
)
else:
return _transform(X, estimator=self._postfit_estimator)

Expand Down Expand Up @@ -271,18 +310,25 @@ def predict(self, X):
"""
self._check_method("predict")
X = self._check_array(X)
meta = self.predict_meta

if isinstance(X, da.Array):
if meta is None:
meta = _get_output_dask_ar_meta_for_estimator(
_predict, self._postfit_estimator, X
)

result = X.map_blocks(
_predict, dtype="int", estimator=self._postfit_estimator, drop_axis=1
_predict, estimator=self._postfit_estimator, drop_axis=1, meta=meta
)
return result

elif isinstance(X, dd._Frame):
if meta is None:
meta = dd.core.no_default
return X.map_partitions(
_predict, estimator=self._postfit_estimator, meta=np.array([1])
_predict, estimator=self._postfit_estimator, meta=meta
)

else:
return _predict(X, estimator=self._postfit_estimator)

Expand All @@ -308,16 +354,26 @@ def predict_proba(self, X):

self._check_method("predict_proba")

meta = self.predict_proba_meta

if isinstance(X, da.Array):
if meta is None:
meta = _get_output_dask_ar_meta_for_estimator(
_predict_proba, self._postfit_estimator, X
)
# XXX: multiclass
return X.map_blocks(
_predict_proba,
estimator=self._postfit_estimator,
dtype="float",
meta=meta,
chunks=(X.chunks[0], len(self._postfit_estimator.classes_)),
)
elif isinstance(X, dd._Frame):
return X.map_partitions(_predict_proba, estimator=self._postfit_estimator)
if meta is None:
meta = dd.core.no_default
return X.map_partitions(
_predict_proba, estimator=self._postfit_estimator, meta=meta
)
else:
return _predict_proba(X, estimator=self._postfit_estimator)

Expand Down Expand Up @@ -424,6 +480,24 @@ class Incremental(ParallelPostFit):
of the Dask arrays (default), or to fit in sequential order. This does
not control shuffle between blocks or shuffling each block.
predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``predict`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``predict_proba`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer)
An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output
type of the estimators ``transform`` call.
This meta is necessary for for some estimators to work with
``dask.dataframe`` and ``dask.array``
Attributes
----------
estimator_ : Estimator
Expand Down Expand Up @@ -460,11 +534,20 @@ def __init__(
shuffle_blocks=True,
random_state=None,
assume_equal_chunks=True,
predict_meta=None,
predict_proba_meta=None,
transform_meta=None,
):
self.shuffle_blocks = shuffle_blocks
self.random_state = random_state
self.assume_equal_chunks = assume_equal_chunks
super(Incremental, self).__init__(estimator=estimator, scoring=scoring)
super(Incremental, self).__init__(
estimator=estimator,
scoring=scoring,
predict_meta=predict_meta,
predict_proba_meta=predict_proba_meta,
transform_meta=transform_meta,
)

@property
def _postfit_estimator(self):
Expand Down Expand Up @@ -552,3 +635,36 @@ def _predict_proba(part, estimator):

def _transform(part, estimator):
return estimator.transform(part)


def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar):
"""
Returns the output metadata array
for the model function (predict, transform etc)
by running the appropriate function on dummy data
of shape (1, n_features)
Parameters
----------
model_fun: Model function
_predict, _transform etc
estimator : Estimator
The underlying estimator that is fit.
input_dask_ar: The input dask_array
Returns
-------
metadata: metadata of output dask array
"""
# sklearn fails if input array has size size
# It requires at least 1 sample to run successfully
ar = np.zeros(
shape=(1, input_dask_ar.shape[1]),
dtype=input_dask_ar.dtype,
like=input_dask_ar._meta,
)
return model_fn(ar, estimator)
9 changes: 8 additions & 1 deletion tests/model_selection/test_hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,14 @@ async def test_correct_params(c, s, a, b):
"verbose",
"prefix",
}
assert set(search.get_params().keys()) == base.union({"aggressiveness"})

search_keys = set(search.get_params().keys())
# we remove meta because thats dask specific attribute
search_keys.remove("predict_meta")
search_keys.remove("predict_proba_meta")
search_keys.remove("transform_meta")

assert search_keys == base.union({"aggressiveness"})
meta = search.metadata
SHAs_params = [
bracket["SuccessiveHalvingSearchCV params"] for bracket in meta["brackets"]
Expand Down
2 changes: 1 addition & 1 deletion tests/model_selection/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def test_explicit(c, s, a, b):
params = [{"alpha": 0.1}, {"alpha": 0.2}]

def additional_calls(scores):
""" Progress through predefined updates, checking along the way """
"""Progress through predefined updates, checking along the way"""
ts = scores[0][-1]["partial_fit_calls"]
ts -= 1 # partial_fit_calls = time step + 1
if ts == 0:
Expand Down
Loading

0 comments on commit f752e29

Please sign in to comment.