Skip to content
This repository has been archived by the owner on Jul 16, 2021. It is now read-only.

support sample weights #29

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

support sample weights #29

wants to merge 3 commits into from

Conversation

tomlaube
Copy link
Contributor

@tomlaube tomlaube commented Oct 12, 2018

Basic support, for sample weights, please don't merge yet

@tomlaube tomlaube changed the title [WIP] support sample weights support sample weights Oct 12, 2018
@tomlaube
Copy link
Contributor Author

There seems to be some issues with the tests using tornado loop, is it somehow connected to the fix mentions in test?

@mrocklin
Copy link
Member

mrocklin commented Oct 12, 2018 via email

@mrocklin
Copy link
Member

This error goes away if I remove this line from setup.cfg

-addopts = -rsx -v -n 1 --boxed

If memory serves @TomAugspurger added this because xgboost didn't play nicely if it was started and stopped repeatedly within a process.

I tried finding a version of dask-distributed that worked with these tests and wasn't able to find anything. Perhaps the pytest-xdist package changed? I'm at a loss here.

@mrocklin
Copy link
Member

I think that ideally it would be nice to find out what we're doing that makes XGBoost sad when we run many tests sequentially in the same process (by eliminating that line in setup.cfg entirely)

@TomAugspurger
Copy link
Member

I tried in dmlc/xgboost#3656 and dmlc/xgboost#2796 but didn't make any progress really.

@tomlaube
Copy link
Contributor Author

I've trying reproducing the dmlc/xgboost#3656 on version of xgboost (0.80, 0.72.1, 0.7.1, 0.7.post4) and I cannot reproduce it (running 4.15.0-36-generic). So what i would probably suggest is to remove the line from setup.cfg and add version bound for xgboost>=0.7. Regarding dmlc/xgboost#2796, the internal state of rabit engine is not directly visible via the python bindigs, so you cannot really tell if you called init or not before.

@tomlaube
Copy link
Contributor Author

I also have unrelated question, if there is some plan to merge dask-xgboost to dask-ml. So that LabelEnoder is part of the code and conversion of labels to ints doesn't need to be handled externally.

@mrocklin
Copy link
Member

I've trying reproducing the dmlc/xgboost#3656 on version of xgboost (0.80, 0.72.1, 0.7.1, 0.7.post4) and I cannot reproduce it (running 4.15.0-36-generic). So what i would probably suggest is to remove the line from setup.cfg and add version bound for xgboost>=0.7. Regarding dmlc/xgboost#2796, the internal state of rabit engine is not directly visible via the python bindigs, so you cannot really tell if you called init or not before.

If you're able to resolve this either here or in a separate PR that would be very welcome. Pinning above version 0.7 sounds fine to me.

@tomlaube
Copy link
Contributor Author

I've provided fix using monkey patching and fixtures, where i use context to manager the lifetime of the rabit instance. All tests seems to pass, just the code formatting failed to some reason.

@mrocklin
Copy link
Member

Interesting. I'm curious, is this something that we should change within the dask-xgboost code itself instead of within tests? When should we be calling init/finalize (I'm not very familiar with rabit)

@tomlaube
Copy link
Contributor Author

I don't think that's practical. Mostly because rabit assumes gang allocation not incremental one as dask does.

Imagine that in constructor you init rabit on all of the workers and in __ del __ you call finalize. If worker joins in the middle, the task will fail on this node, since it was never initialized. I assume there are better ways to do so in dask, like adding RabitService to each of the workers, or via the actors api (which would cause problems with passing actor handlers and actors own lifetime same as in ray: https://ray.readthedocs.io/en/latest/actors.html#current-actor-limitations)

@mrocklin
Copy link
Member

mrocklin commented Oct 16, 2018

Some more questions about Rabit (sorry for going off topic):

When you say "gang allocation" I assume that you mean that they all have to arrive at the same time. If so, how do they know when their peers have arrived? We don't tell them how many to expect.

I assume there are better ways to do so in dask, like adding RabitService to each of the workers, or via the actors api

This might be a decent approach: https://distributed.dask.org/en/latest/api.html#distributed.Client.register_worker_callbacks

But from what you say above it sounds like we don't want to do this, that calling init on new workers is not preferable.

Lets consider the following situation:

  1. Scale cluster up to four workers
  2. Call dask-xgboost train
  3. Finish that call
  4. Two workers die (two still alive), add ten more workers
  5. Call dask-xgboost train

Is there some set of init and finalize calls that make this workflow feasible? I'm happy to figure out the infrastructure on the dask side to let us call init/finalize at arbitrary points. I genuinely don't know when do to call them though to be safe.

@tomlaube
Copy link
Contributor Author

tomlaube commented Oct 16, 2018

Yes by gang allocation i mean that all the workers need to run at the same time. And we do pass the number of workers, it's right here:

rabit = RabitTracker(hostIP=host, nslave=n_workers)

You just don't give it to rabit instances but to tracker, which in turn gives you env that you init rabit at each worker right here:
args = [('%s=%s' % item).encode() for item in env.items()]

The issues of transition from incremental to gang allocation is usually addressed by some synchronization barrier, as for example sparks project hydrogen https://vimeo.com/274267107
So, technically the correct solution is would be to change the dask scheduler to support this sort of barrier, that would be used to init all the workers, do the work and finalize.

@mrocklin
Copy link
Member

Ah! Indeed we do! Thank you for the pointers. It's been a while since I looked at that code.

Barriers are pretty easy to do. I suspect that we already have all of the mechanisms we need today. It looks like the tracker on the scheduler side quits after all the workers shut down, which I assume happens during the finalize call. Given this, I'm curious why things failed before. We start a tracker on the scheduler, init a bunch of workers, train, finalize a bunch of workers, presumably the tracker finishes up. What stops us from starting a new tracker and init-ing again?

@tomlaube
Copy link
Contributor Author

tomlaube commented Oct 17, 2018

That's a good point. But looking at the code tracker code closer, you can see that the join method itself just tries to join another thread in a loop:

def join(self):

so theoretically it should kill itself. Is there some way we can track this? Is there a way to register things at the scheduler and then at the end of train see if they actually shutted down?

@tomlaube
Copy link
Contributor Author

Isn't the problem actually much simpler. We run two tasks concurrently, on the same thread, that both call rabit init, which can be called only once per thread? Looking just at the c++ code, the init is thread local so it cannot be two threads.
https://github.com/dmlc/rabit/blob/edc403fb2c8c21039c8d57cf93057dfe248f1717/src/engine.cc#L47

I admit that i don't know much about tornado and the ioloop, but this seems like that the order is like so:

coroutine1 gets allocated on Thread-1, calls init and yields the execution
coroutine2 gets allocation on Thread-1, calls init and crushes

In other words the problem could be traditional anti pattern of using thread local storage with thread pool

@mrocklin
Copy link
Member

Is there some way we can track this? Is there a way to register things at the scheduler and then at the end of train see if they actually shutted down?

Yes, if the functions passed to the run_on_scheduler function have a magical dask_scheduler parameter then that parameter will be filled with the scheduler itself. So the following diff should attach the tracker thread to the scheduler, and then wait on it as it finishes training.

diff --git a/dask_xgboost/core.py b/dask_xgboost/core.py
index 6bf29d7..c843a00 100644
--- a/dask_xgboost/core.py
+++ b/dask_xgboost/core.py
@@ -34,7 +34,7 @@ def parse_host_port(address):
     return host, port
 
 
-def start_tracker(host, n_workers):
+def start_tracker(host, n_workers, dask_scheduler=None):
     """ Start Rabit tracker """
     env = {'DMLC_NUM_WORKER': n_workers}
     rabit = RabitTracker(hostIP=host, nslave=n_workers)
@@ -45,6 +45,7 @@ def start_tracker(host, n_workers):
     thread = Thread(target=rabit.join)
     thread.daemon = True
     thread.start()
+    dask_scheduler.xgboost_thread = thread
     return env
 
 
@@ -155,6 +156,13 @@ def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
     num_class = params.get("num_class")
     if num_class:
         result.set_attr(num_class=str(num_class))
+
+    def wait_on_tracker_thread(dask_scheduler):
+        dask_scheduler.xgboost_thread.join()
+        del dask_scheduler.xgboost_thread
+
+    yield client.run_on_scheduler(wait_on_tracker_thread)
+
     raise gen.Return(result)

You could also add other operations in the wait_on_tracker_thread function, like printing out or returning state.

When I add this diff my tests fail and pass as before.

@mrocklin
Copy link
Member

I admit that i don't know much about tornado and the ioloop, but this seems like that the order is like so:

coroutine1 gets allocated on Thread-1, calls init and yields the execution
coroutine2 gets allocation on Thread-1, calls init and crushes

I believe that we only call init on the workers within tasks. These are always run in separate threads outside of the tornado event loop. The tornado event loop handles communication and administrative work while the thread pool handles all user code.

@TomAugspurger
Copy link
Member

@mrocklin was your patch in
#29 (comment) based off master? applying it, I still see the test suite exiting early

diff --git a/.circleci/config.yml b/.circleci/config.yml
index f1463079..72faf516 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -16,7 +16,7 @@ jobs:
             conda config --add channels conda-forge
             conda create -q -n test-environment python=${PYTHON}
             source activate test-environment
-            conda install -q coverage flake8 pytest pytest-cov pytest-xdist numpy pandas xgboost dask distributed scikit-learn sparse scipy
+            conda install -q coverage flake8 pytest pytest-cov numpy pandas xgboost dask distributed scikit-learn sparse scipy
             pip install -e .
             conda list test-environment
       - run:
diff --git a/dask_xgboost/core.py b/dask_xgboost/core.py
index 6bf29d78..c843a000 100644
--- a/dask_xgboost/core.py
+++ b/dask_xgboost/core.py
@@ -34,7 +34,7 @@ def parse_host_port(address):
     return host, port
 
 
-def start_tracker(host, n_workers):
+def start_tracker(host, n_workers, dask_scheduler=None):
     """ Start Rabit tracker """
     env = {'DMLC_NUM_WORKER': n_workers}
     rabit = RabitTracker(hostIP=host, nslave=n_workers)
@@ -45,6 +45,7 @@ def start_tracker(host, n_workers):
     thread = Thread(target=rabit.join)
     thread.daemon = True
     thread.start()
+    dask_scheduler.xgboost_thread = thread
     return env
 
 
@@ -155,6 +156,13 @@ def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
     num_class = params.get("num_class")
     if num_class:
         result.set_attr(num_class=str(num_class))
+
+    def wait_on_tracker_thread(dask_scheduler):
+        dask_scheduler.xgboost_thread.join()
+        del dask_scheduler.xgboost_thread
+
+    yield client.run_on_scheduler(wait_on_tracker_thread)
+
     raise gen.Return(result)
 
 
diff --git a/setup.cfg b/setup.cfg
index 2348f495..11894603 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -5,4 +5,4 @@ universal=1
 exclude = tests/data,docs,benchmarks,scripts
 
 [tool:pytest]
-addopts = -rsx -v -n 1 --boxed
+addopts = -rsx -v

pytest output:

collected 12 items

dask_xgboost/tests/test_core.py::test_basic PASSED                                                                                                   [  8%]
dask_xgboost/tests/test_core.py::test_dmatrix_kwargs PASSED                                                                                          [ 16%]
dask_xgboost/tests/test_core.py::test_numpy ⏎ 

@mrocklin
Copy link
Member

mrocklin commented Nov 7, 2018

I'm not sure that the patch I provided ever fixed the problem. I think it was intended to start folks on how to track things. I don't recall much here though.

@TomAugspurger
Copy link
Member

TomAugspurger commented Nov 7, 2018 via email

@mrocklin
Copy link
Member

mrocklin commented Nov 7, 2018

To be honest I haven't thought much about the mocking changes

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants