Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart given worker(s) using client [help wanted] #1823

Closed
ameetshah1983 opened this issue Mar 8, 2018 · 12 comments · Fixed by #7154
Closed

Restart given worker(s) using client [help wanted] #1823

ameetshah1983 opened this issue Mar 8, 2018 · 12 comments · Fixed by #7154

Comments

@ameetshah1983
Copy link

Current client.restart() restarts all workers and entire cluster. Is there a way to restart or perform stop, start, restart operation on single / list of workers using the client apis.

This will allow cleanup of specific workers only without affecting others where additional tasks may be running.

@ameetshah1983 ameetshah1983 changed the title Shutdown given worker(s) using client Shutdown given worker(s) using client [help wanted] Mar 8, 2018
@ameetshah1983 ameetshah1983 changed the title Shutdown given worker(s) using client [help wanted] Restart given worker(s) using client [help wanted] Mar 9, 2018
@mrocklin
Copy link
Member

mrocklin commented Mar 9, 2018

No, there is not currently an easy way to do this.

@sheridp
Copy link

sheridp commented Mar 15, 2018

Is it possible to restart the scheduler? One thing I noticed is that if you have published datasets and then call client.restart, the datasets are still published in the scheduler. If another client then asks for the result from one of those datasets, the scheduler does not reissue the work to a worker. It seems like client.restart should also restart the scheduler or at least unpublish all datasets.

@martindurant
Copy link
Member

Unpublishing all datasets when performing a restart would be easy to implement, e.g., by adding a restart() method to PublishExtension.

@mrocklin
Copy link
Member

I recommend raising the published datasets topic as a separate issue.

@ameetshah1983
Copy link
Author

ameetshah1983 commented Apr 4, 2018

Add retire_workers looks like helps shutting down workers without removing them completely. But is there a way to start these workers again using an api.

@mrocklin mrocklin reopened this Apr 4, 2018
@mrocklin
Copy link
Member

mrocklin commented Apr 4, 2018

You're right. That PR doesn't solve your issue.

@VMois
Copy link
Contributor

VMois commented Nov 30, 2018

Any ideas or suggestions about this issue. I want to use worker restart to refresh local packages cache. Is it makes sense to make PR for this.

@wkerzendorf
Copy link

I would like to restart the worker after every task is run as there seems to be a memory leak with dask and my tasks (which does not occur when the task is run locally). any ideas - or should I make a new PR.

@nmatare
Copy link

nmatare commented Jul 7, 2019

I came across this requirement as well:

I solved it with the below. This assumes your dask-worker(s) are overseen by nanny managers. This will throw a CommClosedError as the scheduler can no longer communicate with the worker but otherwise it should reboot, however hacky.

import os
my_worker = 'name-or-ip-address-to-worker'
client.run(lamda: os._exit(0), workers=[my_worker])

@asford
Copy link
Contributor

asford commented Jan 31, 2021

I'm investigating this as a potential solution to #391 (comment), in which we occasionally see unresponsive workers during network-heavy aggregation operations.

Looking through the existing restart code, it appears that a something akin to "retire workers" could be implemented that first
makes an attempt to replicate data from the worker that's about to be restarted, via:

# Keys orphaned by retiring those workers
keys = set.union(*[w.has_what for w in workers])
keys = {ts._key for ts in keys if ts._who_has.issubset(workers)}
other_workers = set(parent._workers_dv.values()) - workers
if keys:
if other_workers:
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)
else:
return {}

Then issues a command to the nanny (if present) to restart the worker process:

https://github.com/dask/distributed/blob/1297b18ff09276f6ad1553d40ab3ce77acf0fc0e/distributed/scheduler.py#L4874-L4881#4345

@mrocklin, Would this make sense as an additional scheduler api?

@martindurant
Copy link
Member

I have looked at retire_workers in the past and I would agree that it makes sense in the restart-particular-worker scenario. However, if the worker is unresponsive, what is the hope to be able to copy its data?

@ameetshah1983
Copy link
Author

Tried the retire_workers api using cl.retire_workers and with close_workers=False nothing happens and if default setting the worker completely shutsdown but doesnt come up again.

Any other way to restart a particular worker, may be via nanny or some other method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants