Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Client.restart_workers method #7154

Merged
merged 6 commits into from
Oct 27, 2022
Merged

Conversation

jrbourbeau
Copy link
Member

@jrbourbeau jrbourbeau commented Oct 18, 2022

Sometimes users want to restart individual / a subset of workers in their cluster without restarting their entire cluster (with client.restart()). This PR adds a Client.restart_workers method for this (similar to the existing Client.retire_workers method we provide).

Noting that there is a relatively high level on engagement on #1823.

Closes #1823

Comment on lines +3486 to +3491
info = self.scheduler_info()
for worker in workers:
if info["workers"][worker]["nanny"] is None:
raise ValueError(
f"Restarting workers requires a nanny to be used. Worker {worker} has type {info['workers'][worker]['type']}."
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, but is a little clunky. If in the future we want similar behavior elsewhere, we might consider pushing this sort of logic down into Scheduler.broadcast directly. I've held off on doing so for the time being.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you do not handle this error here? I would expect broadcast to raise if we are selecting nannies but there are None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today I think it just ignores non-Nanny workers when nanny=True

results = await All(
[send_message(address) for address in addresses if address is not None]
)

In ^ that snippet address=None for non-Nanny workers

@github-actions
Copy link
Contributor

github-actions bot commented Oct 18, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   6h 38m 10s ⏱️ + 33m 53s
  3 160 tests +  3    3 068 ✔️ +  3    83 💤  - 1    9 +1 
23 380 runs  +24  22 448 ✔️ +19  902 💤 +1  30 +4 

For more details on these failures, see this check.

Results for commit 1aea53f. ± Comparison against base commit 621994e.

♻️ This comment has been updated with latest results.

@jakirkham
Copy link
Member

Neat! This need comes up a lot.

cc @pentschev @quasiben (who may find this of interest)

@jrbourbeau
Copy link
Member Author

Ah, grand. I'd be curious to hear why this comes up in a RAPIDS context -- at least I assume it's RAPIDS-based based on the Ben / Peter pings : )


See Also
--------
Client.restart
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to cross reference this new method in Client.restart as well. I would also appreciate a sentence about the differences between the two methods

Comment on lines 3492 to 3494
return self.sync(
self.scheduler.broadcast, msg={"op": "restart"}, workers=workers, nanny=True
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nanny.restart can return timed out which should be handled here.

I don't thin we should return the output of the broadcast. I doubt this is useful / a good user facing API

@@ -3455,6 +3455,44 @@ def restart(self, timeout=no_default, wait_for_workers=True):
self._restart, timeout=timeout, wait_for_workers=wait_for_workers
)

def restart_workers(self, workers: list[str]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nanny.restart is taking a timeout argument. We should allow this to be provided here

Comment on lines +3486 to +3491
info = self.scheduler_info()
for worker in workers:
if info["workers"][worker]["nanny"] is None:
raise ValueError(
f"Restarting workers requires a nanny to be used. Worker {worker} has type {info['workers'][worker]['type']}."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you do not handle this error here? I would expect broadcast to raise if we are selecting nannies but there are None

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really useful!

@pentschev
Copy link
Member

Thanks @jrbourbeau for the work on this and @jakirkham for the ping. This is something that I remember coming up a couple of times, mostly from people wanting to have a CPU worker that eventually turns into a GPU worker or vice-versa.

Jacob, you also had some use for that, didn't you? I can't remember what was the exact use case now though.

@jacobtomlinson
Copy link
Member

Jacob, you also had some use for that, didn't you? I can't remember what was the exact use case now though.

Kinda, I have some plans around restarting the worker with different config options which is adjacent to this change. I'm interested in something more along the lines of re-deploying the cluster but without losing HPC/Cloud resources that have already been allocated.

See the dask-agent experimental repo I was playing with a while ago.

@jrbourbeau
Copy link
Member Author

Thanks for the review @fjetter -- just pushed a commit that should handle your suggestions

@jakirkham
Copy link
Member

There are also times where workers become unusable or need to be refreshed. Currently we tell people to restart the full cluster, but maybe a less drastic option (like this one) would be useful.

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason we do not reuse the restarting logic within Scheduler.restart and instead implement Client.restart_worker with (what seem to be) slightly different post-conditions? I'd rather have the scheduler be responsible for restarting workers and avoid two functions that might drift even further apart in semantics in the future.

Restarting workers within Scheduler.restart

n_workers = len(self.workers)
nanny_workers = {
addr: ws.nanny for addr, ws in self.workers.items() if ws.nanny
}
# Close non-Nanny workers. We have no way to restart them, so we just let them go,
# and assume a deployment system is going to restart them for us.
await asyncio.gather(
*(
self.remove_worker(address=addr, stimulus_id=stimulus_id)
for addr in self.workers
if addr not in nanny_workers
)
)
logger.debug("Send kill signal to nannies: %s", nanny_workers)
async with contextlib.AsyncExitStack() as stack:
nannies = await asyncio.gather(
*(
stack.enter_async_context(
rpc(nanny_address, connection_args=self.connection_args)
)
for nanny_address in nanny_workers.values()
)
)
start = monotonic()
resps = await asyncio.gather(
*(
asyncio.wait_for(
# FIXME does not raise if the process fails to shut down,
# see https://github.com/dask/distributed/pull/6427/files#r894917424
# NOTE: Nanny will automatically restart worker process when it's killed
nanny.kill(timeout=timeout),
timeout,
)
for nanny in nannies
),
return_exceptions=True,
)
# NOTE: the `WorkerState` entries for these workers will be removed
# naturally when they disconnect from the scheduler.
# Remove any workers that failed to shut down, so we can guarantee
# that after `restart`, there are no old workers around.
bad_nannies = [
addr for addr, resp in zip(nanny_workers, resps) if resp is not None
]
if bad_nannies:
await asyncio.gather(
*(
self.remove_worker(addr, stimulus_id=stimulus_id)
for addr in bad_nannies
)
)
raise TimeoutError(
f"{len(bad_nannies)}/{len(nannies)} nanny worker(s) did not shut down within {timeout}s"
)
self.log_event([client, "all"], {"action": "restart", "client": client})
if wait_for_workers:
while len(self.workers) < n_workers:
# NOTE: if new (unrelated) workers join while we're waiting, we may return before
# our shut-down workers have come back up. That's fine; workers are interchangeable.
if monotonic() < start + timeout:
await asyncio.sleep(0.2)
else:
msg = (
f"Waited for {n_workers} worker(s) to reconnect after restarting, "
f"but after {timeout}s, only {len(self.workers)} have returned. "
"Consider a longer timeout, or `wait_for_workers=False`."
)
if (n_nanny := len(nanny_workers)) < n_workers:
msg += (
f" The {n_workers - n_nanny} worker(s) not using Nannies were just shut "
"down instead of restarted (restart is only possible with Nannies). If "
"your deployment system does not automatically re-launch terminated "
"processes, then those workers will never come back, and `Client.restart` "
"will always time out. Do not use `Client.restart` in that case."
)
raise TimeoutError(msg) from None

Post conditions on Scheduler.restart

Workers without nannies are shut down, hoping an external deployment system
will restart them. Therefore, if not using nannies and your deployment system
does not automatically restart workers, ``restart`` will just shut down all
workers, then time out!
After `restart`, all connected workers are new, regardless of whether `TimeoutError`
was raised. Any workers that failed to shut down in time are removed, and
may or may not shut down on their own in the future.
?

@jrbourbeau
Copy link
Member Author

It looks like Scheduler.restart restarts all workers and clears all the local state on the scheduler (this is where all the heavy lifting for Client.restart is done). This PR is for just restarting a specified set of workers without clearing the scheduler's state. Does that help clarify things?

@hendrikmakait
Copy link
Member

hendrikmakait commented Oct 21, 2022

Is there a specific reason we do not reuse the restarting logic within Scheduler.restart and instead implement Client.restart_worker with (what seem to be) slightly different post-conditions?

I was referring to the snippet from Scheduler.restart linked in the previous post which is concerned with restarting nannies (or dropping non-nanny workers). My idea was that this could be factored out of Scheduler.restart into a method that could also be called by Client.restart_workers, which would enable code reuse and ensure similar (configurable) semantics. One main difference is the way of dealing with workers that refuse to restart within the timeout. restart makes sure they are removed whereas restart_workers works on a best-effort basis. This difference might be unintuitive for users.

@hendrikmakait
Copy link
Member

One conceptual issue I see with this PR is that tasks running on restarting workers will fail (or increment their suspicious count). At the very least, this should be documented in the docstring. From my perspective, this operation should probably be considered "safe", i.e., restarting workers should not impact the retry-count of tasks currently running on them. We could leave this for a future PR that makes the logic safe and release Client.restart_workers with an appropriate warning in the docstring for users that know what they are doing.

Reproducer

@gen_cluster(
    client=True,
    Worker=Nanny,
    nthreads=[("", 1)],
    config={"distributed.scheduler.allowed-failures": 0},
)
async def test_restart_workers_fails_executing_task(c, s, a):
    ev_start = Event()
    ev_block = Event()

    def clog(ev_start, ev_block):
        ev_start.set()
        ev_block.wait()

    fut = c.submit(
        clog,
        ev_start=ev_start,
        ev_block=ev_block,
        key="wait",
    )
    ev_start.wait()
    await c.restart_workers(workers=[a.worker_address])
    assert await fut.result()

@hendrikmakait
Copy link
Member

hendrikmakait commented Oct 26, 2022

@jrbourbeau: In #7184, I have started working on underlying changes to Client.restart that will enable us to make Client.restart and Client.restart_workers behave more similar. As suggestions for this PR, I would document that restarting workers is not safe wrt. to the task retries and leave the rest as is for now. In #7154 I plan to expose a scheduler-side call that can be used by Client.restart_worker which should take care of my objections. I can rewire this in #7184, so there's no need to block this PR.

@jrbourbeau jrbourbeau mentioned this pull request Oct 26, 2022
6 tasks
@jrbourbeau
Copy link
Member Author

Thanks @hendrikmakait -- I've included a note about this in the Client.restart_workers docstring. Let me know what you think. Otherwise, I think this PR should be good to go

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @jrbourbeau. It looks like all the points from @fjetter are addressed as well.

will restart all workers and also reset local state on the cluster
(e.g. all keys are released).

Additionally, this method makes no safety guarantees for tasks that are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Additionally, this method makes no safety guarantees for tasks that are
Additionally, this method does not gracefully handle tasks that are

@jrbourbeau
Copy link
Member Author

Thanks all for the feedback and @fjetter @hendrikmakait for reviewing! Merging this one in -- CI failures are unrelated

@jrbourbeau jrbourbeau merged commit 4210cc8 into dask:main Oct 27, 2022
@jrbourbeau jrbourbeau deleted the restart-workers branch October 27, 2022 16:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Restart given worker(s) using client [help wanted]
6 participants