From 263eea0382d93cab527cac5b77ce3fdf46be910f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 26 Apr 2022 14:22:42 +0200 Subject: [PATCH 01/35] Add tests for termination of worker CLI script --- distributed/cli/dask_worker.py | 2 +- distributed/cli/tests/test_dask_worker.py | 43 +++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 376f2a1c62b..597bec169b2 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -475,7 +475,7 @@ async def run(): try: loop.run_sync(run) - except TimeoutError: + except (TimeoutError, asyncio.TimeoutError): # We already log the exception in nanny / worker. Don't do it again. if not signal_fired: logger.info("Timed out starting worker") diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 23080909e1c..996d4abf7a3 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -2,6 +2,7 @@ import asyncio import os +import signal import sys from multiprocessing import cpu_count from time import sleep @@ -682,3 +683,45 @@ def dask_setup(worker): await c.wait_for_workers(1) [foo] = (await c.run(lambda dask_worker: dask_worker.foo)).values() assert foo == "setup" + + +@pytest.mark.slow +@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +def test_timeout(nanny): + with popen( + ["dask-worker", "192.168.1.100:7777", nanny, "--death-timeout=1"], + flush_output=False, + ) as worker: + logs = [worker.stdout.readline().decode().lower() for _ in range(15)] + + assert any("timed out starting worker" in log for log in logs) + assert any("end worker" in log for log in logs) + assert worker.returncode != 0 + + +@pytest.mark.slow +@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +@gen_cluster(client=True, nthreads=[]) +async def test_sigint(c, s, nanny): + with popen(["dask-worker", s.address, nanny], flush_output=False) as worker: + await c.wait_for_workers(2) + worker.send_signal(signal.SIGINT) + logs = [worker.stdout.readline().decode().lower() for _ in range(25)] + assert not any("timed out" in log for log in logs) + assert not any(f"signal {signal.SIGINT}" in log for log in logs) + assert any("end worker" in log for log in logs) + assert worker.returncode != 0 + + +@pytest.mark.slow +@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +@gen_cluster(client=True, nthreads=[]) +async def test_sigterm(c, s, nanny): + with popen(["dask-worker", s.address, nanny], flush_output=False) as worker: + await c.wait_for_workers(1) + worker.send_signal(signal.SIGTERM) + logs = [worker.stdout.readline().decode().lower() for _ in range(25)] + assert not any("timed out" in log for log in logs) + assert any(f"signal {signal.SIGTERM}" in log for log in logs) + assert any("end worker" in log for log in logs) + assert worker.returncode != 0 From 7c592704859fa6bc128916cd0a9d6c2264b17a74 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 26 Apr 2022 14:26:06 +0200 Subject: [PATCH 02/35] Fix num of workers --- distributed/cli/tests/test_dask_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 996d4abf7a3..9eb5ce5eea2 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -704,7 +704,7 @@ def test_timeout(nanny): @gen_cluster(client=True, nthreads=[]) async def test_sigint(c, s, nanny): with popen(["dask-worker", s.address, nanny], flush_output=False) as worker: - await c.wait_for_workers(2) + await c.wait_for_workers(1) worker.send_signal(signal.SIGINT) logs = [worker.stdout.readline().decode().lower() for _ in range(25)] assert not any("timed out" in log for log in logs) From d196838ac5180f5a23c1db6a0627f051011a1f38 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 26 Apr 2022 18:42:45 +0200 Subject: [PATCH 03/35] Gracefully close workers regardless of whether or not they're nannies or raw workers --- distributed/cli/dask_worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 597bec169b2..23cd4c5c669 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -455,8 +455,7 @@ def del_pid_file(): async def close_all(): # Unregister all workers from scheduler - if nanny: - await asyncio.gather(*(n.close(timeout=2) for n in nannies)) + await asyncio.gather(*(n.close(timeout=2) for n in nannies)) signal_fired = False From 529569527cb80e131c9619c44155d1df7756a784 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 27 Apr 2022 14:22:24 +0200 Subject: [PATCH 04/35] Rework tests for proper log checking without double close on stdout --- distributed/cli/tests/test_dask_worker.py | 74 +++++++++++++++++------ 1 file changed, 54 insertions(+), 20 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 9eb5ce5eea2..bd88bac649b 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -3,6 +3,7 @@ import asyncio import os import signal +import subprocess import sys from multiprocessing import cpu_count from time import sleep @@ -594,7 +595,7 @@ def test_worker_timeout(no_nanny): if no_nanny: args.append("--no-nanny") result = runner.invoke(main, args) - assert result.exit_code != 0 + assert result.exit_code == 1 def test_bokeh_deprecation(): @@ -688,40 +689,73 @@ def dask_setup(worker): @pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_timeout(nanny): - with popen( + worker = subprocess.run( ["dask-worker", "192.168.1.100:7777", nanny, "--death-timeout=1"], - flush_output=False, - ) as worker: - logs = [worker.stdout.readline().decode().lower() for _ in range(15)] + text=True, + encoding="utf8", + capture_output=True, + ) - assert any("timed out starting worker" in log for log in logs) - assert any("end worker" in log for log in logs) - assert worker.returncode != 0 + assert "timed out starting worker" in worker.stderr.lower() + assert "end worker" in worker.stderr.lower() + assert worker.returncode == 1 @pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @gen_cluster(client=True, nthreads=[]) async def test_sigint(c, s, nanny): - with popen(["dask-worker", s.address, nanny], flush_output=False) as worker: + try: + worker = subprocess.Popen( + ["dask-worker", s.address, nanny], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) await c.wait_for_workers(1) worker.send_signal(signal.SIGINT) - logs = [worker.stdout.readline().decode().lower() for _ in range(25)] - assert not any("timed out" in log for log in logs) - assert not any(f"signal {signal.SIGINT}" in log for log in logs) - assert any("end worker" in log for log in logs) - assert worker.returncode != 0 + stdout, stderr = worker.communicate() + logs = stdout.decode().lower() + assert stderr is None + if nanny == "--nanny": + assert "closing nanny" in logs + else: + assert "nanny" not in logs + assert "stopping worker" in logs + assert "end worker" in logs + assert "timed out" not in logs + assert f"signal {signal.SIGINT}" not in logs + assert "error" not in logs + assert "exception" not in logs + assert worker.returncode == 0 + finally: + worker.kill() @pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @gen_cluster(client=True, nthreads=[]) async def test_sigterm(c, s, nanny): - with popen(["dask-worker", s.address, nanny], flush_output=False) as worker: + try: + worker = subprocess.Popen( + ["dask-worker", s.address, nanny], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) await c.wait_for_workers(1) worker.send_signal(signal.SIGTERM) - logs = [worker.stdout.readline().decode().lower() for _ in range(25)] - assert not any("timed out" in log for log in logs) - assert any(f"signal {signal.SIGTERM}" in log for log in logs) - assert any("end worker" in log for log in logs) - assert worker.returncode != 0 + stdout, stderr = worker.communicate() + logs = stdout.decode().lower() + assert stderr is None + assert f"signal {signal.SIGTERM}" in logs + if nanny == "--nanny": + assert "closing nanny" in logs + else: + assert "nanny" not in logs + assert "stopping worker" in logs + assert "end worker" in logs + assert "timed out" not in logs + assert "error" not in logs + assert "exception" not in logs + assert worker.returncode == 0 + finally: + worker.kill() From a2d530f712f3ed8fd8c6c0c4528e037ea82b64b2 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 27 Apr 2022 16:27:57 +0200 Subject: [PATCH 05/35] Enable tmate --- .github/workflows/tests.yaml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 41d97885498..03707dc01eb 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -121,6 +121,7 @@ jobs: pytest distributed \ -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ + -k "test_sigint" \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \ @@ -138,9 +139,9 @@ jobs: python continuous_integration/scripts/parse_stdout.py < reports/stdout > reports/pytest.xml fi - # - name: Debug with tmate on failure - # if: ${{ failure() }} - # uses: mxschmitt/action-tmate@v3 + - name: Debug with tmate on failure + if: ${{ failure() }} + uses: mxschmitt/action-tmate@v3 - name: Coverage uses: codecov/codecov-action@v1 From f54ad9d5a5d3684a89042cef0fafe5424bf1b6f0 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 27 Apr 2022 16:36:32 +0200 Subject: [PATCH 06/35] Revert "Enable tmate" This reverts commit 526b2b6acd860dddf63cd7e6ddc8fb0476f3bd79. --- .github/workflows/tests.yaml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 03707dc01eb..41d97885498 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -121,7 +121,6 @@ jobs: pytest distributed \ -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ - -k "test_sigint" \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \ @@ -139,9 +138,9 @@ jobs: python continuous_integration/scripts/parse_stdout.py < reports/stdout > reports/pytest.xml fi - - name: Debug with tmate on failure - if: ${{ failure() }} - uses: mxschmitt/action-tmate@v3 + # - name: Debug with tmate on failure + # if: ${{ failure() }} + # uses: mxschmitt/action-tmate@v3 - name: Coverage uses: codecov/codecov-action@v1 From b8ca756fd0cafdc3e6d315236b192e1a74df34f2 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 27 Apr 2022 17:40:23 +0200 Subject: [PATCH 07/35] Use different signals for windows --- distributed/cli/tests/test_dask_worker.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index bd88bac649b..98b38cd8e2e 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -15,7 +15,7 @@ from distributed import Client from distributed.cli.dask_worker import _apportion_ports, main -from distributed.compatibility import LINUX, to_thread +from distributed.compatibility import LINUX, WINDOWS, to_thread from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time from distributed.utils_test import gen_cluster, popen, requires_ipv6 @@ -706,13 +706,20 @@ def test_timeout(nanny): @gen_cluster(client=True, nthreads=[]) async def test_sigint(c, s, nanny): try: + kwargs = {} + if WINDOWS: + # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT + kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + worker = subprocess.Popen( ["dask-worker", s.address, nanny], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + **kwargs, ) await c.wait_for_workers(1) - worker.send_signal(signal.SIGINT) + sig = signal.CTRL_C_EVENT if WINDOWS else signal.SIGINT + worker.send_signal(sig) stdout, stderr = worker.communicate() logs = stdout.decode().lower() assert stderr is None @@ -736,17 +743,24 @@ async def test_sigint(c, s, nanny): @gen_cluster(client=True, nthreads=[]) async def test_sigterm(c, s, nanny): try: + kwargs = {} + if WINDOWS: + # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT + kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP worker = subprocess.Popen( ["dask-worker", s.address, nanny], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + **kwargs, ) await c.wait_for_workers(1) - worker.send_signal(signal.SIGTERM) + sig = signal.CTRL_C_EVENT if WINDOWS else signal.SIGINT + + worker.send_signal(sig) stdout, stderr = worker.communicate() logs = stdout.decode().lower() assert stderr is None - assert f"signal {signal.SIGTERM}" in logs + assert f"signal {sig}" in logs if nanny == "--nanny": assert "closing nanny" in logs else: From 959aa7f556109f2e08ef671afe79c3e1dad7594e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Apr 2022 11:57:03 +0200 Subject: [PATCH 08/35] Use different tests for POSIX/Windows --- distributed/cli/tests/test_dask_worker.py | 56 ++++++++++++++++------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 98b38cd8e2e..912c786ce74 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -702,24 +702,18 @@ def test_timeout(nanny): @pytest.mark.slow +@pytest.mark.skipif(WINDOWS, reason="POSIX only") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @gen_cluster(client=True, nthreads=[]) async def test_sigint(c, s, nanny): try: - kwargs = {} - if WINDOWS: - # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT - kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP - worker = subprocess.Popen( ["dask-worker", s.address, nanny], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - **kwargs, ) await c.wait_for_workers(1) - sig = signal.CTRL_C_EVENT if WINDOWS else signal.SIGINT - worker.send_signal(sig) + worker.send_signal(signal.SIGINT) stdout, stderr = worker.communicate() logs = stdout.decode().lower() assert stderr is None @@ -730,7 +724,7 @@ async def test_sigint(c, s, nanny): assert "stopping worker" in logs assert "end worker" in logs assert "timed out" not in logs - assert f"signal {signal.SIGINT}" not in logs + assert "signal" not in logs assert "error" not in logs assert "exception" not in logs assert worker.returncode == 0 @@ -739,34 +733,64 @@ async def test_sigint(c, s, nanny): @pytest.mark.slow +@pytest.mark.skipif(WINDOWS, reason="POSIX only") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @gen_cluster(client=True, nthreads=[]) async def test_sigterm(c, s, nanny): try: kwargs = {} - if WINDOWS: - # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT - kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP worker = subprocess.Popen( ["dask-worker", s.address, nanny], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - **kwargs, ) await c.wait_for_workers(1) - sig = signal.CTRL_C_EVENT if WINDOWS else signal.SIGINT - worker.send_signal(sig) + worker.send_signal(signal.SIGTERM) + stdout, stderr = worker.communicate() + logs = stdout.decode().lower() + assert stderr is None + assert f"signal {signal.SIGTERM}" in logs + if nanny == "--nanny": + assert "closing nanny" in logs + else: + assert "nanny" not in logs + assert "stopping worker" in logs + assert "end worker" in logs + assert "timed out" not in logs + assert "error" not in logs + assert "exception" not in logs + assert worker.returncode == 0 + finally: + worker.kill() + + +@pytest.mark.slow +@pytest.mark.skipif(not WINDOWS, reason="Windows only") +@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +@gen_cluster(client=True, nthreads=[]) +async def test_ctrl_c_event(c, s, nanny): + try: + worker = subprocess.Popen( + ["dask-worker", s.address, nanny], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, + ) + await c.wait_for_workers(1) + + worker.send_signal(signal.CTRL_C_EVENT) stdout, stderr = worker.communicate() logs = stdout.decode().lower() assert stderr is None - assert f"signal {sig}" in logs if nanny == "--nanny": assert "closing nanny" in logs else: assert "nanny" not in logs assert "stopping worker" in logs assert "end worker" in logs + assert "signal" not in logs assert "timed out" not in logs assert "error" not in logs assert "exception" not in logs From 8fe7960a3e9ebc6ba522a443dc4683ce9d60dc1a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Apr 2022 12:55:18 +0200 Subject: [PATCH 09/35] Add CTRL_BREAK_EVENT test --- distributed/cli/tests/test_dask_worker.py | 34 +++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 912c786ce74..d4ae38cf47f 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -797,3 +797,37 @@ async def test_ctrl_c_event(c, s, nanny): assert worker.returncode == 0 finally: worker.kill() + + +@pytest.mark.slow +@pytest.mark.skipif(not WINDOWS, reason="Windows only") +@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +@gen_cluster(client=True, nthreads=[]) +async def test_ctrl_break_event(c, s, nanny): + try: + worker = subprocess.Popen( + ["dask-worker", s.address, nanny], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, + ) + await c.wait_for_workers(1) + + worker.send_signal(signal.CTRL_BREAK_EVENT) + stdout, stderr = worker.communicate() + logs = stdout.decode().lower() + assert stderr is None + if nanny == "--nanny": + assert "closing nanny" in logs + else: + assert "nanny" not in logs + assert "stopping worker" in logs + assert "end worker" in logs + assert "signal" not in logs + assert "timed out" not in logs + assert "error" not in logs + assert "exception" not in logs + assert worker.returncode == 0 + finally: + worker.kill() From 09cc46cca6eb39d66f6235c35b1f23a19d796cf2 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Apr 2022 13:55:59 +0200 Subject: [PATCH 10/35] Remove CTRL_C_EVENT test --- distributed/cli/tests/test_dask_worker.py | 34 ----------------------- 1 file changed, 34 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index d4ae38cf47f..1d426a92a00 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -765,40 +765,6 @@ async def test_sigterm(c, s, nanny): worker.kill() -@pytest.mark.slow -@pytest.mark.skipif(not WINDOWS, reason="Windows only") -@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -@gen_cluster(client=True, nthreads=[]) -async def test_ctrl_c_event(c, s, nanny): - try: - worker = subprocess.Popen( - ["dask-worker", s.address, nanny], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT - creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, - ) - await c.wait_for_workers(1) - - worker.send_signal(signal.CTRL_C_EVENT) - stdout, stderr = worker.communicate() - logs = stdout.decode().lower() - assert stderr is None - if nanny == "--nanny": - assert "closing nanny" in logs - else: - assert "nanny" not in logs - assert "stopping worker" in logs - assert "end worker" in logs - assert "signal" not in logs - assert "timed out" not in logs - assert "error" not in logs - assert "exception" not in logs - assert worker.returncode == 0 - finally: - worker.kill() - - @pytest.mark.slow @pytest.mark.skipif(not WINDOWS, reason="Windows only") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) From 55aaaa724d340a2d25d5fc11cfcbf50f517667d7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Apr 2022 15:22:25 +0200 Subject: [PATCH 11/35] Remove run_sync from dask_worker and add prelim. alternative form of install_signal_handlers --- distributed/cli/dask_worker.py | 74 +++++++++++++++++----------------- distributed/cli/utils.py | 25 +++++++++++- 2 files changed, 61 insertions(+), 38 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 23cd4c5c669..74f8f246546 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -20,7 +20,7 @@ from dask.system import CPU_COUNT from distributed import Nanny -from distributed.cli.utils import install_signal_handlers +from distributed.cli.utils import install_signal_handlers2 as install_signal_handlers from distributed.comm import get_address_host_port from distributed.deploy.utils import nprocesses_nthreads from distributed.preloading import validate_preload_argv @@ -404,8 +404,6 @@ def del_pid_file(): else: resources = None - loop = IOLoop.current() - worker_class = import_term(worker_class) port_kwargs = _apportion_ports(worker_port, nanny_port, n_workers, nanny) @@ -432,48 +430,50 @@ def del_pid_file(): with suppress(TypeError, ValueError): name = int(name) - nannies = [ - t( - scheduler, - scheduler_file=scheduler_file, - nthreads=nthreads, - loop=loop, - resources=resources, - security=sec, - contact_address=contact_address, - host=host, - dashboard=dashboard, - dashboard_address=dashboard_address, - name=name - if n_workers == 1 or name is None or name == "" - else str(name) + "-" + str(i), - **kwargs, - **port_kwargs_i, - ) - for i, port_kwargs_i in enumerate(port_kwargs) - ] + signal_fired = False + + async def run(): + loop = IOLoop.current() + + nannies = [ + t( + scheduler, + scheduler_file=scheduler_file, + nthreads=nthreads, + loop=loop, + resources=resources, + security=sec, + contact_address=contact_address, + host=host, + dashboard=dashboard, + dashboard_address=dashboard_address, + name=name + if n_workers == 1 or name is None or name == "" + else str(name) + "-" + str(i), + **kwargs, + **port_kwargs_i, + ) + for i, port_kwargs_i in enumerate(port_kwargs) + ] - async def close_all(): - # Unregister all workers from scheduler - await asyncio.gather(*(n.close(timeout=2) for n in nannies)) + async def close_all(): + # Unregister all workers from scheduler + await asyncio.gather(*(n.close(timeout=2) for n in nannies)) - signal_fired = False + def on_signal(signum): + nonlocal signal_fired + signal_fired = True + if signum != signal.SIGINT: + logger.info("Exiting on signal %d", signum) + return asyncio.ensure_future(close_all()) - def on_signal(signum): - nonlocal signal_fired - signal_fired = True - if signum != signal.SIGINT: - logger.info("Exiting on signal %d", signum) - return asyncio.ensure_future(close_all()) + install_signal_handlers(loop, cleanup=on_signal) - async def run(): await asyncio.gather(*nannies) await asyncio.gather(*(n.finished() for n in nannies)) - install_signal_handlers(loop, cleanup=on_signal) - try: - loop.run_sync(run) + asyncio.run(run()) except (TimeoutError, asyncio.TimeoutError): # We already log the exception in nanny / worker. Don't do it again. if not signal_fired: diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index a0191edaa90..e3da410b723 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -20,8 +20,31 @@ async def cleanup_and_stop(): await cleanup(sig) finally: loop.stop() + loop.add_callback_from_signal(lambda: cleanup(sig)) - loop.add_callback_from_signal(cleanup_and_stop) + # Restore old signal handler to allow for a quicker exit + # if the user sends the signal again. + signal.signal(sig, old_handlers[sig]) + + for sig in [signal.SIGINT, signal.SIGTERM]: + old_handlers[sig] = signal.signal(sig, handle_signal) + + +def install_signal_handlers2(loop=None, cleanup=None): + """ + Install global signal handlers to halt the Tornado IOLoop in case of + a SIGINT or SIGTERM. *cleanup* is an optional callback called, + before the loop stops, with a single signal number argument. + """ + import signal + + loop = loop or IOLoop.current() + + old_handlers = {} + + def handle_signal(sig, frame): + if cleanup is not None: + loop.add_callback_from_signal(lambda: cleanup(sig)) # Restore old signal handler to allow for a quicker exit # if the user sends the signal again. signal.signal(sig, old_handlers[sig]) From ef5a1d0aa3cef9f89553d28a459930bc10855110 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Apr 2022 15:33:22 +0200 Subject: [PATCH 12/35] Minor --- distributed/cli/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index e3da410b723..d70da8387f0 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -20,7 +20,7 @@ async def cleanup_and_stop(): await cleanup(sig) finally: loop.stop() - loop.add_callback_from_signal(lambda: cleanup(sig)) + loop.add_callback_from_signal(cleanup_and_stop) # Restore old signal handler to allow for a quicker exit # if the user sends the signal again. From c5fc3df45aa11919545979bba413c9f532780fde Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 28 Apr 2022 15:34:40 +0200 Subject: [PATCH 13/35] Minor --- distributed/cli/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index d70da8387f0..ce94bc0601a 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -20,7 +20,8 @@ async def cleanup_and_stop(): await cleanup(sig) finally: loop.stop() - loop.add_callback_from_signal(cleanup_and_stop) + + loop.add_callback_from_signal(cleanup_and_stop) # Restore old signal handler to allow for a quicker exit # if the user sends the signal again. From d1a9225920d10b2ff5f42aab547c3a4db710a4b4 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 29 Apr 2022 09:48:02 +0200 Subject: [PATCH 14/35] Improve structure of graceful shutdown in dask_worker --- distributed/cli/dask_worker.py | 27 ++++++++-------- distributed/cli/tests/test_dask_worker.py | 39 +++-------------------- distributed/cli/utils.py | 26 ++++++++++++++- 3 files changed, 43 insertions(+), 49 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 74f8f246546..b27c3808a39 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -5,7 +5,6 @@ import gc import logging import os -import signal import sys import warnings from collections.abc import Iterator @@ -20,7 +19,7 @@ from dask.system import CPU_COUNT from distributed import Nanny -from distributed.cli.utils import install_signal_handlers2 as install_signal_handlers +from distributed.cli.utils import wait_for_signal from distributed.comm import get_address_host_port from distributed.deploy.utils import nprocesses_nthreads from distributed.preloading import validate_preload_argv @@ -456,21 +455,23 @@ async def run(): for i, port_kwargs_i in enumerate(port_kwargs) ] - async def close_all(): - # Unregister all workers from scheduler - await asyncio.gather(*(n.close(timeout=2) for n in nannies)) + async def wait_until_nannies_finish(): + await asyncio.gather(*(n.finished() for n in nannies)) - def on_signal(signum): - nonlocal signal_fired - signal_fired = True - if signum != signal.SIGINT: - logger.info("Exiting on signal %d", signum) - return asyncio.ensure_future(close_all()) + async def wait_and_finish(): + wait_for_signal_task = asyncio.create_task(wait_for_signal()) + wait_for_nannies_task = asyncio.create_task(wait_until_nannies_finish()) + _, pending = await asyncio.wait( + [wait_for_signal_task, wait_for_nannies_task], + return_when=asyncio.FIRST_COMPLETED, + ) - install_signal_handlers(loop, cleanup=on_signal) + if wait_for_nannies_task in pending: + # Unregister all workers from scheduler + await asyncio.gather(*(n.close(timeout=2) for n in nannies)) await asyncio.gather(*nannies) - await asyncio.gather(*(n.finished() for n in nannies)) + await wait_and_finish() try: asyncio.run(run()) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 1d426a92a00..1d25252a28f 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -704,8 +704,9 @@ def test_timeout(nanny): @pytest.mark.slow @pytest.mark.skipif(WINDOWS, reason="POSIX only") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM]) @gen_cluster(client=True, nthreads=[]) -async def test_sigint(c, s, nanny): +async def test_signal_handling(c, s, nanny, sig): try: worker = subprocess.Popen( ["dask-worker", s.address, nanny], @@ -713,44 +714,12 @@ async def test_sigint(c, s, nanny): stderr=subprocess.STDOUT, ) await c.wait_for_workers(1) - worker.send_signal(signal.SIGINT) - stdout, stderr = worker.communicate() - logs = stdout.decode().lower() - assert stderr is None - if nanny == "--nanny": - assert "closing nanny" in logs - else: - assert "nanny" not in logs - assert "stopping worker" in logs - assert "end worker" in logs - assert "timed out" not in logs - assert "signal" not in logs - assert "error" not in logs - assert "exception" not in logs - assert worker.returncode == 0 - finally: - worker.kill() - - -@pytest.mark.slow -@pytest.mark.skipif(WINDOWS, reason="POSIX only") -@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -@gen_cluster(client=True, nthreads=[]) -async def test_sigterm(c, s, nanny): - try: - kwargs = {} - worker = subprocess.Popen( - ["dask-worker", s.address, nanny], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) - await c.wait_for_workers(1) - worker.send_signal(signal.SIGTERM) + worker.send_signal(sig) stdout, stderr = worker.communicate() logs = stdout.decode().lower() assert stderr is None - assert f"signal {signal.SIGTERM}" in logs + assert f"signal {sig}" in logs if nanny == "--nanny": assert "closing nanny" in logs else: diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index ce94bc0601a..9da151bbcfe 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -1,5 +1,11 @@ +import asyncio +import logging +import signal + from tornado.ioloop import IOLoop +logger = logging.getLogger(__name__) + def install_signal_handlers(loop=None, cleanup=None): """ @@ -22,7 +28,6 @@ async def cleanup_and_stop(): loop.stop() loop.add_callback_from_signal(cleanup_and_stop) - # Restore old signal handler to allow for a quicker exit # if the user sends the signal again. signal.signal(sig, old_handlers[sig]) @@ -52,3 +57,22 @@ def handle_signal(sig, frame): for sig in [signal.SIGINT, signal.SIGTERM]: old_handlers[sig] = signal.signal(sig, handle_signal) + + +async def wait_for_signal(): + loop = asyncio.get_running_loop() + event = asyncio.Event() + + old_handlers = {} + + def handle_signal(sig, frame): + # Restore old signal handler to allow for quicker exit + # if the user sends the signal again. + signal.signal(sig, old_handlers[sig]) + logger.info("Received signal %d", sig) + loop.call_soon_threadsafe(event.set) + + for sig in [signal.SIGINT, signal.SIGTERM]: + old_handlers[sig] = signal.signal(sig, handle_signal) + + await event.wait() From 2c77b26e01305cceb84a0ae38af35630dfb98134 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 29 Apr 2022 10:38:22 +0200 Subject: [PATCH 15/35] Implement graceful shutdown on scheduler while dropping run_sync --- distributed/cli/dask_scheduler.py | 54 ++++++++++++++++++------------ distributed/cli/dask_worker.py | 8 ++--- distributed/cli/utils.py | 55 +------------------------------ 3 files changed, 38 insertions(+), 79 deletions(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index c7a18517876..d419ba32657 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -1,3 +1,4 @@ +import asyncio import atexit import gc import logging @@ -10,7 +11,7 @@ from tornado.ioloop import IOLoop from distributed import Scheduler -from distributed.cli.utils import install_signal_handlers +from distributed.cli.utils import wait_for_signal from distributed.preloading import validate_preload_argv from distributed.proctitle import ( enable_proctitle_on_children, @@ -183,33 +184,44 @@ def del_pid_file(): limit = max(soft, hard // 2) resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard)) - loop = IOLoop.current() - logger.info("-" * 47) + address = None - scheduler = Scheduler( - loop=loop, - security=sec, - host=host, - port=port, - dashboard=dashboard, - dashboard_address=dashboard_address, - http_prefix=dashboard_prefix, - **kwargs, - ) - logger.info("-" * 47) + async def run(): + nonlocal address + loop = IOLoop.current() + logger.info("-" * 47) + + scheduler = Scheduler( + loop=loop, + security=sec, + host=host, + port=port, + dashboard=dashboard, + dashboard_address=dashboard_address, + http_prefix=dashboard_prefix, + **kwargs, + ) + logger.info("-" * 47) - install_signal_handlers(loop) + async def wait_and_finish(): + wait_for_signal_task = asyncio.create_task(wait_for_signal()) + wait_for_scheduler_task = asyncio.create_task(scheduler.finished()) + _, pending = await asyncio.wait( + [wait_for_signal_task, wait_for_scheduler_task], + return_when=asyncio.FIRST_COMPLETED, + ) + + if wait_for_scheduler_task in pending: + await scheduler.close() - async def run(): await scheduler - await scheduler.finished() + address = scheduler.address + await wait_and_finish() try: - loop.run_sync(run) + asyncio.run(run()) finally: - scheduler.stop() - - logger.info("End scheduler at %r", scheduler.address) + logger.info("End scheduler at %r", address) if __name__ == "__main__": diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index b27c3808a39..501dc7a7f09 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -459,14 +459,16 @@ async def wait_until_nannies_finish(): await asyncio.gather(*(n.finished() for n in nannies)) async def wait_and_finish(): + nonlocal signal_fired wait_for_signal_task = asyncio.create_task(wait_for_signal()) wait_for_nannies_task = asyncio.create_task(wait_until_nannies_finish()) - _, pending = await asyncio.wait( + done, _ = await asyncio.wait( [wait_for_signal_task, wait_for_nannies_task], return_when=asyncio.FIRST_COMPLETED, ) - if wait_for_nannies_task in pending: + if wait_for_signal_task in done: + signal_fired = True # Unregister all workers from scheduler await asyncio.gather(*(n.close(timeout=2) for n in nannies)) @@ -480,8 +482,6 @@ async def wait_and_finish(): if not signal_fired: logger.info("Timed out starting worker") sys.exit(1) - except KeyboardInterrupt: # pragma: no cover - pass finally: logger.info("End worker") diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index 9da151bbcfe..712e5922376 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -2,64 +2,11 @@ import logging import signal -from tornado.ioloop import IOLoop - logger = logging.getLogger(__name__) -def install_signal_handlers(loop=None, cleanup=None): - """ - Install global signal handlers to halt the Tornado IOLoop in case of - a SIGINT or SIGTERM. *cleanup* is an optional callback called, - before the loop stops, with a single signal number argument. - """ - import signal - - loop = loop or IOLoop.current() - - old_handlers = {} - - def handle_signal(sig, frame): - async def cleanup_and_stop(): - try: - if cleanup is not None: - await cleanup(sig) - finally: - loop.stop() - - loop.add_callback_from_signal(cleanup_and_stop) - # Restore old signal handler to allow for a quicker exit - # if the user sends the signal again. - signal.signal(sig, old_handlers[sig]) - - for sig in [signal.SIGINT, signal.SIGTERM]: - old_handlers[sig] = signal.signal(sig, handle_signal) - - -def install_signal_handlers2(loop=None, cleanup=None): - """ - Install global signal handlers to halt the Tornado IOLoop in case of - a SIGINT or SIGTERM. *cleanup* is an optional callback called, - before the loop stops, with a single signal number argument. - """ - import signal - - loop = loop or IOLoop.current() - - old_handlers = {} - - def handle_signal(sig, frame): - if cleanup is not None: - loop.add_callback_from_signal(lambda: cleanup(sig)) - # Restore old signal handler to allow for a quicker exit - # if the user sends the signal again. - signal.signal(sig, old_handlers[sig]) - - for sig in [signal.SIGINT, signal.SIGTERM]: - old_handlers[sig] = signal.signal(sig, handle_signal) - - async def wait_for_signal(): + """Wait for SIGINT or SIGTERM by setting global signal handlers""" loop = asyncio.get_running_loop() event = asyncio.Event() From 53f5fb28c3cc9285ef8583c39fd4a89bb4e55dbe Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 29 Apr 2022 11:35:00 +0200 Subject: [PATCH 16/35] Pass desired signals into wait_for_signals --- distributed/cli/dask_scheduler.py | 7 +++++-- distributed/cli/dask_worker.py | 7 +++++-- distributed/cli/utils.py | 15 ++++++++------- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index d419ba32657..e805011a1f7 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -4,6 +4,7 @@ import logging import os import re +import signal import sys import warnings @@ -11,7 +12,7 @@ from tornado.ioloop import IOLoop from distributed import Scheduler -from distributed.cli.utils import wait_for_signal +from distributed.cli.utils import wait_for_signals from distributed.preloading import validate_preload_argv from distributed.proctitle import ( enable_proctitle_on_children, @@ -204,7 +205,9 @@ async def run(): logger.info("-" * 47) async def wait_and_finish(): - wait_for_signal_task = asyncio.create_task(wait_for_signal()) + wait_for_signal_task = asyncio.create_task( + wait_for_signals([signal.SIGINT, signal.SIGTERM]) + ) wait_for_scheduler_task = asyncio.create_task(scheduler.finished()) _, pending = await asyncio.wait( [wait_for_signal_task, wait_for_scheduler_task], diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 501dc7a7f09..2350a3a3649 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -5,6 +5,7 @@ import gc import logging import os +import signal import sys import warnings from collections.abc import Iterator @@ -19,7 +20,7 @@ from dask.system import CPU_COUNT from distributed import Nanny -from distributed.cli.utils import wait_for_signal +from distributed.cli.utils import wait_for_signals from distributed.comm import get_address_host_port from distributed.deploy.utils import nprocesses_nthreads from distributed.preloading import validate_preload_argv @@ -460,7 +461,9 @@ async def wait_until_nannies_finish(): async def wait_and_finish(): nonlocal signal_fired - wait_for_signal_task = asyncio.create_task(wait_for_signal()) + wait_for_signal_task = asyncio.create_task( + wait_for_signals([signal.SIGINT, signal.SIGTERM]) + ) wait_for_nannies_task = asyncio.create_task(wait_until_nannies_finish()) done, _ = await asyncio.wait( [wait_for_signal_task, wait_for_nannies_task], diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index 712e5922376..0c73e09c363 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -1,25 +1,26 @@ import asyncio import logging import signal +from typing import Any logger = logging.getLogger(__name__) -async def wait_for_signal(): - """Wait for SIGINT or SIGTERM by setting global signal handlers""" +async def wait_for_signals(signals: list[signal.Signals]): + """Wait for the passed signals by setting global signal handlers""" loop = asyncio.get_running_loop() event = asyncio.Event() - old_handlers = {} + old_handlers: dict[int, Any] = {} - def handle_signal(sig, frame): + def handle_signal(signum, frame): # Restore old signal handler to allow for quicker exit # if the user sends the signal again. - signal.signal(sig, old_handlers[sig]) - logger.info("Received signal %d", sig) + signal.signal(signum, old_handlers[signum]) + logger.info("Received signal %d", signum) loop.call_soon_threadsafe(event.set) - for sig in [signal.SIGINT, signal.SIGTERM]: + for sig in signals: old_handlers[sig] = signal.signal(sig, handle_signal) await event.wait() From 05b94a9d2e61bcdc174671fde51df8a3cadde4aa Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 29 Apr 2022 12:16:24 +0200 Subject: [PATCH 17/35] Fix typing in 3.8 --- distributed/cli/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index 0c73e09c363..e4e97aa3283 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import logging import signal From 5301af0a31802888a63f42a19b3a08822b2fa406 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 29 Apr 2022 12:35:04 +0200 Subject: [PATCH 18/35] Improve function naming and documentation --- distributed/cli/dask_scheduler.py | 17 +++++++++++------ distributed/cli/dask_worker.py | 15 ++++++++++----- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index e805011a1f7..8dbaa5ffcdd 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -204,22 +204,27 @@ async def run(): ) logger.info("-" * 47) - async def wait_and_finish(): + async def wait_until_shutdown(): + """Wait until the scheduler finishes or the script receives a SIGINT or SIGTERM + which triggers a graceful shutdown of the scheduler + """ wait_for_signal_task = asyncio.create_task( wait_for_signals([signal.SIGINT, signal.SIGTERM]) ) - wait_for_scheduler_task = asyncio.create_task(scheduler.finished()) - _, pending = await asyncio.wait( - [wait_for_signal_task, wait_for_scheduler_task], + wait_for_scheduler_to_finish_task = asyncio.create_task( + scheduler.finished() + ) + done, _ = await asyncio.wait( + [wait_for_signal_task, wait_for_scheduler_to_finish_task], return_when=asyncio.FIRST_COMPLETED, ) - if wait_for_scheduler_task in pending: + if wait_for_signal_task in done: await scheduler.close() await scheduler address = scheduler.address - await wait_and_finish() + await wait_until_shutdown() try: asyncio.run(run()) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 2350a3a3649..fb258bca9d6 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -456,17 +456,22 @@ async def run(): for i, port_kwargs_i in enumerate(port_kwargs) ] - async def wait_until_nannies_finish(): + async def wait_for_nannies_to_finish(): await asyncio.gather(*(n.finished() for n in nannies)) - async def wait_and_finish(): + async def wait_until_shutdown(): + """Wait until all nannies finished or the script receives a SIGINT or SIGTERM + which triggers a graceful shutdown of the worker + """ nonlocal signal_fired wait_for_signal_task = asyncio.create_task( wait_for_signals([signal.SIGINT, signal.SIGTERM]) ) - wait_for_nannies_task = asyncio.create_task(wait_until_nannies_finish()) + wait_for_nannies_to_finish_task = asyncio.create_task( + wait_for_nannies_to_finish() + ) done, _ = await asyncio.wait( - [wait_for_signal_task, wait_for_nannies_task], + [wait_for_signal_task, wait_for_nannies_to_finish_task], return_when=asyncio.FIRST_COMPLETED, ) @@ -476,7 +481,7 @@ async def wait_and_finish(): await asyncio.gather(*(n.close(timeout=2) for n in nannies)) await asyncio.gather(*nannies) - await wait_and_finish() + await wait_until_shutdown() try: asyncio.run(run()) From 6891abfaf5c136e0cd99a13312cfd9290ac1151d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 29 Apr 2022 12:56:30 +0200 Subject: [PATCH 19/35] Add tests for termination of scheduler CLI script --- distributed/cli/tests/test_dask_scheduler.py | 32 ++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index d73905401ff..5a0b5331489 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -5,6 +5,8 @@ import os import shutil +import signal +import subprocess import sys import tempfile from time import sleep @@ -17,7 +19,7 @@ import distributed import distributed.cli.dask_scheduler from distributed import Client, Scheduler -from distributed.compatibility import LINUX +from distributed.compatibility import LINUX, WINDOWS from distributed.metrics import time from distributed.utils import get_ip, get_ip_interface from distributed.utils_test import ( @@ -414,9 +416,12 @@ def test_version_option(): def test_idle_timeout(loop): start = time() runner = CliRunner() - runner.invoke(distributed.cli.dask_scheduler.main, ["--idle-timeout", "1s"]) + result = runner.invoke( + distributed.cli.dask_scheduler.main, ["--idle-timeout", "1s"] + ) stop = time() assert 1 < stop - start < 10 + assert result.exit_code == 0 def test_multiple_workers_2(loop): @@ -453,3 +458,26 @@ def test_multiple_workers(loop): while len(c.nthreads()) < 2: sleep(0.1) assert time() < start + 10 + + +@pytest.mark.slow +@pytest.mark.skipif(WINDOWS, reason="POSIX only") +@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM]) +def test_signal_handling(sig): + try: + scheduler = subprocess.Popen( + ["dask-scheduler"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + sleep(1) + scheduler.send_signal(sig) + stdout, stderr = scheduler.communicate() + logs = stdout.decode().lower() + assert stderr is None + assert f"signal {sig}" in logs + assert "scheduler closing" in logs + assert "end scheduler" in logs + assert scheduler.returncode == 0 + finally: + scheduler.kill() From bf10cab8bee4e5ab10577aaebd4cca852fb4f239 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 29 Apr 2022 14:15:21 +0200 Subject: [PATCH 20/35] Improve scheduler test by removing sleep --- distributed/cli/tests/test_dask_scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 5a0b5331489..8b2d6734f2b 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -463,14 +463,16 @@ def test_multiple_workers(loop): @pytest.mark.slow @pytest.mark.skipif(WINDOWS, reason="POSIX only") @pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM]) -def test_signal_handling(sig): +def test_signal_handling(loop, sig): try: scheduler = subprocess.Popen( ["dask-scheduler"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) - sleep(1) + # Wait for scheduler to start + with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop): + pass scheduler.send_signal(sig) stdout, stderr = scheduler.communicate() logs = stdout.decode().lower() From b322e35dd9440212b5bf18d26c2bf5438e73f874 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 9 May 2022 16:50:13 +0200 Subject: [PATCH 21/35] Adjust logging --- distributed/cli/dask_scheduler.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 8dbaa5ffcdd..773794a1690 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -185,10 +185,7 @@ def del_pid_file(): limit = max(soft, hard // 2) resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard)) - address = None - async def run(): - nonlocal address loop = IOLoop.current() logger.info("-" * 47) @@ -223,13 +220,14 @@ async def wait_until_shutdown(): await scheduler.close() await scheduler - address = scheduler.address await wait_until_shutdown() + logger.info("Stopped scheduler at %r", scheduler.address) + try: asyncio.run(run()) finally: - logger.info("End scheduler at %r", address) + logger.info("End scheduler") if __name__ == "__main__": From ef6070f5c66bcce3b42c02f6891b382b7324adec Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 9 May 2022 17:41:05 +0200 Subject: [PATCH 22/35] Linting --- distributed/cli/dask_scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 773794a1690..641b5f503b0 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -223,7 +223,6 @@ async def wait_until_shutdown(): await wait_until_shutdown() logger.info("Stopped scheduler at %r", scheduler.address) - try: asyncio.run(run()) finally: From 11fbefce331460922a15a6b8ca8f5332a7f6919a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 10:54:25 +0200 Subject: [PATCH 23/35] Fix reaping of stray processes by not closing workers without nannies (same behavior as before) this PR --- distributed/cli/dask_worker.py | 5 +++-- distributed/cli/tests/test_dask_worker.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index fb258bca9d6..d99a5e08074 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -477,8 +477,9 @@ async def wait_until_shutdown(): if wait_for_signal_task in done: signal_fired = True - # Unregister all workers from scheduler - await asyncio.gather(*(n.close(timeout=2) for n in nannies)) + if nanny: + # Unregister all workers from scheduler + await asyncio.gather(*(n.close(timeout=2) for n in nannies)) await asyncio.gather(*nannies) await wait_until_shutdown() diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 1d25252a28f..ed1ed778d3a 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -722,9 +722,9 @@ async def test_signal_handling(c, s, nanny, sig): assert f"signal {sig}" in logs if nanny == "--nanny": assert "closing nanny" in logs + assert "stopping worker" in logs else: assert "nanny" not in logs - assert "stopping worker" in logs assert "end worker" in logs assert "timed out" not in logs assert "error" not in logs From 1f0c80e0345856e865b8b30534df207dfba87197 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 11:50:45 +0200 Subject: [PATCH 24/35] Remove Windows CTRL EVENT test due to difficulties making it work --- distributed/cli/tests/test_dask_worker.py | 34 ----------------------- 1 file changed, 34 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index ed1ed778d3a..b1546aa3bc9 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -732,37 +732,3 @@ async def test_signal_handling(c, s, nanny, sig): assert worker.returncode == 0 finally: worker.kill() - - -@pytest.mark.slow -@pytest.mark.skipif(not WINDOWS, reason="Windows only") -@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -@gen_cluster(client=True, nthreads=[]) -async def test_ctrl_break_event(c, s, nanny): - try: - worker = subprocess.Popen( - ["dask-worker", s.address, nanny], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT - creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, - ) - await c.wait_for_workers(1) - - worker.send_signal(signal.CTRL_BREAK_EVENT) - stdout, stderr = worker.communicate() - logs = stdout.decode().lower() - assert stderr is None - if nanny == "--nanny": - assert "closing nanny" in logs - else: - assert "nanny" not in logs - assert "stopping worker" in logs - assert "end worker" in logs - assert "signal" not in logs - assert "timed out" not in logs - assert "error" not in logs - assert "exception" not in logs - assert worker.returncode == 0 - finally: - worker.kill() From b32834512c77750c880c05a8312e493f0f9fb1ab Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 11:53:00 +0200 Subject: [PATCH 25/35] Increase timeout to avoid CI failures --- distributed/cli/dask_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index d99a5e08074..ab84e631090 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -479,7 +479,7 @@ async def wait_until_shutdown(): signal_fired = True if nanny: # Unregister all workers from scheduler - await asyncio.gather(*(n.close(timeout=2) for n in nannies)) + await asyncio.gather(*(n.close(timeout=10) for n in nannies)) await asyncio.gather(*nannies) await wait_until_shutdown() From 4c6feb79966c9465a8048543e0fcbf24eee85944 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 13:10:32 +0200 Subject: [PATCH 26/35] Retrigger CI to check for flake From ab8202bec8624011a5f1d8fcf9444078586733a8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 14:01:08 +0200 Subject: [PATCH 27/35] Move check for returncode to front --- distributed/cli/tests/test_dask_scheduler.py | 2 +- distributed/cli/tests/test_dask_worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 8b2d6734f2b..b0a8f143f71 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -477,9 +477,9 @@ def test_signal_handling(loop, sig): stdout, stderr = scheduler.communicate() logs = stdout.decode().lower() assert stderr is None + assert scheduler.returncode == 0 assert f"signal {sig}" in logs assert "scheduler closing" in logs assert "end scheduler" in logs - assert scheduler.returncode == 0 finally: scheduler.kill() diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index b1546aa3bc9..6fdc149768c 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -719,6 +719,7 @@ async def test_signal_handling(c, s, nanny, sig): stdout, stderr = worker.communicate() logs = stdout.decode().lower() assert stderr is None + assert worker.returncode == 0 assert f"signal {sig}" in logs if nanny == "--nanny": assert "closing nanny" in logs @@ -729,6 +730,5 @@ async def test_signal_handling(c, s, nanny, sig): assert "timed out" not in logs assert "error" not in logs assert "exception" not in logs - assert worker.returncode == 0 finally: worker.kill() From 68d4a145b5d1dae9c2a7e4e84778a08db2e519d1 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 15:19:36 +0200 Subject: [PATCH 28/35] Fix race-condition with signal handler --- distributed/cli/dask_worker.py | 41 +++++++++++------------ distributed/cli/tests/test_dask_worker.py | 14 +++----- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index ab84e631090..cdc7958ce1f 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -457,32 +457,31 @@ async def run(): ] async def wait_for_nannies_to_finish(): + """Wait for nannies to initialize and finish""" + await asyncio.gather(*nannies) await asyncio.gather(*(n.finished() for n in nannies)) - async def wait_until_shutdown(): - """Wait until all nannies finished or the script receives a SIGINT or SIGTERM - which triggers a graceful shutdown of the worker - """ + async def wait_for_signals_and_close(): + """Wait for SIGINT or SIGTERM and close nannies upon receiving one of those signals""" nonlocal signal_fired - wait_for_signal_task = asyncio.create_task( - wait_for_signals([signal.SIGINT, signal.SIGTERM]) - ) - wait_for_nannies_to_finish_task = asyncio.create_task( - wait_for_nannies_to_finish() - ) - done, _ = await asyncio.wait( - [wait_for_signal_task, wait_for_nannies_to_finish_task], - return_when=asyncio.FIRST_COMPLETED, - ) + await wait_for_signals([signal.SIGINT, signal.SIGTERM]) + + signal_fired = True + if nanny: + # Unregister all workers from scheduler + await asyncio.gather(*(n.close(timeout=10) for n in nannies)) - if wait_for_signal_task in done: - signal_fired = True - if nanny: - # Unregister all workers from scheduler - await asyncio.gather(*(n.close(timeout=10) for n in nannies)) + wait_for_signals_and_close_task = asyncio.create_task( + wait_for_signals_and_close() + ) + wait_for_nannies_to_finish_task = asyncio.create_task( + wait_for_nannies_to_finish() + ) - await asyncio.gather(*nannies) - await wait_until_shutdown() + await asyncio.wait( + [wait_for_signals_and_close_task, wait_for_nannies_to_finish_task], + return_when=asyncio.FIRST_COMPLETED, + ) try: asyncio.run(run()) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 6fdc149768c..547acf0e2b8 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -701,18 +701,16 @@ def test_timeout(nanny): assert worker.returncode == 1 -@pytest.mark.slow @pytest.mark.skipif(WINDOWS, reason="POSIX only") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM]) @gen_cluster(client=True, nthreads=[]) async def test_signal_handling(c, s, nanny, sig): - try: - worker = subprocess.Popen( - ["dask-worker", s.address, nanny], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) + with subprocess.Popen( + ["dask-worker", s.address, nanny], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as worker: await c.wait_for_workers(1) worker.send_signal(sig) @@ -730,5 +728,3 @@ async def test_signal_handling(c, s, nanny, sig): assert "timed out" not in logs assert "error" not in logs assert "exception" not in logs - finally: - worker.kill() From 8adb9af9e413f2422792d1d6328a514ac732d619 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 15:32:52 +0200 Subject: [PATCH 29/35] Fix race condition in scheduler as well and use context manager --- distributed/cli/dask_scheduler.py | 41 ++++++++++---------- distributed/cli/dask_worker.py | 4 +- distributed/cli/tests/test_dask_scheduler.py | 15 +++---- 3 files changed, 29 insertions(+), 31 deletions(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 641b5f503b0..94d82a44395 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -201,26 +201,27 @@ async def run(): ) logger.info("-" * 47) - async def wait_until_shutdown(): - """Wait until the scheduler finishes or the script receives a SIGINT or SIGTERM - which triggers a graceful shutdown of the scheduler - """ - wait_for_signal_task = asyncio.create_task( - wait_for_signals([signal.SIGINT, signal.SIGTERM]) - ) - wait_for_scheduler_to_finish_task = asyncio.create_task( - scheduler.finished() - ) - done, _ = await asyncio.wait( - [wait_for_signal_task, wait_for_scheduler_to_finish_task], - return_when=asyncio.FIRST_COMPLETED, - ) - - if wait_for_signal_task in done: - await scheduler.close() - - await scheduler - await wait_until_shutdown() + async def wait_for_scheduler_to_finish(): + """Wait for the scheduler to initialize and finish""" + await scheduler + await scheduler.finished() + + async def wait_for_signals_and_close(): + """Wait for SIGINT or SIGTERM and close the scheduler upon receiving one of those signals""" + await wait_for_signals([signal.SIGINT, signal.SIGTERM]) + await scheduler.close() + + wait_for_signals_and_close_task = asyncio.create_task( + wait_for_signals_and_close() + ) + wait_for_scheduler_to_finish_task = asyncio.create_task( + wait_for_scheduler_to_finish() + ) + + await asyncio.wait( + [wait_for_signals_and_close_task, wait_for_scheduler_to_finish_task], + return_when=asyncio.FIRST_COMPLETED, + ) logger.info("Stopped scheduler at %r", scheduler.address) try: diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index cdc7958ce1f..c24ec7ad97b 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -457,12 +457,12 @@ async def run(): ] async def wait_for_nannies_to_finish(): - """Wait for nannies to initialize and finish""" + """Wait for all nannies to initialize and finish""" await asyncio.gather(*nannies) await asyncio.gather(*(n.finished() for n in nannies)) async def wait_for_signals_and_close(): - """Wait for SIGINT or SIGTERM and close nannies upon receiving one of those signals""" + """Wait for SIGINT or SIGTERM and close all nannies upon receiving one of those signals""" nonlocal signal_fired await wait_for_signals([signal.SIGINT, signal.SIGTERM]) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index b0a8f143f71..a7b21224b30 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -464,14 +464,13 @@ def test_multiple_workers(loop): @pytest.mark.skipif(WINDOWS, reason="POSIX only") @pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM]) def test_signal_handling(loop, sig): - try: - scheduler = subprocess.Popen( - ["dask-scheduler"], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) + with subprocess.Popen( + ["dask-scheduler"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as scheduler: # Wait for scheduler to start - with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop): + with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c: pass scheduler.send_signal(sig) stdout, stderr = scheduler.communicate() @@ -481,5 +480,3 @@ def test_signal_handling(loop, sig): assert f"signal {sig}" in logs assert "scheduler closing" in logs assert "end scheduler" in logs - finally: - scheduler.kill() From 622feb589def7921946b22ec47acd0f08349d529 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 15:40:32 +0200 Subject: [PATCH 30/35] Call scheduler and worker via python -m distributed.cli.dask_{scheduler|worker} --- distributed/cli/tests/test_dask_scheduler.py | 2 +- distributed/cli/tests/test_dask_worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index a7b21224b30..3ae417dbf71 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -465,7 +465,7 @@ def test_multiple_workers(loop): @pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM]) def test_signal_handling(loop, sig): with subprocess.Popen( - ["dask-scheduler"], + ["python", "-m", "distributed.cli.dask_scheduler"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) as scheduler: diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 547acf0e2b8..5dc4fb12206 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -707,7 +707,7 @@ def test_timeout(nanny): @gen_cluster(client=True, nthreads=[]) async def test_signal_handling(c, s, nanny, sig): with subprocess.Popen( - ["dask-worker", s.address, nanny], + ["python", "-m", "distributed.cli.dask_worker", s.address, nanny], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) as worker: From 159bc096b3d9206cd18ac391d8526c38015a4591 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 16:37:38 +0200 Subject: [PATCH 31/35] Fix timeout on startup --- distributed/cli/dask_worker.py | 4 +++- distributed/cli/tests/test_dask_worker.py | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index c24ec7ad97b..0b22db6ae3c 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -478,10 +478,12 @@ async def wait_for_signals_and_close(): wait_for_nannies_to_finish() ) - await asyncio.wait( + done, _ = await asyncio.wait( [wait_for_signals_and_close_task, wait_for_nannies_to_finish_task], return_when=asyncio.FIRST_COMPLETED, ) + # Re-raise exceptions from done tasks + [task.result() for task in done] try: asyncio.run(run()) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 5dc4fb12206..0e11ce0839b 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -690,7 +690,14 @@ def dask_setup(worker): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_timeout(nanny): worker = subprocess.run( - ["dask-worker", "192.168.1.100:7777", nanny, "--death-timeout=1"], + [ + "python", + "-m", + "distributed.cli.dask_worker", + "192.168.1.100:7777", + nanny, + "--death-timeout=1", + ], text=True, encoding="utf8", capture_output=True, From ae3e254f791a9ae435143d6c800a489503fccae0 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 11 May 2022 21:14:32 +0200 Subject: [PATCH 32/35] Retrigger CI to check for flake From 17384a364ce48eb63dbd99b5780f0ab657c30a4d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 12 May 2022 17:00:31 +0200 Subject: [PATCH 33/35] Fix annotation Co-authored-by: Thomas Grainger --- distributed/cli/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index e4e97aa3283..083065c52db 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -async def wait_for_signals(signals: list[signal.Signals]): +async def wait_for_signals(signals: list[signal.Signals]) -> None: """Wait for the passed signals by setting global signal handlers""" loop = asyncio.get_running_loop() event = asyncio.Event() From aaa7321d5cf161da476fe13dabb6a0ff31b84e2d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 12 May 2022 18:04:46 +0200 Subject: [PATCH 34/35] Re-raise exception in `dask_scheduler.py` Co-authored-by: Thomas Grainger --- distributed/cli/dask_scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 94d82a44395..700765ba6fe 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -218,10 +218,12 @@ async def wait_for_signals_and_close(): wait_for_scheduler_to_finish() ) - await asyncio.wait( + done, _ = await asyncio.wait( [wait_for_signals_and_close_task, wait_for_scheduler_to_finish_task], return_when=asyncio.FIRST_COMPLETED, ) + # Re-raise exceptions from done tasks + [task.result() for task in done] logger.info("Stopped scheduler at %r", scheduler.address) try: From edc6893d5dcce357a2da13b791dae274cc60cae7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 17 May 2022 20:12:54 +0200 Subject: [PATCH 35/35] Code review feedback --- distributed/cli/tests/test_dask_scheduler.py | 2 +- distributed/cli/tests/test_dask_worker.py | 2 +- distributed/cli/utils.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 3ae417dbf71..1b01d1ad355 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -477,6 +477,6 @@ def test_signal_handling(loop, sig): logs = stdout.decode().lower() assert stderr is None assert scheduler.returncode == 0 - assert f"signal {sig}" in logs + assert sig.name.lower() in logs assert "scheduler closing" in logs assert "end scheduler" in logs diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 0e11ce0839b..a6ba6d45aa6 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -725,7 +725,7 @@ async def test_signal_handling(c, s, nanny, sig): logs = stdout.decode().lower() assert stderr is None assert worker.returncode == 0 - assert f"signal {sig}" in logs + assert sig.name.lower() in logs if nanny == "--nanny": assert "closing nanny" in logs assert "stopping worker" in logs diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index 083065c52db..f25b7245c41 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -19,7 +19,7 @@ def handle_signal(signum, frame): # Restore old signal handler to allow for quicker exit # if the user sends the signal again. signal.signal(signum, old_handlers[signum]) - logger.info("Received signal %d", signum) + logger.info("Received signal %s (%d)", signal.Signals(signum).name, signum) loop.call_soon_threadsafe(event.set) for sig in signals: