Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task stealing regression in 2021-11-0+ (preventing task load balancing) #5564

Closed
arnaudsj opened this issue Dec 3, 2021 · 11 comments · Fixed by #5572
Closed

Task stealing regression in 2021-11-0+ (preventing task load balancing) #5564

arnaudsj opened this issue Dec 3, 2021 · 11 comments · Fixed by #5572
Labels
bug Something is broken

Comments

@arnaudsj
Copy link

arnaudsj commented Dec 3, 2021

What happened: There seems to be an issue with the task scheduling. When the cluster starts, and the first worker grabs the first task, it appears to block the rest of the workers until it completes the first task. This is particularly an issue when large clusters are spun in the cloud, as the scheduler start to send tasks when not all the workers are ready. This minimum working example below also demonstrate how one of the workers appears to keep all the tasks assigned and only distribute them as needed throughout the length of the computation.

What you expected to happen: Workers should start processing tasks immediatly and be able to steal tasks from the 1st worker. This is appears to be a regresssion from pre dask-2021-11-0 (I confirmed for sure it works as expected in 2021-9-2)

Minimal Complete Verifiable Example:

import numpy as np
import time
import dask.bag as db

def slow_function(input):
    time.sleep(30)
    return input

bag = db.from_sequence(np.random.rand(1000, 1), npartitions=1000)
bag.map(slow_function).compute()

Anything else we need to know?:

This is a minimized example of an issue our team ran into at scale on Coiled (cc @gjoseph92)

Environment:

  • Dask version: 2021.11.2
  • Python version: 3.9.7
  • Operating System: Linux / Coiled
  • Install method (conda, pip, source): conda
@jrbourbeau jrbourbeau transferred this issue from dask/dask Dec 3, 2021
@gjoseph92
Copy link
Collaborator

Thanks @arnaudsj! I think this example is a little bit too minimized actually, so here's a different one:

  • Add in the distributed scheduler
  • Have the cluster scale up once computation has already been submitted (key)
  • Remove bag and just use client.map (simpler)
import time
import distributed


if __name__ == "__main__":

    def slow_function(x):
        time.sleep(10)

    with distributed.Client(n_workers=1, threads_per_worker=1) as client:
        input(f"Open dashboard at {client.dashboard_link}, then press enter to start")
        fs = client.map(slow_function, range(30))

        time.sleep(1)
        client.cluster.scale(10)
        client.wait_for_workers(10)
        print("Cluster has 10 workers now, tasks _should_ get balanced to the new workers, but don't")

        distributed.wait(fs)

If you watch the dashboard, you'll see that at first, all 30 slow tasks get assigned to the one worker in the cluster.
The cluster scales up to 10 workers before the first task finishes. However, it doesn't load-balance the other tasks to the new workers yet, even though they're idle:
Screen Shot 2021-12-03 at 4 56 15 PM
^ processing count is all on one worker, though more are available

It's not until one of the tasks finish that the tasks get load-balanced:
Screen Shot 2021-12-03 at 4 56 20 PM
^ one task is complete, now processing count is more balanced

Now, the tasks run on all workers:
Screen Shot 2021-12-03 at 4 56 30 PM

According to @arnaudsj, the regression here is that tasks used to get load-balanced even when none had completed yet. In his case, these slow tasks could take 5-10min, so having to wait that long with a cluster that's 99% idle before other workers pick up the work is costly.

cc @fjetter, @crusaderky

@gjoseph92 gjoseph92 added the bug Something is broken label Dec 4, 2021
@arnaudsj
Copy link
Author

arnaudsj commented Dec 4, 2021

Thank you @gjoseph92 for making such a more detailed use case and better capturing what I was trying to report. Let me know if you need anything else!

@nuKs
Copy link

nuKs commented Dec 6, 2021

I do have the same issue (this is for 4h+ tasks):

Capture d’écran 2021-12-05 à 11 15 46

After killing the worker containing all the stalled tasks, the latters get redistributed.
Capture d’écran 2021-12-05 à 11 43 30

@fjetter
Copy link
Member

fjetter commented Dec 6, 2021

I don't think this is a regression. It has been a long known problem that we need to wait for a task to finish at least once before we get any measurements in. See #3516

There is a config option to give the scheduler a hint on how long an unknown task execution might take with distributed.scheduler.unknown-task-duration, see #3642 It defaults to 500ms meaning it expects to work off that chunk of tasks in few seconds and therefore the scheduler doesn't bother with balancing.

There is also a config option distributed.scheduler.default-task-durations where one can set specific defaults given a task prefix

