Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure client.restart waits for workers to leave and come back #6637

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5083,11 +5083,15 @@ def clear_task_state(self):
for collection in self._task_state_collections:
collection.clear()

def _get_worker_ids(self) -> set[str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems kinda unnecessary to me as a Scheduler method, where else would it be used?

return set({ws.server_id for ws in self.workers.values()})

@log_errors
async def restart(self, client=None, timeout=30):
"""Restart all workers. Reset local state."""
stimulus_id = f"restart-{time()}"
n_workers = len(self.workers)
initial_workers = self._get_worker_ids()
n_workers = len(initial_workers)

logger.info("Send lost future signal to clients")
for cs in self.clients.values():
Expand Down Expand Up @@ -5161,7 +5165,9 @@ async def restart(self, client=None, timeout=30):

self.log_event([client, "all"], {"action": "restart", "client": client})
start = time()
while time() < start + 10 and len(self.workers) < n_workers:
while time() < start + 10 and (
len(self.workers) < n_workers or initial_workers & self._get_worker_ids()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this comment: https://github.com/dask/distributed/pull/6637/files#diff-bbcf2e505bf2f9dd0dc25de4582115ee4ed4a6e80997affc7b22122912cc6591R5108-R5109

Workers without nannies will be closed and not come back. So we shouldn't wait for them. We should just be waiting for nanny_workers.

Copy link
Member

@hendrikmakait hendrikmakait Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understood the intent of that comment and the code, we purposefully rely on the deployment system to bring the workers back up and we therefore want to wait for that to happen. Otherwise, in a non-nanny environment, Scheduler.restart would just be a Scheduler.shutdown with extra steps. I might be wrong though...and we should definitely improve the way we handle the case where the deployment system does not restart workers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have no guarantee that the deployment system will bring them back up. It's entirely possible it's just a Scheduler.shutdown!

With the 10s timeout, I guess it's okay to wait for all of them, even if they'll never come back. But it feels like a kind of odd assumption to make.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to Florian's comment here: #6637 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a restarted worker going to have a new server_id? The ID is coming from the Worker, not the Nanny instance:

self.id = type(self).__name__ + "-" + str(uuid.uuid4())

server_id=self.id,

So I don't know why we'd even expect those IDs to come back. Indeed, if they did come back that would be highly concerning.

I think this might be reasonable (though obviously not tolerant to new workers joining during the restart):

Suggested change
len(self.workers) < n_workers or initial_workers & self._get_worker_ids()
len(self.running) < len(nanny_workers)

Copy link
Member

@hendrikmakait hendrikmakait Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a scenario where due to some race condition an old worker might somehow still be in self.worker or reconnecting before shutting down? I think that's what initial_workers & self._get_worker_ids() is intended to catch.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's possible if we hit the not all(resp == "OK" for resp in resps) branch when telling Nanny workers to restart. If the nanny times out trying to kill the worker subprocess, that worker may still be connected.

async def restart(self, timeout=30):
async def _():
if self.process is not None:
await self.kill()
await self.instantiate()
try:
await asyncio.wait_for(_(), timeout)
except TimeoutError:
logger.error(
f"Restart timed out after {timeout}s; returning before finished"
)
return "timed out"

However, in that case, I think we'd want to ignore that worker in our count of workers we expect to come back. Since we didn't successfully shut it down, it's still around, and if/when it does shut down in the future, it's quite possible it won't come back. We can disregard the fact that we tried to restart that one; it's a lost cause.

Honestly, I wonder if we should call remove_worker on the workers that didn't respond "OK". Their state is a bit undefined, but it's probably kinda broken. Maybe we don't want to talk to them anymore. If we remove them, the scheduler will reject them the next time they try to talk to it, so it's safe.

But if we don't do that, then I think initial_workers & self._get_worker_ids() is too strict. Because the only reason some initial_workers would still be connected is that their nannies failed to shut down. If they failed to shut down, then they may never restart, so we shouldn't wait for them.

I'd then be torn on len(self.workers) < n_workers vs len(self.running) < n_workers. It's quite possible these broken workers aren't running anymore, and never will be. So just checking len(self.workers) means they'll still be counted. OTOH, it's nice to wait for new workers to be fully running instead of just connected. We could do

                if n_restart_failed := sum(resp != "OK" for resp in resps):
                    logger.error(
                        "Not all workers responded positively: %s",
                        resps,
                        exc_info=True,
                    )
                    n_workers -= n_restart_failed

Then checking len(self.running) < n_workers will ignore the workers we don't expect to restart. But I don't know if it's necessary.

):
await asyncio.sleep(0.01)

self.report({"op": "restart"})
Expand Down
10 changes: 10 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3493,6 +3493,16 @@ async def test_Client_clears_references_after_restart(c, s, a, b):
assert key not in c.refcount


@pytest.mark.slow
@gen_cluster(Worker=Nanny, client=True, nthreads=[("", 1)] * 5)
async def test_restart_waits_for_new_workers(c, s, *workers):
initial_workers = set(s.workers)
await c.restart()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await c.restart()
await c.restart()
assert len(s.workers) == len(initial_workers)

assert len(s.workers) == len(initial_workers)
for w in workers:
assert w.address not in s.workers


@gen_cluster(Worker=Nanny, client=True)
async def test_restart_timeout_is_logged(c, s, a, b):
with captured_logger(logging.getLogger("distributed.client")) as logger:
Expand Down