Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task co-assignment logic is worst-case for binary operations like a + b #6597

Open
gjoseph92 opened this issue Jun 18, 2022 · 1 comment · Fixed by #6614
Open

Task co-assignment logic is worst-case for binary operations like a + b #6597

gjoseph92 opened this issue Jun 18, 2022 · 1 comment · Fixed by #6614

Comments

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Jun 18, 2022

The root task co-assignment logic does the exact opposite of what it should for operations combining two different datasets, like a + b.

 x     x     x     x
 /\    /\    /\    /\
a  b  a  b  a  b  a  b
1  2  3  4  5  6  7  8  <-- priority

It assigns all the as to one worker, and all the bs to another. Each x then requires transferring an a or a b. So 50% of the data gets transferred. This could have been 0% if we had co-assigned properly.

The reason for this is that co-assignment selects a worker to re-use per task group. So it goes something like (recall that we're iterating through root tasks in priority order):

  1. Assign a1. It has no last_worker set, so pick the least busy worker: w1.
  2. Assign b1. It has no last_worker set, so pick the least busy worker. w1 already has a task assigned to it (a1), so we pick w2.
  3. Assign the next 3 as to w1, and the next 3 bs to w2 (they come through interleaved, since they're interleaved in priority order)
  4. Time to pick a new worker for a5. They're both equally busy; say we pick w2.
  5. Time to pick a new worker for b5. We just made w2 slightly busier than w1, so pick w1.
  6. Pattern continues. Each time we flip-flop, sending the tasks to opposite workers

The last-used worker should be global state (well, global to a particular sequence of transitions caused by update_graph). Each subsequent task in priority should re-use this worker until it's filled up, regardless of what task group the task belongs to.

The tricky part is calculating what "filled up" means. We currently use the size of the task group to decide how many root tasks in total there are, which we then divide by nthreads to decide how many to assign per worker. But of course, that's not actually the total number of root tasks. I'm not sure yet how to figure out the total number of root tasks in constant time within decide_worker.

Broadly speaking, this stateful and kinda hacky co-assignment logic is a bit of a pain to integrate into #6560. I've been able to do it, but maintaining good assignment while rebalancing tasks when adding and removing workers is difficult. Our co-assignment logic is too reliant on statefulness and getting to iterate through all the tasks at once in priority order, we can't actually re-co-assign things when workers change. If we had a data structure/mechanism to efficiently identify "which tasks are siblings of this one", or maybe even "which worker holds the task nearest in priority to this one", it might make solving both problems easier.


As a simple test that fails on main (each worker has transferred 4 keys):

@gen_cluster(
    client=True,
    nthreads=[("", 1), ("", 1)],
)
async def test_decide_worker_coschedule_order_binary_op(c, s, a, b):
    xs = [delayed(i, name=f"x-{i}") for i in range(8)]
    ys = [delayed(i, name=f"y-{i}") for i in range(8)]
    zs = [x + y for x, y in zip(xs, ys)]

    await c.gather(c.compute(zs))

    assert not a.transfer_incoming_log, [l["keys"] for l in a.transfer_incoming_log]
    assert not b.transfer_incoming_log, [l["keys"] for l in b.transfer_incoming_log]

Note that this case occurs in @TomNicholas's example workload: #6571

cc @fjetter @mrocklin

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Jun 18, 2022
When there were multiple root task groups, we were just re-using the last worker for every batch because it had nothing processing on it.

Unintentionally this also fixes dask#6597 in some cases (because the first task goes to processing, but we measure queued, so we pick the same worker for both task groups)
gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Jun 22, 2022
When there were multiple root task groups, we were just re-using the last worker for every batch because it had nothing processing on it.

Unintentionally this also fixes dask#6597 in some cases (because the first task goes to processing, but we measure queued, so we pick the same worker for both task groups)
gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Jun 23, 2022
Bit of a hack, but closes dask#6597. I'd like to have a better metric for the batch size, but I think this is about as good as we can get. Any reasonably large number will do here.
@fjetter
Copy link
Member

fjetter commented Sep 2, 2022

#6985

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants