-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure client.restart waits for workers to leave and come back #6637
Ensure client.restart waits for workers to leave and come back #6637
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 49m 26s ⏱️ + 10m 41s For more details on these failures and errors, see this check. Results for commit bd61588. ± Comparison against base commit 18a4642. ♻️ This comment has been updated with latest results. |
@gen_cluster(Worker=Nanny, client=True, nthreads=[("", 1)]) | ||
async def test_restart_waits_for_new_workers(c, s, a): | ||
initial_workers = set(s.workers) | ||
await c.restart() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await c.restart() | |
await c.restart() | |
assert len(s.workers) == len(initial_workers) |
distributed/tests/test_client.py
Outdated
async def test_restart_waits_for_new_workers(c, s, a): | ||
initial_workers = set(s.workers) | ||
await c.restart() | ||
assert set(s.workers) != initial_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be more cautious to check that we do not have an overlap in the IDs at all (as we do in the implementation)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the challenge here is to not test the implementation. We can, however, check that the workers we started with are no longer known to the scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we generally should not test the implementation, but I think in this case, it simply turns out that we can implement the functionality using the actual conditions we would like to test, so this should be fine.
distributed/scheduler.py
Outdated
@@ -5140,7 +5144,11 @@ async def restart(self, client=None, timeout=30): | |||
|
|||
self.log_event([client, "all"], {"action": "restart", "client": client}) | |||
start = time() | |||
while time() < start + 10 and len(self.workers) < n_workers: | |||
while ( | |||
time() < start + 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this? I can only imagine that this was added to give the scheduler time to start dropping workers, for which we added conditions below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the "correct" way would be to time out if this condition is not met and not silently return. That would be a breaking change and I'm not convinced this is worth it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we're talking about the same thing, to be more specific, I was referring to time() < start + 10
only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and I think what we do is bad practice. We simply wait for 10 seconds and return regardless of whether the cluster is in the state we want it to be or not. Instead, we should raise a TimeoutError but that would be a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I read that as an OR
condition which felt weird. Your explanation totally makes sense for and AND
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new test appears to be broken in 2 CI runs, is that related or some CI GC issue?
There are some weird failures that may be fixed by #6651 |
waiting for merge until that one is merged and I can rebase this PR |
90931f0
to
bd61588
Compare
The new test is unfortunately not solid. It appears that the workers are timing out |
@@ -5083,11 +5083,15 @@ def clear_task_state(self): | |||
for collection in self._task_state_collections: | |||
collection.clear() | |||
|
|||
def _get_worker_ids(self) -> set[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems kinda unnecessary to me as a Scheduler method, where else would it be used?
@@ -5161,7 +5165,9 @@ async def restart(self, client=None, timeout=30): | |||
|
|||
self.log_event([client, "all"], {"action": "restart", "client": client}) | |||
start = time() | |||
while time() < start + 10 and len(self.workers) < n_workers: | |||
while time() < start + 10 and ( | |||
len(self.workers) < n_workers or initial_workers & self._get_worker_ids() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See this comment: https://github.com/dask/distributed/pull/6637/files#diff-bbcf2e505bf2f9dd0dc25de4582115ee4ed4a6e80997affc7b22122912cc6591R5108-R5109
Workers without nannies will be closed and not come back. So we shouldn't wait for them. We should just be waiting for nanny_workers
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I understood the intent of that comment and the code, we purposefully rely on the deployment system to bring the workers back up and we therefore want to wait for that to happen. Otherwise, in a non-nanny environment, Scheduler.restart
would just be a Scheduler.shutdown
with extra steps. I might be wrong though...and we should definitely improve the way we handle the case where the deployment system does not restart workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have no guarantee that the deployment system will bring them back up. It's entirely possible it's just a Scheduler.shutdown
!
With the 10s timeout, I guess it's okay to wait for all of them, even if they'll never come back. But it feels like a kind of odd assumption to make.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to Florian's comment here: #6637 (comment)
@@ -5161,7 +5165,9 @@ async def restart(self, client=None, timeout=30): | |||
|
|||
self.log_event([client, "all"], {"action": "restart", "client": client}) | |||
start = time() | |||
while time() < start + 10 and len(self.workers) < n_workers: | |||
while time() < start + 10 and ( | |||
len(self.workers) < n_workers or initial_workers & self._get_worker_ids() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't a restarted worker going to have a new server_id
? The ID is coming from the Worker
, not the Nanny
instance:
distributed/distributed/core.py
Line 341 in d88c1d2
self.id = type(self).__name__ + "-" + str(uuid.uuid4()) |
distributed/distributed/worker.py
Line 1113 in d88c1d2
server_id=self.id, |
So I don't know why we'd even expect those IDs to come back. Indeed, if they did come back that would be highly concerning.
I think this might be reasonable (though obviously not tolerant to new workers joining during the restart):
len(self.workers) < n_workers or initial_workers & self._get_worker_ids() | |
len(self.running) < len(nanny_workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a scenario where due to some race condition an old worker might somehow still be in self.worker
or reconnecting before shutting down? I think that's what initial_workers & self._get_worker_ids()
is intended to catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's possible if we hit the not all(resp == "OK" for resp in resps)
branch when telling Nanny workers to restart. If the nanny times out trying to kill the worker subprocess, that worker may still be connected.
distributed/distributed/nanny.py
Lines 479 to 491 in f7f6501
async def restart(self, timeout=30): | |
async def _(): | |
if self.process is not None: | |
await self.kill() | |
await self.instantiate() | |
try: | |
await asyncio.wait_for(_(), timeout) | |
except TimeoutError: | |
logger.error( | |
f"Restart timed out after {timeout}s; returning before finished" | |
) | |
return "timed out" |
However, in that case, I think we'd want to ignore that worker in our count of workers we expect to come back. Since we didn't successfully shut it down, it's still around, and if/when it does shut down in the future, it's quite possible it won't come back. We can disregard the fact that we tried to restart that one; it's a lost cause.
Honestly, I wonder if we should call remove_worker
on the workers that didn't respond "OK"
. Their state is a bit undefined, but it's probably kinda broken. Maybe we don't want to talk to them anymore. If we remove them, the scheduler will reject them the next time they try to talk to it, so it's safe.
But if we don't do that, then I think initial_workers & self._get_worker_ids()
is too strict. Because the only reason some initial_workers
would still be connected is that their nannies failed to shut down. If they failed to shut down, then they may never restart, so we shouldn't wait for them.
I'd then be torn on len(self.workers) < n_workers
vs len(self.running) < n_workers
. It's quite possible these broken workers aren't running anymore, and never will be. So just checking len(self.workers)
means they'll still be counted. OTOH, it's nice to wait for new workers to be fully running instead of just connected. We could do
if n_restart_failed := sum(resp != "OK" for resp in resps):
logger.error(
"Not all workers responded positively: %s",
resps,
exc_info=True,
)
n_workers -= n_restart_failed
Then checking len(self.running) < n_workers
will ignore the workers we don't expect to restart. But I don't know if it's necessary.
Client should be blocking until the workers are cycled through. At the very least since #6504 we're no longer removing workers immediately from the internal state tracking and therefore client.restart would return immediately before any workers were removed.
This is at the very least an issue since #6504 possibly before. I have a strong suspicion that this is responsible for coiled/benchmarks#166
I believe in cases of timeouts of individual nanny restarts, this was a problem before #6504 already