From b18081d539ef8986d725d27f3d5afe71f1e7f3d6 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 10 Mar 2023 16:55:49 +0000 Subject: [PATCH 1/2] Expose schedule idle via RPC and HTTP API --- distributed/http/scheduler/api.py | 17 ++++++++++ .../scheduler/tests/test_scheduler_http.py | 25 +++++++++++++++ distributed/scheduler.py | 32 ++++++++++--------- distributed/tests/test_scheduler.py | 17 ++++------ 4 files changed, 66 insertions(+), 25 deletions(-) diff --git a/distributed/http/scheduler/api.py b/distributed/http/scheduler/api.py index 0a710a58ff..1ee9339a18 100644 --- a/distributed/http/scheduler/api.py +++ b/distributed/http/scheduler/api.py @@ -63,9 +63,26 @@ def get(self): self.write(json.dumps({"Error": "Internal Server Error"})) +class CheckIdleHandler(RequestHandler): + def get(self): + self.set_header("Content-Type", "application/json") + scheduler = self.server + try: + idle_since = scheduler.check_idle() + response = { + "idle": idle_since is None, + "idle_since": idle_since, + } + self.write(json.dumps(response)) + except Exception as e: + self.set_status(500, str(e)) + self.write(json.dumps({"Error": "Internal Server Error"})) + + routes: list[tuple] = [ ("/api/v1", APIHandler, {}), ("/api/v1/retire_workers", RetireWorkersHandler, {}), ("/api/v1/get_workers", GetWorkersHandler, {}), ("/api/v1/adaptive_target", AdaptiveTargetHandler, {}), + ("/api/v1/check_idle", CheckIdleHandler, {}), ] diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index f766fae80d..f81fd7b842 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -481,3 +481,28 @@ async def test_adaptive_target(c, s, a, b): assert resp.headers["Content-Type"] == "application/json" num_workers = json.loads(await resp.text())["workers"] assert num_workers == 0 + + +@gen_cluster( + client=True, + clean_kwargs={"threads": False}, + config={ + "distributed.scheduler.http.routes": DEFAULT_ROUTES + + ["distributed.http.scheduler.api"] + }, +) +async def test_check_idle(c, s, a, b): + aiohttp = pytest.importorskip("aiohttp") + + async with aiohttp.ClientSession() as session: + async with session.get( + "http://localhost:%d/api/v1/check_idle" % s.http_server.port + ) as resp: + assert resp.status == 200 + assert resp.headers["Content-Type"] == "application/json" + response = json.loads(await resp.text()) + assert isinstance(response["idle"], bool) + assert ( + isinstance(response["idle_since"], int) + or response["idle_since"] is None + ) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 82e92ff7bb..b3aa9d349f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3643,6 +3643,7 @@ def __init__( "dump_cluster_state_to_url": self.dump_cluster_state_to_url, "benchmark_hardware": self.benchmark_hardware, "get_story": self.get_story, + "check_idle": self.check_idle, } connection_limit = get_fileno_limit() / 2 @@ -3675,9 +3676,8 @@ def __init__( pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl * 1000) self.periodic_callbacks["worker-ttl"] = pc - if self.idle_timeout: - pc = PeriodicCallback(self.check_idle, self.idle_timeout * 1000 / 4) - self.periodic_callbacks["idle-timeout"] = pc + pc = PeriodicCallback(self.check_idle, (self.idle_timeout or 1) * 1000 / 4) + self.periodic_callbacks["idle-timeout"] = pc if extensions is None: extensions = DEFAULT_EXTENSIONS.copy() @@ -7708,15 +7708,14 @@ async def check_worker_ttl(self): ) await self.remove_worker(address=ws.address, stimulus_id=stimulus_id) - def check_idle(self): - assert self.idle_timeout + def check_idle(self) -> int | None: if self.status in (Status.closing, Status.closed): - return + return None if self.transition_counter != self._idle_transition_counter: self._idle_transition_counter = self.transition_counter self.idle_since = None - return + return None if ( self.queued @@ -7724,18 +7723,21 @@ def check_idle(self): or any([ws.processing for ws in self.workers.values()]) ): self.idle_since = None - return + return None if not self.idle_since: self.idle_since = time() + return self.idle_since - if time() > self.idle_since + self.idle_timeout: - assert self.idle_since - logger.info( - "Scheduler closing after being idle for %s", - format_time(self.idle_timeout), - ) - self._ongoing_background_tasks.call_soon(self.close) + if self.idle_timeout: + if time() > self.idle_since + self.idle_timeout: + assert self.idle_since + logger.info( + "Scheduler closing after being idle for %s", + format_time(self.idle_timeout), + ) + self._ongoing_background_tasks.call_soon(self.close) + return None def adaptive_target(self, target_duration=None): """Desired number of workers based on the current workload diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 64f31178bd..f469b4beff 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2288,6 +2288,8 @@ async def test_idle_timeout(c, s, a, b): pc.start() await future assert s.idle_since is None or s.idle_since > beginning + _idle_since = s.check_idle() + assert _idle_since == s.idle_since with captured_logger("distributed.scheduler") as logs: start = time() @@ -2317,19 +2319,16 @@ async def test_idle_timeout_no_workers(c, s): while not s.tasks: await asyncio.sleep(0.1) - s.check_idle() - assert not s.idle_since + assert not s.check_idle() for _ in range(10): await asyncio.sleep(0.01) - s.check_idle() - assert not s.idle_since + assert not s.check_idle() assert s.tasks async with Worker(s.address): await future - s.check_idle() - assert not s.idle_since + assert not s.check_idle() del future while s.tasks: @@ -2337,11 +2336,9 @@ async def test_idle_timeout_no_workers(c, s): # We only set idleness once nothing happened between two consecutive # check_idle calls - s.check_idle() - assert not s.idle_since + assert not s.check_idle() - s.check_idle() - assert s.idle_since + assert s.check_idle() @gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"}) From 04be19075da87cfcf577bcd3cd574a58fd67171a Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 13 Mar 2023 13:27:35 +0000 Subject: [PATCH 2/2] Fix test --- distributed/tests/test_scheduler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f469b4beff..af1f66f52d 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2314,6 +2314,9 @@ async def test_idle_timeout(c, s, a, b): nthreads=[], ) async def test_idle_timeout_no_workers(c, s): + # Cancel the idle check periodic timeout so we can step through manually + s.periodic_callbacks["idle-timeout"].stop() + s.idle_timeout = 0.1 future = c.submit(inc, 1) while not s.tasks: