From 11f72afa54f4ce4cff96a76caad09bf0e265f1c1 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Fri, 15 Oct 2021 15:02:17 -0700 Subject: [PATCH 01/17] working with dask_cudf dataframe --- dask_ml/wrappers.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 1d70f09c0..124e1e48b 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -3,6 +3,7 @@ import dask.array as da import dask.dataframe as dd +from dask.dataframe.dispatch import make_meta import dask.delayed import numpy as np import sklearn.base @@ -15,6 +16,7 @@ from ._utils import copy_learned_attributes from .metrics import check_scoring, get_scorer + logger = logging.getLogger(__name__) @@ -279,9 +281,10 @@ def predict(self, X): return result elif isinstance(X, dd._Frame): - return X.map_partitions( - _predict, estimator=self._postfit_estimator, meta=np.array([1]) - ) + # create meta series belonging to the appropiate backend + meta_ser = make_meta(X.iloc[:,0]) + result = X.map_partitions( _predict, estimator=self._postfit_estimator, meta=meta_ser) + return result.to_dask_array() else: return _predict(X, estimator=self._postfit_estimator) From 9c26f3777c851e3a218c443540aba7822a709d6a Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 19 Oct 2021 15:48:54 -0700 Subject: [PATCH 02/17] cleaned up code --- dask_ml/wrappers.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 124e1e48b..86d70c0f3 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -3,7 +3,6 @@ import dask.array as da import dask.dataframe as dd -from dask.dataframe.dispatch import make_meta import dask.delayed import numpy as np import sklearn.base @@ -16,7 +15,6 @@ from ._utils import copy_learned_attributes from .metrics import check_scoring, get_scorer - logger = logging.getLogger(__name__) @@ -281,11 +279,8 @@ def predict(self, X): return result elif isinstance(X, dd._Frame): - # create meta series belonging to the appropiate backend - meta_ser = make_meta(X.iloc[:,0]) - result = X.map_partitions( _predict, estimator=self._postfit_estimator, meta=meta_ser) - return result.to_dask_array() - + result = X.map_partitions(_predict, estimator=self._postfit_estimator) + return result else: return _predict(X, estimator=self._postfit_estimator) From 9fb42eebe32eb0cc4f53239f344b3f94fdbc726b Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Mon, 25 Oct 2021 13:38:36 -0700 Subject: [PATCH 03/17] added meta fallback and pytests --- dask_ml/wrappers.py | 11 +++++++-- tests/test_parallel_post_fit.py | 43 +++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 86d70c0f3..b2124e270 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -279,8 +279,15 @@ def predict(self, X): return result elif isinstance(X, dd._Frame): - result = X.map_partitions(_predict, estimator=self._postfit_estimator) - return result + try: + meta = _predict(X._meta_nonempty, estimator=self._postfit_estimator) + except: + # fall back to numpy array if _predict fails on meta data + # to allow value dependent models to succeed + meta = np.array([1]) + return X.map_partitions( + _predict, estimator=self._postfit_estimator, meta=meta + ) else: return _predict(X, estimator=self._postfit_estimator) diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index 5a42786f9..c5c9e3353 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -2,11 +2,13 @@ import dask.array as da import dask.dataframe as dd import numpy as np +import pandas as pd import pytest import sklearn.datasets from sklearn.decomposition import PCA from sklearn.ensemble import GradientBoostingClassifier from sklearn.linear_model import LinearRegression, LogisticRegression +from sklearn.naive_bayes import CategoricalNB from dask_ml.datasets import make_classification from dask_ml.utils import assert_eq_ar, assert_estimator_equal @@ -49,6 +51,47 @@ def test_laziness(): assert 0 < x.compute() < 1 +def test_predict_meta_fallback(): + # test to predict + # that fit works when fit fails on meta + # because of value dependent model + X = pd.DataFrame({"c_0": [0, 1, 2, 3]}) + y = np.array([1, 2, 3, 4]) + + base = CategoricalNB() + base.fit(pd.DataFrame(X), y) + wrap = ParallelPostFit(base) + + dd_X = dd.from_pandas(X, npartitions=2) + dd_X._meta = pd.DataFrame({"c_0": [5]}) + with pytest.raises(IndexError) as m: + base.predict(dd_X._meta) + + # ensure we fall back to numpy array as output + # when metadata inference fails + wrap_output = wrap.predict(dd_X) + assert wrap_output.dtype == np.int64 + + result = wrap.predict(dd_X) + expected = base.predict(X) + assert_eq_ar(result, expected) + + +def test_predict_meta_correctness(): + X, y = make_classification(chunks=100) + X_ddf = dd.from_dask_array(X) + y_ddf = dd.from_dask_array(y) + + base = LinearRegression(n_jobs=1) + base.fit(X, y) + wrap = ParallelPostFit(base) + + base_output_dtype = base.predict(X).dtype + wrap_output_dtype = wrap.predict(X_ddf).dtype + + assert base_output_dtype == wrap_output_dtype + + @pytest.mark.parametrize("kind", ["numpy", "dask.dataframe", "dask.array"]) def test_predict(kind): X, y = make_classification(chunks=100) From f29dd194567e3922fcc51da8dedbbb851b77e1cc Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Mon, 25 Oct 2021 13:59:11 -0700 Subject: [PATCH 04/17] fixed flake8 changes --- dask_ml/wrappers.py | 2 +- tests/test_parallel_post_fit.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index b2124e270..054b4dc5a 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -281,7 +281,7 @@ def predict(self, X): elif isinstance(X, dd._Frame): try: meta = _predict(X._meta_nonempty, estimator=self._postfit_estimator) - except: + except Exception: # fall back to numpy array if _predict fails on meta data # to allow value dependent models to succeed meta = np.array([1]) diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index c5c9e3353..1a5da9754 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -64,7 +64,7 @@ def test_predict_meta_fallback(): dd_X = dd.from_pandas(X, npartitions=2) dd_X._meta = pd.DataFrame({"c_0": [5]}) - with pytest.raises(IndexError) as m: + with pytest.raises(IndexError): base.predict(dd_X._meta) # ensure we fall back to numpy array as output @@ -80,7 +80,6 @@ def test_predict_meta_fallback(): def test_predict_meta_correctness(): X, y = make_classification(chunks=100) X_ddf = dd.from_dask_array(X) - y_ddf = dd.from_dask_array(y) base = LinearRegression(n_jobs=1) base.fit(X, y) From fd83064cf551f4932b4c0871b58e99b2357d82a6 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 26 Oct 2021 12:22:31 -0700 Subject: [PATCH 05/17] Trigger Build From f6316a978a066a6c2ebdea7dbee4da4a9b9393e8 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 4 Nov 2021 10:35:07 -0700 Subject: [PATCH 06/17] Added futurewarning to ParallelPostFit.predict for empty meta --- dask_ml/wrappers.py | 20 ++++++----- tests/model_selection/test_hyperband.py | 1 + tests/model_selection/test_incremental.py | 3 ++ tests/test_incremental.py | 5 +++ tests/test_parallel_post_fit.py | 42 ++++++----------------- 5 files changed, 31 insertions(+), 40 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 054b4dc5a..c9f9574c2 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -1,5 +1,6 @@ """Meta-estimators for parallelizing estimators using the scikit-learn API.""" import logging +from warnings import warn import dask.array as da import dask.dataframe as dd @@ -254,7 +255,7 @@ def score(self, X, y, compute=True): else: return self._postfit_estimator.score(X, y) - def predict(self, X): + def predict(self, X, meta=None): """Predict for X. For dask inputs, a dask array or dataframe is returned. For other @@ -272,19 +273,22 @@ def predict(self, X): self._check_method("predict") X = self._check_array(X) + if meta is None: + warn( + "No meta provided to `ParallelPostFit.predict`. " + "Defaulting meta to np.array([1]).\n" + "Will default to None(infer) in the future release", + FutureWarning, + ) + meta = np.array([1]) + if isinstance(X, da.Array): result = X.map_blocks( - _predict, dtype="int", estimator=self._postfit_estimator, drop_axis=1 + _predict, estimator=self._postfit_estimator, drop_axis=1, meta=meta ) return result elif isinstance(X, dd._Frame): - try: - meta = _predict(X._meta_nonempty, estimator=self._postfit_estimator) - except Exception: - # fall back to numpy array if _predict fails on meta data - # to allow value dependent models to succeed - meta = np.array([1]) return X.map_partitions( _predict, estimator=self._postfit_estimator, meta=meta ) diff --git a/tests/model_selection/test_hyperband.py b/tests/model_selection/test_hyperband.py index f9390b7ca..11117f172 100644 --- a/tests/model_selection/test_hyperband.py +++ b/tests/model_selection/test_hyperband.py @@ -29,6 +29,7 @@ pytestmark = pytest.mark.skipif(not DISTRIBUTED_2_5_0, reason="hangs") +@pytest.mark.filterwarnings("ignore::FutureWarning") @pytest.mark.parametrize( "array_type, library, max_iter", [ diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 5038a9eb6..0636e9245 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -335,6 +335,7 @@ async def _test_search_basic(decay_rate, input_type, memory, c, s, a, b): @gen_cluster(client=True) +@pytest.mark.filterwarnings("ignore::FutureWarning") async def test_search_plateau_patience(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) @@ -461,6 +462,7 @@ async def test_transform_func(c, s, a, b): pytest.xfail(reason="https://github.com/dask/dask-ml/issues/673") +@pytest.mark.filterwarnings("ignore::FutureWarning") @gen_cluster(client=True) async def test_small(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) @@ -472,6 +474,7 @@ async def test_small(c, s, a, b): search.predict(X_) +@pytest.mark.filterwarnings("ignore::FutureWarning") @gen_cluster(client=True) async def test_smaller(c, s, a, b): # infinite loop diff --git a/tests/test_incremental.py b/tests/test_incremental.py index 9a9a063b1..b2e174438 100644 --- a/tests/test_incremental.py +++ b/tests/test_incremental.py @@ -34,6 +34,7 @@ def test_set_params(): assert result["scoring"] == "accuracy" +@pytest.mark.filterwarnings("ignore::FutureWarning") @pytest.mark.parametrize("dataframes", [False, True]) def test_incremental_basic(scheduler, dataframes): # Create observations that we know linear models can recover @@ -93,6 +94,7 @@ def test_incremental_basic(scheduler, dataframes): assert set(dir(clf.estimator_)) == set(dir(est2)) +@pytest.mark.filterwarnings("ignore::FutureWarning") def test_in_gridsearch(scheduler, xy_classification): X, y = xy_classification clf = Incremental(SGDClassifier(random_state=0, tol=1e-3)) @@ -111,6 +113,7 @@ def test_scoring(scheduler, xy_classification, scoring=dask_ml.metrics.accuracy_ clf.fit(X, y, classes=np.unique(y)) +@pytest.mark.filterwarnings("ignore::FutureWarning") @pytest.mark.parametrize("scoring", ["accuracy", "neg_mean_squared_error", "r2", None]) def test_scoring_string(scheduler, xy_classification, scoring): X, y = xy_classification @@ -136,6 +139,7 @@ def test_fit_ndarrays(): assert_eq(inc.coef_, inc.estimator_.coef_) +@pytest.mark.filterwarnings("ignore::FutureWarning") def test_score_ndarrays(): X = np.ones((10, 5)) y = np.ones(10) @@ -153,6 +157,7 @@ def test_score_ndarrays(): assert inc.score(dX, dy) == 1 +@pytest.mark.filterwarnings("ignore::FutureWarning") def test_score(xy_classification): distributed = pytest.importorskip("distributed") client = distributed.Client(n_workers=2) diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index 1a5da9754..f6372650a 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -2,19 +2,18 @@ import dask.array as da import dask.dataframe as dd import numpy as np -import pandas as pd import pytest import sklearn.datasets from sklearn.decomposition import PCA from sklearn.ensemble import GradientBoostingClassifier from sklearn.linear_model import LinearRegression, LogisticRegression -from sklearn.naive_bayes import CategoricalNB from dask_ml.datasets import make_classification from dask_ml.utils import assert_eq_ar, assert_estimator_equal from dask_ml.wrappers import ParallelPostFit +@pytest.mark.filterwarnings("ignore::FutureWarning") def test_it_works(): clf = ParallelPostFit(GradientBoostingClassifier()) @@ -41,6 +40,7 @@ def test_no_method_raises(): assert m.match("The wrapped estimator (.|\n)* 'predict_proba' method.") +@pytest.mark.filterwarnings("ignore::FutureWarning") def test_laziness(): clf = ParallelPostFit(LinearRegression()) X, y = make_classification(chunks=50) @@ -51,33 +51,8 @@ def test_laziness(): assert 0 < x.compute() < 1 -def test_predict_meta_fallback(): - # test to predict - # that fit works when fit fails on meta - # because of value dependent model - X = pd.DataFrame({"c_0": [0, 1, 2, 3]}) - y = np.array([1, 2, 3, 4]) - - base = CategoricalNB() - base.fit(pd.DataFrame(X), y) - wrap = ParallelPostFit(base) - - dd_X = dd.from_pandas(X, npartitions=2) - dd_X._meta = pd.DataFrame({"c_0": [5]}) - with pytest.raises(IndexError): - base.predict(dd_X._meta) - - # ensure we fall back to numpy array as output - # when metadata inference fails - wrap_output = wrap.predict(dd_X) - assert wrap_output.dtype == np.int64 - - result = wrap.predict(dd_X) - expected = base.predict(X) - assert_eq_ar(result, expected) - - -def test_predict_meta_correctness(): +@pytest.mark.filterwarnings("ignore::FutureWarning") +def test_predict_with_meta(): X, y = make_classification(chunks=100) X_ddf = dd.from_dask_array(X) @@ -85,12 +60,13 @@ def test_predict_meta_correctness(): base.fit(X, y) wrap = ParallelPostFit(base) - base_output_dtype = base.predict(X).dtype - wrap_output_dtype = wrap.predict(X_ddf).dtype + base_output = base.predict(X) + wrap_output = wrap.predict(X_ddf, meta=np.array([1.00], dtype=np.float64)) - assert base_output_dtype == wrap_output_dtype + assert base_output.dtype == wrap_output.dtype +@pytest.mark.filterwarnings("ignore::FutureWarning") @pytest.mark.parametrize("kind", ["numpy", "dask.dataframe", "dask.array"]) def test_predict(kind): X, y = make_classification(chunks=100) @@ -145,6 +121,7 @@ def test_transform(kind): assert_eq_ar(result, expected) +@pytest.mark.filterwarnings("ignore::FutureWarning") def test_multiclass(): X, y = sklearn.datasets.make_classification(n_classes=3, n_informative=4) X = da.from_array(X, chunks=50) @@ -172,6 +149,7 @@ def test_multiclass(): assert_eq_ar(result, expected) +@pytest.mark.filterwarnings("ignore::FutureWarning") def test_auto_rechunk(): clf = ParallelPostFit(GradientBoostingClassifier()) X, y = make_classification(n_samples=1000, n_features=20, chunks=100) From 3d6b38fd3a5ab1093443115ba6ba88bfcab87e4a Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 4 Nov 2021 11:15:34 -0700 Subject: [PATCH 07/17] Trigger Build From 6c2984400f14016cc0af008cd60d949083cb5a45 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 4 Nov 2021 13:58:42 -0700 Subject: [PATCH 08/17] Ignore warning raised in ParallelPostFit.predict due to meta=None --- docs/source/hyper-parameter-search.rst | 1 + docs/source/incremental.rst | 1 + docs/source/meta-estimators.rst | 1 + 3 files changed, 3 insertions(+) diff --git a/docs/source/hyper-parameter-search.rst b/docs/source/hyper-parameter-search.rst index 41db9d107..cddfc0fe6 100644 --- a/docs/source/hyper-parameter-search.rst +++ b/docs/source/hyper-parameter-search.rst @@ -465,6 +465,7 @@ to use post-estimation features like scoring or prediction, we recommend using :class:`dask_ml.wrappers.ParallelPostFit`. .. ipython:: python + :okwarning: from dask_ml.wrappers import ParallelPostFit params = {'estimator__alpha': loguniform(1e-2, 1e0), diff --git a/docs/source/incremental.rst b/docs/source/incremental.rst index aeee55058..6f73454f8 100644 --- a/docs/source/incremental.rst +++ b/docs/source/incremental.rst @@ -92,6 +92,7 @@ the wrapped ``fit``. We can get the accuracy score on our dataset. .. ipython:: python + :okwarning: clf.score(X, y) diff --git a/docs/source/meta-estimators.rst b/docs/source/meta-estimators.rst index 04642dd6e..9468ffe23 100644 --- a/docs/source/meta-estimators.rst +++ b/docs/source/meta-estimators.rst @@ -56,6 +56,7 @@ This class is useful for predicting for or transforming large datasets. We'll make a larger dask array ``X_big`` with 10,000 samples per block. .. ipython:: python + :okwarning: X_big, _ = dask_ml.datasets.make_classification(n_samples=100000, chunks=10000, From 749f69043caf90c5d0062e919cabc825144273fa Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 4 Nov 2021 14:54:58 -0700 Subject: [PATCH 09/17] Trigger Build From 094c31adc3352c50966f86d501739b1ea50a9c9e Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Mon, 8 Nov 2021 14:04:15 -0800 Subject: [PATCH 10/17] Added meta as a class attribute and added relevant doc strings, tests etc --- dask_ml/model_selection/_hyperband.py | 9 +++++ dask_ml/model_selection/_incremental.py | 17 ++++++-- dask_ml/wrappers.py | 49 ++++++++++++++++++----- docs/source/hyper-parameter-search.rst | 3 +- docs/source/incremental.rst | 3 +- docs/source/meta-estimators.rst | 3 +- tests/model_selection/test_hyperband.py | 14 +++++-- tests/model_selection/test_incremental.py | 21 ++++++---- tests/test_incremental.py | 21 +++++----- tests/test_parallel_post_fit.py | 48 +++++++++++++++------- 10 files changed, 134 insertions(+), 54 deletions(-) diff --git a/dask_ml/model_selection/_hyperband.py b/dask_ml/model_selection/_hyperband.py index 17ebe2797..3ce84b044 100644 --- a/dask_ml/model_selection/_hyperband.py +++ b/dask_ml/model_selection/_hyperband.py @@ -154,6 +154,13 @@ class HyperbandSearchCV(BaseIncrementalSearchCV): prefix : str, optional, default="" While logging, add ``prefix`` to each message. + meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the ``predict``, ``transform`` calls. + This meta is necessary for some ``predict``, ``transform`` calls + for some estimators to work with ``dask.dataframe`` and ``dask.array`` . + + Examples -------- >>> import numpy as np @@ -340,6 +347,7 @@ def __init__( scoring=None, verbose=False, prefix="", + meta=None, ): self.aggressiveness = aggressiveness @@ -354,6 +362,7 @@ def __init__( scoring=scoring, verbose=verbose, prefix=prefix, + meta=meta, ) def _get_SHAs(self, brackets): diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 180c2a23d..f2b7504f7 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -128,7 +128,7 @@ def _score( def _create_model(model: Model, ident: Int, **params: Params) -> Tuple[Model, Meta]: - """ Create a model by cloning and then setting params """ + """Create a model by cloning and then setting params""" with log_errors(): model = clone(model).set_params(**params) return model, {"model_id": ident, "params": params, "partial_fit_calls": 0} @@ -515,6 +515,7 @@ def __init__( tol=1e-3, verbose=False, prefix="", + meta=None, ): self.parameters = parameters self.test_size = test_size @@ -524,7 +525,9 @@ def __init__( self.tol = tol self.verbose = verbose self.prefix = prefix - super(BaseIncrementalSearchCV, self).__init__(estimator, scoring=scoring) + super(BaseIncrementalSearchCV, self).__init__( + estimator, scoring=scoring, meta=meta + ) async def _validate_parameters(self, X, y): if (self.max_iter is not None) and self.max_iter < 1: @@ -846,6 +849,12 @@ class IncrementalSearchCV(BaseIncrementalSearchCV): prefix : str, optional, default="" While logging, add ``prefix`` to each message. + meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the ``predict``, ``transform`` calls. + This meta is necessary for some ``predict``, ``transform`` calls + for some estimators to work with ``dask.dataframe`` and ``dask.array`` . + Attributes ---------- cv_results_ : dict of np.ndarrays @@ -977,6 +986,7 @@ def __init__( verbose=False, prefix="", scores_per_fit=None, + meta=None, ): self.n_initial_parameters = n_initial_parameters @@ -995,6 +1005,7 @@ def __init__( tol=tol, verbose=verbose, prefix=prefix, + meta=meta, ) def _decay_deprecated(self): @@ -1338,7 +1349,7 @@ def _adapt(self, info): start = self.n_initial_parameters def inverse(time): - """ Decrease target number of models inversely with time """ + """Decrease target number of models inversely with time""" return int(start / (1 + time) ** self.decay_rate) example = toolz.first(info.values()) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index c9f9574c2..c862d1259 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -47,6 +47,12 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi a single NumPy array, which may exhaust the memory of your worker. You probably want to always specify `scoring`. + meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the ``predict``, ``transform`` calls. + This meta is necessary for some ``predict``, ``transform`` calls + for some estimators to work with ``dask.dataframe`` and ``dask.array`` . + Notes ----- @@ -116,9 +122,10 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi [0.99407016, 0.00592984]]) """ - def __init__(self, estimator=None, scoring=None): + def __init__(self, estimator=None, scoring=None, meta=None): self.estimator = estimator self.scoring = scoring + self.meta = meta def _check_array(self, X): """Validate an array for post-fit tasks. @@ -203,13 +210,27 @@ def transform(self, X): """ self._check_method("transform") X = self._check_array(X) + meta = self.meta if isinstance(X, da.Array): - xx = np.zeros((1, X.shape[1]), dtype=X.dtype) - dt = _transform(xx, self._postfit_estimator).dtype - return X.map_blocks(_transform, estimator=self._postfit_estimator, dtype=dt) + if meta is None: + xx = np.zeros((1, X.shape[1]), dtype=X.dtype) + dt = _transform(xx, self._postfit_estimator).dtype + return X.map_blocks( + _transform, estimator=self._postfit_estimator, dtype=dt + ) + else: + return X.map_blocks( + _transform, estimator=self._postfit_estimator, meta=meta + ) elif isinstance(X, dd._Frame): - return X.map_partitions(_transform, estimator=self._postfit_estimator) + if meta is None: + # dask-dataframe relies on dd.core.no_default + # for infering meta + meta = dd.core.no_default + return X.map_partitions( + _transform, estimator=self._postfit_estimator, meta=meta + ) else: return _transform(X, estimator=self._postfit_estimator) @@ -255,7 +276,7 @@ def score(self, X, y, compute=True): else: return self._postfit_estimator.score(X, y) - def predict(self, X, meta=None): + def predict(self, X): """Predict for X. For dask inputs, a dask array or dataframe is returned. For other @@ -272,15 +293,16 @@ def predict(self, X, meta=None): """ self._check_method("predict") X = self._check_array(X) + meta = self.meta if meta is None: warn( "No meta provided to `ParallelPostFit.predict`. " - "Defaulting meta to np.array([1]).\n" + "Defaulting meta to np.empty(1, dtype=np.int64).\n" "Will default to None(infer) in the future release", FutureWarning, ) - meta = np.array([1]) + meta = np.empty(1, dtype=np.int64) if isinstance(X, da.Array): result = X.map_blocks( @@ -433,6 +455,12 @@ class Incremental(ParallelPostFit): of the Dask arrays (default), or to fit in sequential order. This does not control shuffle between blocks or shuffling each block. + meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the ``predict``, ``transform`` calls. + This meta is necessary for some ``predict``, ``transform`` calls + for some estimators to work with ``dask.dataframe`` and ``dask.array`` . + Attributes ---------- estimator_ : Estimator @@ -469,11 +497,14 @@ def __init__( shuffle_blocks=True, random_state=None, assume_equal_chunks=True, + meta=None, ): self.shuffle_blocks = shuffle_blocks self.random_state = random_state self.assume_equal_chunks = assume_equal_chunks - super(Incremental, self).__init__(estimator=estimator, scoring=scoring) + super(Incremental, self).__init__( + estimator=estimator, scoring=scoring, meta=meta + ) @property def _postfit_estimator(self): diff --git a/docs/source/hyper-parameter-search.rst b/docs/source/hyper-parameter-search.rst index cddfc0fe6..6f3cfc0bf 100644 --- a/docs/source/hyper-parameter-search.rst +++ b/docs/source/hyper-parameter-search.rst @@ -465,12 +465,11 @@ to use post-estimation features like scoring or prediction, we recommend using :class:`dask_ml.wrappers.ParallelPostFit`. .. ipython:: python - :okwarning: from dask_ml.wrappers import ParallelPostFit params = {'estimator__alpha': loguniform(1e-2, 1e0), 'estimator__l1_ratio': uniform(0, 1)} - est = ParallelPostFit(SGDClassifier(tol=1e-3, random_state=0)) + est = ParallelPostFit(SGDClassifier(tol=1e-3, random_state=0), meta=np.empty(1, dtype=np.int64)) search = HyperbandSearchCV(est, params, max_iter=9, random_state=0) search.fit(X_train, y_train, classes=[0, 1]); search.score(X_test, y_test) diff --git a/docs/source/incremental.rst b/docs/source/incremental.rst index 6f73454f8..9b3583008 100644 --- a/docs/source/incremental.rst +++ b/docs/source/incremental.rst @@ -57,7 +57,7 @@ between machines. X estimator = SGDClassifier(random_state=10, max_iter=100) - clf = Incremental(estimator) + clf = Incremental(estimator, meta=np.empty(1, dtype=np.int64)) clf.fit(X, y, classes=[0, 1]) In this example, we make a (small) random Dask Array. It has 100 samples, @@ -92,7 +92,6 @@ the wrapped ``fit``. We can get the accuracy score on our dataset. .. ipython:: python - :okwarning: clf.score(X, y) diff --git a/docs/source/meta-estimators.rst b/docs/source/meta-estimators.rst index 9468ffe23..c3b0980b2 100644 --- a/docs/source/meta-estimators.rst +++ b/docs/source/meta-estimators.rst @@ -49,14 +49,13 @@ copying over learned attributes, that's all that ``ParallelPostFit`` does. .. ipython:: python - clf = ParallelPostFit(estimator=GradientBoostingClassifier()) + clf = ParallelPostFit(estimator=GradientBoostingClassifier(),meta=np.empty(1,dtype=np.int32)) clf.fit(X, y) This class is useful for predicting for or transforming large datasets. We'll make a larger dask array ``X_big`` with 10,000 samples per block. .. ipython:: python - :okwarning: X_big, _ = dask_ml.datasets.make_classification(n_samples=100000, chunks=10000, diff --git a/tests/model_selection/test_hyperband.py b/tests/model_selection/test_hyperband.py index 11117f172..2c3378772 100644 --- a/tests/model_selection/test_hyperband.py +++ b/tests/model_selection/test_hyperband.py @@ -29,7 +29,6 @@ pytestmark = pytest.mark.skipif(not DISTRIBUTED_2_5_0, reason="hangs") -@pytest.mark.filterwarnings("ignore::FutureWarning") @pytest.mark.parametrize( "array_type, library, max_iter", [ @@ -61,7 +60,7 @@ async def test_basic(c, s, a, b, array_type, library, max_iter): } model = SGDClassifier(tol=-np.inf, penalty="elasticnet", random_state=42, eta0=0.1) if library == "dask-ml": - model = Incremental(model) + model = Incremental(model, meta=np.empty(1, dtype=np.int64)) params = {"estimator__" + k: v for k, v in params.items()} elif library == "ConstantFunction": model = ConstantFunction() @@ -260,7 +259,9 @@ async def test_correct_params(c, s, a, b): est = ConstantFunction() X, y = make_classification(n_samples=10, n_features=4, chunks=10) params = {"value": np.linspace(0, 1)} - search = HyperbandSearchCV(est, params, max_iter=9) + search = HyperbandSearchCV( + est, params, max_iter=9, meta=np.empty(1, dtype=np.int64) + ) base = { "estimator", @@ -276,7 +277,12 @@ async def test_correct_params(c, s, a, b): "verbose", "prefix", } - assert set(search.get_params().keys()) == base.union({"aggressiveness"}) + + search_keys = set(search.get_params().keys()) + # we remove meta because thats dask specific attribute + search_keys.remove("meta") + + assert search_keys == base.union({"aggressiveness"}) meta = search.metadata SHAs_params = [ bracket["SuccessiveHalvingSearchCV params"] for bracket in meta["brackets"] diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 0636e9245..04582c415 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -169,7 +169,7 @@ async def test_explicit(c, s, a, b): params = [{"alpha": 0.1}, {"alpha": 0.2}] def additional_calls(scores): - """ Progress through predefined updates, checking along the way """ + """Progress through predefined updates, checking along the way""" ts = scores[0][-1]["partial_fit_calls"] ts -= 1 # partial_fit_calls = time step + 1 if ts == 0: @@ -335,7 +335,6 @@ async def _test_search_basic(decay_rate, input_type, memory, c, s, a, b): @gen_cluster(client=True) -@pytest.mark.filterwarnings("ignore::FutureWarning") async def test_search_plateau_patience(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) @@ -351,7 +350,13 @@ def score(self, *args, **kwargs): model = ConstantClassifier() search = IncrementalSearchCV( - model, params, n_initial_parameters=10, patience=5, tol=0, max_iter=10, + model, + params, + n_initial_parameters=10, + patience=5, + tol=0, + max_iter=10, + meta=np.empty(1, dtype=np.int64), ) await search.fit(X, y, classes=[0, 1]) @@ -462,26 +467,28 @@ async def test_transform_func(c, s, a, b): pytest.xfail(reason="https://github.com/dask/dask-ml/issues/673") -@pytest.mark.filterwarnings("ignore::FutureWarning") @gen_cluster(client=True) async def test_small(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) model = SGDClassifier(tol=1e-3, penalty="elasticnet") params = {"alpha": [0.1, 0.5, 0.75, 1.0]} - search = IncrementalSearchCV(model, params, n_initial_parameters="grid") + search = IncrementalSearchCV( + model, params, n_initial_parameters="grid", meta=np.empty(1, dtype=np.int64) + ) await search.fit(X, y, classes=[0, 1]) X_ = await c.compute(X) search.predict(X_) -@pytest.mark.filterwarnings("ignore::FutureWarning") @gen_cluster(client=True) async def test_smaller(c, s, a, b): # infinite loop X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) model = SGDClassifier(tol=1e-3, penalty="elasticnet") params = {"alpha": [0.1, 0.5]} - search = IncrementalSearchCV(model, params, n_initial_parameters="grid") + search = IncrementalSearchCV( + model, params, n_initial_parameters="grid", meta=np.empty(1, dtype=np.int64) + ) await search.fit(X, y, classes=[0, 1]) X_ = await c.compute(X) search.predict(X_) diff --git a/tests/test_incremental.py b/tests/test_incremental.py index b2e174438..32422db54 100644 --- a/tests/test_incremental.py +++ b/tests/test_incremental.py @@ -34,7 +34,6 @@ def test_set_params(): assert result["scoring"] == "accuracy" -@pytest.mark.filterwarnings("ignore::FutureWarning") @pytest.mark.parametrize("dataframes", [False, True]) def test_incremental_basic(scheduler, dataframes): # Create observations that we know linear models can recover @@ -52,7 +51,7 @@ def test_incremental_basic(scheduler, dataframes): est1 = SGDClassifier(random_state=0, tol=1e-3, average=True) est2 = clone(est1) - clf = Incremental(est1, random_state=0) + clf = Incremental(est1, random_state=0, meta=np.empty(1, dtype=np.int64)) result = clf.fit(X, y, classes=[0, 1]) assert result is clf @@ -94,10 +93,11 @@ def test_incremental_basic(scheduler, dataframes): assert set(dir(clf.estimator_)) == set(dir(est2)) -@pytest.mark.filterwarnings("ignore::FutureWarning") def test_in_gridsearch(scheduler, xy_classification): X, y = xy_classification - clf = Incremental(SGDClassifier(random_state=0, tol=1e-3)) + clf = Incremental( + SGDClassifier(random_state=0, tol=1e-3), meta=np.empty(1, dtype=np.int64) + ) param_grid = {"estimator__alpha": [0.1, 10]} gs = sklearn.model_selection.GridSearchCV(clf, param_grid, cv=3) @@ -113,12 +113,13 @@ def test_scoring(scheduler, xy_classification, scoring=dask_ml.metrics.accuracy_ clf.fit(X, y, classes=np.unique(y)) -@pytest.mark.filterwarnings("ignore::FutureWarning") @pytest.mark.parametrize("scoring", ["accuracy", "neg_mean_squared_error", "r2", None]) def test_scoring_string(scheduler, xy_classification, scoring): X, y = xy_classification with scheduler() as (s, [a, b]): - clf = Incremental(SGDClassifier(tol=1e-3), scoring=scoring) + clf = Incremental( + SGDClassifier(tol=1e-3), scoring=scoring, meta=np.empty(1, dtype=np.int64) + ) assert callable(check_scoring(clf, scoring=scoring)) clf.fit(X, y, classes=np.unique(y)) clf.score(X, y) @@ -139,13 +140,12 @@ def test_fit_ndarrays(): assert_eq(inc.coef_, inc.estimator_.coef_) -@pytest.mark.filterwarnings("ignore::FutureWarning") def test_score_ndarrays(): X = np.ones((10, 5)) y = np.ones(10) sgd = SGDClassifier(tol=1e-3) - inc = Incremental(sgd, scoring="accuracy") + inc = Incremental(sgd, scoring="accuracy", meta=np.empty(1, dtype=np.int64)) inc.partial_fit(X, y, classes=[0, 1]) inc.fit(X, y, classes=[0, 1]) @@ -157,14 +157,15 @@ def test_score_ndarrays(): assert inc.score(dX, dy) == 1 -@pytest.mark.filterwarnings("ignore::FutureWarning") def test_score(xy_classification): distributed = pytest.importorskip("distributed") client = distributed.Client(n_workers=2) X, y = xy_classification inc = Incremental( - SGDClassifier(max_iter=1000, random_state=0, tol=1e-3), scoring="accuracy" + SGDClassifier(max_iter=1000, random_state=0, tol=1e-3), + scoring="accuracy", + meta=np.empty(1, dtype=np.int64), ) with client: diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index f6372650a..0e767dcf2 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -13,9 +13,8 @@ from dask_ml.wrappers import ParallelPostFit -@pytest.mark.filterwarnings("ignore::FutureWarning") def test_it_works(): - clf = ParallelPostFit(GradientBoostingClassifier()) + clf = ParallelPostFit(GradientBoostingClassifier(), meta=np.empty(1, np.int64)) X, y = make_classification(n_samples=1000, chunks=100) X_, y_ = dask.compute(X, y) @@ -40,9 +39,8 @@ def test_no_method_raises(): assert m.match("The wrapped estimator (.|\n)* 'predict_proba' method.") -@pytest.mark.filterwarnings("ignore::FutureWarning") def test_laziness(): - clf = ParallelPostFit(LinearRegression()) + clf = ParallelPostFit(LinearRegression(), meta=np.empty(1, dtype=np.float64)) X, y = make_classification(chunks=50) clf.fit(X, y) @@ -51,8 +49,7 @@ def test_laziness(): assert 0 < x.compute() < 1 -@pytest.mark.filterwarnings("ignore::FutureWarning") -def test_predict_with_meta(): +def test_predict_raise_warning(): X, y = make_classification(chunks=100) X_ddf = dd.from_dask_array(X) @@ -60,13 +57,28 @@ def test_predict_with_meta(): base.fit(X, y) wrap = ParallelPostFit(base) - base_output = base.predict(X) - wrap_output = wrap.predict(X_ddf, meta=np.array([1.00], dtype=np.float64)) + expect_warn = ( + "No meta provided to `ParallelPostFit.predict`. Defaulting meta to np.empty" + ) + + with pytest.warns(FutureWarning, match=expect_warn): + wrap.predict(X_ddf) + + +def test_predict_with_meta(): + X, y = make_classification(chunks=100) + X_ddf = dd.from_dask_array(X) + + base = LinearRegression(n_jobs=1) + base.fit(X, y) + meta_ar = np.empty(1, dtype=np.float64) + wrap = ParallelPostFit(base, meta=meta_ar) + + wrap_output = wrap.predict(X_ddf) - assert base_output.dtype == wrap_output.dtype + assert wrap_output.dtype == np.float64 -@pytest.mark.filterwarnings("ignore::FutureWarning") @pytest.mark.parametrize("kind", ["numpy", "dask.dataframe", "dask.array"]) def test_predict(kind): X, y = make_classification(chunks=100) @@ -78,7 +90,10 @@ def test_predict(kind): y = dd.from_dask_array(y) base = LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs") - wrap = ParallelPostFit(LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs")) + wrap = ParallelPostFit( + LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs"), + meta=np.empty(1, dtype=np.int64), + ) base.fit(*dask.compute(X, y)) wrap.fit(*dask.compute(X, y)) @@ -121,14 +136,16 @@ def test_transform(kind): assert_eq_ar(result, expected) -@pytest.mark.filterwarnings("ignore::FutureWarning") def test_multiclass(): X, y = sklearn.datasets.make_classification(n_classes=3, n_informative=4) X = da.from_array(X, chunks=50) y = da.from_array(y, chunks=50) clf = ParallelPostFit( - LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs", multi_class="auto") + LogisticRegression( + random_state=0, n_jobs=1, solver="lbfgs", multi_class="auto" + ), + meta=np.empty(1, dtype=np.int64), ) clf.fit(*dask.compute(X, y)) @@ -149,9 +166,10 @@ def test_multiclass(): assert_eq_ar(result, expected) -@pytest.mark.filterwarnings("ignore::FutureWarning") def test_auto_rechunk(): - clf = ParallelPostFit(GradientBoostingClassifier()) + clf = ParallelPostFit( + GradientBoostingClassifier(), meta=np.empty(1, dtype=np.int32) + ) X, y = make_classification(n_samples=1000, n_features=20, chunks=100) X = X.rechunk({0: 100, 1: 10}) clf.fit(X, y) From 4e46d8cc83ea17be10e0972e4f7ec0622951a59c Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 9 Nov 2021 11:25:45 -0800 Subject: [PATCH 11/17] addessed review --- dask_ml/wrappers.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index c862d1259..50fc2cac0 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -215,14 +215,11 @@ def transform(self, X): if isinstance(X, da.Array): if meta is None: xx = np.zeros((1, X.shape[1]), dtype=X.dtype) - dt = _transform(xx, self._postfit_estimator).dtype - return X.map_blocks( - _transform, estimator=self._postfit_estimator, dtype=dt - ) - else: - return X.map_blocks( - _transform, estimator=self._postfit_estimator, meta=meta - ) + y = _transform(xx, self._postfit_estimator) + meta = type(y)((), dtype=y.dtype) + return X.map_blocks( + _transform, estimator=self._postfit_estimator, meta=meta + ) elif isinstance(X, dd._Frame): if meta is None: # dask-dataframe relies on dd.core.no_default From 18c023aaaf469a0e296ac6692b4b8ece33d7ff42 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 9 Nov 2021 11:34:07 -0800 Subject: [PATCH 12/17] renamed some variables --- dask_ml/wrappers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 50fc2cac0..ac559d335 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -215,8 +215,8 @@ def transform(self, X): if isinstance(X, da.Array): if meta is None: xx = np.zeros((1, X.shape[1]), dtype=X.dtype) - y = _transform(xx, self._postfit_estimator) - meta = type(y)((), dtype=y.dtype) + x_t = _transform(xx, self._postfit_estimator) + meta = type(x_t)((), dtype=x_t.dtype) return X.map_blocks( _transform, estimator=self._postfit_estimator, meta=meta ) From e16d475330cf4c40b025ca277121f3ad9240292f Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 16 Nov 2021 00:12:52 -0800 Subject: [PATCH 13/17] Updated implimentation to add separate metas for predict, predict_proba_meta, transform and ensure parrity in predict, transform --- dask_ml/model_selection/_hyperband.py | 27 +++-- dask_ml/model_selection/_incremental.py | 38 +++++-- dask_ml/wrappers.py | 117 +++++++++++++++++----- docs/source/hyper-parameter-search.rst | 2 +- docs/source/incremental.rst | 2 +- docs/source/meta-estimators.rst | 2 +- tests/model_selection/test_hyperband.py | 10 +- tests/model_selection/test_incremental.py | 16 +-- tests/test_incremental.py | 16 +-- tests/test_parallel_post_fit.py | 41 +++----- 10 files changed, 175 insertions(+), 96 deletions(-) diff --git a/dask_ml/model_selection/_hyperband.py b/dask_ml/model_selection/_hyperband.py index 3ce84b044..bc5e2dd38 100644 --- a/dask_ml/model_selection/_hyperband.py +++ b/dask_ml/model_selection/_hyperband.py @@ -154,12 +154,23 @@ class HyperbandSearchCV(BaseIncrementalSearchCV): prefix : str, optional, default="" While logging, add ``prefix`` to each message. - meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output - type of the ``predict``, ``transform`` calls. - This meta is necessary for some ``predict``, ``transform`` calls - for some estimators to work with ``dask.dataframe`` and ``dask.array`` . + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` Examples -------- @@ -347,7 +358,9 @@ def __init__( scoring=None, verbose=False, prefix="", - meta=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, ): self.aggressiveness = aggressiveness @@ -362,7 +375,9 @@ def __init__( scoring=scoring, verbose=verbose, prefix=prefix, - meta=meta, + predict_meta=predict_meta, + predict_proba_meta=predict_proba_meta, + transform_meta=transform_meta, ) def _get_SHAs(self, brackets): diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index f2b7504f7..cedd89dca 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -515,7 +515,9 @@ def __init__( tol=1e-3, verbose=False, prefix="", - meta=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, ): self.parameters = parameters self.test_size = test_size @@ -526,7 +528,11 @@ def __init__( self.verbose = verbose self.prefix = prefix super(BaseIncrementalSearchCV, self).__init__( - estimator, scoring=scoring, meta=meta + estimator, + scoring=scoring, + predict_meta=predict_meta, + predict_proba_meta=predict_proba_meta, + transform_meta=transform_meta, ) async def _validate_parameters(self, X, y): @@ -849,11 +855,23 @@ class IncrementalSearchCV(BaseIncrementalSearchCV): prefix : str, optional, default="" While logging, add ``prefix`` to each message. - meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output - type of the ``predict``, ``transform`` calls. - This meta is necessary for some ``predict``, ``transform`` calls - for some estimators to work with ``dask.dataframe`` and ``dask.array`` . + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` Attributes ---------- @@ -986,7 +1004,9 @@ def __init__( verbose=False, prefix="", scores_per_fit=None, - meta=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, ): self.n_initial_parameters = n_initial_parameters @@ -1005,7 +1025,9 @@ def __init__( tol=tol, verbose=verbose, prefix=prefix, - meta=meta, + predict_meta=predict_meta, + predict_proba_meta=predict_proba_meta, + transform_meta=transform_meta, ) def _decay_deprecated(self): diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index ac559d335..edaf8dc5d 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -1,6 +1,5 @@ """Meta-estimators for parallelizing estimators using the scikit-learn API.""" import logging -from warnings import warn import dask.array as da import dask.dataframe as dd @@ -47,11 +46,23 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi a single NumPy array, which may exhaust the memory of your worker. You probably want to always specify `scoring`. - meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output - type of the ``predict``, ``transform`` calls. - This meta is necessary for some ``predict``, ``transform`` calls - for some estimators to work with ``dask.dataframe`` and ``dask.array`` . + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` Notes ----- @@ -122,10 +133,19 @@ class ParallelPostFit(sklearn.base.BaseEstimator, sklearn.base.MetaEstimatorMixi [0.99407016, 0.00592984]]) """ - def __init__(self, estimator=None, scoring=None, meta=None): + def __init__( + self, + estimator=None, + scoring=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, + ): self.estimator = estimator self.scoring = scoring - self.meta = meta + self.predict_meta = predict_meta + self.predict_proba_meta = predict_proba_meta + self.transform_meta = transform_meta def _check_array(self, X): """Validate an array for post-fit tasks. @@ -210,13 +230,13 @@ def transform(self, X): """ self._check_method("transform") X = self._check_array(X) - meta = self.meta + meta = self.transform_meta if isinstance(X, da.Array): if meta is None: - xx = np.zeros((1, X.shape[1]), dtype=X.dtype) - x_t = _transform(xx, self._postfit_estimator) - meta = type(x_t)((), dtype=x_t.dtype) + meta = _get_output_dask_ar_meta_for_estimator( + _transform, self._postfit_estimator, X + ) return X.map_blocks( _transform, estimator=self._postfit_estimator, meta=meta ) @@ -290,24 +310,22 @@ def predict(self, X): """ self._check_method("predict") X = self._check_array(X) - meta = self.meta - - if meta is None: - warn( - "No meta provided to `ParallelPostFit.predict`. " - "Defaulting meta to np.empty(1, dtype=np.int64).\n" - "Will default to None(infer) in the future release", - FutureWarning, - ) - meta = np.empty(1, dtype=np.int64) + meta = self.predict_meta if isinstance(X, da.Array): + if meta is None: + meta = _get_output_dask_ar_meta_for_estimator( + _predict, self._postfit_estimator, X + ) + result = X.map_blocks( _predict, estimator=self._postfit_estimator, drop_axis=1, meta=meta ) return result elif isinstance(X, dd._Frame): + if meta is None: + meta = dd.core.no_default return X.map_partitions( _predict, estimator=self._postfit_estimator, meta=meta ) @@ -336,16 +354,26 @@ def predict_proba(self, X): self._check_method("predict_proba") + meta = self.predict_proba_meta + if isinstance(X, da.Array): + if meta is None: + meta = _get_output_dask_ar_meta_for_estimator( + _predict_proba, self._postfit_estimator, X + ) # XXX: multiclass return X.map_blocks( _predict_proba, estimator=self._postfit_estimator, - dtype="float", + meta=meta, chunks=(X.chunks[0], len(self._postfit_estimator.classes_)), ) elif isinstance(X, dd._Frame): - return X.map_partitions(_predict_proba, estimator=self._postfit_estimator) + if meta is None: + meta = dd.core.no_default + return X.map_partitions( + _predict_proba, estimator=self._postfit_estimator, meta=meta + ) else: return _predict_proba(X, estimator=self._postfit_estimator) @@ -494,13 +522,19 @@ def __init__( shuffle_blocks=True, random_state=None, assume_equal_chunks=True, - meta=None, + predict_meta=None, + predict_proba_meta=None, + transform_meta=None, ): self.shuffle_blocks = shuffle_blocks self.random_state = random_state self.assume_equal_chunks = assume_equal_chunks super(Incremental, self).__init__( - estimator=estimator, scoring=scoring, meta=meta + estimator=estimator, + scoring=scoring, + predict_meta=predict_meta, + predict_proba_meta=predict_proba_meta, + transform_meta=transform_meta, ) @property @@ -589,3 +623,36 @@ def _predict_proba(part, estimator): def _transform(part, estimator): return estimator.transform(part) + + +def _get_output_dask_ar_meta_for_estimator(model_fn, estimator, input_dask_ar): + """ + Returns the output metadata array + for the model function (predict, transform etc) + by running the appropriate function on dummy data + of shape (1, n_features) + + Parameters + ---------- + + model_fun: Model function + _predict, _transform etc + + estimator : Estimator + The underlying estimator that is fit. + + input_dask_ar: The input dask_array + + Returns + ------- + metadata: metadata of output dask array + + """ + # sklearn fails if input array has size size + # It requires at least 1 sample to run successfully + ar = np.zeros( + shape=(1, input_dask_ar.shape[1]), + dtype=input_dask_ar.dtype, + like=input_dask_ar._meta, + ) + return model_fn(ar, estimator) diff --git a/docs/source/hyper-parameter-search.rst b/docs/source/hyper-parameter-search.rst index 6f3cfc0bf..b5e120214 100644 --- a/docs/source/hyper-parameter-search.rst +++ b/docs/source/hyper-parameter-search.rst @@ -469,7 +469,7 @@ to use post-estimation features like scoring or prediction, we recommend using from dask_ml.wrappers import ParallelPostFit params = {'estimator__alpha': loguniform(1e-2, 1e0), 'estimator__l1_ratio': uniform(0, 1)} - est = ParallelPostFit(SGDClassifier(tol=1e-3, random_state=0), meta=np.empty(1, dtype=np.int64)) + est = ParallelPostFit(SGDClassifier(tol=1e-3, random_state=0) search = HyperbandSearchCV(est, params, max_iter=9, random_state=0) search.fit(X_train, y_train, classes=[0, 1]); search.score(X_test, y_test) diff --git a/docs/source/incremental.rst b/docs/source/incremental.rst index 9b3583008..aeee55058 100644 --- a/docs/source/incremental.rst +++ b/docs/source/incremental.rst @@ -57,7 +57,7 @@ between machines. X estimator = SGDClassifier(random_state=10, max_iter=100) - clf = Incremental(estimator, meta=np.empty(1, dtype=np.int64)) + clf = Incremental(estimator) clf.fit(X, y, classes=[0, 1]) In this example, we make a (small) random Dask Array. It has 100 samples, diff --git a/docs/source/meta-estimators.rst b/docs/source/meta-estimators.rst index c3b0980b2..04642dd6e 100644 --- a/docs/source/meta-estimators.rst +++ b/docs/source/meta-estimators.rst @@ -49,7 +49,7 @@ copying over learned attributes, that's all that ``ParallelPostFit`` does. .. ipython:: python - clf = ParallelPostFit(estimator=GradientBoostingClassifier(),meta=np.empty(1,dtype=np.int32)) + clf = ParallelPostFit(estimator=GradientBoostingClassifier()) clf.fit(X, y) This class is useful for predicting for or transforming large datasets. diff --git a/tests/model_selection/test_hyperband.py b/tests/model_selection/test_hyperband.py index 2c3378772..44b701f0f 100644 --- a/tests/model_selection/test_hyperband.py +++ b/tests/model_selection/test_hyperband.py @@ -60,7 +60,7 @@ async def test_basic(c, s, a, b, array_type, library, max_iter): } model = SGDClassifier(tol=-np.inf, penalty="elasticnet", random_state=42, eta0=0.1) if library == "dask-ml": - model = Incremental(model, meta=np.empty(1, dtype=np.int64)) + model = Incremental(model) params = {"estimator__" + k: v for k, v in params.items()} elif library == "ConstantFunction": model = ConstantFunction() @@ -259,9 +259,7 @@ async def test_correct_params(c, s, a, b): est = ConstantFunction() X, y = make_classification(n_samples=10, n_features=4, chunks=10) params = {"value": np.linspace(0, 1)} - search = HyperbandSearchCV( - est, params, max_iter=9, meta=np.empty(1, dtype=np.int64) - ) + search = HyperbandSearchCV(est, params, max_iter=9) base = { "estimator", @@ -280,7 +278,9 @@ async def test_correct_params(c, s, a, b): search_keys = set(search.get_params().keys()) # we remove meta because thats dask specific attribute - search_keys.remove("meta") + search_keys.remove("predict_meta") + search_keys.remove("predict_proba_meta") + search_keys.remove("transform_meta") assert search_keys == base.union({"aggressiveness"}) meta = search.metadata diff --git a/tests/model_selection/test_incremental.py b/tests/model_selection/test_incremental.py index 04582c415..9c0ae64df 100644 --- a/tests/model_selection/test_incremental.py +++ b/tests/model_selection/test_incremental.py @@ -350,13 +350,7 @@ def score(self, *args, **kwargs): model = ConstantClassifier() search = IncrementalSearchCV( - model, - params, - n_initial_parameters=10, - patience=5, - tol=0, - max_iter=10, - meta=np.empty(1, dtype=np.int64), + model, params, n_initial_parameters=10, patience=5, tol=0, max_iter=10, ) await search.fit(X, y, classes=[0, 1]) @@ -472,9 +466,7 @@ async def test_small(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) model = SGDClassifier(tol=1e-3, penalty="elasticnet") params = {"alpha": [0.1, 0.5, 0.75, 1.0]} - search = IncrementalSearchCV( - model, params, n_initial_parameters="grid", meta=np.empty(1, dtype=np.int64) - ) + search = IncrementalSearchCV(model, params, n_initial_parameters="grid") await search.fit(X, y, classes=[0, 1]) X_ = await c.compute(X) search.predict(X_) @@ -486,9 +478,7 @@ async def test_smaller(c, s, a, b): X, y = make_classification(n_samples=100, n_features=5, chunks=(10, 5)) model = SGDClassifier(tol=1e-3, penalty="elasticnet") params = {"alpha": [0.1, 0.5]} - search = IncrementalSearchCV( - model, params, n_initial_parameters="grid", meta=np.empty(1, dtype=np.int64) - ) + search = IncrementalSearchCV(model, params, n_initial_parameters="grid") await search.fit(X, y, classes=[0, 1]) X_ = await c.compute(X) search.predict(X_) diff --git a/tests/test_incremental.py b/tests/test_incremental.py index 32422db54..ca4caa828 100644 --- a/tests/test_incremental.py +++ b/tests/test_incremental.py @@ -51,7 +51,7 @@ def test_incremental_basic(scheduler, dataframes): est1 = SGDClassifier(random_state=0, tol=1e-3, average=True) est2 = clone(est1) - clf = Incremental(est1, random_state=0, meta=np.empty(1, dtype=np.int64)) + clf = Incremental(est1, random_state=0) result = clf.fit(X, y, classes=[0, 1]) assert result is clf @@ -95,9 +95,7 @@ def test_incremental_basic(scheduler, dataframes): def test_in_gridsearch(scheduler, xy_classification): X, y = xy_classification - clf = Incremental( - SGDClassifier(random_state=0, tol=1e-3), meta=np.empty(1, dtype=np.int64) - ) + clf = Incremental(SGDClassifier(random_state=0, tol=1e-3)) param_grid = {"estimator__alpha": [0.1, 10]} gs = sklearn.model_selection.GridSearchCV(clf, param_grid, cv=3) @@ -117,9 +115,7 @@ def test_scoring(scheduler, xy_classification, scoring=dask_ml.metrics.accuracy_ def test_scoring_string(scheduler, xy_classification, scoring): X, y = xy_classification with scheduler() as (s, [a, b]): - clf = Incremental( - SGDClassifier(tol=1e-3), scoring=scoring, meta=np.empty(1, dtype=np.int64) - ) + clf = Incremental(SGDClassifier(tol=1e-3), scoring=scoring) assert callable(check_scoring(clf, scoring=scoring)) clf.fit(X, y, classes=np.unique(y)) clf.score(X, y) @@ -145,7 +141,7 @@ def test_score_ndarrays(): y = np.ones(10) sgd = SGDClassifier(tol=1e-3) - inc = Incremental(sgd, scoring="accuracy", meta=np.empty(1, dtype=np.int64)) + inc = Incremental(sgd, scoring="accuracy") inc.partial_fit(X, y, classes=[0, 1]) inc.fit(X, y, classes=[0, 1]) @@ -163,9 +159,7 @@ def test_score(xy_classification): X, y = xy_classification inc = Incremental( - SGDClassifier(max_iter=1000, random_state=0, tol=1e-3), - scoring="accuracy", - meta=np.empty(1, dtype=np.int64), + SGDClassifier(max_iter=1000, random_state=0, tol=1e-3), scoring="accuracy", ) with client: diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index 0e767dcf2..7b2aaae25 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -14,7 +14,7 @@ def test_it_works(): - clf = ParallelPostFit(GradientBoostingClassifier(), meta=np.empty(1, np.int64)) + clf = ParallelPostFit(GradientBoostingClassifier()) X, y = make_classification(n_samples=1000, chunks=100) X_, y_ = dask.compute(X, y) @@ -40,7 +40,7 @@ def test_no_method_raises(): def test_laziness(): - clf = ParallelPostFit(LinearRegression(), meta=np.empty(1, dtype=np.float64)) + clf = ParallelPostFit(LinearRegression()) X, y = make_classification(chunks=50) clf.fit(X, y) @@ -49,34 +49,31 @@ def test_laziness(): assert 0 < x.compute() < 1 -def test_predict_raise_warning(): - X, y = make_classification(chunks=100) - X_ddf = dd.from_dask_array(X) +def test_predict_meta_override(): + pass - base = LinearRegression(n_jobs=1) - base.fit(X, y) - wrap = ParallelPostFit(base) - expect_warn = ( - "No meta provided to `ParallelPostFit.predict`. Defaulting meta to np.empty" - ) +def test_score_meta_override(): + pass + - with pytest.warns(FutureWarning, match=expect_warn): - wrap.predict(X_ddf) +def transform_meta_overide(): + pass -def test_predict_with_meta(): +def test_predict_correct_output_dtype(): X, y = make_classification(chunks=100) X_ddf = dd.from_dask_array(X) base = LinearRegression(n_jobs=1) base.fit(X, y) - meta_ar = np.empty(1, dtype=np.float64) - wrap = ParallelPostFit(base, meta=meta_ar) + wrap = ParallelPostFit(base) + + base_output = base.predict(X_ddf.compute()) wrap_output = wrap.predict(X_ddf) - assert wrap_output.dtype == np.float64 + assert wrap_output.dtype == base_output.dtype @pytest.mark.parametrize("kind", ["numpy", "dask.dataframe", "dask.array"]) @@ -92,7 +89,6 @@ def test_predict(kind): base = LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs") wrap = ParallelPostFit( LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs"), - meta=np.empty(1, dtype=np.int64), ) base.fit(*dask.compute(X, y)) @@ -142,10 +138,7 @@ def test_multiclass(): y = da.from_array(y, chunks=50) clf = ParallelPostFit( - LogisticRegression( - random_state=0, n_jobs=1, solver="lbfgs", multi_class="auto" - ), - meta=np.empty(1, dtype=np.int64), + LogisticRegression(random_state=0, n_jobs=1, solver="lbfgs", multi_class="auto") ) clf.fit(*dask.compute(X, y)) @@ -167,9 +160,7 @@ def test_multiclass(): def test_auto_rechunk(): - clf = ParallelPostFit( - GradientBoostingClassifier(), meta=np.empty(1, dtype=np.int32) - ) + clf = ParallelPostFit(GradientBoostingClassifier()) X, y = make_classification(n_samples=1000, n_features=20, chunks=100) X = X.rechunk({0: 100, 1: 10}) clf.fit(X, y) From 433cd0868a03a0ba626b3ddbfd6e8a536145be3e Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 16 Nov 2021 00:14:53 -0800 Subject: [PATCH 14/17] fix docstring in Incremental --- dask_ml/wrappers.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index edaf8dc5d..436c2a0f1 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -480,11 +480,23 @@ class Incremental(ParallelPostFit): of the Dask arrays (default), or to fit in sequential order. This does not control shuffle between blocks or shuffling each block. - meta: pd.Series, pd.DataFrame, np.array, iterable, tuple, optional, deafult: None + predict_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + predict_proba_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) + An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output + type of the estimators ``predict_proba`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` + + transform_meta: pd.Series, pd.DataFrame, np.array deafult: None(infer) An empty ``pd.Series``, ``pd.DataFrame``, ``np.array`` that matches the output - type of the ``predict``, ``transform`` calls. - This meta is necessary for some ``predict``, ``transform`` calls - for some estimators to work with ``dask.dataframe`` and ``dask.array`` . + type of the estimators ``transform`` call. + This meta is necessary for for some estimators to work with + ``dask.dataframe`` and ``dask.array`` Attributes ---------- From 6cb28a63ddf0fde48263811d5e2f7d062c5200c5 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 16 Nov 2021 00:19:41 -0800 Subject: [PATCH 15/17] remove minor typo in hyper-parameter-search.rst --- docs/source/hyper-parameter-search.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/hyper-parameter-search.rst b/docs/source/hyper-parameter-search.rst index b5e120214..41db9d107 100644 --- a/docs/source/hyper-parameter-search.rst +++ b/docs/source/hyper-parameter-search.rst @@ -469,7 +469,7 @@ to use post-estimation features like scoring or prediction, we recommend using from dask_ml.wrappers import ParallelPostFit params = {'estimator__alpha': loguniform(1e-2, 1e0), 'estimator__l1_ratio': uniform(0, 1)} - est = ParallelPostFit(SGDClassifier(tol=1e-3, random_state=0) + est = ParallelPostFit(SGDClassifier(tol=1e-3, random_state=0)) search = HyperbandSearchCV(est, params, max_iter=9, random_state=0) search.fit(X_train, y_train, classes=[0, 1]); search.score(X_test, y_test) From 05091cb4695dfb1859689a370088dfd83268a378 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 16 Nov 2021 00:21:51 -0800 Subject: [PATCH 16/17] removed comma from tests/test_incremental.py --- tests/test_incremental.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_incremental.py b/tests/test_incremental.py index ca4caa828..9a9a063b1 100644 --- a/tests/test_incremental.py +++ b/tests/test_incremental.py @@ -159,7 +159,7 @@ def test_score(xy_classification): X, y = xy_classification inc = Incremental( - SGDClassifier(max_iter=1000, random_state=0, tol=1e-3), scoring="accuracy", + SGDClassifier(max_iter=1000, random_state=0, tol=1e-3), scoring="accuracy" ) with client: From 25bbfca96223cab3e60966a87fced57b12f1c101 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 16 Nov 2021 11:15:45 -0800 Subject: [PATCH 17/17] Added meta tests for predict, predict_proba, transform --- tests/test_parallel_post_fit.py | 68 ++++++++++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index 7b2aaae25..95356fee0 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -2,11 +2,14 @@ import dask.array as da import dask.dataframe as dd import numpy as np +import pandas as pd import pytest import sklearn.datasets from sklearn.decomposition import PCA from sklearn.ensemble import GradientBoostingClassifier from sklearn.linear_model import LinearRegression, LogisticRegression +from sklearn.naive_bayes import CategoricalNB +from sklearn.preprocessing import OneHotEncoder from dask_ml.datasets import make_classification from dask_ml.utils import assert_eq_ar, assert_estimator_equal @@ -50,15 +53,70 @@ def test_laziness(): def test_predict_meta_override(): - pass + X = pd.DataFrame({"c_0": [1, 2, 3, 4]}) + y = np.array([1, 2, 3, 4]) + base = CategoricalNB() + base.fit(pd.DataFrame(X), y) -def test_score_meta_override(): - pass + dd_X = dd.from_pandas(X, npartitions=2) + dd_X._meta = pd.DataFrame({"c_0": [5]}) + # Failure when not proving predict_meta + # because of value dependent model + wrap = ParallelPostFit(base) + with pytest.raises(ValueError): + wrap.predict(dd_X) + + # Success when providing meta over-ride + wrap = ParallelPostFit(base, predict_meta=np.array([1])) + result = wrap.predict(dd_X) + expected = base.predict(X) + assert_eq_ar(result, expected) + + +def test_predict_proba_meta_override(): + X = pd.DataFrame({"c_0": [1, 2, 3, 4]}) + y = np.array([1, 2, 3, 4]) -def transform_meta_overide(): - pass + base = CategoricalNB() + base.fit(pd.DataFrame(X), y) + + dd_X = dd.from_pandas(X, npartitions=2) + dd_X._meta = pd.DataFrame({"c_0": [5]}) + + # Failure when not proving predict_proba_meta + # because of value dependent model + wrap = ParallelPostFit(base) + with pytest.raises(ValueError): + wrap.predict_proba(dd_X) + + # Success when providing meta over-ride + wrap = ParallelPostFit(base, predict_proba_meta=np.array([[0.0, 0.1, 0.8, 0.1]])) + result = wrap.predict_proba(dd_X) + expected = base.predict_proba(X) + assert_eq_ar(result, expected) + + +def test_transform_meta_override(): + X = pd.DataFrame({"cat_s": ["a", "b", "c", "d"]}) + dd_X = dd.from_pandas(X, npartitions=2) + + base = OneHotEncoder(sparse=False) + base.fit(pd.DataFrame(X)) + + # Failure when not proving transform_meta + # because of value dependent model + wrap = ParallelPostFit(base) + with pytest.raises(ValueError): + wrap.transform(dd_X) + + wrap = ParallelPostFit( + base, transform_meta=np.array([[0, 0, 0, 0]], dtype=np.float64) + ) + result = wrap.transform(dd_X) + expected = base.transform(X) + assert_eq_ar(result, expected) def test_predict_correct_output_dtype():