Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite Dask interface. #4819

Merged
merged 15 commits into from
Sep 25, 2019
Merged
20 changes: 0 additions & 20 deletions demo/dask/README.md

This file was deleted.

35 changes: 35 additions & 0 deletions demo/dask/cpu_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import xgboost as xgb
from xgboost.dask import DaskDMatrix
from dask.distributed import Client
from dask.distributed import LocalCluster
from dask import array as da


def main(client):
n = 100
m = 100000
partition_size = 1000
X = da.random.random((m, n), partition_size)
y = da.random.random(m, partition_size)

dtrain = DaskDMatrix(client, X, y)

output = xgb.dask.train(client,
{'verbosity': 2,
'nthread': 1,
'tree_method': 'hist'},
dtrain,
num_boost_round=4, evals=[(dtrain, 'train')])
bst = output['booster']
history = output['history']

prediction = xgb.dask.predict(client, bst, dtrain)
print('Evaluation history:', history)
return prediction


if __name__ == '__main__':
# or use any other clusters
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)
main(client)
42 changes: 0 additions & 42 deletions demo/dask/dask_gpu_demo.py

This file was deleted.

68 changes: 0 additions & 68 deletions demo/dask/dask_simple_demo.py

This file was deleted.

41 changes: 41 additions & 0 deletions demo/dask/gpu_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask import array as da
import xgboost as xgb
from xgboost.dask import DaskDMatrix


def main(client):
n = 100
m = 100000
partition_size = 1000
X = da.random.random((m, n), partition_size)
y = da.random.random(m, partition_size)

# DaskDMatrix acts like normal DMatrix, works as a proxy for local
# DMatrix scatter around workers.
dtrain = DaskDMatrix(client, X, y)

# Use train method from xgboost.dask instead of xgboost. This
# distributed version of train returns a dictionary containing the
# resulting booster and evaluation history obtained from
# evaluation metrics.
output = xgb.dask.train(client,
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
{'verbosity': 2,
'nthread': 1,
'tree_method': 'gpu_hist'},
dtrain,
num_boost_round=4, evals=[(dtrain, 'train')])
bst = output['booster']
history = output['history']

prediction = xgb.dask.predict(client, bst, dtrain)
print('Evaluation history:', history)
return prediction


if __name__ == '__main__':
# or use any other clusters
cluster = LocalCUDACluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)
main(client)
30 changes: 30 additions & 0 deletions demo/dask/sklearn_cpu_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'''Dask interface demo:

Use scikit-learn regressor interface with CPU histogram tree method.'''
from dask.distributed import Client
from dask.distributed import LocalCluster
from dask import array as da
import xgboost

if __name__ == '__main__':
cluster = LocalCluster(n_workers=2, silence_logs=False) # or use any other clusters
client = Client(cluster)

n = 100
m = 10000
partition_size = 100
X = da.random.random((m, n), partition_size)
y = da.random.random(m, partition_size)

regressor = xgboost.dask.DaskXGBRegressor(verbosity=2, n_estimators=2)
regressor.set_params(tree_method='hist')
regressor.client = client

regressor.fit(X, y, eval_set=[(X, y)])
prediction = regressor.predict(X)

bst = regressor.get_booster()
history = regressor.evals_result()

print('Evaluation history:', history)
assert isinstance(prediction, da.Array)
31 changes: 31 additions & 0 deletions demo/dask/sklearn_gpu_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'''Dask interface demo:

Use scikit-learn regressor interface with GPU histogram tree method.'''

from dask.distributed import Client
# It's recommended to use dask_cuda for GPU assignment
from dask_cuda import LocalCUDACluster
from dask import array as da
import xgboost

if __name__ == '__main__':
cluster = LocalCUDACluster()
client = Client(cluster)

n = 100
m = 1000000
partition_size = 10000
X = da.random.random((m, n), partition_size)
y = da.random.random(m, partition_size)

regressor = xgboost.dask.DaskXGBRegressor(verbosity=2)
regressor.set_params(tree_method='gpu_hist')
regressor.client = client

regressor.fit(X, y, eval_set=[(X, y)])
prediction = regressor.predict(X)

bst = regressor.get_booster()
history = regressor.evals_result()

print('Evaluation history:', history)
7 changes: 4 additions & 3 deletions doc/python/python_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ Dask API
--------
.. automodule:: xgboost.dask

.. autofunction:: xgboost.dask.run
.. autofunction:: xgboost.dask.DaskDMatrix

.. autofunction:: xgboost.dask.create_worker_dmatrix
.. autofunction:: xgboost.dask.predict

.. autofunction:: xgboost.dask.get_local_data
.. autofunction:: xgboost.dask.DaskXGBClassifier

.. autofunction:: xgboost.dask.DaskXGBRegressor
92 changes: 92 additions & 0 deletions doc/tutorials/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#############################
Distributed XGBoost with Dask
#############################

`Dask <https://dask.org>`_ is a parallel computing library built on Python. Dask allows
easy management of distributed workers and excels handling large distributed data science
workflows. The implementation in XGBoost originates from `dask-xgboost
<https://github.com/dask/dask-xgboost>`_ with some extended functionalities and a
different interface. Right now it is still under construction and may change (with proper
warnings) in the future.

************
Requirements
************

Dask is trivial to install using either pip or conda. `See here for official install
documentation <https://docs.dask.org/en/latest/install.html>`_. For accelerating XGBoost
with GPU, `dask-cuda <https://github.com/rapidsai/dask-cuda>`_ is recommended for creating
GPU clusters.


********
Overview
********

There are 3 different components in dask from a user's perspective, namely a scheduler,
bunch of workers and some clients connecting to the scheduler. For using XGBoost with
dask, one needs to call XGBoost dask interface from the client side. A small example
illustrates the basic usage:

.. code-block:: python

cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)

dtrain = xgb.dask.DaskDMatrix(client, X, y) # X and y are dask dataframes or arrays

output = xgb.dask.train(client,
{'verbosity': 2,
'nthread': 1,
'tree_method': 'hist'},
dtrain,
num_boost_round=4, evals=[(dtrain, 'train')])

Here we first create a cluster in signle-node mode wtih ``distributed.LocalCluster``, then
connect a ``client`` to this cluster, setting up environment for later computation.
Similar to non-distributed interface, we create a ``DMatrix`` object and pass it to
``train`` along with some other parameters. Except in dask interface, client is an extra
argument for carrying out the computation, when set to ``None`` XGBoost will use the
default client returned from dask.

There are two sets of APIs implemented in XGBoost. The first set is functional API
illustrated in above example. Given the data and a set of parameters, `train` function
returns a model and the computation history as Python dictionary

.. code-block:: python

{'booster': Booster,
'history': dict}

For prediction, pass the ``output`` returned by ``train`` into ``xgb.dask.predict``

.. code-block:: python

prediction = xgb.dask.predict(client, output, dtrain)

Or equivalently, pass ``output['booster']``:

.. code-block:: python

prediction = xgb.dask.predict(client, output['booster'], dtrain)

Here ``prediction`` is a dask ``Array`` object containing predictions from model.

Another set of API is a Scikit-Learn wrapper, which mimics the stateful Scikit-Learn
interface with ``DaskXGBClassifier`` and ``DaskXGBRegressor``. See ``xgboost/demo/dask``
for more examples.


***********
Limitations
***********

Basic functionalities including training and generating predictions for regression and
classification are implemented. But there are still some other limitations we haven't
addressed yet.

- Label encoding for Scikit-Learn classifier.
- Ranking
- Callback functions are not tested.
- To use cross validation one needs to explicitly train different models instead of using
a functional API like ``xgboost.cv``.
1 change: 1 addition & 0 deletions doc/tutorials/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ See `Awesome XGBoost <https://github.com/dmlc/xgboost/tree/master/demo>`_ for mo
param_tuning
external_memory
custom_metric_obj
dask
Loading