I started a patch to get measurements in for not-yet-finished tasks here #4459 a while ago but didn't have the time to finish it up. That would likely allow us to get rid of all of this.

@arnaudsj what makes you think this is a regression? Have you seen a different behaviour on another version?

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2021 via email

@gjoseph92
Copy link
Collaborator

@arnaudsj what makes you think this is a regression? Have you seen a different behaviour on another version?

Yeah, prior to 2021-10-0 (worker state machine refactor) he saw tasks get stolen before any in the group had completed.
From the OP:

I confirmed for sure it works as expected in 2021-9-2

There is a config option to give the scheduler a hint on how long an unknown task execution might take with distributed.scheduler.unknown-task-duration

I suggested distributed.scheduler.default-task-durations to @arnaudsj and he said it didn't help. But testing myself, it seems to solve the issue. Adding dask.config.set({"distributed.scheduler.default-task-durations.slow_function": "10s"}) before starting the cluster in my reproducer causes tasks to get balanced before the first one finishes. @arnaudsj and @nuKs, you should try out this setting.

Interestingly, setting unknown-task-duration to 10s does not fix the problem though. Maybe that needs some investigation.

If any of the currently executing tasks have an unknown task duration then we tried to use the time that they had been executing in order to make stealing decisions.

I think this is the behavior we'd want, and why the current behavior feels unreasonable. If a task has been running for 2s, even if it's not done yet, we should guess that others will probably also take 2s, and balance accordingly.

Based on title alone, it seems very likely to me that #5392 is where this behavior changed.

@gjoseph92
Copy link
Collaborator

Yeah, this fixes my reproducer:

diff --git a/distributed/stealing.py b/distributed/stealing.py
index e5fa43d7..0acb1d60 100644
--- a/distributed/stealing.py
+++ b/distributed/stealing.py
@@ -177,7 +177,7 @@ class WorkStealing(SchedulerPlugin):
         level: The location within a stealable list to place this value
         """
         split = ts.prefix.name
-        if split in fast_tasks or split in self.scheduler.unknown_durations:
+        if split in fast_tasks:
             return None, None
 
         if not ts.dependencies:  # no dependencies fast path

As a more reasonable change, perhaps once set_duration_estimate has run as part of the scheduler's reevaluate_occupancy loop, a task's duration should no longer be considered unknown?

@arnaudsj
Copy link
Author

arnaudsj commented Dec 7, 2021

@gjoseph92 - I tested the workaround by setting distributed.scheduler.default-task-durations.slow_dask_task_name, and it did not exhibit the regression: all tasks were redistributed immediately to all available workers. I guess it is only a partial workaround because, for complex workflows, it still requires prerunning the task graph to adequately capture the task's name that needs to be flagged during the cluster spin up (and which will change when the task graph is modified if I am not mistaken).

@fjetter, yes, I confirmed that this behavior was not occurring with 2021.9.2 and prior (did not test to verify if 2021.10.x were still good or not). Our team switched to 2021.11.2 at the beginning of our Sprint last week, and that's when we noticed the change (which for our particular section of our ETL is quite noticeable as we have tasks loading lots of data into the worker's worker_data space before it can start running).

@crusaderky
Copy link
Collaborator

crusaderky commented Dec 7, 2021

I don't think this is a regression. It has been a long known problem that we need to wait for a task to finish at least once before we get any measurements in. See #3516

There is a config option to give the scheduler a hint on how long an unknown task execution might take with distributed.scheduler.unknown-task-duration, see #3642 It defaults to 500ms meaning it expects to work off that chunk of tasks in few seconds and therefore the scheduler doesn't bother with balancing.

There is also a config option distributed.scheduler.default-task-durations where one can set specific defaults given a task prefix

Something doesn't add up. With the default distributed.scheduler.unknown-task-duration = 500ms, in @gjoseph92's demo the scheduler should already believe that a single worker has 30 x 500ms = 15s worth of load on it while the rest has zero. Shouldn't it be sufficient to trigger work stealing?

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Dec 7, 2021

@crusaderky I believe the problem is

if split in fast_tasks or split in self.scheduler.unknown_durations:
return None, None

The task is in scheduler.unknown_durations because it hasn't completed yet.

@fjetter
Copy link
Member

fjetter commented Dec 8, 2021

I agree with @gjoseph92 that #5392 is likely the culprit. The interesting thing is that I classified as a regression since there were tests available asserting that tasks should not be allowed to be stolen.

I traced back the test test_dont_steal_unknown_functions which motivated me to introduce this behaviour back to #278. That's one of the first PRs about stealing and based on the assumption that we simply didn't know the runtime of a task back then, I assume this can be revised.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants