Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decide_worker_rootish_queuing_disabled assert #7065

Conversation

gjoseph92
Copy link
Collaborator

Closes #7063

  • Tests added / passed
  • Passes pre-commit run --all-files

@gjoseph92 gjoseph92 self-assigned this Sep 23, 2022
@crusaderky
Copy link
Collaborator

Is it possible to write a unit test that deterministically reproduces the problem? As of #7062, the failure rate is 1/162.

@github-actions
Copy link
Contributor

github-actions bot commented Sep 24, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±0         15 suites  ±0   6h 15m 2s ⏱️ - 7m 41s
  3 161 tests +1    3 073 ✔️  - 2    83 💤  - 1  5 +4 
23 388 runs  +8  22 481 ✔️ +3  900 💤 ±0  7 +5 

For more details on these failures, see this check.

Results for commit 5592da9. ± Comparison against base commit 9c8ff86.

♻️ This comment has been updated with latest results.

@gjoseph92
Copy link
Collaborator Author

Yes, I can write one. It'll be a bit complicated though.

@fjetter
Copy link
Member

fjetter commented Oct 4, 2022

@gjoseph92 what's the status on this?

@crusaderky
Copy link
Collaborator

@gjoseph92 any update on this?

@crusaderky crusaderky closed this Oct 24, 2022
@crusaderky crusaderky reopened this Oct 24, 2022
@gjoseph92 gjoseph92 force-pushed the fix-decide_worker_rootish_queuing_disabled-assertion branch from f8b84c0 to 3d33954 Compare October 25, 2022 16:41
@gjoseph92
Copy link
Collaborator Author

I'm finding it pretty hard to come up with a test for this. I imagine this case is happening when TaskGroup.last_worker is set to a worker that has just been put in closing_gracefully state, and TaskGroup.last_worker_tasks_left is non-zero. We go through a transitions cycle and pick this non-running last-used worker.

But I don't understand how this could actually happen.

A transitions cycle holds control of the event loop until it's done. So I don't think it's possible for a worker to switch from running to non-running in between two transitions.

And this line is intended to clear last_worker at the end of each transitions cycle:

tg.last_worker = (
ws if tg.states["released"] + tg.states["waiting"] > 1 else None
)

My guess is that there's some edge case that that logic isn't catching. Generally though, if this rootish task is going waiting->processing, and other tasks in the group are currently released or waiting, I'd expect those tasks to also be recommended waiting->processing in the same transitions cycle. Then the last task would clear out last_worker.

It seems there's a case where that doesn't happen, so last_worker doesn't get cleared at the end of one transitions cycle, and then we try to reuse it in a different cycle after the worker has switched to closing_gracefully.

Bottom line is that I'm reluctant to spend more time trying to come up with a test for this edge case, especially when we're looking to move away from this code path anyway by turning queuing on by default, and by using a more sensible (and stateless) algorithm for co-assignment #7141.

Instead, I've switched this to just not re-use a previous worker if it's not running, which seems reasonable enough.

@fjetter @crusaderky how do you feel about merging this without a specific test?

@crusaderky
Copy link
Collaborator

A transitions cycle holds control of the event loop until it's done. So I don't think it's possible for a worker to switch from running to non-running in between two transitions.

This is correct.

My guess is that there's some edge case that that logic isn't catching.

I think this is what is happening:

  1. tasks (x, 0), (x, 1), (x, 2), (x, 3) transition to processing. tg.last_worker is set.
  2. The worker where (x, 2) is processing transitions to closing_gracefully, e.g. not running.
  3. Work stealing steals (x, 2) (by mere coincidence). This happens before the worker is still flushing out other tasks in memory and has not completely left the cluster.
  4. task (x, 2) transitions back to released, which in turn kicks off another transition to processing
  5. tg.last_worker from the previous transitions loop is still there.

@fjetter @crusaderky how do you feel about merging this without a specific test?

Personally I am quite unhappy to have half-dead, poorly tested code paths.

@gjoseph92
Copy link
Collaborator Author

5. tg.last_worker from the previous transitions loop is still there

I like the work-stealing theory. But more fundamentally, how could last_worker be there from the last transitions loop? If the last transitions loop ran to completion, then you'd expect there to be exactly 1 released or waiting task in the last iteration of the loop, which would have cleared last_worker.

@crusaderky
Copy link
Collaborator

crusaderky commented Oct 27, 2022

The good news: I pushed a deterministic reproducer. (work stealing is innocent).
The bad news: The reproducer highlights an unrelated failure when queueing is active: #7204

# - TaskGroup(y) has more than 4 tasks (total_nthreads * 2)
# - TaskGroup(y) has less than 5 dependency groups
# - TaskGroup(y) has less than 5 dependency tasks
assert s.is_rootish(s.tasks["y-2"])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note: while is_rootish(s.tasks["y-2"]) is true here, it was not true when we actually submitted y-2. At that point, there were only 2 known tasks in the y TaskGroup.

The only reason the y tasks go down the rootish decide-worker code path is that decide_worker doesn't get to run until the xs complete.

I don't think it matters for this test, but this is just broadly an issue with using client.submit instead of dask delayed. There's a bit of logic in the scheduler that assumes graphs should be submitted in full up front, instead of incrementally with repeated submit or maps (for example, dask.order, is_rootish, #7141).

# - TaskGroup(y) has less than 5 dependency tasks
assert s.is_rootish(s.tasks["y-2"])

await evx[0].set()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this is basically the issue with

# Record `last_worker`, or clear it on the final task
tg.last_worker = (
ws if tg.states["released"] + tg.states["waiting"] > 1 else None
)

The silly assumption this is making is effectively that all tasks in the TaskGroup have the same dependencies, so they're all runnable (and will all transition) at the same time. It's assuming that we're not going to have a subset of the group transition, then later transition another subset.

This is obviously not true, it just happens to pretty much always be the case in "common workloads" built with dask collections.

I've always really, really disliked the use of TaskGroups in the co-location logic in order to guess at things about graph structure. In this test, if you just change the names of some the y tasks so they're in different groups, it'll probably pass. Task names should be opaque to dask and should not affect scheduling behavior. The combination of name-based heuristics with statefulness in the current implementation is even more problematic, and makes bugs like this possible.

To me, all this is just another argument for #7141. A static way of determining these groupings based purely on graph structure would eliminate the possibility of these kinds of bugs.

Furthermore, in the graph you've constructed, the y tasks shouldn't even be treated as one co-assignment group. There should be three groups. The fact that there's only one group is just an artifact of the naming scheme (which totally can happen in real usage).

@gjoseph92
Copy link
Collaborator Author

@crusaderky your new test is passing for me; that's expected right? It seems like the and lws.status == Status.running check is solving the specific issue here, right?

@crusaderky
Copy link
Collaborator

@crusaderky your new test is passing for me; that's expected right? It seems like the and lws.status == Status.running check is solving the specific issue here, right?

Yes, my new test fails on main and passes on your branch with no_queue.
However, it fails with queue (#7204).

@gjoseph92
Copy link
Collaborator Author

Sounds good, I'm looking into #7204 now.

So this is more or less ready to go in that it resolves #7063. We just can't merge it yet because the test fails on queuing CI due to #7204.

@gjoseph92
Copy link
Collaborator Author

This seems to be actually (intermittently) failing distributed/tests/test_scheduler.py::test_include_communication_in_occupancy on macOS, looking into it.

Also maybe distributed/diagnostics/tests/test_task_stream.py::test_client_sync. I can't reproduce that locally though.

Also all the normal websockets stuff and #7208.

@gjoseph92
Copy link
Collaborator Author

Including #7212 in here to get CI more greenish, but let's merge that PR before this one.

@crusaderky crusaderky merged commit e0669cd into dask:main Oct 28, 2022
@gjoseph92 gjoseph92 deleted the fix-decide_worker_rootish_queuing_disabled-assertion branch October 28, 2022 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

decide_worker_rootish_queuing_disabled assertion fails when retiring worker
3 participants