Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock when emerging from closing_gracefully #6867

Closed
crusaderky opened this issue Aug 10, 2022 · 8 comments · Fixed by #6878
Closed

Deadlock when emerging from closing_gracefully #6867

crusaderky opened this issue Aug 10, 2022 · 8 comments · Fixed by #6878
Assignees
Labels
deadlock The cluster appears to not make any progress

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Aug 10, 2022

This line at the top of Worker.execute is wrong:

if self.status in {Status.closing, Status.closed, Status.closing_gracefully}:
return None

The problem is that closing_gracefully is reversible. This normally doesn't happen. However, there are legitimate use cases where Schedule.retire_workers can give up and revert a worker from closing_gracefully back to running - namely, if there are no longer any peer workers anymore that can accept its unique in-memory tasks.

This can cause a rather extreme race condition where

  1. The worker receives a {op: compute-task} command followed, within the same batched-send packet, by {op: worker-status-change, status: closing_gracefully}.
  2. This will cause a Worker.execute asyncio task to be spawned and, as soon as it reaches its turn in the event loop, return None.
  3. The task is now stuck in running state forever. This is not a problem for closing and closed, as we're irreversibly tearing down everything anyways.
  4. However, later on the scheduler decides to resuscitate the worker: {op: worker-status-change, status: running}.
  5. The scheduler and the WorkerState now both think that the task is running, but it's not.

The fix for this issue is trivial (just remove closing-gracefully from the line above); a deterministic reproducer is probably going to be very ugly.

This issue interacts with #3761.

FYI @fjetter

@crusaderky crusaderky self-assigned this Aug 10, 2022
@crusaderky crusaderky added the deadlock The cluster appears to not make any progress label Aug 10, 2022
@fjetter
Copy link
Member

fjetter commented Aug 10, 2022

This line at the top of Worker.execute is wrong:

Let's take a step back and discuss the issue

How is the scheduler reverting something like this? I don't think that "reverting" a close is a sane process. The same is true for a close_gracefully. This is not just about a change of status attributes but rather about close(-gracefully) triggering a coroutine on worker side that cleans up and closes the server for good, i.e. closes RPC pools, connections, listeners, etc.
I don't think this is something we can and should just walk back from but instead something should restart a new worker if the scheduler made the wrong decision.

@fjetter
Copy link
Member

fjetter commented Aug 10, 2022

We encoded the assumption that the server lifecycle is unidirectional in a couple of places. start and close do both not behave sane if we allow for restarts

@jacobtomlinson
Copy link
Member

I agree with @fjetter. I'm surprised that a worker in close_gracefully can go to anything other than closing and closed.

@fjetter
Copy link
Member

fjetter commented Aug 10, 2022

If anything, I think we should review the worker-status-change handlers here since starting/stopping is much more complicated than this makes it look like. This is OK for pause/unpause but beyond this, this can be dangerous

@crusaderky
Copy link
Collaborator Author

crusaderky commented Aug 10, 2022

The state closing_gracefully has nothing to do with Server.close, except for the fact that it typically transitions to closing state.

closing_gracefully means:

  • The Active Memory Manager is busy flushing data out of the worker
  • Tasks that are currently executing, long-running, or in flight can continue to do so until completion
  • Tasks that are in ready, constrained, or fetch state will not progress.

Worker side, the status is a variant of paused, and in fact there is no difference between the two statuses in WorkerState, but only a running boolean flag which is flipped by PauseEvent and UnpauseEvent:

@ServerNode.status.setter # type: ignore
def status(self, value: Status) -> None:
"""Override Server.status to notify the Scheduler of status changes.
Also handles pausing/unpausing.
"""
prev_status = self.status
ServerNode.status.__set__(self, value) # type: ignore
stimulus_id = f"worker-status-change-{time()}"
self._send_worker_status_change(stimulus_id)
if prev_status == Status.running and value != Status.running:
self.handle_stimulus(PauseEvent(stimulus_id=stimulus_id))
elif value == Status.running and prev_status in (
Status.paused,
Status.closing_gracefully,
):
self.handle_stimulus(UnpauseEvent(stimulus_id=stimulus_id))

