Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submitting multiple dask.xgboost calls causes fewer worker utilization #7544

Closed
rudra0713 opened this issue Jan 7, 2022 · 3 comments
Closed

Comments

@rudra0713
Copy link

I have created a random classification dataset with 100,000 rows and 30 columns and I am training the distributed xgboost on this dataset. My system has 8 workers, therefore I created 8 partitions of my dataset. While running dask.xgboost only once on this dataset, each of the 8 workers get one part of the DaskDmatrix (total it has 8 parts). But when I submit multiple dask.xgboost calls, then the 8 partitions get randomly divided among a subset of the workers. Therefore, only those workers end up utilizing CPU and runtime becomes very high.

Here is a reproducible example,

from dask.distributed import Client
import xgboost as xgb
from sklearn.datasets import make_classification
import dask.array as da
from dask.distributed import get_client
import dask

n_samples, n_features, num_test_samples = 100000, 30, 100
dask.config.set({'distributed.worker.daemon': False})

def invoke_dis_xgboost(X, y, number_of_classes, number_of_estimators):
  client_xgboost = get_client()
  dtrain = xgb.dask.DaskDMatrix(client_xgboost, X, y)
  xgb_params = {
                  'n_estimators': number_of_estimators,
                  'num_class': number_of_classes,
                  ## all other xgb parameter
              }
output = xgb.dask.train(client_xgboost, xgb_params, dtrain, num_boost_round=100)
return


def main(client):
    print(f'n_samples={n_samples}, n_features={n_features}')
    X_local, y_local = make_classification(n_samples=n_samples, n_features=n_features, random_state=12345)
    number_of_classes = len(set(y_local))
    X = da.from_array(X_local, chunks=(n_samples//8,n_features), name='train_feature')
    y = da.from_array(y_local, chunks=(n_samples//8,), name='train_label')

    futures = []
    results = []
    
    for i in range(100, 105):
        f1 = client.submit(invoke_dis_xgboost, X, y, number_of_classes, i)
        futures.append(f1)
    for i, f in enumerate(futures):
        results.append(f.result())

return


if __name__ == '__main__':
    client = Client('127.0.0.1:8786')
    main(client)
    

Looking at this part of the source code of xgboost.dask.train

key_to_partition = {part.key: part for part in parts}
who_has = await client.scheduler.who_has(keys=[part.key for part in parts])
worker_map: Dict[str, "distributed.Future"] = defaultdict(list)
for key, workers in who_has.items():
     worker_map[next(iter(workers))].append(key_to_partition[key])

I was hoping to see each part will be distributed to one worker only. However, I am getting the following output,

('tuple-4d2a6701-77c1-41f6-9566-b5d360cde213', ('tcp://127.0.0.1:43719',), 
('tuple-f301e12c-b53d-4520-a764-383d3a5e1785', ('tcp://127.0.0.1:46133',), 
('tuple-4f5d4be3-4e51-4b49-87ea-ef4bf2460f21', ('tcp://127.0.0.1:32889',), 
('tuple-2ad89ca9-2e8a-4486-bfd4-5e36eed92576', ('tcp://127.0.0.1:32889',), 
('tuple-5856e611-f229-4c41-bf49-a39e4d3c6f9a', ('tcp://127.0.0.1:46133',), 
('tuple-d176cd30-4049-40c2-927e-49d06088dc8b', ('tcp://127.0.0.1:46133',), 
('tuple-aba3b2a6-d245-44a9-b3ec-9231b6637c88', ('tcp://127.0.0.1:43719',), 
('tuple-f82b4ee7-47f4-4ccf-a431-efe5b930bd34', ('tcp://127.0.0.1:32889',)

Since there are only 3 unique workers getting all the parititions, number of dispatched_train is 3, therefore only 3 workers are utilizing CPU according to the dask dashboard.

@rudra0713 rudra0713 changed the title ubmitting multiple dask.xgboost calls causes fewer worker utilization Submitting multiple dask.xgboost calls causes fewer worker utilization Jan 7, 2022
@trivialfis
Copy link
Member

trivialfis commented Jan 7, 2022

My system has 8 workers, therefore I created 8 partitions of my dataset.

Can you try to let dask decide the partitioning? Dask usually has a more fine-grained partitioning scheme and can saturate all the workers.

For xgboost, it doesn't move the data. XGBoost will take whatever dask provide and run on corresponding workers

@rudra0713
Copy link
Author

My system has 8 workers, therefore I created 8 partitions of my dataset.

Can you try to let dask decide the partitioning? Dask usually has a more fine-grained partitioning scheme and can saturate all the workers.

I tried with the default auto chunks of dask array. Runtime varied from 117 sec to 235 sec to more than 5 minutes. From the dashboard, I have sometimes seen only 2 workers actually handling the dispatch_train call and in those cases, overall CPU utilization was very low.

For xgboost, it doesn't move the data. XGBoost will take whatever dask provide and run on corresponding workers

Is there a way to make sure that dask uniformly partitions my data to each worker?

@trivialfis
Copy link
Member

There are some ongoing efforts on client.rebalance like dask/distributed#4906 and some other techniques like repartition that you have already been using. Feel free to participate in the discussion in dask/distributed.

I will close this issue now since it's about dask data management instead of XGBoost.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants