-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP co-assign related root-ish tasks #4899
Conversation
distributed/scheduler.py
Outdated
sum(map(len, group._dependencies)) < 5 # TODO what number | ||
) | ||
): | ||
group._last_worker_tasks_left -= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I think that having a counter like this opens us up to failures in some complex situations.
For example, what happens when we lose a few workers and need to recompute these tasks and this count goes negative?
I would encourage us to keep thinking about comparing occupancy between the best and recent workers. That feels like a strictly local decision that might be more robust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the problem is that duration_average is unknown that's fine, we should replace it with whatever is used to increment the worker's occupancy when the previous tasks were added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this will probably get thrown off by both new workers arriving and workers leaving. I'll think more about how to use something less stateful.
distributed/scheduler.py
Outdated
|
||
total_nthreads = sum( | ||
wws._nthreads for wws in candidates | ||
) # TODO get `self._total_threads` from scheduler? Though that doesn't account for worker restrictions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Common case, if len(candidates) < len(self.workers)
... else self._total_threads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. More that this function isn't a method of SchedulerState
, so we'll have to pass in self._total_threads
to here (fine, just slightly awkward).
distributed/scheduler.py
Outdated
if ( | ||
ws is not None # there is a previous worker | ||
and group._last_worker_tasks_left > 0 # previous worker not fully assigned | ||
and ts._dependents # task has dependents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend skipping the dependents check. This can change in the future. Consider the following case:
df = dd.read_parquet(...).persist()
df.x.mean()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's cool (and the first non-performance-related advantage I see to this approach over finding relatives in the graph structure). So neighboring partitions of df
would be stored on the same workers, before we even know what we're going to do with them.
distributed/scheduler.py
Outdated
and ( # is a root-like task (task group depends on very few tasks) | ||
sum(map(len, group._dependencies)) < 5 # TODO what number | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that only groups of size N (=5)
are grouped together and everything above is distributed randomly?
If my understanding here is correct, we might want to choose 32 (+/-1?) since this is the default branching factor for tree reductions in dask, last time I checked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is restricing this behavior to task groups where the size of the task group is large, but the number of dependencies of that group is small. This is true in situations like the following:
df = dd.read_parquet(...) # thousands of tasks, but zero dependenies
x = da.from_zarr(...) # thousands of tasks, but one dependency (the zarr file)
But not in situations like the following:
y = x + 1 # thousands of tasks, but thousands of dependencies
@mrocklin here's the xarray/zarr issue I'm running into. Running this code on my branch behaves the same as on What I'm struggling to debug is that (from adding print statements to # test_zarr.py
import xarray as xr
import dask.array as da
from distributed import Client, LocalCluster
if __name__ == "__main__":
cluster = LocalCluster(
processes=True, n_workers=4, threads_per_worker=1, memory_limit=0
)
client = Client(cluster)
# Write a zarr array to disk (requires 100GB free disk space!)
# Comment this out once you've run it once.
data = da.zeros((12500000, 1000), chunks=(12500000, 1))
ds = xr.Dataset({"data": (("x", "y"), data)})
ds.to_zarr("test.zarr")
print("Saved zarr")
# Do the same array-sum example, but from zarr.
# This _should_ behave just as well, but on the dashboard we can see lots of transfers.
# In fact, the dashboard looks the same as running this test on main.
ds_zarr = xr.open_zarr("test.zarr")
ds_zarr.sum("y").data.data.compute()
# TODO where is `sum-` getting scheduled?? Doesn't seem to be going through `decide_worker` Here's the full (optimized) graph with order coloring. You can see that it looks just like the non-zarr one, except every sum task depends on the one zarr array task: mydask.pdf |
Unfortunately `ts._prefix._duration_average` == -1 for all the root tasks we care about, so this won't work.
This reverts commit e348a7c.
When a task is root-like and the previous worker is full, we don't want to use the normal `decide_worker` logic, since that only considers as candidates workers that have the deps of the dask. Since the task only has 1-5 deps, we'd only ever consider the same 1-5 workers.
4bfb165
to
0fbb75e
Compare
ws: WorkerState = group._last_worker | ||
|
||
if valid_workers is not None: | ||
total_nthreads = sum(wws._nthreads for wws in valid_workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This walks through all workers for all tasks. We may not be able to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See below; I believe valid_workers is None
is the common case? Agreed that this isn't ideal though. But if there are worker restrictions, ignoring them and just using self._total_nthreads
could be wildly wrong (imagine 10 GPU workers and 100 CPU workers for a task group of 50 that needs to run on GPUs). Maybe there's a cheaper measurement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, grand.
distributed/scheduler.py
Outdated
if valid_workers is not None: | ||
total_nthreads = sum(wws._nthreads for wws in valid_workers) | ||
|
||
group_tasks_per_worker = len(group) / total_nthreads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens here if valid_workers is None
? It looks like total_nthreads might be undefined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we use the self._total_nthreads
that SchedulerState.decide_worker
passed in (see https://github.com/dask/distributed/pull/4899/files#diff-bbcf2e505bf2f9dd0dc25de4582115ee4ed4a6e80997affc7b22122912cc6591R2376). So in the common case of no worker restrictions, we won't have to do the total_nthreads
calculation.
distributed/scheduler.py
Outdated
|
||
group._last_worker = ws | ||
group._last_worker_tasks_left = math.floor(group_tasks_per_worker) | ||
group._last_worker_priority = ts.priority |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to do this when we're not in a root-ish task situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because we're also using this as our base case when group._last_worker
is not set. So for the first root-ish task, we fall through to here, and then need to store our decision for the next decide_worker
cycle. We could skip doing this if we've identified that the task is not root-like (which would save us from ever checking that again), though it might make the code a little more tangled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for the first root-ish task, we fall through to here
We can maybe guard this code block with a condition that we're root-ish then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes—though we currently skip determining if we're root-ish when last_worker
is None. So we'd need some other sentinel value to distinguish "no last worker because we haven't checked yet if it's root-ish" from "no last worker because it's not root-ish".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently check the following:
and group_tasks_per_worker > 1 # group is larger than cluster
and ( # is a root-like task (task group is large, but depends on very few tasks)
sum(map(len, group._dependencies)) < 5 # TODO what number
)
I recommend that we define that as a variable at the top and then use it in a couple of places. Would that work?
Only compute `total_nthreads` when a new worker is needed, and only compute the number of dependencies once per task group. Overloads the meaning of `_last_worker` to indicate if we've decided in the past whether a TaskGroup is root-ish or not.
Alternate solution that uses occupancy here: #4922 |
In the above case, we want ``a`` and ``b`` to run on the same worker, | ||
and ``c`` and ``d`` to run on the same worker, reducing future | ||
data transfer. We can also ignore the location of ``X``, because | ||
as a common dependency, it will eventually get transferred everywhere. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<3 the ascii art
Comment/question: Do we want to explain all of this here? Historically I haven't put the logic behind heuristics in the code. This is a subjective opinion, and far from universal, but I find that heavily commented/documented logic makes it harder to understand the code at a glance. I really like that the current decide_worker implementation fits in a terminal window. I think that single-line comments are cool, but that long multi-line comments would better be written as documentation.
Thoughts? If you are not in disagreement then I would encourage us to write up a small docpage or maybe a blogpost and then link to that external resource from the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also planning on updating https://distributed.dask.org/en/latest/scheduling-policies.html#choosing-workers, probably with this same ascii art. So just linking to that page in the docstring seems appropriate.
This reverts commit f2da0bc.
This reverts commit 0d1e2380b525b0ee7b4e60d9bee62f889ed5520b.
Maybe avoiding -1 is excessive; I just wanted it to still work if we changed to a Py_ssize_t
Arguably would be better to check that `ws` is in (valid_workers or all_workers); downside is that the common `all_workers` would require an O(n) search, since it's only `dict_values`, not `dict`.
Closing in favor of #4967, which is similar logic but more concise. It also removes |
In
decide_worker
, rather than spreading out root tasks as much as possible, schedule consecutive (by priority order) root(ish) tasks on the same worker. This ensures the dependencies of a reduction start out on the same worker, reducing future data transfer.black distributed
/flake8 distributed
/isort distributed