The two different statuses are necessary

  • because closing-gracefully can be reverted from the scheduler side, while paused can't, and
  • to inhibit the WorkerMemoryManager from unpausing.

How is the scheduler reverting something like this?

if policy.no_recipients:
# Abort retirement. This time we don't need to worry about race
# conditions and we can wait for a scheduler->worker->scheduler
# round-trip.
self.stream_comms[ws.address].send(
{
"op": "worker-status-change",
"status": prev_status.name,
"stimulus_id": stimulus_id,
}
)
return None, {}

This code path is thoroughly covered by unit tests.

@gjoseph92
Copy link
Collaborator

I think this points out a lot of issues, and there are a number of ways we could go here.

  1. closing_gracefully is probably not the right name for the state. Perhaps it should have been called draining or something like that. The closing part of the name is misleading, as @crusaderky points out.

  2. The worker receives a {op: compute-task} command followed, within the same batched-send packet, by {op: worker-status-change, status: closing_gracefully}.

    That feels a bit like it should be a scheduler bug to me. The scheduler has just asked the worker to close gracefully—why should it send it more tasks to run then? I know it's because right now, we just ask the worker to retire gracefully:

    self.stream_comms[ws.address].send(
    {
    "op": "worker-status-change",
    "status": ws.status.name,
    "stimulus_id": stimulus_id,
    }
    )
    and don't update its state until it echos back to us that its state has changed. But on the scheduler, I think we should be updating some state eagerly. We should probably immediately remove the worker from self.running, or something.

    Basically, just like how remove_worker immediately removes the worker from self.workers even if it's still running (and even connected Eliminate partially-removed-worker state on scheduler (comms open, state removed) #6390), preventing any further scheduling onto that worker, there should probably be a Scheduler.drain_worker function which does whatever's appropriate to immediately prevent further tasks from being scheduled there.

  3. The task is now stuck in running state forever

    This feels like a bug to me in the worker. execute just returning None here is leaving the task in an incorrect state. We get away with it for closing and closed, since "forever" won't last much longer, but even for those, it's technically incorrect. I feel like this should return an event—maybe even a form of AlreadyCancelledEvent, since you could consider the worker closing/draining to be a form of cancellation?

    Or alternatively, perhaps the status change to closing_gracefully should cancel all tasks?

  4. Finally, the ability to reverse closing_gracefully is actually critical to the implementation of retire_workers right now, even if it doesn't happen often. That's the way we guarantee that Any key that is in memory exclusively on the retired workers is replicated somewhere else. retire_workers doesn't do any sort of clever binpacking or satisfiability to guarantee that the set of workers not retired will be able to hold all the necessary data—it just picks some decent-looking ones to retire with a heuristic (workers_to_close) and tries it. If it turns out that it picked too many, or some others have paused, then it asks some retirees to rejoin the workforce. Without that, you'd be forced to retire workers you should have kept around, and have to recompute all their keys.


I know retire_workers isn't actually used that commonly yet. So I think the priority of addressing this is rather low.

But I think the fix first has to happen on the scheduler side, to prevent new tasks from being sent to draining workers. Otherwise, even if we handle the task well on the worker side (don't leave it in executing forever), the scheduler will still always think it's processing—AFAIK we don't currently have a way for a worker to tell the scheduler, "hey I just cancelled this task even though you didn't tell me to".

@fjetter
Copy link
Member

fjetter commented Aug 10, 2022

Since we do not want this situation to be too simple, there is yet another pseudo-retirement code path with Worker.close_gracefully which is triggered when lifetime is timed out.

async def close_gracefully(self, restart=None):
"""Gracefully shut down a worker
This first informs the scheduler that we're shutting down, and asks it
to move our data elsewhere. Afterwards, we close as normal
"""
if self.status in (Status.closing, Status.closing_gracefully):
await self.finished()
if self.status == Status.closed:
return
if restart is None:
restart = self.lifetime_restart
logger.info("Closing worker gracefully: %s", self.address)
# Wait for all tasks to leave the worker and don't accept any new ones.
# Scheduler.retire_workers will set the status to closing_gracefully and push it
# back to this worker.
await self.scheduler.retire_workers(
workers=[self.address],
close_workers=False,
remove=False,
stimulus_id=f"worker-close-gracefully-{time()}",
)
await self.close(nanny=not restart)

This will close the worker for good. If that is the only broken code path, that's good news.


Taking a step back again.

What clearly happens here is that people were looking at different areas of the code and we clearly have different versions of the code around and different versions in our memory.

For instance, just browsing the code I can find...

  • A method Worker.close_gracefully that is invoked when lifetime times out but otherwise it's dead code; assuming no user is calling this.
  • A method Nanny.close_gracefully
  • An RPC handler Nanny.handler['close_gracefully'] that calls into the method
  • A state called clos**ing**_gracefully
  • This state is set on the nanny and on the worker under different circumstances
    • on a worker if the scheduler is calling retire_workers
    • on the nanny if the worker is closing
  • to make this entire thing a bit more confusing, it is part of the same enum Status that is used to control actual server lifecycle

I think the problem here is that we should clearly distinguish a server status and a state machine or task status.

Server stati

  • Init
  • Starting
  • Running
  • Closing / alt: stopping
  • Closed / alt: stopped
  • Closing gracefully (For nanny; indicate the the worker is shutting down intentionally and it should terminate after the worker is closed.)
  • failed

State machine / Tasks stati

  • Active
  • Paused
  • Draining
  • ?

These two sets are entangled but they have very different state diagrams. The server side acyclic while the latter isn't.

That problem cause already a lot of complexity in writing Server.close and was causing problems there, most notably we had Zombie workers in the past because these states were intertwined. I believe the very first step here is to break them apart

@crusaderky
Copy link
Collaborator Author

  1. closing_gracefully is probably not the right name for the state. Perhaps it should have been called draining or something like that. The closing part of the name is misleading, as @crusaderky points out.

Agreed.

  1. The worker receives a {op: compute-task} command followed, within the same batched-send packet, by {op: worker-status-change, status: closing_gracefully}.

    That feels a bit like it should be a scheduler bug to me. The scheduler has just asked the worker to close gracefully—why should it send it more tasks to run then?

It is the other way around: first the scheduler asks to run a task, and then it asks the worker to shut down. This happens when either the client or the adaptive scaler asks to, and the scheduler can predict neither.

I know it's because right now, we just ask the worker to retire gracefully:

self.stream_comms[ws.address].send(
{
"op": "worker-status-change",
"status": ws.status.name,
"stimulus_id": stimulus_id,
}
)

and don't update its state until it echos back to us that its state has changed. But on the scheduler, I think we should be updating some state eagerly.

What you describe is already happening, 3 lines above the snippet you posted:

# Change Worker.status to closing_gracefully. Immediately set
# the same on the scheduler to prevent race conditions.
prev_status = ws.status
ws.status = Status.closing_gracefully
self.running.discard(ws)

This feels like a bug to me in the worker. execute just returning None here is leaving the task in an incorrect state. We get away with it for closing and closed, since "forever" won't last much longer, but even for those, it's technically incorrect. I feel like this should return an event—maybe even a form of AlreadyCancelledEvent, since you could consider the worker closing/draining to be a form of cancellation?

This however means that _handle_already_cancelled will need to deal with the task being not cancelled.
Which actually makes me realise that there is another deadly race condition in these few lines of code: #6869 🥴

Or alternatively, perhaps the status change to closing_gracefully should cancel all tasks?

This would open a huge can of worms if the scheduler reverts the decision. Related: #3761

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deadlock The cluster appears to not make any progress
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants