From daf0b9c83cffe9430a3691e046f949ab1f133640 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 21 Nov 2017 10:43:16 -0500 Subject: [PATCH] Remove use of underscored methods Also allow other workers to take task --- dask_xgboost/core.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/dask_xgboost/core.py b/dask_xgboost/core.py index b0516da..dea1c7e 100644 --- a/dask_xgboost/core.py +++ b/dask_xgboost/core.py @@ -9,7 +9,7 @@ from toolz import first, assoc from tornado import gen from dask import delayed -from distributed.client import _wait, default_client +from distributed.client import wait, default_client from distributed.utils import sync import xgboost as xgb @@ -105,7 +105,7 @@ def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs): # Arrange parts into pairs. This enforces co-locality parts = list(map(delayed, zip(data_parts, label_parts))) parts = client.compute(parts) # Start computation in the background - yield _wait(parts) + yield wait(parts) # Because XGBoost-python doesn't yet allow iterative training, we need to # find the locations of all chunks and map them to particular Dask workers @@ -119,19 +119,20 @@ def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs): # Start the XGBoost tracker on the Dask scheduler host, port = parse_host_port(client.scheduler.address) - env = yield client._run_on_scheduler(start_tracker, - host.strip('/:'), - len(worker_map)) + env = yield client.run_on_scheduler(start_tracker, + host.strip('/:'), + len(worker_map)) # Tell each worker to train on the chunks/parts that it has locally futures = [client.submit(train_part, env, assoc(params, 'nthreads', ncores[worker]), list_of_parts, workers=worker, + allow_other_workers=True, dmatrix_kwargs=dmatrix_kwargs, **kwargs) for worker, list_of_parts in worker_map.items()] # Get the results, only one will be non-None - results = yield client._gather(futures) + results = yield client.gather(futures) result = [v for v in results if v][0] raise gen.Return(result)