Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Dask XGBoost hanging on rabit initialization during training with multi-GPU multi-nodes #6677

Closed
wants to merge 1 commit into from

Conversation

elaineejiang
Copy link

This helped me resolve the issue in #6649. Please let me know if it makes sense.

@trivialfis
Copy link
Member

trivialfis commented Feb 4, 2021

Did you figure out why didn't the worker receive data?

@@ -818,6 +818,8 @@ def dispatched_train(

'''
LOGGER.debug('Training on %s', str(worker_addr))
# Initialize rabit without workers first
rabit.init()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if it makes sense.

I don't quite understand why does this help.

@@ -839,6 +841,11 @@ def dispatched_train(
LOGGER.info(msg)
else:
local_param[p] = worker.nthreads

# If worker did not receive input data, return without failing
if local_dtrain.num_row() == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should let the train continue, otherwise the cluster can hang on waiting for this exited worker for synchronization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I let training continue, I get this error Check failed: mparam_.num_feature != 0 (0 vs. 0) : 0 feature is supplied. Are you using raw Booster interface? in learner.cc if the worker did not receive input data.

Copy link
Member

@trivialfis trivialfis Feb 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elaineejiang Thanks for the explanation. In that case, the synchronization between DMatrix is failing. During the construction, DMatrix on workers synchronize the number of columns in input data, see

rabit::Allreduce<rabit::op::Max>(&info_.num_col_, 1);
. So as long as one of the workers has any data, it shouldn't be 0.

Copy link
Author

@elaineejiang elaineejiang Feb 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks @trivialfis. Just to be clear, I'm only seeing this error when I try to run multiple XGBoost learners in parallel via

dask_client.submit(xgb.dask.DaskXGBRegressor([...]).fit(dd, xcols, ycol, wcol))

Is this something that's not recommended or not supported?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct version should be something similar to:

classifier_future = client.submit(classifier.fit, X, y, sample_weight=w, eval_set=[(X, y)])

This should work.

Copy link
Author

@elaineejiang elaineejiang Feb 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so sorry -- I typed that out on the fly and got the syntax wrong. I am seeing

Check failed: mparam_.num_feature != 0 (0 vs. 0) : 0 feature is supplied. Are you using raw Booster interface?

with your version.

If that's supposed to work, I guess it's just my network issue?

@trivialfis
Copy link
Member

To me the issue you encountered seems to be a network issue. That's quite problematic as each cluster has its own quirks, I don't know what exactly is happening on your cluster. If I were you I would start with trying to figure out why the connection is refused.

@elaineejiang
Copy link
Author

To me the issue you encountered seems to be a network issue. That's quite problematic as each cluster has its own quirks, I don't know what exactly is happening on your cluster. If I were you I would start with trying to figure out why the connection is refused.

Agreed that this is likely to do with my specific cluster configuration. I have seen similar issues posted (#6604) that haven't been closed yet and was wondering if the initial call to rabit.init() can help resolve those issues.

@trivialfis
Copy link
Member

initial call to rabit.init() can help resolve those issues.

It shouldn't. init is supposed to receive all worker information.

@trivialfis
Copy link
Member

I think I can reproduce the issue now with setting pure to False. Will investigate. So far it's hanged in Python thread mutex, not sure where's applied.

@trivialfis
Copy link
Member

Em .. my error seems to be different. dask is wrongly identifying data being created by other client. It doesn't apply here.

@elaineejiang
Copy link
Author

I see. I saw in the GPU tests (https://github.com/dmlc/xgboost/blob/master/tests/distributed/distributed_gpu.py) to call rabit initializer:

# Always call this before using distributed module
xgb.rabit.init()

which is why I decided to add it just to see what would happen. However, I don't know why calling rabit.init() without worker args prevents the subsequent call rabit.init(args) from hanging. But it's the only thing that's worked on my cluster.

@trivialfis
Copy link
Member

But it's the only thing that's worked on my cluster.

It might not be working correctly I think. A better place to look at tests would be tests/python-gpu/test_gpu_with_dask.py and tests/python/test_with_dask.py.

I'm looking into parallel model training, might take some time as I vaguely recall there's a similar issue on Spark package. For now please train 1 model at a time.

@elaineejiang
Copy link
Author

@trivialfis sounds good, thanks for all the help debugging!

Here's a summary for anyone who might be following. The two issues are:

1. Rabit initialization hangs during training on multi-node, multi-GPU

  • My workaround was to add rabit.init(), and I was able to see the logs such as:
[02/04 19:01:13.564 INFO [xgboost.dask] dask.py:414] Training on tcp://10.101.37.135:30030
[02/04 19:01:13.862 INFO [xgboost.dask] dask.py:414] Training on tcp://10.98.13.197:30030
[02/04 19:01:13.973 INFO [xgboost.dask] dask.py:414] Training on tcp://10.103.98.134:30030
[02/04 19:01:19.461 INFO [xgboost.dask] dask.py:414] Training on tcp://10.99.9.197:30030
  • I also checked that the GPUs were being utilized
  • @trivialfis mentioned it still might not be working properly, so I'll test some more. I also can't explain why calling rabit.init() would make a difference, so I'll continue to investigate.

2. Synchronization between DMatrix is failing, resulting in Check failed: mparam_.num_feature != 0 (0 vs. 0) : 0 feature is supplied. Are you using raw Booster interface?.

  • This only occurs when I'm training multiple models in parallel via classifier_future = client.submit(classifier.fit, X, y, sample_weight=w, eval_set=[(X, y)])

@trivialfis
Copy link
Member

trivialfis commented Feb 4, 2021

If you are willing to patch xgboost, here is something you can try:

Change the fit method of DaskXGBClassifier/DaskXGBRegressor into:

        _assert_dask_support()
        args = {k: v for k, v in locals().items() if k != 'self'}
        if self._client is None:   # Don't set the client yourself
            try:
                with distributed.worker_client() as client:
                    self.client = client
                    return self.client.sync(self._fit_async, **args)
            except ValueError:
                pass
        return self.client.sync(self._fit_async, **args)

This workaround uses the client from worker_client instead of global client. I tested it with classification in a small example and it works fine so far:

def test_parallel_submits(client: "Client"):
    from sklearn.datasets import load_digits
    futures = []
    for i in range(10):
        X_, y_ = load_digits(return_X_y=True)
        X_ += 1.0
        X = client.submit(dd.from_array, X_, chunksize=32)
        y = client.submit(dd.from_array, y_, chunksize=32)
        cls = xgb.dask.DaskXGBClassifier(
            verbosity=1, n_estimators=30, eval_metric="merror"
        )
        f = client.submit(cls.fit, X, y, pure=False)
        futures.append(f)
    classifiers = client.gather(futures)
    assert len(classifiers) == 10
    for cls in classifiers:
        assert cls.get_booster().num_boosted_rounds() == 30

@trivialfis
Copy link
Member

Or equivalently you can set the client and launch training on a local function:

def my_training_func(cls, X, y):
    with worker_client() as client:
        cls.client = client
        return cls.fit(X, y)

client.submit(my_training_func, X, y)

@elaineejiang
Copy link
Author

I've tried doing the latter example (getting worker client before launching) but I still hit the 0 feature error.

@trivialfis
Copy link
Member

Thanks for the reply. I think for now one will have to train one model at a time. xgboost relies on MPI like communication framework, if one of the worker is scheduled behind the others it will hang.

@elaineejiang
Copy link
Author

Thanks @trivialfis for looking into this. Would it be possible to add this request to the roadmap?

@trivialfis
Copy link
Member

I will put it in there, but not promising it will deliver. Seems to be a lot of work.

@elaineejiang
Copy link
Author

Thanks @trivialfis! Is there an expected release deadline for 1.4.0?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants