Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all invocations of IOLoop.run_sync from CLI #6205

Merged
merged 36 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
263eea0
Add tests for termination of worker CLI script
hendrikmakait Apr 26, 2022
7c59270
Fix num of workers
hendrikmakait Apr 26, 2022
d196838
Gracefully close workers regardless of whether or not they're nannies…
hendrikmakait Apr 26, 2022
5295695
Rework tests for proper log checking without double close on stdout
hendrikmakait Apr 27, 2022
a2d530f
Enable tmate
hendrikmakait Apr 27, 2022
f54ad9d
Revert "Enable tmate"
hendrikmakait Apr 27, 2022
b8ca756
Use different signals for windows
hendrikmakait Apr 27, 2022
959aa7f
Use different tests for POSIX/Windows
hendrikmakait Apr 28, 2022
8fe7960
Add CTRL_BREAK_EVENT test
hendrikmakait Apr 28, 2022
09cc46c
Remove CTRL_C_EVENT test
hendrikmakait Apr 28, 2022
55aaaa7
Remove run_sync from dask_worker and add prelim. alternative form of …
hendrikmakait Apr 28, 2022
ef5a1d0
Minor
hendrikmakait Apr 28, 2022
c5fc3df
Minor
hendrikmakait Apr 28, 2022
d1a9225
Improve structure of graceful shutdown in dask_worker
hendrikmakait Apr 29, 2022
2c77b26
Implement graceful shutdown on scheduler while dropping run_sync
hendrikmakait Apr 29, 2022
53f5fb2
Pass desired signals into wait_for_signals
hendrikmakait Apr 29, 2022
05b94a9
Fix typing in 3.8
hendrikmakait Apr 29, 2022
5301af0
Improve function naming and documentation
hendrikmakait Apr 29, 2022
6891abf
Add tests for termination of scheduler CLI script
hendrikmakait Apr 29, 2022
bf10cab
Improve scheduler test by removing sleep
hendrikmakait Apr 29, 2022
b322e35
Adjust logging
hendrikmakait May 9, 2022
ef6070f
Linting
hendrikmakait May 9, 2022
1984423
Merge branch 'main' into remove-run_sync_from_cli
hendrikmakait May 11, 2022
11fbefc
Fix reaping of stray processes by not closing workers without nannies…
hendrikmakait May 11, 2022
1f0c80e
Remove Windows CTRL EVENT test due to difficulties making it work
hendrikmakait May 11, 2022
b328345
Increase timeout to avoid CI failures
hendrikmakait May 11, 2022
4c6feb7
Retrigger CI to check for flake
hendrikmakait May 11, 2022
ab8202b
Move check for returncode to front
hendrikmakait May 11, 2022
68d4a14
Fix race-condition with signal handler
hendrikmakait May 11, 2022
8adb9af
Fix race condition in scheduler as well and use context manager
hendrikmakait May 11, 2022
622feb5
Call scheduler and worker via python -m distributed.cli.dask_{schedul…
hendrikmakait May 11, 2022
159bc09
Fix timeout on startup
hendrikmakait May 11, 2022
ae3e254
Retrigger CI to check for flake
hendrikmakait May 11, 2022
17384a3
Fix annotation
hendrikmakait May 12, 2022
aaa7321
Re-raise exception in `dask_scheduler.py`
hendrikmakait May 12, 2022
edc6893
Code review feedback
hendrikmakait May 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import asyncio
import atexit
import gc
import logging
import os
import re
import signal
import sys
import warnings

import click
from tornado.ioloop import IOLoop

from distributed import Scheduler
from distributed.cli.utils import install_signal_handlers
from distributed.cli.utils import wait_for_signals
from distributed.preloading import validate_preload_argv
from distributed.proctitle import (
enable_proctitle_on_children,
Expand Down Expand Up @@ -183,33 +185,49 @@ def del_pid_file():
limit = max(soft, hard // 2)
resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard))

loop = IOLoop.current()
logger.info("-" * 47)
async def run():
loop = IOLoop.current()
logger.info("-" * 47)

scheduler = Scheduler(
loop=loop,
security=sec,
host=host,
port=port,
dashboard=dashboard,
dashboard_address=dashboard_address,
http_prefix=dashboard_prefix,
**kwargs,
)
logger.info("-" * 47)

scheduler = Scheduler(
loop=loop,
security=sec,
host=host,
port=port,
dashboard=dashboard,
dashboard_address=dashboard_address,
http_prefix=dashboard_prefix,
**kwargs,
)
logger.info("-" * 47)
async def wait_for_scheduler_to_finish():
"""Wait for the scheduler to initialize and finish"""
await scheduler
await scheduler.finished()

install_signal_handlers(loop)
async def wait_for_signals_and_close():
"""Wait for SIGINT or SIGTERM and close the scheduler upon receiving one of those signals"""
await wait_for_signals([signal.SIGINT, signal.SIGTERM])
await scheduler.close()

async def run():
await scheduler
await scheduler.finished()
wait_for_signals_and_close_task = asyncio.create_task(
wait_for_signals_and_close()
)
wait_for_scheduler_to_finish_task = asyncio.create_task(
wait_for_scheduler_to_finish()
)

await asyncio.wait(
[wait_for_signals_and_close_task, wait_for_scheduler_to_finish_task],
return_when=asyncio.FIRST_COMPLETED,
)
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Stopped scheduler at %r", scheduler.address)

try:
loop.run_sync(run)
asyncio.run(run())
finally:
scheduler.stop()

logger.info("End scheduler at %r", scheduler.address)
logger.info("End scheduler")


if __name__ == "__main__":
Expand Down
94 changes: 52 additions & 42 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dask.system import CPU_COUNT

from distributed import Nanny
from distributed.cli.utils import install_signal_handlers
from distributed.cli.utils import wait_for_signals
from distributed.comm import get_address_host_port
from distributed.deploy.utils import nprocesses_nthreads
from distributed.preloading import validate_preload_argv
Expand Down Expand Up @@ -404,8 +404,6 @@ def del_pid_file():
else:
resources = None

loop = IOLoop.current()

worker_class = import_term(worker_class)

port_kwargs = _apportion_ports(worker_port, nanny_port, n_workers, nanny)
Expand All @@ -432,56 +430,68 @@ def del_pid_file():
with suppress(TypeError, ValueError):
name = int(name)

nannies = [
t(
scheduler,
scheduler_file=scheduler_file,
nthreads=nthreads,
loop=loop,
resources=resources,
security=sec,
contact_address=contact_address,
host=host,
dashboard=dashboard,
dashboard_address=dashboard_address,
name=name
if n_workers == 1 or name is None or name == ""
else str(name) + "-" + str(i),
**kwargs,
**port_kwargs_i,
)
for i, port_kwargs_i in enumerate(port_kwargs)
]
signal_fired = False

async def close_all():
# Unregister all workers from scheduler
if nanny:
await asyncio.gather(*(n.close(timeout=2) for n in nannies))
async def run():
loop = IOLoop.current()

nannies = [
t(
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
scheduler,
scheduler_file=scheduler_file,
nthreads=nthreads,
loop=loop,
resources=resources,
security=sec,
contact_address=contact_address,
host=host,
dashboard=dashboard,
dashboard_address=dashboard_address,
name=name
if n_workers == 1 or name is None or name == ""
else str(name) + "-" + str(i),
**kwargs,
**port_kwargs_i,
)
for i, port_kwargs_i in enumerate(port_kwargs)
]

signal_fired = False
async def wait_for_nannies_to_finish():
"""Wait for all nannies to initialize and finish"""
await asyncio.gather(*nannies)
await asyncio.gather(*(n.finished() for n in nannies))

def on_signal(signum):
nonlocal signal_fired
signal_fired = True
if signum != signal.SIGINT:
logger.info("Exiting on signal %d", signum)
return asyncio.ensure_future(close_all())
async def wait_for_signals_and_close():
"""Wait for SIGINT or SIGTERM and close all nannies upon receiving one of those signals"""
nonlocal signal_fired
await wait_for_signals([signal.SIGINT, signal.SIGTERM])

async def run():
await asyncio.gather(*nannies)
await asyncio.gather(*(n.finished() for n in nannies))
signal_fired = True
if nanny:
# Unregister all workers from scheduler
await asyncio.gather(*(n.close(timeout=10) for n in nannies))

install_signal_handlers(loop, cleanup=on_signal)
wait_for_signals_and_close_task = asyncio.create_task(
wait_for_signals_and_close()
)
wait_for_nannies_to_finish_task = asyncio.create_task(
wait_for_nannies_to_finish()
)

done, _ = await asyncio.wait(
[wait_for_signals_and_close_task, wait_for_nannies_to_finish_task],
return_when=asyncio.FIRST_COMPLETED,
)
# Re-raise exceptions from done tasks
[task.result() for task in done]

try:
loop.run_sync(run)
except TimeoutError:
asyncio.run(run())
except (TimeoutError, asyncio.TimeoutError):
# We already log the exception in nanny / worker. Don't do it again.
if not signal_fired:
logger.info("Timed out starting worker")
sys.exit(1)
except KeyboardInterrupt: # pragma: no cover
pass
finally:
logger.info("End worker")

Expand Down
31 changes: 29 additions & 2 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import os
import shutil
import signal
import subprocess
import sys
import tempfile
from time import sleep
Expand All @@ -17,7 +19,7 @@
import distributed
import distributed.cli.dask_scheduler
from distributed import Client, Scheduler
from distributed.compatibility import LINUX
from distributed.compatibility import LINUX, WINDOWS
from distributed.metrics import time
from distributed.utils import get_ip, get_ip_interface
from distributed.utils_test import (
Expand Down Expand Up @@ -414,9 +416,12 @@ def test_version_option():
def test_idle_timeout(loop):
start = time()
runner = CliRunner()
runner.invoke(distributed.cli.dask_scheduler.main, ["--idle-timeout", "1s"])
result = runner.invoke(
distributed.cli.dask_scheduler.main, ["--idle-timeout", "1s"]
)
stop = time()
assert 1 < stop - start < 10
assert result.exit_code == 0


def test_multiple_workers_2(loop):
Expand Down Expand Up @@ -453,3 +458,25 @@ def test_multiple_workers(loop):
while len(c.nthreads()) < 2:
sleep(0.1)
assert time() < start + 10


@pytest.mark.slow
@pytest.mark.skipif(WINDOWS, reason="POSIX only")
@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM])
def test_signal_handling(loop, sig):
with subprocess.Popen(
["python", "-m", "distributed.cli.dask_scheduler"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as scheduler:
# Wait for scheduler to start
with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c:
pass
scheduler.send_signal(sig)
stdout, stderr = scheduler.communicate()
logs = stdout.decode().lower()
assert stderr is None
assert scheduler.returncode == 0
assert f"signal {sig}" in logs
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
assert "scheduler closing" in logs
assert "end scheduler" in logs
57 changes: 55 additions & 2 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import asyncio
import os
import signal
import subprocess
import sys
from multiprocessing import cpu_count
from time import sleep
Expand All @@ -13,7 +15,7 @@

from distributed import Client
from distributed.cli.dask_worker import _apportion_ports, main
from distributed.compatibility import LINUX, to_thread
from distributed.compatibility import LINUX, WINDOWS, to_thread
from distributed.deploy.utils import nprocesses_nthreads
from distributed.metrics import time
from distributed.utils_test import gen_cluster, popen, requires_ipv6
Expand Down Expand Up @@ -593,7 +595,7 @@ def test_worker_timeout(no_nanny):
if no_nanny:
args.append("--no-nanny")
result = runner.invoke(main, args)
assert result.exit_code != 0
assert result.exit_code == 1


def test_bokeh_deprecation():
Expand Down Expand Up @@ -682,3 +684,54 @@ def dask_setup(worker):
await c.wait_for_workers(1)
[foo] = (await c.run(lambda dask_worker: dask_worker.foo)).values()
assert foo == "setup"


@pytest.mark.slow
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
def test_timeout(nanny):
worker = subprocess.run(
[
"python",
"-m",
"distributed.cli.dask_worker",
"192.168.1.100:7777",
nanny,
"--death-timeout=1",
],
text=True,
encoding="utf8",
capture_output=True,
)

assert "timed out starting worker" in worker.stderr.lower()
assert "end worker" in worker.stderr.lower()
assert worker.returncode == 1


@pytest.mark.skipif(WINDOWS, reason="POSIX only")
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM])
@gen_cluster(client=True, nthreads=[])
async def test_signal_handling(c, s, nanny, sig):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this test is flaky, my first guess is that the 2 sec timeout on closing the nannies might lead to unreliable behavior on the CI VMs. Any suggestions on how to avoid this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still flakey?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran it locally a couple of times without an issue, retriggered CI to double-check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flakiness should be resolved, no issues on local and two CI runs.

with subprocess.Popen(
["python", "-m", "distributed.cli.dask_worker", s.address, nanny],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as worker:
await c.wait_for_workers(1)

worker.send_signal(sig)
stdout, stderr = worker.communicate()
logs = stdout.decode().lower()
assert stderr is None
assert worker.returncode == 0
assert f"signal {sig}" in logs
if nanny == "--nanny":
assert "closing nanny" in logs
assert "stopping worker" in logs
else:
assert "nanny" not in logs
assert "end worker" in logs
assert "timed out" not in logs
assert "error" not in logs
assert "exception" not in logs
Loading