Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Co assignment groups #7141

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Oct 17, 2022

This is an implementation of an algorithm we discussed in an offline work session that tries to combine tasks into groups based on whether or not they should be co-located to reduce network traffic and RAM.

I called these groups "co assignment groups" or in short, cogroups. The idea is basically to lean on dask.order and use "jumps" in priority to detect branches. cc @eriknw I would be very interested if something like this can not be returned directly from one of the dask.order functions.

This is leaning on an earlier attempt for this in #7076

This implementation is still incomplete. Particularly, what's missing is

  • Integration with task queuing
  • Handling of underutilized clusters
  • All sorts of performance optimizations of the algorithm itself (e.g. there are many sorts still in there but nothing that could not be simply refactored)
  • probably a couple of other things
Raw notes from the offline workshop (I'll open another issue shortly to summarize)

IMG_20221014_163728
IMG_20221014_155805

IMG_20221014_163731
IMG_20221014_163752

@fjetter fjetter requested a review from gjoseph92 October 17, 2022 13:24
Comment on lines +2073 to +2100
def cogroup_objective(self, cogroup: int, ws: WorkerState) -> tuple:
# Cogroups are not always connected subgraphs but if we assume they
# were, only the top prio task would need a transfer
tasks_in_group = self.cogroups[cogroup]
# TODO: this could be made more efficient / we should remeber max if it is required
ts_top_prio = max(tasks_in_group, key=lambda ts: ts.priority)
dts: TaskState
comm_bytes: int = 0
cotasks_on_worker = 0
for ts in tasks_in_group:
if ts in ws.processing or ws in ts.who_has:
cotasks_on_worker += 1
for dts in ts_top_prio.dependencies:
if (
# This is new compared to worker_objective
(dts not in tasks_in_group or dts not in ws.processing)
and ws not in dts.who_has
):
nbytes = dts.get_nbytes()
comm_bytes += nbytes

stack_time: float = ws.occupancy / ws.nthreads
start_time: float = stack_time + comm_bytes / self.bandwidth

if ts_top_prio.actor:
raise NotImplementedError("Cogroup assignment for actors not implemented")
else:
return (-cotasks_on_worker, start_time, ws.nbytes)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very naive way to decide where to put the task. We could also use a similar approach to #7076 but this felt minimal invasice

Comment on lines +2292 to +2298
if ts.cogroup is not None:
decider = self.decide_worker_cogroup
else:
if not (ws := self.decide_worker_non_rootish(ts)):
return {ts.key: "no-worker"}, {}, {}
decider = self.decide_worker_non_rootish

if not (ws := decider(ts)):
return {ts.key: "no-worker"}, {}, {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As already stated, I haven't dealt with queuing, yet. The structure of all the decide functions felt sufficiently confusing that I didn't know where to put the new logic. Should not be too difficult but will require some thought. I mostly wanted to verify the core logic quickly

Comment on lines 4614 to 4615
cogroups = coassignmnet_groups(sorted_tasks[::-1], start=start)
self.cogroups.update(cogroups)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Somewhere we'd need to handle cleanup of Scheduler.cogroups

Comment on lines 8485 to 8491
while len(next.dependents) == 1:
dep = list(next.dependents)[0]
if len(dep.dependencies) != 1:
# This algorithm has the shortcoming that groups may grow too large if the dependent of a group
group_dependents_seen.add(dep)
break
next = dep
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things where this deviates from the original whiteboard implementation

  1. I ended up walking linear chains after all. This may not be necessary after 2.) any more, I haven't checked.
  2. I'm breaking early by excluding any dependents of groups. This is a but ugly but pragmatic.

nthreads=[("", 1)] * 6,
config={"distributed.scheduler.worker-saturation": 1.0},
)
async def test_utilization_over_co_assignment(c, s, *workers):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied these over from #7076 but they are not working yet

@gjoseph92
Copy link
Collaborator

@fjetter we had the same idea for a fun weekend project, I also put together a prototype of this on the train a couple days ago. I think you've gotten further than me, but I'll push up my branch since we did some things a little differently and it might be interesting to compare.

Overall, I got as far as discovering that it didn't do well with widely-shared dependencies, or fan-in tasks in tree reductions. I may have missed something in the implementation though. I'll show you a couple of tests for that that were failing, let's see if they work on your branch.

@github-actions
Copy link
Contributor

github-actions bot commented Oct 17, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

     14 files   -          1       14 suites   - 1   0s ⏱️ - 6h 17m 3s
   696 tests  -   2 447     650 ✔️  -   2 357    21 💤  -   64  25  - 26 
2 332 runs   - 20 919  2 113 ✔️  - 20 174  173 💤  - 740  46  -   5 

For more details on these failures, see this check.

Results for commit 1344f9c. ± Comparison against base commit 6002e72.

♻️ This comment has been updated with latest results.

distributed/scheduler.py Outdated Show resolved Hide resolved
@@ -8408,3 +8452,50 @@ def transition(
self.metadata[key] = ts.metadata
self.state[key] = finish
self.keys.discard(key)


def coassignmnet_groups(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def coassignmnet_groups(
def coassignment_groups(

while len(next.dependents) == 1:
dep = list(next.dependents)[0]
if len(dep.dependencies) != 1:
# This algorithm has the shortcoming that groups may grow too large if the dependent of a group
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete comment? "If the dependent of a group ..."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we find a task and then walk dependents (recursively) (as long as there is only one dependent) and add them to the group until we find a task that has more than a single dependency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this already but didn't push the commit... 🤦

distributed/system_monitor.py Outdated Show resolved Hide resolved
distributed/tests/test_coassignmnet_group.py Outdated Show resolved Hide resolved
break
next = dep
max_prio = tasks.index(next) + 1
groups[group] = set(tasks[min_prio:max_prio])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
groups[group] = set(tasks[min_prio:max_prio])
tasks = set(tasks[min_prio:max_prio])
for ts in tasks:
ts.cogroup = group
groups[group] = tasks

Rationale: this connection between TaskState and cogroup data structures must be maintained, best to do so at construction time, rather than having to remember that things are done later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose not to do this s.t. coassignment_groups is a pure function. Much easier to test and reason about. Might be slightly worse in performance but I doubt this will be relevant

Comment on lines +4616 to +4618
for gr_ix, tss in self.cogroups.items():
for ts in tss:
ts.cogroup = gr_ix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one wants to go with a plain dict for maintaining cogroups, I think it would make more sense if this invariant were maintained in coassignment_groups (see below).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't decided, yet, what to use to maintain this. Maintenance of this structure is not implemented yet (e.g. we're not cleaning it up again). For now, I am using a dict for simplicity. I'm also not set on gr_ix being an integer fwiw



def coassignmnet_groups(
tasks: Sequence[TaskState], start: int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so tasks is a list of taskstates sorted in increasing priority order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I wanted to add a TODO to verify this but this is guaranteed in update_graph so for this prototype, it works

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
@gjoseph92
Copy link
Collaborator

@fjetter @wence- this was my implementation: https://github.com/gjoseph92/distributed/pull/6/files. Just in case it's useful for comparison.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants