From 09837312ccb3a9f1a14ea068ba5825963fec82cc Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 25 Oct 2022 14:19:17 +0200 Subject: [PATCH] Track reason of workers closing and restarting (#7166) --- distributed/diagnostics/plugin.py | 4 +- distributed/nanny.py | 64 ++++++++++++++++++---------- distributed/scheduler.py | 13 ++++-- distributed/tests/test_nanny.py | 12 +++--- distributed/tests/test_scheduler.py | 9 ++-- distributed/tests/test_utils_test.py | 2 + distributed/tests/test_worker.py | 4 +- distributed/worker.py | 51 ++++++++++++++-------- 8 files changed, 103 insertions(+), 56 deletions(-) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index a8445feea19..f6c82f4c06b 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -307,7 +307,9 @@ async def setup(self, worker): if self.restart and worker.nanny and not await self._is_restarted(worker): logger.info("Restarting worker to refresh interpreter.") await self._set_restarted(worker) - worker.loop.add_callback(worker.close_gracefully, restart=True) + worker.loop.add_callback( + worker.close_gracefully, restart=True, reason=f"{self.name}-setup" + ) @abc.abstractmethod def install(self) -> None: diff --git a/distributed/nanny.py b/distributed/nanny.py index 43523694cc6..29e71f14c8e 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -15,7 +15,7 @@ from inspect import isawaitable from queue import Empty from time import sleep as sync_sleep -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, ClassVar, Literal from toolz import merge from tornado import gen @@ -110,7 +110,7 @@ class Nanny(ServerNode): """ _instances: ClassVar[weakref.WeakSet[Nanny]] = weakref.WeakSet() - process = None + process: WorkerProcess | None memory_manager: NannyMemoryManager env: dict[str, str] @@ -162,6 +162,7 @@ def __init__( # type: ignore[no-untyped-def] stacklevel=2, ) + self.process = None self._setup_logging(logger) self.loop = self.io_loop = IOLoop.current() @@ -256,8 +257,8 @@ def __init__( # type: ignore[no-untyped-def] "instantiate": self.instantiate, "kill": self.kill, "restart": self.restart, - # cannot call it 'close' on the rpc side for naming conflict "get_logs": self.get_logs, + # cannot call it 'close' on the rpc side for naming conflict "terminate": self.close, "close_gracefully": self.close_gracefully, "run": self.run, @@ -364,7 +365,7 @@ async def start_unsafe(self): response = await self.instantiate() if response != Status.running: - await self.close() + await self.close(reason="nanny-start-failed") return assert self.worker_address @@ -373,7 +374,7 @@ async def start_unsafe(self): return self - async def kill(self, timeout=2): + async def kill(self, timeout: float = 2, reason: str = "nanny-kill") -> None: """Kill the local worker process Blocks until both the process is down and the scheduler is properly @@ -383,7 +384,7 @@ async def kill(self, timeout=2): return deadline = time() + timeout - await self.process.kill(timeout=0.8 * (deadline - time())) + await self.process.kill(reason=reason, timeout=0.8 * (deadline - time())) async def instantiate(self) -> Status: """Start a local worker process @@ -430,7 +431,9 @@ async def instantiate(self) -> Status: self, self.scheduler_addr, ) - await self.close(timeout=self.death_timeout) + await self.close( + timeout=self.death_timeout, reason="nanny-instantiate-timeout" + ) raise else: @@ -438,7 +441,7 @@ async def instantiate(self) -> Status: result = await self.process.start() except Exception: logger.error("Failed to start process", exc_info=True) - await self.close() + await self.close(reason="nanny-instantiate-failed") raise return result @@ -464,7 +467,7 @@ async def plugin_add(self, plugin=None, name=None): msg = error_message(e) return msg if getattr(plugin, "restart", False): - await self.restart() + await self.restart(reason=f"nanny-plugin-{name}-restart") return {"status": "OK"} @@ -483,10 +486,12 @@ async def plugin_remove(self, name=None): return {"status": "OK"} - async def restart(self, timeout=30): + async def restart( + self, timeout: float = 30, reason: str = "nanny-restart" + ) -> Literal["OK", "timed out"]: async def _(): if self.process is not None: - await self.kill() + await self.kill(reason=reason) await self.instantiate() try: @@ -525,7 +530,7 @@ async def _on_worker_exit(self, exitcode): except OSError: logger.exception("Failed to unregister") if not self.reconnect: - await self.close() + await self.close(reason="nanny-unregister-failed") return try: @@ -538,7 +543,7 @@ async def _on_worker_exit(self, exitcode): logger.warning("Restarting worker") await self.instantiate() elif self.status == Status.closing_gracefully: - await self.close() + await self.close(reason="nanny-close-gracefully") except Exception: logger.error( @@ -553,15 +558,20 @@ def _close(self, *args, **kwargs): warnings.warn("Worker._close has moved to Worker.close", stacklevel=2) return self.close(*args, **kwargs) - def close_gracefully(self): + def close_gracefully(self, reason: str = "nanny-close-gracefully") -> None: """ A signal that we shouldn't try to restart workers if they go away This is used as part of the cluster shutdown process. """ self.status = Status.closing_gracefully + logger.info( + "Closing Nanny gracefully at %r. Reason: %s", self.address_safe, reason + ) - async def close(self, timeout=5): + async def close( + self, timeout: float = 5, reason: str = "nanny-close" + ) -> Literal["OK"]: """ Close the worker process, stop all comms. """ @@ -573,10 +583,7 @@ async def close(self, timeout=5): return "OK" self.status = Status.closing - logger.info( - "Closing Nanny at %r.", - self.address_safe, - ) + logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason) for preload in self.preloads: await preload.teardown() @@ -592,7 +599,7 @@ async def close(self, timeout=5): self.stop() try: if self.process is not None: - await self.kill(timeout=timeout) + await self.kill(timeout=timeout, reason=reason) except Exception: logger.exception("Error in Nanny killing Worker subprocess") self.process = None @@ -615,6 +622,7 @@ class WorkerProcess: running: asyncio.Event stopped: asyncio.Event + process: AsyncProcess | None env: dict[str, str] pre_spawn_env: dict[str, str] @@ -744,6 +752,7 @@ def pid(self): def mark_stopped(self): if self.status != Status.stopped: + assert self.process is not None r = self.process.exitcode assert r is not None if r != 0: @@ -764,7 +773,12 @@ def mark_stopped(self): if self.on_exit is not None: self.on_exit(r) - async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None: + async def kill( + self, + timeout: float = 2, + executor_wait: bool = True, + reason: str = "workerprocess-kill", + ) -> None: """ Ensure the worker process is stopped, waiting at most ``timeout * 0.8`` seconds before killing it abruptly. @@ -787,7 +801,7 @@ async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None: Status.failed, # process failed to start, but hasn't been joined yet ), self.status self.status = Status.stopping - logger.info("Nanny asking worker to close") + logger.info("Nanny asking worker to close. Reason: %s", reason) process = self.process assert process @@ -799,6 +813,7 @@ async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None: "op": "stop", "timeout": wait_timeout, "executor_wait": executor_wait, + "reason": reason, } ) await asyncio.sleep(0) # otherwise we get broken pipe errors @@ -871,12 +886,15 @@ def _run( loop.make_current() worker = Worker(**worker_kwargs) - async def do_stop(timeout=5, executor_wait=True): + async def do_stop( + timeout=5, executor_wait=True, reason="workerprocess-stop" + ): try: await worker.close( nanny=False, executor_wait=executor_wait, timeout=timeout, + reason=reason, ) finally: loop.stop() diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9cae2088cce..cf240240cfb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3981,7 +3981,7 @@ async def log_errors(func): if not comm.closed(): # This closes the Worker and ensures that if a Nanny is around, # it is closed as well - comm.send({"op": "close"}) + comm.send({"op": "close", "reason": "scheduler-close"}) comm.send({"op": "close-stream"}) # ^ TODO remove? `Worker.close` will close the stream anyway. with suppress(AttributeError): @@ -4729,7 +4729,7 @@ def close_worker(self, worker: str) -> None: logger.info("Closing worker %s", worker) self.log_event(worker, {"action": "close-worker"}) - self.worker_send(worker, {"op": "close"}) + self.worker_send(worker, {"op": "close", "reason": "scheduler-close-worker"}) @log_errors async def remove_worker( @@ -4768,7 +4768,9 @@ async def remove_worker( logger.info("Remove worker %s", ws) if close: with suppress(AttributeError, CommClosedError): - self.stream_comms[address].send({"op": "close"}) + self.stream_comms[address].send( + {"op": "close", "reason": "scheduler-remove-worker"} + ) self.remove_resources(address) @@ -5751,7 +5753,10 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): # FIXME does not raise if the process fails to shut down, # see https://github.com/dask/distributed/pull/6427/files#r894917424 # NOTE: Nanny will automatically restart worker process when it's killed - nanny.kill(timeout=timeout), + nanny.kill( + reason="scheduler-restart", + timeout=timeout, + ), timeout, ) for nanny in nannies diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 1de2d758a18..0fe602c14aa 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -113,12 +113,14 @@ async def test_no_hang_when_scheduler_closes(s, a, b): Worker=Nanny, nthreads=[("127.0.0.1", 1)], worker_kwargs={"reconnect": False} ) async def test_close_on_disconnect(s, w): - await s.close() + with captured_logger("distributed.nanny") as logger: + await s.close() - start = time() - while w.status != Status.closed: - await asyncio.sleep(0.05) - assert time() < start + 9 + start = time() + while w.status != Status.closed: + await asyncio.sleep(0.05) + assert time() < start + 9 + assert "Reason: scheduler-close" in logger.getvalue() class Something(Worker): diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index bd7a0c7c825..7df6796066c 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -861,8 +861,9 @@ async def test_restart(c, s, a, b): with captured_logger("distributed.scheduler") as caplog: futures = c.map(inc, range(20)) await wait(futures) - - await s.restart() + with captured_logger("distributed.nanny") as nanny_logger: + await s.restart() + assert "Reason: scheduler-restart" in nanny_logger.getvalue() assert not s.computations assert not s.task_prefixes @@ -909,12 +910,12 @@ def __init__(self, *args, **kwargs): self.kill_called = asyncio.Event() super().__init__(*args, **kwargs) - async def kill(self, *, timeout): + async def kill(self, *, timeout, reason=None): self.kill_called.set() print("kill called") await asyncio.wait_for(self.kill_proceed.wait(), timeout) print("kill proceed") - return await super().kill(timeout=timeout) + return await super().kill(timeout=timeout, reason=reason) @gen_cluster(client=True, Worker=SlowKillNanny, nthreads=[("", 1)] * 2) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 78f7ae7107c..5ff214bcc0e 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -839,6 +839,8 @@ async def test(s): while a.status != Status.closed: await asyncio.sleep(0.01) + method_name = "fail_sync" if sync else "fail_async" + assert f"worker-{method_name}-fail-hard" in logger.getvalue() test_done = True diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 25e11b9a9ca..aebf3b2b1e1 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1505,7 +1505,9 @@ async def test_close_gracefully(c, s, a, b): assert any(ts for ts in b.state.tasks.values() if ts.state == "executing") - await b.close_gracefully() + with captured_logger("distributed.worker") as logger: + await b.close_gracefully(reason="foo") + assert "Reason: foo" in logger.getvalue() assert b.status == Status.closed assert b.address not in s.workers diff --git a/distributed/worker.py b/distributed/worker.py index 2f3be51f358..2252021a09b 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -170,6 +170,7 @@ def fail_hard(method: Callable[P, T]) -> Callable[P, T]: """ Decorator to close the worker if this method encounters an exception. """ + reason = f"worker-{method.__name__}-fail-hard" if iscoroutinefunction(method): @functools.wraps(method) @@ -180,7 +181,7 @@ async def wrapper(self, *args: P.args, **kwargs: P.kwargs) -> Any: if self.status not in (Status.closed, Status.closing): self.log_event("worker-fail-hard", error_message(e)) logger.exception(e) - await _force_close(self) + await _force_close(self, reason) raise else: @@ -193,13 +194,13 @@ def wrapper(self, *args: P.args, **kwargs: P.kwargs) -> T: if self.status not in (Status.closed, Status.closing): self.log_event("worker-fail-hard", error_message(e)) logger.exception(e) - self.loop.add_callback(_force_close, self) + self.loop.add_callback(_force_close, self, reason) raise return wrapper # type: ignore -async def _force_close(self): +async def _force_close(self, reason: str): """ Used with the fail_hard decorator defined above @@ -207,7 +208,10 @@ async def _force_close(self): 2. If it doesn't, log and kill the process """ try: - await asyncio.wait_for(self.close(nanny=False, executor_wait=False), 30) + await asyncio.wait_for( + self.close(nanny=False, executor_wait=False, reason=reason), + 30, + ) except (KeyboardInterrupt, SystemExit): # pragma: nocover raise except BaseException: # pragma: nocover @@ -831,7 +835,9 @@ def __init__( if lifetime: lifetime += (random.random() * 2 - 1) * lifetime_stagger - self.io_loop.call_later(lifetime, self.close_gracefully) + self.io_loop.call_later( + lifetime, self.close_gracefully, reason="worker-lifetime-reached" + ) self.lifetime = lifetime Worker._instances.add(self) @@ -1266,7 +1272,7 @@ async def handle_scheduler(self, comm: Comm) -> None: self.address, self.status, ) - await self.close() + await self.close(reason="worker-handle-scheduler-connection-broken") async def upload_file( self, filename: str, data: str | bytes, load: bool = True @@ -1467,6 +1473,7 @@ async def close( # type: ignore timeout: float = 30, executor_wait: bool = True, nanny: bool = True, + reason: str = "worker-close", ) -> str | None: """Close the worker @@ -1475,12 +1482,14 @@ async def close( # type: ignore Parameters ---------- - timeout : float, default 30 + timeout Timeout in seconds for shutting down individual instructions - executor_wait : bool, default True + executor_wait If True, shut down executors synchronously, otherwise asynchronously - nanny : bool, default True + nanny If True, close the nanny + reason + Reason for closing the worker Returns ------- @@ -1492,6 +1501,11 @@ async def close( # type: ignore # nanny+worker, the nanny must be notified first. ==> Remove kwarg # nanny, see also Scheduler.retire_workers if self.status in (Status.closed, Status.closing, Status.failed): + logging.debug( + "Attempted to close worker that is already %s. Reason: %s", + self.status, + reason, + ) await self.finished() return None @@ -1509,9 +1523,9 @@ async def close( # type: ignore disable_gc_diagnosis() try: - logger.info("Stopping worker at %s", self.address) + logger.info("Stopping worker at %s. Reason: %s", self.address, reason) except ValueError: # address not available if already closed - logger.info("Stopping worker") + logger.info("Stopping worker. Reason: %s", reason) if self.status not in WORKER_ANY_RUNNING: logger.info("Closed worker has not yet started: %s", self.status) if not executor_wait: @@ -1540,7 +1554,7 @@ async def close( # type: ignore if nanny and self.nanny: with self.rpc(self.nanny) as r: - await r.close_gracefully() + await r.close_gracefully(reason=reason) setproctitle("dask worker [closing]") @@ -1633,7 +1647,9 @@ def _close(executor, wait): setproctitle("dask worker [closed]") return "OK" - async def close_gracefully(self, restart=None): + async def close_gracefully( + self, restart=None, reason: str = "worker-close-gracefully" + ): """Gracefully shut down a worker This first informs the scheduler that we're shutting down, and asks it @@ -1645,10 +1661,7 @@ async def close_gracefully(self, restart=None): if self.status == Status.closed: return - if restart is None: - restart = self.lifetime_restart - - logger.info("Closing worker gracefully: %s", self.address) + logger.info("Closing worker gracefully: %s. Reason: %s", self.address, reason) # Wait for all tasks to leave the worker and don't accept any new ones. # Scheduler.retire_workers will set the status to closing_gracefully and push it # back to this worker. @@ -1658,7 +1671,9 @@ async def close_gracefully(self, restart=None): remove=False, stimulus_id=f"worker-close-gracefully-{time()}", ) - await self.close(nanny=not restart) + if restart is None: + restart = self.lifetime_restart + await self.close(nanny=not restart, reason=reason) async def wait_until_closed(self): warnings.warn("wait_until_closed has moved to finished()")