Skip to content

Commit

Permalink
Track reason of workers closing and restarting (#7166)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Oct 25, 2022
1 parent 9fa952b commit 0983731
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 56 deletions.
4 changes: 3 additions & 1 deletion distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ async def setup(self, worker):
if self.restart and worker.nanny and not await self._is_restarted(worker):
logger.info("Restarting worker to refresh interpreter.")
await self._set_restarted(worker)
worker.loop.add_callback(worker.close_gracefully, restart=True)
worker.loop.add_callback(
worker.close_gracefully, restart=True, reason=f"{self.name}-setup"
)

@abc.abstractmethod
def install(self) -> None:
Expand Down
64 changes: 41 additions & 23 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from inspect import isawaitable
from queue import Empty
from time import sleep as sync_sleep
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, Literal

from toolz import merge
from tornado import gen
Expand Down Expand Up @@ -110,7 +110,7 @@ class Nanny(ServerNode):
"""

_instances: ClassVar[weakref.WeakSet[Nanny]] = weakref.WeakSet()
process = None
process: WorkerProcess | None
memory_manager: NannyMemoryManager

env: dict[str, str]
Expand Down Expand Up @@ -162,6 +162,7 @@ def __init__( # type: ignore[no-untyped-def]
stacklevel=2,
)

self.process = None
self._setup_logging(logger)
self.loop = self.io_loop = IOLoop.current()

Expand Down Expand Up @@ -256,8 +257,8 @@ def __init__( # type: ignore[no-untyped-def]
"instantiate": self.instantiate,
"kill": self.kill,
"restart": self.restart,
# cannot call it 'close' on the rpc side for naming conflict
"get_logs": self.get_logs,
# cannot call it 'close' on the rpc side for naming conflict
"terminate": self.close,
"close_gracefully": self.close_gracefully,
"run": self.run,
Expand Down Expand Up @@ -364,7 +365,7 @@ async def start_unsafe(self):
response = await self.instantiate()

if response != Status.running:
await self.close()
await self.close(reason="nanny-start-failed")
return

assert self.worker_address
Expand All @@ -373,7 +374,7 @@ async def start_unsafe(self):

return self

async def kill(self, timeout=2):
async def kill(self, timeout: float = 2, reason: str = "nanny-kill") -> None:
"""Kill the local worker process
Blocks until both the process is down and the scheduler is properly
Expand All @@ -383,7 +384,7 @@ async def kill(self, timeout=2):
return

deadline = time() + timeout
await self.process.kill(timeout=0.8 * (deadline - time()))
await self.process.kill(reason=reason, timeout=0.8 * (deadline - time()))

async def instantiate(self) -> Status:
"""Start a local worker process
Expand Down Expand Up @@ -430,15 +431,17 @@ async def instantiate(self) -> Status:
self,
self.scheduler_addr,
)
await self.close(timeout=self.death_timeout)
await self.close(
timeout=self.death_timeout, reason="nanny-instantiate-timeout"
)
raise

else:
try:
result = await self.process.start()
except Exception:
logger.error("Failed to start process", exc_info=True)
await self.close()
await self.close(reason="nanny-instantiate-failed")
raise
return result

Expand All @@ -464,7 +467,7 @@ async def plugin_add(self, plugin=None, name=None):
msg = error_message(e)
return msg
if getattr(plugin, "restart", False):
await self.restart()
await self.restart(reason=f"nanny-plugin-{name}-restart")

return {"status": "OK"}

Expand All @@ -483,10 +486,12 @@ async def plugin_remove(self, name=None):

return {"status": "OK"}

async def restart(self, timeout=30):
async def restart(
self, timeout: float = 30, reason: str = "nanny-restart"
) -> Literal["OK", "timed out"]:
async def _():
if self.process is not None:
await self.kill()
await self.kill(reason=reason)
await self.instantiate()

try:
Expand Down Expand Up @@ -525,7 +530,7 @@ async def _on_worker_exit(self, exitcode):
except OSError:
logger.exception("Failed to unregister")
if not self.reconnect:
await self.close()
await self.close(reason="nanny-unregister-failed")
return

try:
Expand All @@ -538,7 +543,7 @@ async def _on_worker_exit(self, exitcode):
logger.warning("Restarting worker")
await self.instantiate()
elif self.status == Status.closing_gracefully:
await self.close()
await self.close(reason="nanny-close-gracefully")

except Exception:
logger.error(
Expand All @@ -553,15 +558,20 @@ def _close(self, *args, **kwargs):
warnings.warn("Worker._close has moved to Worker.close", stacklevel=2)
return self.close(*args, **kwargs)

def close_gracefully(self):
def close_gracefully(self, reason: str = "nanny-close-gracefully") -> None:
"""
A signal that we shouldn't try to restart workers if they go away
This is used as part of the cluster shutdown process.
"""
self.status = Status.closing_gracefully
logger.info(
"Closing Nanny gracefully at %r. Reason: %s", self.address_safe, reason
)

async def close(self, timeout=5):
async def close(
self, timeout: float = 5, reason: str = "nanny-close"
) -> Literal["OK"]:
"""
Close the worker process, stop all comms.
"""
Expand All @@ -573,10 +583,7 @@ async def close(self, timeout=5):
return "OK"

self.status = Status.closing
logger.info(
"Closing Nanny at %r.",
self.address_safe,
)
logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason)

for preload in self.preloads:
await preload.teardown()
Expand All @@ -592,7 +599,7 @@ async def close(self, timeout=5):
self.stop()
try:
if self.process is not None:
await self.kill(timeout=timeout)
await self.kill(timeout=timeout, reason=reason)
except Exception:
logger.exception("Error in Nanny killing Worker subprocess")
self.process = None
Expand All @@ -615,6 +622,7 @@ class WorkerProcess:
running: asyncio.Event
stopped: asyncio.Event

process: AsyncProcess | None
env: dict[str, str]
pre_spawn_env: dict[str, str]

Expand Down Expand Up @@ -744,6 +752,7 @@ def pid(self):

def mark_stopped(self):
if self.status != Status.stopped:
assert self.process is not None
r = self.process.exitcode
assert r is not None
if r != 0:
Expand All @@ -764,7 +773,12 @@ def mark_stopped(self):
if self.on_exit is not None:
self.on_exit(r)

async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None:
async def kill(
self,
timeout: float = 2,
executor_wait: bool = True,
reason: str = "workerprocess-kill",
) -> None:
"""
Ensure the worker process is stopped, waiting at most
``timeout * 0.8`` seconds before killing it abruptly.
Expand All @@ -787,7 +801,7 @@ async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None:
Status.failed, # process failed to start, but hasn't been joined yet
), self.status
self.status = Status.stopping
logger.info("Nanny asking worker to close")
logger.info("Nanny asking worker to close. Reason: %s", reason)

process = self.process
assert process
Expand All @@ -799,6 +813,7 @@ async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None:
"op": "stop",
"timeout": wait_timeout,
"executor_wait": executor_wait,
"reason": reason,
}
)
await asyncio.sleep(0) # otherwise we get broken pipe errors
Expand Down Expand Up @@ -871,12 +886,15 @@ def _run(
loop.make_current()
worker = Worker(**worker_kwargs)

async def do_stop(timeout=5, executor_wait=True):
async def do_stop(
timeout=5, executor_wait=True, reason="workerprocess-stop"
):
try:
await worker.close(
nanny=False,
executor_wait=executor_wait,
timeout=timeout,
reason=reason,
)
finally:
loop.stop()
Expand Down
13 changes: 9 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3981,7 +3981,7 @@ async def log_errors(func):
if not comm.closed():
# This closes the Worker and ensures that if a Nanny is around,
# it is closed as well
comm.send({"op": "close"})
comm.send({"op": "close", "reason": "scheduler-close"})
comm.send({"op": "close-stream"})
# ^ TODO remove? `Worker.close` will close the stream anyway.
with suppress(AttributeError):
Expand Down Expand Up @@ -4729,7 +4729,7 @@ def close_worker(self, worker: str) -> None:

logger.info("Closing worker %s", worker)
self.log_event(worker, {"action": "close-worker"})
self.worker_send(worker, {"op": "close"})
self.worker_send(worker, {"op": "close", "reason": "scheduler-close-worker"})

@log_errors
async def remove_worker(
Expand Down Expand Up @@ -4768,7 +4768,9 @@ async def remove_worker(
logger.info("Remove worker %s", ws)
if close:
with suppress(AttributeError, CommClosedError):
self.stream_comms[address].send({"op": "close"})
self.stream_comms[address].send(
{"op": "close", "reason": "scheduler-remove-worker"}
)

self.remove_resources(address)

Expand Down Expand Up @@ -5751,7 +5753,10 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True):
# FIXME does not raise if the process fails to shut down,
# see https://github.com/dask/distributed/pull/6427/files#r894917424
# NOTE: Nanny will automatically restart worker process when it's killed
nanny.kill(timeout=timeout),
nanny.kill(
reason="scheduler-restart",
timeout=timeout,
),
timeout,
)
for nanny in nannies
Expand Down
12 changes: 7 additions & 5 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ async def test_no_hang_when_scheduler_closes(s, a, b):
Worker=Nanny, nthreads=[("127.0.0.1", 1)], worker_kwargs={"reconnect": False}
)
async def test_close_on_disconnect(s, w):
await s.close()
with captured_logger("distributed.nanny") as logger:
await s.close()

start = time()
while w.status != Status.closed:
await asyncio.sleep(0.05)
assert time() < start + 9
start = time()
while w.status != Status.closed:
await asyncio.sleep(0.05)
assert time() < start + 9
assert "Reason: scheduler-close" in logger.getvalue()


class Something(Worker):
Expand Down
9 changes: 5 additions & 4 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,9 @@ async def test_restart(c, s, a, b):
with captured_logger("distributed.scheduler") as caplog:
futures = c.map(inc, range(20))
await wait(futures)

await s.restart()
with captured_logger("distributed.nanny") as nanny_logger:
await s.restart()
assert "Reason: scheduler-restart" in nanny_logger.getvalue()

assert not s.computations
assert not s.task_prefixes
Expand Down Expand Up @@ -909,12 +910,12 @@ def __init__(self, *args, **kwargs):
self.kill_called = asyncio.Event()
super().__init__(*args, **kwargs)

async def kill(self, *, timeout):
async def kill(self, *, timeout, reason=None):
self.kill_called.set()
print("kill called")
await asyncio.wait_for(self.kill_proceed.wait(), timeout)
print("kill proceed")
return await super().kill(timeout=timeout)
return await super().kill(timeout=timeout, reason=reason)


@gen_cluster(client=True, Worker=SlowKillNanny, nthreads=[("", 1)] * 2)
Expand Down
2 changes: 2 additions & 0 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,8 @@ async def test(s):

while a.status != Status.closed:
await asyncio.sleep(0.01)
method_name = "fail_sync" if sync else "fail_async"
assert f"worker-{method_name}-fail-hard" in logger.getvalue()

test_done = True

Expand Down
4 changes: 3 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,9 @@ async def test_close_gracefully(c, s, a, b):

assert any(ts for ts in b.state.tasks.values() if ts.state == "executing")

await b.close_gracefully()
with captured_logger("distributed.worker") as logger:
await b.close_gracefully(reason="foo")
assert "Reason: foo" in logger.getvalue()

assert b.status == Status.closed
assert b.address not in s.workers
Expand Down
Loading

0 comments on commit 0983731

Please sign in to comment.