Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyError and Worker already exists #169

Closed
louisabraham opened this issue Oct 6, 2018 · 11 comments
Closed

KeyError and Worker already exists #169

louisabraham opened this issue Oct 6, 2018 · 11 comments

Comments

@louisabraham
Copy link
Contributor

I'm trying to setup dask with tpot.

My code looks like this:

  from dask_jobqueue import LSFCluster
cluster = LSFCluster(cores=1, memory='3GB', job_extra=['-R rusage[mem=2048,scratch=8000]'],
                    local_directory='$TMPDIR',
                    walltime='12:00')

from dask.distributed import Client
client = Client(cluster)
cluster.scale(10)

from tpot import TPOTRegressor

reg = TPOTRegressor(max_time_mins=30, generations=20, population_size=96,
                    cv=5,
                    scoring='r2',
                    memory='auto', random_state=42, verbosity=10, use_dask=True)
reg.fit(X, y)

and I keep getting those annoying errors:

distributed.scheduler - ERROR - '74905774'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74905774'

distributed.utils - ERROR - Worker already exists tcp://10.205.103.50:35780
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/utils.py", line 648, in log_errors
    yield
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1261, in add_worker
    raise ValueError("Worker already exists %s" % address)
ValueError: Worker already exists tcp://10.205.103.50:35780

I think there might be a problem with LSFCluster because it puts a lot of workers in cluster.finished_jobs that are still running according to bjobs and even to the dask.distributed web interface.

@guillaumeeb
Copy link
Member

See #117, this error is seen when workers die and are being restarted. This is often due to out of memory errors.

The message should be corrected on master branch, but the underlying issue coming from your dask process will remain.

Try increasing memory per worker and also using dashboard to monitor your worker processes and see if you spot something wrong.

@louisabraham
Copy link
Contributor Author

louisabraham commented Oct 6, 2018

Thank you! Yes, I see the memory increasing a bit too much, I'll try increasing the limit in a few hours and hope it will solve my problem.

However, if a node unexpectedly encounters a problem that causes the worker to restart, shouldn't dask restart the computation as well? And the Cluster object consider the jobs as running again?

@guillaumeeb
Copy link
Member

Dask proposes a mechanism that will relaunch your failed tasks. I believe it should try to launch the tasks three times, but I'm not sure exactly in which cases. However, the memory problem in your case will probably shows up every time, and eventually your computation should fail.

Glad the problem is identified here, I will close this issue, but feel free to raise something upstream in dask or distributed if you believe there is a problem in task retrial mechanism!

@louisabraham
Copy link
Contributor Author

louisabraham commented Oct 6, 2018

Yes, you were right, the used memory grows a lot before the KeyError happens.
It seems that tpot tries PolynomialFeatures on data with 800 columns :)
I think the problem comes from tpot, but at the same time I don't understand how it is handled when executing locally.
EDIT: it is handled here, if an exception happens without dask, the score is float('-inf'), but nothing handles an error from Dask it seems error_score=float('-inf') doesn't work.

Is there a way to signal that there is a memory error? Not a message but an exception or a special return type.

A good test to identify those issues is:

grep Restarting dask-worker.err

In my case it shows some

Worker exceeded 95% memory budget. Restarting

and exactly 4 so I think you were right about the three restarts.

What is funny is that it will only crash after the 4 attempts, while executing other tasks in the meantime.

Thank you a lot for your help!

@guillaumeeb
Copy link
Member

A pleasure to help!

Is there a way to signal that there is a memory error? Not a message but an exception or a special return type.

You should try to ask this upstream in distributed, I imagine there has been some thought in this behavior.

@louisabraham
Copy link
Contributor Author

louisabraham commented Oct 6, 2018

Yes, I can confirm it happens as well with a LocalCluster, so nothing to do with dask-jobqueue!

@mrocklin
Copy link
Member

mrocklin commented Oct 7, 2018 via email

@louisabraham
Copy link
Contributor Author

louisabraham commented Oct 7, 2018

I think the problem comes from tpot that doesn't handle the memory errors.

The problem might come from dask_ml.model_selection._search.build_graph because the argument error_score=float('-inf') was used but didn't seem to have an effect.

https://github.com/EpistasisLab/tpot/blob/507b45db01e8f88651f4ce8e03b607a5b50146f5/tpot/gp_deap.py#L443-L454

A reproducible example with tpot that runs fast enough involves restricting the operators to force it to run PolynomialFeatures.

I should be able produce an example that causes an error in dask_ml. Maybe I should open an issue there?

However, I think the problem comes from dask.distributed as the error also happens with the joblib backend provided by dask (see EpistasisLab/tpot#779).
My guess is that the memory errors do not raise exceptions.

@louisabraham
Copy link
Contributor Author

I think I need some help to produce a reproducible example that runs without a cluster.

I am not sure how to trigger a memory error on my laptop. I fear that it will either use the swap memory or restart the computer.

I produced an error with a LocalCluster on a notebook started with LSF bsub with a soft memory limit.

Maybe using ulimit will work?

@mrocklin
Copy link
Member

mrocklin commented Oct 7, 2018 via email

@louisabraham
Copy link
Contributor Author

I think that setting a dask-worker with the --memory-limit option will do the trick.

ulimit doesn't work at all on macos and doesn't limit effectively the memory on linux.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants