Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit fetch state to worker TaskState #4470

Merged
merged 29 commits into from
Mar 18, 2021

Conversation

gforsyth
Copy link
Contributor

Taking a pass at the new control flow that @fjetter laid out in #4413. Many thanks to him for the very detailed write-up and diagnosis of the issue.

To disambiguate a task that is to be executed vs. a task which is a data container (a dependency to be fetched from another worker) I've added a new state called fetch. The existing state waiting now only refers to tasks which will be executed on the worker.

I think the add_task function needs to be cleaned up and I also agree with @fjetter that we should have a similar set of chained recommendations like the scheduler does, but hopefully this is a step in that direction.

test_failed_workers.py::test_broken_worker_during_computation fails one every 30 runs or so on my machine, I'd like to fix that before this gets merged in, but the more explicit states makes hunting down inconsistent state a fair bit easier.

Also needs updates to docs for the new state and the new worker flow

@fjetter
Copy link
Member

fjetter commented Feb 1, 2021

First pass looks good. Will give it a deeper review once you are more confident with the changes. If there are still failing tests this may trigger a bunch or more changes.
I can highly recommend the suggestion to run the flaky tests in an ipython shell to speed up development. At least the gen_cluster annotated tests can be simply imported and executed as an ordinary function

@gforsyth gforsyth marked this pull request as ready for review February 1, 2021 18:35
@gforsyth
Copy link
Contributor Author

gforsyth commented Feb 1, 2021

Thanks for the initial pass, @fjetter -- I ran test_broken_worker_after_computation and test_who_has_clears_after_failed_connection 100x each without any issue, so I think this is stable and ready for a review.

I'd like to make the state validation methods a bit more detailed but I think for now that will have to wait until we have the chained recommendation system in place and I'm trying not to do too much in this PR.

Thanks again for all the work in diagnosing these issues!

@mrocklin
Copy link
Member

mrocklin commented Feb 1, 2021 via email

@gforsyth
Copy link
Contributor Author

gforsyth commented Feb 1, 2021

Any interest in rewriting this doc section? https://distributed.dask.org/en/latest/worker.html#internal-scheduling

yes, on my list

Comment on lines 1537 to 1538
if dep_ts.state in ("fetch", "flight"):
ts.waiting_for_data.add(dep_ts.key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, waiting_for_data to be empty is one of the conditions for the transition waiting -> executing and I'm wondering if this condition shouldn't also include waiting and executing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if I include waiting and executing in here that it might cause weird transitions -- if the dependency is executing and then we add it to waiting_for_data while it is moving to memory, it will cause us to try to re-run this and we'll end up with a waiting -> memory transition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I think my comment was wrong. This is an issue about semantics. I perceived waiting_for_data to be the same as "dependencies which are not yet in memory" but it is rather a "dependency which is not yet in memory and must be fetch from another worker". If this is true, your code changes make sense and I think there should be no other state in here

distributed/worker.py Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
"""
ts.runspec = None

def transition_constrained_waiting(self, ts):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is possible. Constrained is more or less the twin state of a waiting task with the distinction that the constrained task has resource constraints to fulfil. If that's the case, this transition would mean we're changing the definition of the task by stripping the resource constraints

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only be called if constrained task is stolen -- the current logic is there is that stolen tasks revert to a state of waiting and are then released.
i think in this case we're only stripping the resource constraints because they're being enacted on a different worker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it is stolen, the resource constraint of a task is defined on task initialisation, effectively by the user. ressource counting is done by the worker but the task should not be modfiied

it is managed here

for resource, quantity in ts.resource_restrictions.items():
self.available_resources[resource] -= quantity

and here
for resource, quantity in ts.resource_restrictions.items():
self.available_resources[resource] += quantity

but I would consider the field resource_restrictions to be immutable (if only that was easily possible in python :) )

Therefore, if my reasoning is correct, this might overshadow another wrong transition (maybe key is forgotten and improperly resubmitted?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current logic is there is that stolen tasks revert to a state of waiting and are then released.

Do you mean this line?

self.transition(ts, "waiting")

I think it is wrong... :( This should show up somewhere in a verify task method. I think there should be something like

def validate_task_waiting(self, ts):
    ...
    assert ts.resource_restrictions is None # // or empty dict, dont know
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point re: resource_restrictions -- I'll look again at the stealing code.
I don't know why the transition to waiting was in there -- it seems like after we have confirmation that the task has been stolen, we should release the key on the victim side (along with whatever available_resources might have been allocated for use)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll push up changes in a sec, I think we don't need the transition function called from steal_request

Regarding the check for resource_restrictions, the current flow for a constrained task is

new -> waiting (gather any remote dependencies) -> constrained (because ts.resource_restrictions is not None) -> executing

so we don't want to check that it's None in validate_task_waiting.

I DO think we could improve available_resources tracking with a set of all the tasks that have currently occupied available resources, then doing a periodic check (in ensure_computing, maybe?) that all of the keys in the set are in the executing state and discard them if not.
That might be best left for a separate PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it looks like steal_request doesn't need to worry about releasing available_resources because they aren't "taken" until the task transitions from ready -> executing, at which point it's no longer eligible to be stolen

distributed/worker.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member

fjetter commented Feb 3, 2021

The constrained->waiting thing might connect to #4446

everything else LGTM

@gforsyth
Copy link
Contributor Author

gforsyth commented Feb 3, 2021

Damn, moving to the non-transition steal_request flow led to a deadlock and I can't track it down locally.

@fjetter you added this in 1fc1b46 as a part of #4432 -- do you remember why that transition was there? Was it solely to remove the runspec? Or to handle dependents a little more gracefully?

@gforsyth
Copy link
Contributor Author

gforsyth commented Feb 3, 2021

Hmm, conversely, it might have been a fluke? It looks like perhaps pytest test_client_executor.py::test_cancellation hung, which seems possibly unrelated

@fjetter
Copy link
Member

fjetter commented Feb 4, 2021

@fjetter you added this in 1fc1b46 as a part of #4432 -- do you remember why that transition was there? Was it solely to remove the runspec? Or to handle dependents a little more gracefully?

Well, the runspec is reset a few lines above the release and yes, I tried to map as much as possible via transitions instead of inplace mutations to ensure all connected state information is properly modified (dependnts, dependencies, waiting for, etc.). I am still convinced that the way we're releasing tasks is also a bit buggy or at the very least not very transparent. I added this commit trying to handle this more transparently. It is currently otherwise very difficult to know what the release actually does. For example, is the key still in self.tasks or not?? That is determined by ts.dependents but why do I need to check this attribute myself before calling that method? Sometimes you want to be some of release_key but all of it but it's not very clear to me when we do need what. Eventually we might want to implement this more clearly by having another virtual state like forgotten, similar to how the scheduler deals with this but that's yet another change. At the very least we should fix this kind of leaky abstraction and make it "easier" to call release_key such that the caller knows for a fact what is going to happen. I'm drifting off...

Ultimately, I can't tell you which direction is the right one, that's why my first PR escalated so badly :(


Damn, moving to the non-transition steal_request flow led to a deadlock and I can't track it down locally.

Do you have an idea which test deadlocked? I can try to run it on my machine and see if I can reproduce

It looks like perhaps pytest test_client_executor.py::test_cancellation hung, which seems possibly unrelated

What my last efforts debugging this taught me is that there is no such thing as an unrelated deadlock. It's all just a big ball of spaghetti ;)
I'll try to reproduce (if I can, I'll try the same on master and we'll see where we are)

@gforsyth
Copy link
Contributor Author

gforsyth commented Feb 4, 2021

there is no such thing as an unrelated deadlock

oof. too true. I was thinking it might've been a known flaky test but github actions doesn't hang on to logs for super long, I went looking for a previous similar timeout but didn't find one

@gforsyth
Copy link
Contributor Author

Hey @fjetter -- just rebased against master and should be up to date now. If you have the opportunity to run this in one of your bigger workloads and see how it performs, that would be very helpful. I've run the test-suite locally a few hundred times at this point (focusing on work-stealing, worker failure, and regular worker tests) and things seem to be stable.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll test this on our workloads and will report back.

@fjetter
Copy link
Member

fjetter commented Feb 23, 2021

Unfortunately, I hit #4538 when trying this so no results, yet

@fjetter
Copy link
Member

fjetter commented Feb 23, 2021

I reverted the commit which likely caused #4538 to run some tests with this PR. Unfortunately, the very first try already hit a deadlock. I'm in an upscaling scenario where I start with one worker and am adding more and more, i.e. heavy work stealing. After some upscaling, the cluster distributes some of the tasks and works on them but only on about half. View from the scheduler (after it became stale) shows that the worker is supposedly still processing ~70k tasks and doesn't have any in memory. However, when asking the worker about its task, I can see that it has two tasks in memory. The state between scheduler and worker seems to have drifted significantly.

Asking scheduler/worker about one of the processing tasks story, I get only a single entry [("<key>", 'release-key')]. Couldn't find any stealing logs for the key I was looking at either.

@gforsyth
Copy link
Contributor Author

Thanks for the report @fjetter -- and for the description of the failing test. I'll also try to work up a test case that follows that pattern and see if I can reproduce locally.

@fjetter
Copy link
Member

fjetter commented Feb 23, 2021

I had another look, couldn't find any more useful logs but I noticed that our scheduler was overloaded the entire time, for whatever reason. that at least explains why the work stealing was working so poorly, but not why the initial worker was lazy and didn't process any results, nor why everything was entirely out of sync. Regardless, what I've seen didn't look healthy. I'll try to perform a few more test runs later today/tomorrow

@fjetter
Copy link
Member

fjetter commented Feb 25, 2021

Had another test run which ran through but I received a lot of logs like

('fetch', 'memory')
Traceback (most recent call last):
  File "/.../lib/python3.6/site-packages/distributed/utils.py", line 655, in log_errors
    yield
  File "/.../lib/python3.6/site-packages/distributed/worker.py", line 2256, in gather_dep
    self.transition(ts, "memory", value=data[d])
  File "/.../lib/python3.6/site-packages/distributed/worker.py", line 1576, in transition
    func = self._transitions[start, finish]
KeyError: ('fetch', 'memory')

@gforsyth
Copy link
Contributor Author

gforsyth commented Mar 2, 2021

I hit the KeyError reliably when running tests/test_steal.py::test_steal_twice -- trying to figure out the root cause now, but it definitely occurs as a knock-on effect from a dependency being stolen, so that's encouraging.

@gforsyth
Copy link
Contributor Author

gforsyth commented Mar 3, 2021

Changes:

  • If there's a knock-on release_key call but the task is in flight, just leave it alone. Worst case is an extra entry in the self.tasks dictionary
  • Reserve self.data_needed for tasks which will be executed locally (that is, only list the tasks for which we need to fetch data, don't include the tasks to-be-fetched)
  • Don't call release_key in a transition function, ever.

@gforsyth
Copy link
Contributor Author

gforsyth commented Mar 3, 2021

@fjetter -- if you have an opportunity later this week to run your heavy stealing workload against this branch that would be a big help. Thanks!

@fjetter
Copy link
Member

fjetter commented Mar 4, 2021

I am repeating myself but I believe we need to eventually rework the release mechanism. I have the feeling this is mostly educated guessing on when we must release a key, when we should and when we must not release it. Not sure how you feel about this. Either way, I'm glad if it works and reworking this is definitely out of scope for this PR

if you have an opportunity later this week to run your heavy stealing workload against this branch that would be a big help. Thanks!

Yes, I think I can find time for this later today

@gforsyth
Copy link
Contributor Author

gforsyth commented Mar 4, 2021

I am repeating myself but I believe we need to eventually rework the release mechanism.

Oh, definitely. I think reworking the release mechanism and adding in a chained-transition system are the next steps once this is stable.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this another spin and couldn't find anything wrong with it. That doesn't necessarily mean a lot but it's a very good start.

I'd be willing to merge this now but there is a release scheduled for tomorrow and I'm wondering if we want to let this sit a while before releasing. Opinions? cc @gforsyth @jrbourbeau

@gforsyth
Copy link
Contributor Author

gforsyth commented Mar 4, 2021

I gave this another spin and couldn't find anything wrong with it. That doesn't necessarily mean a lot but it's a very good start.

🎉 wooooooo!

I'll defer to @jrbourbeau on whether we want to try to sneak this in to the next release or let it sit on master for a little bit

@jrbourbeau
Copy link
Member

I'd prefer to wait until after releasing to merge this in if that's alright

gforsyth and others added 18 commits March 4, 2021 12:33
Should only be set for `fetch` or `flight` -- possible following failed
workers that a task will end up looking remote when it isn't, so we
discard the key from `has_what` when a task is transitioned to `ready`
Tasks can transition from `fetch` -> `waiting` or `flight` -> `waiting`
if the worker that has the data dies before the transfer is completed.

If that task is reassigned to a worker that was expecting just the data,
add the `runspec` and clear out any references to where the data _was_
coming from since it will now be executed locally.
Trying this out as it might not be needed anymore (but can revert if
test complain)
There's a very real chance here for a task to be reassigned here and the
extra task doesn't hurt anything.
E.g. don't add keys which will be fetch from other workers -- that
information belongs in `TaskState.waiting_for_data`
We shouldn't call `release_key` in transition functions at all, it is
very likely to cause mixups.  There's an outside chance this leads to
worker memory problems over very long jobs, but I think that is better
handled by a cleanup callback.
@gforsyth
Copy link
Contributor Author

gforsyth commented Mar 8, 2021

Hey @jrbourbeau -- are we good to merge this now?

Base automatically changed from master to main March 8, 2021 19:04
@gforsyth
Copy link
Contributor Author

Just a gentle ping on this to see if anyone has time to review further and/or merge

@fjetter
Copy link
Member

fjetter commented Mar 16, 2021

I would go ahead and merge this. I'm just a bit hesitant since I'm lately not very involved and don't know if this interferes with any other change. @jrbourbeau can you give a quick "go ahead" / merge unless you are aware of any other big change which could interfere?

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gforsyth and @fjetter for your work here -- apologies for the delayed reply. After a quick look this seems good to merge (there are a couple of TODOs where it'd be good to confirm if they are talking about future work or things we should include here)

Comment on lines +1486 to +1488
# TODO: move transition of `ts` to end of `add_task`
# This will require a chained recommendation transition system like
# the scheduler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be done in this PR or is this follow-up work?

ts.waiting_for_data.add(dep_ts.key)
self.waiting_for_data_count += 1
# check to ensure task wasn't already executed and partially released
# # TODO: make this less bad
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is bad ascetically or from a logic perspective. Is there anything else we should do here in this PR?

@gforsyth
Copy link
Contributor Author

Hey @jrbourbeau! Yeah, both of those Todos are for the next PR which is going to add a chained recommendation system for transitions.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau jrbourbeau merged commit 98148cb into dask:main Mar 18, 2021
@gforsyth gforsyth deleted the fetch_and_wait branch March 18, 2021 15:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants