Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task timeouts and retries #391

Open
mrocklin opened this issue Jul 27, 2016 · 7 comments
Open

Task timeouts and retries #391

mrocklin opened this issue Jul 27, 2016 · 7 comments

Comments

@mrocklin
Copy link
Member

Sometimes we want to retry a task on another worker if it appears to be taking a long time. One approach would be to specify a timeout when submitting the task

future = e.submit(func, *args, timeout=5)

Another would be for the function itself to raise a special exception

def func(*args):
    ...
    raise RetryException()

The latter is somewhat attractive because it removes some administrative burden from the scheduler.

Pinging @minrk for thoughts or previous experiences.

@minrk
Copy link
Contributor

minrk commented Jul 28, 2016

IPython doesn't allow retrying task on timeout (that might be a little tricky, depending on how well you support aborting/invalidating results that are pending), but it does support task reassignment based on special exceptions. In IPython's case, it's an UnmetDependency exception, indicating that the worker cannot run the task (meant for missing packages, resources like memory, GPUs, etc.).

@bnaul
Copy link
Contributor

bnaul commented Aug 16, 2018

Just curious, has anything changed on this front? In particular I was looking for a solution to:

Sometimes we want to retry a task on another worker if it appears to be taking a long time.

I often find that the last couple of tasks (out of many thousands) will hang for unknown reasons; the only solution I've come up with so far is to submit everything w/ retries and have some sort of loop in the client that checks for this situation and retries everything that's running:

c.retire_workers([k for k, v in c.processing().items() if v])

Far from ideal but it at least breaks out of the deadlock where the job will never finish.

@louisabraham
Copy link

@bnaul you might be interested in dask/dask#1183

@Hoeze
Copy link

Hoeze commented Oct 27, 2020

Just curious, has anything changed on this front? In particular I was looking for a solution to:

Sometimes we want to retry a task on another worker if it appears to be taking a long time.

I often find that the last couple of tasks (out of many thousands) will hang for unknown reasons; the only solution I've come up with so far is to submit everything w/ retries and have some sort of loop in the client that checks for this situation and retries everything that's running:

c.retire_workers([k for k, v in c.processing().items() if v])

Far from ideal but it at least breaks out of the deadlock where the job will never finish.

Wow, this line is a life-saver.
Almost in all of my jobs I end up running into this problem.

Executing this by hand also helped with my issue here:
https://stackoverflow.com/questions/64557212/dask-distributed-workers-stall-when-reaching-80-of-memory-limit

Is there meanwhile a solution for this problem?

@jrbourbeau
Copy link
Member

FWIW today there's a special distributed.Reschedule exception which tasks can raise to automatically be stopped and rescheduled. This doesn't guarantee that it will ultimately be run a different worker, but might still be useful for folks who are tracking this issue.

@ljstrnadiii
Copy link

This would be really useful especially if that could be a global to all tasks. My specific use case would be basically annotate any task in a graph with a timeout that would trigger a retry. I often find that 99% of my tasks finish quickly, but for some reason one hangs potentially indefinitely. I could use the exception mentioned by @jrbourbeau above, but it make it hard if I am calling a function defined by a package that triggers many tasks.

One example is Xarray's' to_zarr(). It would be great to be able to use a context manager for this like

with dask.config.set(task_timeout=1000):
   dset.to_zarr(...)

Another use case that would be helpful is when I have a hypothesis that a dask distributed lock is stuck or something similar. There are many hypothesis I have that are unlikely to be true about why a very small subset of tasks are hanging at the very end. This is the only sweeping mechanism that could help trigger a retry.

I have no clue what this would take, just giving my two cents at how this would be a nice-to-have.

@grandrew
Copy link

grandrew commented Jun 4, 2023

I have another scenario of same issue where raising anything from the task won't work: I have a cluster tuned for raw performance versus stability, and tasks are allowed to rarely fail with unexpected consequences (segfaults typically) and thus can sometimes hang without any way to recover. I don't see any "infinite" hangs - when the tasks stops responding it will eventually trigger a reschedule "automatically" but the delay is huge, probably due to some internal timeout that saves the day, so I can safely keep it running without supervision. The issue is of course degraded overall performance.

The line I'm seeing in logs after a long delay:

2023-06-04 13:20:28,448 - distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'run_sim_many-91117067be7edfb74fec26fd07754ed2': ('tcp://10.0.0.33:42059',)} 

So I need a way to trigger that internal timeout. Ideally, some routine should learn 99th percentile of single-job completion time, and force-reschedule.

I will try to play with TCP keepalive and timeouts on scheduler node to see if it helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants