Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure Nanny doesn't restart workers that fail to start, and joins subprocess #6427

Merged
merged 31 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7d73e26
`WorkerProcess.start`: wait for terminate
gjoseph92 May 23, 2022
ba57395
Nanny: don't attempt to restart if `failed`
gjoseph92 May 23, 2022
0676b3b
Log failure to kill worker process
gjoseph92 May 23, 2022
2861e56
`AsyncProcess.join` should raise TimeoutError
gjoseph92 May 23, 2022
5e1d2ea
`WorkerProcess.kill` should join the process
gjoseph92 May 23, 2022
ea544e5
fix `test_worker_start_exception` to actually test
gjoseph92 May 23, 2022
3405583
failed is a valid status for `WorkerProcess.kill`
gjoseph92 May 24, 2022
4d61b38
reduce log spewing in nanny
gjoseph92 May 24, 2022
31db02a
add `raises_with_causes`
gjoseph92 Jun 8, 2022
de4daaf
use start_unsafe and mp.Value
gjoseph92 Jun 8, 2022
babfd00
remove raises_with_causes
gjoseph92 Jun 8, 2022
302d59e
Note `Server.start` should not be overridden
gjoseph92 Jun 8, 2022
00e277e
remove logs capture
gjoseph92 Jun 8, 2022
ee343aa
Merge remote-tracking branch 'upstream/main' into nanny-close-proc-on…
gjoseph92 Jun 8, 2022
bdb632a
Revert "remove logs capture"
gjoseph92 Jun 8, 2022
27e29b9
on_exit -> on_worker_exit
gjoseph92 Jun 8, 2022
cd1e235
no `asyncio.to_thread`
gjoseph92 Jun 10, 2022
fa50856
remove startup_attempts shared value
gjoseph92 Jun 10, 2022
8ed0062
Merge remote-tracking branch 'upstream/main' into nanny-close-proc-on…
gjoseph92 Jun 10, 2022
11aaca9
Nanny: kill subprocess if terminate fails
gjoseph92 Jun 10, 2022
68ec8df
fix test
gjoseph92 Jun 10, 2022
8decd1a
don't terminate, just kill
gjoseph92 Jun 10, 2022
bb9a35b
clarify await terminate
gjoseph92 Jun 10, 2022
8dc579e
it's slow
gjoseph92 Jun 10, 2022
6dd8215
brokenworker doesn't need init
gjoseph92 Jun 10, 2022
9e8e538
poll mp Event instead of waiting in thread
gjoseph92 Jun 11, 2022
3dbebad
Merge branch 'main' into nanny-close-proc-on-start-failure
gjoseph92 Jul 14, 2022
c976317
Merge branch 'main' into nanny-close-proc-on-start-failure
hendrikmakait Aug 1, 2022
1d8b9e4
use background tasks for _on_worker_exit_sync
hendrikmakait Aug 1, 2022
28c91a5
Create Event using mp_context
hendrikmakait Aug 1, 2022
88abaf2
Merge branch 'main' into nanny-close-proc-on-start-failure
hendrikmakait Aug 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from contextlib import suppress
from enum import Enum
from functools import partial
from typing import Callable, ClassVar, TypedDict, TypeVar
from typing import Callable, ClassVar, TypedDict, TypeVar, final

import tblib
from tlz import merge
Expand Down Expand Up @@ -309,6 +309,7 @@ async def start_unsafe(self):
await self.rpc.start()
return self

@final
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driveby: added so nobody in the future can try to override this

async def start(self):
async with self._startup_lock:
if self.status == Status.failed:
Expand Down
60 changes: 35 additions & 25 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ async def instantiate(self) -> Status:
self.process = WorkerProcess(
worker_kwargs=worker_kwargs,
silence_logs=self.silence_logs,
on_exit=self._on_exit_sync,
on_exit=self._on_worker_exit_sync,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driveby: renamed for clarity

worker=self.Worker,
env=self.env,
config=self.config,
Expand Down Expand Up @@ -497,16 +497,17 @@ def is_alive(self):
def run(self, comm, *args, **kwargs):
return run(self, comm, *args, **kwargs)

def _on_exit_sync(self, exitcode):
self.loop.add_callback(self._on_exit, exitcode)
def _on_worker_exit_sync(self, exitcode):
self.loop.add_callback(self._on_worker_exit, exitcode)

@log_errors
async def _on_exit(self, exitcode):
async def _on_worker_exit(self, exitcode):
if self.status not in (
Status.init,
Status.closing,
Status.closed,
Status.closing_gracefully,
Status.failed,
):
try:
await self._unregister()
Expand All @@ -521,6 +522,7 @@ async def _on_exit(self, exitcode):
Status.closing,
Status.closed,
Status.closing_gracefully,
Status.failed,
):
logger.warning("Restarting worker")
await self.instantiate()
Expand Down Expand Up @@ -581,7 +583,7 @@ async def close(self, timeout=5):
if self.process is not None:
await self.kill(timeout=timeout)
except Exception:
pass
logger.exception("Error in Nanny killing Worker subprocess")
self.process = None
await self.rpc.close()
self.status = Status.closed
Expand Down Expand Up @@ -666,15 +668,15 @@ async def start(self) -> Status:
await self.process.start()
except OSError:
logger.exception("Nanny failed to start process", exc_info=True)
self.process.terminate()
# NOTE: doesn't wait for process to terminate, just for terminate signal to be sent
await self.process.terminate()
self.status = Status.failed
return self.status
try:
msg = await self._wait_until_connected(uid)
except Exception:
logger.exception("Failed to connect to process")
# NOTE: doesn't wait for process to terminate, just for terminate signal to be sent
await self.process.terminate()
self.status = Status.failed
self.process.terminate()
raise
if not msg:
return self.status
Expand Down Expand Up @@ -735,7 +737,12 @@ def mark_stopped(self):
async def kill(self, timeout: float = 2, executor_wait: bool = True):
"""
Ensure the worker process is stopped, waiting at most
*timeout* seconds before terminating it abruptly.
``timeout * 0.8`` seconds before killing it abruptly.

When `kill` returns, the worker process has been joined.

If the worker process does not terminate within ``timeout`` seconds,
even after being killed, `asyncio.TimeoutError` is raised.
"""
deadline = time() + timeout

Expand All @@ -744,32 +751,38 @@ async def kill(self, timeout: float = 2, executor_wait: bool = True):
if self.status == Status.stopping:
await self.stopped.wait()
return
assert self.status in (Status.starting, Status.running)
assert self.status in (
Status.starting,
Status.running,
Status.failed, # process failed to start, but hasn't been joined yet
), self.status
self.status = Status.stopping
logger.info("Nanny asking worker to close")

process = self.process
assert self.process
wait_timeout = timeout * 0.8
self.child_stop_q.put(
{
"op": "stop",
"timeout": max(0, deadline - time()) * 0.8,
"timeout": wait_timeout,
"executor_wait": executor_wait,
}
)
await asyncio.sleep(0) # otherwise we get broken pipe errors
self.child_stop_q.close()

while process.is_alive() and time() < deadline:
await asyncio.sleep(0.05)
try:
await process.join(wait_timeout)
return
except asyncio.TimeoutError:
pass

if process.is_alive():
logger.warning(
f"Worker process still alive after {timeout} seconds, killing"
)
try:
await process.terminate()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change: we used to first call Worker.close (via child_stop_q.put({"op": "stop"})), then send SIGTERM to the process if it didn't stop in time, then just return to the caller as soon as the terminate signal was sent, regardless of whether the process actually stopped.

Now, we still do Worker.close, but then send SIGKILL (which unlike SIGTERM, can't be blocked) because the Worker.close can already be considered our SIGTERM-like request for graceful close. We wait for the process to actually terminate.

except Exception as e:
logger.error("Failed to kill worker process: %s", e)
logger.warning(
f"Worker process still alive after {wait_timeout} seconds, killing"
)
await process.kill()
await process.join(max(0, deadline - time()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little wary of this deadline on the join. I could imagine the default 2s timeout not being long enough for the process to actually shut down in CI. kill didn't used to raise an error if the timeout expired; now it will. I could make it not, but I think it's really the caller's responsibility to decide what to do if the process doesn't shut down in time.

I think we should probably make the default timeout a bit longer.


async def _wait_until_connected(self, uid):
while True:
Expand All @@ -787,9 +800,6 @@ async def _wait_until_connected(self, uid):
continue

if "exception" in msg:
logger.error(
"Failed while trying to start worker process: %s", msg["exception"]
)
raise msg["exception"]
else:
return msg
Expand Down
14 changes: 4 additions & 10 deletions distributed/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import dask

from distributed.utils import TimeoutError, mp_context
from distributed.utils import mp_context

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -297,15 +297,9 @@ async def join(self, timeout=None):
assert self._state.pid is not None, "can only join a started process"
if self._state.exitcode is not None:
return
if timeout is None:
await self._exit_future
else:
try:
# Shield otherwise the timeout cancels the future and our
# on_exit callback will try to set a result on a canceled future
await asyncio.wait_for(asyncio.shield(self._exit_future), timeout)
except TimeoutError:
pass
# Shield otherwise the timeout cancels the future and our
# on_exit callback will try to set a result on a canceled future
await asyncio.wait_for(asyncio.shield(self._exit_future), timeout)

def close(self):
"""
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_asyncprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ async def test_simple():
assert proc.exitcode is None

t1 = time()
await proc.join(timeout=0.02)
with pytest.raises(asyncio.TimeoutError):
await proc.join(timeout=0.02)
dt = time() - t1
assert 0.2 >= dt >= 0.001
assert proc.is_alive()
Expand Down
71 changes: 60 additions & 11 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import psutil
import pytest

from distributed.diagnostics.plugin import WorkerPlugin

pytestmark = pytest.mark.gpu

from tlz import first, valmap
Expand All @@ -20,7 +22,7 @@
from dask.utils import tmpfile

from distributed import Nanny, Scheduler, Worker, profile, rpc, wait, worker
from distributed.compatibility import LINUX, WINDOWS
from distributed.compatibility import LINUX, WINDOWS, to_thread
from distributed.core import CommClosedError, Status
from distributed.diagnostics import SchedulerPlugin
from distributed.metrics import time
Expand Down Expand Up @@ -472,21 +474,32 @@ async def test_nanny_closed_by_keyboard_interrupt(ucx_loop, protocol):
assert "remove-worker" in str(s.events)


class StartException(Exception):
pass


class BrokenWorker(worker.Worker):
async def start(self):
raise StartException("broken")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

async def start_unsafe(self):
raise ValueError("broken")


@gen_cluster(nthreads=[])
async def test_worker_start_exception(s):
# make sure this raises the right Exception:
with raises_with_cause(RuntimeError, None, StartException, None):
async with Nanny(s.address, worker_class=BrokenWorker) as n:
pass
nanny = Nanny(s.address, worker_class=BrokenWorker)
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
with captured_logger(logger="distributed.nanny", level=logging.WARNING) as logs:
with raises_with_cause(
RuntimeError,
"Nanny failed to start",
RuntimeError,
"BrokenWorker failed to start",
):
async with nanny:
pass
assert nanny.status == Status.failed
# ^ NOTE: `Nanny.close` sets it to `closed`, then `Server.start._close_on_failure` sets it to `failed`
assert nanny.process is None
assert "Restarting worker" not in logs.getvalue()
# Avoid excessive spewing. (It's also printed once extra within the subprocess, which is okay.)
assert logs.getvalue().count("ValueError: broken") == 1, logs.getvalue()
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved


@gen_cluster(nthreads=[])
Expand Down Expand Up @@ -569,6 +582,42 @@ async def test_restart_memory(c, s, n):
await asyncio.sleep(0.1)


class BlockClose(WorkerPlugin):
def __init__(self, close_happened) -> None:
self.close_happened = close_happened

async def teardown(self, worker):
# Never let the worker cleanly shut down, so it has to be killed
self.close_happened.set()
while True:
await asyncio.sleep(10)


@pytest.mark.slow
@gen_cluster(nthreads=[])
async def test_close_joins(s):
close_happened = mp.Event()

nanny = Nanny(s.address, plugins=[BlockClose(close_happened)])
async with nanny:
p = nanny.process
assert p
close_t = asyncio.create_task(nanny.close())
await to_thread(close_happened.wait)

assert not close_t.done()
assert nanny.status == Status.closing
assert nanny.process and nanny.process.status == Status.stopping

await close_t

assert nanny.status == Status.closed
assert not nanny.process

assert p.status == Status.stopped
assert not p.process


@gen_cluster(Worker=Nanny, nthreads=[("", 1)])
async def test_scheduler_crash_doesnt_restart(s, a):
# Simulate a scheduler crash by disconnecting it first
Expand Down