Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dask predict shape infer. #5989

Merged
merged 3 commits into from
Aug 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions python-package/xgboost/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@ def dispatched_predict(worker_id):
predt = booster.predict(data=local_x,
validate_features=local_x.num_row() != 0,
*args)
ret = (delayed(predt), order)
columns = 1 if len(predt.shape) == 1 else predt.shape[1]
ret = ((delayed(predt), columns), order)
predictions.append(ret)
return predictions

Expand Down Expand Up @@ -775,8 +776,10 @@ async def map_function(func):
# See https://docs.dask.org/en/latest/array-creation.html
arrays = []
for i, shape in enumerate(shapes):
arrays.append(da.from_delayed(results[i], shape=(shape[0], ),
dtype=numpy.float32))
arrays.append(da.from_delayed(
results[i][0], shape=(shape[0],)
if results[i][1] == 1 else (shape[0], results[i][1]),
dtype=numpy.float32))
predictions = await da.concatenate(arrays, axis=0)
return predictions

Expand Down Expand Up @@ -978,6 +981,7 @@ def client(self):
def client(self, clt):
self._client = clt


@xgboost_model_doc("""Implementation of the Scikit-Learn API for XGBoost.""",
['estimators', 'model'])
class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase):
Expand Down Expand Up @@ -1032,9 +1036,6 @@ def predict(self, data):
['estimators', 'model']
)
class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
# pylint: disable=missing-docstring
_client = None

async def _fit_async(self, X, y,
sample_weights=None,
eval_set=None,
Expand Down
50 changes: 37 additions & 13 deletions tests/python/test_with_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
import json
import asyncio
from sklearn.datasets import make_classification

if sys.platform.startswith("win"):
pytest.skip("Skipping dask tests on Windows", allow_module_level=True)
Expand Down Expand Up @@ -36,7 +37,7 @@ def generate_array():


def test_from_dask_dataframe():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = generate_array()

Expand Down Expand Up @@ -74,7 +75,7 @@ def test_from_dask_dataframe():


def test_from_dask_array():
with LocalCluster(n_workers=5, threads_per_worker=5) as cluster:
with LocalCluster(n_workers=kWorkers, threads_per_worker=5) as cluster:
with Client(cluster) as client:
X, y = generate_array()
dtrain = DaskDMatrix(client, X, y)
Expand Down Expand Up @@ -104,8 +105,28 @@ def test_from_dask_array():
assert np.all(single_node_predt == from_arr.compute())


def test_dask_predict_shape_infer():
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = make_classification(n_samples=1000, n_informative=5,
n_classes=3)
X_ = dd.from_array(X, chunksize=100)
y_ = dd.from_array(y, chunksize=100)
dtrain = xgb.dask.DaskDMatrix(client, data=X_, label=y_)

model = xgb.dask.train(
client,
{"objective": "multi:softprob", "num_class": 3},
dtrain=dtrain
)

preds = xgb.dask.predict(client, model, dtrain)
assert preds.shape[0] == preds.compute().shape[0]
assert preds.shape[1] == preds.compute().shape[1]


def test_dask_missing_value_reg():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X_0 = np.ones((20 // 2, kCols))
X_1 = np.zeros((20 // 2, kCols))
Expand Down Expand Up @@ -156,7 +177,7 @@ def test_dask_missing_value_cls():


def test_dask_regressor():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = generate_array()
regressor = xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2)
Expand All @@ -178,7 +199,7 @@ def test_dask_regressor():


def test_dask_classifier():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = generate_array()
y = (y * 10).astype(np.int32)
Expand All @@ -188,7 +209,7 @@ def test_dask_classifier():
classifier.fit(X, y, eval_set=[(X, y)])
prediction = classifier.predict(X)

assert prediction.ndim == 1
assert prediction.ndim == 2
assert prediction.shape[0] == kRows

history = classifier.evals_result()
Expand All @@ -211,14 +232,14 @@ def test_dask_classifier():
assert classifier.n_classes_ == 10
prediction = classifier.predict(X_d)

assert prediction.ndim == 1
assert prediction.ndim == 2
assert prediction.shape[0] == kRows


@pytest.mark.skipif(**tm.no_sklearn())
def test_sklearn_grid_search():
from sklearn.model_selection import GridSearchCV
with LocalCluster(n_workers=4) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
X, y = generate_array()
reg = xgb.dask.DaskXGBRegressor(learning_rate=0.1,
Expand Down Expand Up @@ -292,7 +313,9 @@ def _check_outputs(out, predictions):
evals=[(dtrain, 'validation')],
num_boost_round=2)
predictions = xgb.dask.predict(client=client, model=out,
data=dtrain).compute()
data=dtrain)
assert predictions.shape[1] == n_classes
predictions = predictions.compute()
_check_outputs(out, predictions)

# train has more rows than evals
Expand All @@ -315,15 +338,15 @@ def _check_outputs(out, predictions):
# environment and Exact doesn't support it.

def test_empty_dmatrix_hist():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
parameters = {'tree_method': 'hist'}
run_empty_dmatrix_reg(client, parameters)
run_empty_dmatrix_cls(client, parameters)


def test_empty_dmatrix_approx():
with LocalCluster(n_workers=5) as cluster:
with LocalCluster(n_workers=kWorkers) as cluster:
with Client(cluster) as client:
parameters = {'tree_method': 'approx'}
run_empty_dmatrix_reg(client, parameters)
Expand Down Expand Up @@ -384,7 +407,7 @@ async def run_dask_classifier_asyncio(scheduler_address):
await classifier.fit(X, y, eval_set=[(X, y)])
prediction = await classifier.predict(X)

assert prediction.ndim == 1
assert prediction.ndim == 2
assert prediction.shape[0] == kRows

history = classifier.evals_result()
Expand All @@ -407,8 +430,9 @@ async def run_dask_classifier_asyncio(scheduler_address):
assert classifier.n_classes_ == 10
prediction = await classifier.predict(X_d)

assert prediction.ndim == 1
assert prediction.ndim == 2
assert prediction.shape[0] == kRows
assert prediction.shape[1] == 10


def test_with_asyncio():
Expand Down