diff --git a/dask_ml/model_selection/_hyperband.py b/dask_ml/model_selection/_hyperband.py index 17ebe2797..bc5e2dd38 100644 --- a/dask_ml/model_selection/_hyperband.py +++ b/dask_ml/model_selection/_hyperband.py @@ -154,6 +154,24 @@ class HyperbandSearchCV(BaseIncrementalSearchCV): prefix : str, optional, default="" While logging, add ``prefix`` to each message. + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + Examples -------- >>> import numpy as np @@ -340,6 +358,9 @@ def __init__( scoring=None, verbose=False, prefix="", + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, ): self.aggressiveness = aggressiveness @@ -354,6 +375,9 @@ def __init__( scoring=scoring, verbose=verbose, prefix=prefix, + predict_meta=predict_meta, + predict_proba_meta=predict_proba_meta, + transform_meta=transform_meta, ) def _get_SHAs(self, brackets): diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 180c2a23d..cedd89dca 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -128,7 +128,7 @@ def _score( def _create_model(model: Model, ident: Int, **params: Params) -> Tuple[Model, Meta]: - """ Create a model by cloning and then setting params """ + """Create a model by cloning and then setting params""" with log_errors(): model = clone(model).set_params(**params) return model, {"model_id": ident, "params": params, "partial_fit_calls": 0} @@ -515,6 +515,9 @@ def __init__( tol=1e-3, verbose=False, prefix="", + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, ): self.parameters = parameters self.test_size = test_size @@ -524,7 +527,13 @@ def __init__( self.tol = tol self.verbose = verbose self.prefix = prefix - super(BaseIncrementalSearchCV, self).__init__(estimator, scoring=scoring) + super(BaseIncrementalSearchCV, self).__init__( + estimator, + scoring=scoring, + predict_meta=predict_meta, + predict_proba_meta=predict_proba_meta, + transform_meta=transform_meta, + ) async def _validate_parameters(self, X, y): if (self.max_iter is not None) and self.max_iter < 1: @@ -846,6 +855,24 @@ class IncrementalSearchCV(BaseIncrementalSearchCV): prefix : str, optional, default="" While logging, add ``prefix`` to each message. + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + Attributes ---------- cv_results_ : dict of np.ndarrays @@ -977,6 +1004,9 @@ def __init__( verbose=False, prefix="", scores_per_fit=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, ): self.n_initial_parameters = n_initial_parameters @@ -995,6 +1025,9 @@ def __init__( tol=tol, verbose=verbose, prefix=prefix, + predict_meta=predict_meta, + predict_proba_meta=predict_proba_meta, + transform_meta=transform_meta, ) def _decay_deprecated(self): @@ -1338,7 +1371,7 @@ def _adapt(self, info): start = self.n_initial_parameters def inverse(time): - """ Decrease target number of models inversely with time """ + """Decrease target number of models inversely with time""" return int(start / (1 + time) ** self.decay_rate) example = toolz.first(info.values()) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 1d70f09c0..436c2a0f1 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -46,6 +46,24 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi a single NumPy array, which may exhaust the memory of your worker. You probably want to always specify `scoring`. + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + Notes ----- @@ -115,9 +133,19 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi [0.99407016, 0.00592984]]) """ - def __init__(self, estimator=None, scoring=None): + def __init__( + self, + estimator=None, + scoring=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, + ): self.estimator = estimator self.scoring = scoring + self.predict_meta = predict_meta + self.predict_proba_meta = predict_proba_meta + self.transform_meta = transform_meta def _check_array(self, X): """Validate an array for post-fit tasks. @@ -202,13 +230,24 @@ def transform(self, X): """ self._check_method("transform") X = self._check_array(X) + meta = self.transform_meta if isinstance(X, da.Array): - xx = np.zeros((1, X.shape[1]), dtype=X.dtype) - dt = _transform(xx, self._postfit_estimator).dtype - return X.map_blocks(_transform, estimator=self._postfit_estimator, dtype=dt) + if meta is None: + meta = _get_output_dask_ar_meta_for_estimator( + _transform, self._postfit_estimator, X + ) + return X.map_blocks( + _transform, estimator=self._postfit_estimator, meta=meta + ) elif isinstance(X, dd._Frame): - return X.map_partitions(_transform, estimator=self._postfit_estimator) + if meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + meta = dd.core.no_default + return X.map_partitions( + _transform, estimator=self._postfit_estimator, meta=meta + ) else: return _transform(X, estimator=self._postfit_estimator) @@ -271,18 +310,25 @@ def predict(self, X): """ self._check_method("predict") X = self._check_array(X) + meta = self.predict_meta if isinstance(X, da.Array): + if meta is None: + meta = _get_output_dask_ar_meta_for_estimator( + _predict, self._postfit_estimator, X + ) + result = X.map_blocks( - _predict, dtype="int", estimator=self._postfit_estimator, drop_axis=1 + _predict, estimator=self._postfit_estimator, drop_axis=1, meta=meta ) return result elif isinstance(X, dd._Frame): + if meta is None: + meta = dd.core.no_default return X.map_partitions( - _predict, estimator=self._postfit_estimator, meta=np.array([1]) + _predict, estimator=self._postfit_estimator, meta=meta ) - else: return _predict(X, estimator=self._postfit_estimator) @@ -308,16 +354,26 @@ def predict_proba(self, X): self._check_method("predict_proba") + meta = self.predict_proba_meta + if isinstance(X, da.Array): + if meta is None: + meta = _get_output_dask_ar_meta_for_estimator( + _predict_proba, self._postfit_estimator, X + ) # XXX: multiclass return X.map_blocks( _predict_proba, estimator=self._postfit_estimator, - dtype="float", + meta=meta, chunks=(X.chunks[0], len(self._postfit_estimator.classes_)), ) elif isinstance(X, dd._Frame): - return X.map_partitions(_predict_proba, estimator=self._postfit_estimator) + if meta is None: + meta = dd.core.no_default + return X.map_partitions( + _predict_proba, estimator=self._postfit_estimator, meta=meta + ) else: return _predict_proba(X, estimator=self._postfit_estimator) @@ -424,6 +480,24 @@ class Incremental(ParallelPostFit): of the Dask arrays (default), or to fit in sequential order. This does not control shuffle between blocks or shuffling each block. + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + Attributes ---------- estimator_ : Estimator @@ -460,11 +534,20 @@ def __init__( shuffle_blocks=True, random_state=None, assume_equal_chunks=True, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, ): self.shuffle_blocks = shuffle_blocks self.random_state = random_state self.assume_equal_chunks = assume_equal_chunks - super(Incremental, self).__init__(estimator=estimator, scoring=scoring) + super(Incremental, self).__init__( + estimator=estimator, + scoring=scoring, + predict_meta=predict_meta, + predict_proba_meta=predict_proba_meta, + transform_meta=transform_meta, + ) @property def _postfit_estimator(self): @@ -552,3 +635,36 @@ def _predict_proba(part, estimator): def _transform(part, estimator): return estimator.transform(part) + + +def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): + """ + Returns the output metadata array + for the model function (predict, transform etc) + by running the appropriate function on dummy data + of shape (1, n_features) + + Parameters + ---------- + + model_fun: Model function + _predict, _transform etc + + estimator : Estimator + The underlying estimator that is fit. + + input_dask_ar: The input dask_array + + Returns + ------- + metadata: metadata of output dask array + + """ + # sklearn fails if input array has size size + # It requires at least 1 sample to run successfully + ar = np.zeros( + shape=(1, input_dask_ar.shape[1]), + dtype=input_dask_ar.dtype, + like=input_dask_ar._meta, + ) + return model_fn(ar, estimator) diff --git a/tests/model_selection/test_hyperband.py b/tests/model_selection/test_hyperband.py index f9390b7ca..44b701f0f 100644 --- a/tests/model_selection/test_hyperband.py +++ b/tests/model_selection/test_hyperband.py @@ -275,7 +275,14 @@ async def test_correct_params(c, s, a, b): "verbose", "prefix", } - assert set(search.get_params().keys()) == base.union({"aggressiveness"}) + + search_keys = set(search.get_params().keys()) + # we remove meta because thats dask specific attribute + search_keys.remove("predict_meta") + search_keys.remove("predict_proba_meta") + search_keys.remove("transform_meta") + + assert search_keys == base.union({"aggressiveness"}) meta = search.metadata SHAs_params = [ bracket["SuccessiveHalvingSearchCV params"] for bracket in meta["brackets"] diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 5038a9eb6..9c0ae64df 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -169,7 +169,7 @@ async def test_explicit(c, s, a, b): params = [{"alpha": 0.1}, {"alpha": 0.2}] def additional_calls(scores): - """ Progress through predefined updates, checking along the way """ + """Progress through predefined updates, checking along the way""" ts = scores[0][-1]["partial_fit_calls"] ts -= 1 # partial_fit_calls = time step + 1 if ts == 0: diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index 5a42786f9..95356fee0 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -2,11 +2,14 @@ import dask.array as da import dask.dataframe as dd import numpy as np +import pandas as pd import pytest import sklearn.datasets from sklearn.decomposition import PCA from sklearn.ensemble import GradientBoostingClassifier from sklearn.linear_model import LinearRegression, LogisticRegression +from sklearn.naive_bayes import CategoricalNB +from sklearn.preprocessing import OneHotEncoder from dask_ml.datasets import make_classification from dask_ml.utils import assert_eq_ar, assert_estimator_equal @@ -49,6 +52,88 @@ def test_laziness(): assert 0 < x.compute() < 1 +def test_predict_meta_override(): + X = pd.DataFrame({"c_0": [1, 2, 3, 4]}) + y = np.array([1, 2, 3, 4]) + + base = CategoricalNB() + base.fit(pd.DataFrame(X), y) + + dd_X = dd.from_pandas(X, npartitions=2) + dd_X._meta = pd.DataFrame({"c_0": [5]}) + + # Failure when not proving predict_meta + # because of value dependent model + wrap = ParallelPostFit(base) + with pytest.raises(ValueError): + wrap.predict(dd_X) + + # Success when providing meta over-ride + wrap = ParallelPostFit(base, predict_meta=np.array([1])) + result = wrap.predict(dd_X) + expected = base.predict(X) + assert_eq_ar(result, expected) + + +def test_predict_proba_meta_override(): + X = pd.DataFrame({"c_0": [1, 2, 3, 4]}) + y = np.array([1, 2, 3, 4]) + + base = CategoricalNB() + base.fit(pd.DataFrame(X), y) + + dd_X = dd.from_pandas(X, npartitions=2) + dd_X._meta = pd.DataFrame({"c_0": [5]}) + + # Failure when not proving predict_proba_meta + # because of value dependent model + wrap = ParallelPostFit(base) + with pytest.raises(ValueError): + wrap.predict_proba(dd_X) + + # Success when providing meta over-ride + wrap = ParallelPostFit(base, predict_proba_meta=np.array([[0.0, 0.1, 0.8, 0.1]])) + result = wrap.predict_proba(dd_X) + expected = base.predict_proba(X) + assert_eq_ar(result, expected) + + +def test_transform_meta_override(): + X = pd.DataFrame({"cat_s": ["a", "b", "c", "d"]}) + dd_X = dd.from_pandas(X, npartitions=2) + + base = OneHotEncoder(sparse=False) + base.fit(pd.DataFrame(X)) + + # Failure when not proving transform_meta + # because of value dependent model + wrap = ParallelPostFit(base) + with pytest.raises(ValueError): + wrap.transform(dd_X) + + wrap = ParallelPostFit( + base, transform_meta=np.array([[0, 0, 0, 0]], dtype=np.float64) + ) + result = wrap.transform(dd_X) + expected = base.transform(X) + assert_eq_ar(result, expected) + + +def test_predict_correct_output_dtype(): + X, y = make_classification(chunks=100) + X_ddf = dd.from_dask_array(X) + + base = LinearRegression(n_jobs=1) + base.fit(X, y) + + wrap = ParallelPostFit(base) + + base_output = base.predict(X_ddf.compute()) + wrap_output = wrap.predict(X_ddf) + + assert wrap_output.dtype == base_output.dtype + + @pytest.mark.parametrize("kind", ["numpy", "dask.dataframe", "dask.array"]) def test_predict(kind): X, y = make_classification(chunks=100) @@ -60,7 +145,9 @@ def test_predict(kind): y = dd.from_dask_array(y) base = LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs") - wrap = ParallelPostFit(LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs")) + wrap = ParallelPostFit( + LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs"), + ) base.fit(*dask.compute(X, y)) wrap.fit(*dask.compute(X, y))