-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Forget erred tasks // Fix deadlocks on worker #4784
Conversation
distributed/tests/test_worker.py
Outdated
f.key: "memory", # expected to be gone | ||
g.key: "memory", # expected to be gone | ||
res.key: "error", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This state is actually never left so I do consider this a bug. Eventually the tasks should be released (I commented out the wait below)
# A was instructed to compute this result and we're still holding a ref via `f` | ||
f.key: "memory", | ||
# This was fetched from another worker. While we hold a ref via `g`, the scheduler | ||
g.key: "memory", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect g
to be released on worker A since it was only on worker A as a dependency. However, it is kept after the task erred. Do we consider this sane behaviour? Do we intentionally keep this dependency in memory to allow for retries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, the scheduler decides in this case that the keys should be kept, or rather it doesn't decide to release them. I am questioning whether this is the correct decision
f.key: "memory", | ||
g.key: "memory", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even after explicit refs to f and g are released, we still keep holding on to them since there is the dependency res
which erred. I'm wondering what the expected behaviour should be here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, the scheduler forgot all tasks at this point
1d968a4
to
520f2bb
Compare
@property | ||
def erred_on(self): | ||
return self._erred_on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the reasons why the erred task state is never forgotten is that keys are only "remembered" in processing_on (processing)
or who_has (memory)
, depending on the state of the task. This would be the equivalent for erred tasks to allow us to tell the worker to forget the task.
@jakirkham you have been involved in the scheduler state machine a lot recently. Just pinging in case you have thoughts about adding more state here or if you see other options
|
||
tg = s.task_groups[y.name] | ||
assert tg.states["memory"] == 5 | ||
|
||
assert s.task_groups[y.name].dependencies == {s.task_groups[x.name]} | ||
|
||
await c.replicate(y) | ||
# TODO: Are we supposed to track repliacted memory here? See also Scheduelr.add_keys | ||
assert tg.nbytes_in_memory == y.nbytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into some issues with the nbytes counting and wondered if this is correct. While the test states this explicitly, I'm a bit surprised and believe there are code areas where this is treated differently
14a2f49
to
03210bc
Compare
To not release too many tasks but release those which are free I ended up introducing additional state on the scheduler This additional reference count is required because in the past, when tasks and dependencies were clearly separated, this reference count was implicit. Tasks were always referred to by the scheduler, dependencies were just a side product and could be forgotten asap. You will therefore notice that all "compute-task" TaskState objects have this attribute set and dependencies don't. Tests look promising and while this is not the end of the story for the stability issues (I still have reproducing tests which deadlock) this is an important step. Gentle ping to @jrbourbeau and @gforsyth for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @fjetter -- this looks good to me. I flagged one small typo in the scheduler attributes but that doesn't need to block this.
Hitting an issue in the py3.8 ubu test which might be connected although I'm not sure, yet.
This also triggers a similar error as reported in #4800 with
at least the very first assert could be related |
I assume the above is connected since all three failures (ubu 3.8 and the two osx failures) are the same failing test. However, it's one of these incredibly rare events which is hard to reproduce locally |
f2abe61
to
05fb3df
Compare
b11b657
to
d9cb0e9
Compare
23e104a
to
5a35ce1
Compare
All tests green 🎉 |
friendly ping to @gforsyth and @jrbourbeau for another review. What changed since the last time I asked for reviews are the handlers which control task releases on the worker and how and when the scheduler invokes them. I changed the names for all of them since I consider "release" to be much too ambiguous but maybe the new names are equally confusing. Happy to give everything different names if smbd comes up with a suggestion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick first pass -- I'll try to find time to dig in a bit deeper
ef94538
to
05c1c0e
Compare
@jrbourbeau and @gforsyth can you please indicate whether I can expect a review? If you are too busy that's fine. I just would like to know if I am waiting for something to happen. Any further work on the worker is kind of blocked by this PR and I am confident that this improves stability. |
OSX test failure is unrelated and connected to unclosed RPC after startup/teardown of an adaptive cluster |
I began reviewing the changes here yesterday, but didn't have much dedicated time to focus on this today. If you're confident in the changes here, then feel free to merge them in. Otherwise, I will have time to review tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of minor nits, and one genuine question :)
if ts and not ts.scheduler_holds_ref: | ||
self.release_key(key, reason=f"delete data: {reason}", report=False) | ||
|
||
logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about why there should be multiple methods for this. In principle I think of our current approach as the scheduler saying
I don't need you to have this data
The worker though, as you say, may disagree and hold on to it. I don't imagine a situation where the scheduler would override the worker, and so I don't see a reason to have multiple methods for this signal.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, I think this particular signal I called "superfluous data" should not exist and I believe this is what you're saying as well.
This signal is issued here (link to main branch)
distributed/distributed/scheduler.py
Lines 6276 to 6284 in ddec3c9
if ts is not None and ts._state == "memory": | |
if ts not in ws._has_what: | |
ws._nbytes += ts.get_nbytes() | |
ws._has_what[ts] = None | |
ts._who_has.add(ws) | |
else: | |
self.worker_send( | |
worker, {"op": "delete-data", "keys": [key], "report": False} | |
) |
I am actually a bit fuzzy myself how this particular race condition occurs but I think something like the following causes otherwise a race condition which results in a deadlock
Task A (Worker A) --> Task C (Worker B)
- ^
/
/
Task B (Worker B)
(pardon aweful ascii art. C depends on A and B)
- Worker B tries to execute C but needs to fetch Task A to do so
- Worker A submits data to B
- Data arrives at B and is being deserialized or is waiting for a loop, whatever
- Worker A dies and the scheduler notices
- Scheduler thinks all is lost and transitions Task A to released and tries to reschedule on Worker B
- ... In the meantime Worker B is done deserializing and puts key in memory, notifies scheduler usign
add-keys
- Worker B receives the
compute-task
signal from scheduler and thinks he's lucky since he got the data already, cool. He tells the schdeuler ->task-finished
signal - Scheduler receives the add-keys response but thinks "This task is not in memory any more, please forget it" -> delete-data signal (what I call superfluous data)
- Worker B receives the
delete-data
signal and forgets everything (this is where my superfluous-data signal has a safeguard) - Scheduler receives the
task-finished
signal from before and thinks everything is OK and proceeds as before, however, worker B does not have the data.
I assume this only is a problem due to another bug in how the worker corrects missing data (which I am also trying to fix) which would eventually heal the cluster but I'm not entirely certain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, that's a fun comedy of errors.
Yeah, so I agree that this feels like a different bug. In principle it'd be nice to fix that bug rather than add this signal, but I can imagine that it might be best to do that in future work.
client=True, | ||
nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2), ("127.0.0.1", 3)], | ||
) | ||
async def test_worker_same_host_replicas_missing(c, s, a, b, x): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a test case for the last deadlock we were made aware of. It requires a bit of patching since it is otherwise not possible to stimulate the same-host with multiple who-has edge case this is hitting
This happens if there are multiple who_has such that the scheduler doesn't retrigger a clean computation but the worker always pick the same erroneous worker. If the worker doesn't forget about that wrong worker after first try, we're running in a loop.
distributed/distributed/worker.py
Lines 2089 to 2093 in ddec3c9
local = [w for w in workers if get_address_host(w) == host] | |
if local: | |
worker = random.choice(local) | |
else: | |
worker = random.choice(list(workers)) |
|
||
|
||
@gen_cluster(client=True, timeout=None) | ||
async def test_handle_superfluous_data(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is rather close to the implementation since we somehow need to wait for worker B to actually start deserializing and we do not have any sane hooks exposing this at the moment, therefore I'm watching the connection pool to make this work.
see #4784 (comment)
This test proves the race condition (replace handle-superfluous-data with handle-free-keys and it is stuck) but I couldn't prove that we require any handler like this at all. I kept this path to preserver "previous functionality" and added a really stupid test below which constructs the situation without any actual work, just to have the code path covered. I will try further to reconstruct a case for this but I don't want this to block the PR (see test_forget_data_not_supposed_to_have
)
Since the test failures appear to be unrelated, I will go ahead and merge this now. Feel free to leave review comments afterwards, I'll address everything that comes up. |
This is a follow up to #4784 and reduces complexity of Worker.release_key significantly. There is one non-trivial behavioural change regarding erred tasks. Current main branch holds on to dependencies of an erred task on a worker and implements a release mechanism once that erred task is released. I implemented this recently trying to capture status quo but I'm not convinced any longer that this is the correct behaviour. It treats the erred case specially which introduces a lot of complexity. The only place where this might be of interest is if an erred task wants to be recomputed locally. Not forgetting the data keys until the erred task was released would speed up this process. However, we'd still need to potentially compute some keys and I'm inclined to strike this feature in favour of reduced complexity.
I need some feedback because the cluster behaves differently than I would expect and I would like to know for sure what we expect to happen.
The test case constructs a very simple graph consisting of three tasks, F, G and Res where F and G are simple dependencies executed on different workers while Res is task which will fail. I pinned the workers down explicitly to ensure determinism but otherwise this is not relevant.
What happens is that the task errs which is expected. We keep all tasks in memory, still expected. Afterwards, two things happen which surprised me and I would like to know if those are bugs or expected behaviour I misunderstood
F
andG
and would expect them to be released, i.e. the memory is freed becauseRes
is done and we no longer hold a reference to the dependenciesF
andG
. Instead, both stay in memory. Is this expected?Res
and would expect the cluster to reset, i.e. forget about all tasks. This isn't happening but insteadF
andG
are released andRes
stays erred. The tasks are never forgotten.cc @jrbourbeau
Edit:
While investigating the free error tasks, I found two deadlock situations which were more likely to be triggered in this branch than in others. This increased the scope of this PR