From 2e93b4363ac99be845250fa7f641592f042a7c9f Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 26 Mar 2021 14:47:12 -0400 Subject: [PATCH] Add protection against repeated use of one worker in a quiet cluster ref dask/distributed#4637 --- distributed/scheduler.py | 8 +++++++- distributed/tests/test_scheduler.py | 8 ++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 190bc7c30c9..9600ef99a7c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2130,7 +2130,13 @@ def decide_worker(self, ts: TaskState) -> WorkerState: worker_pool = self._idle or self._workers worker_pool_dv = cast(dict, worker_pool) n_workers: Py_ssize_t = len(worker_pool_dv) - if n_workers < 20: # smart but linear in small case + # if all occupancies in worker pool of size less than 20 + # sum to under 0.1 of 1ms; go to the else branch (a round + # robin) because the cluster is considered quiet. + if ( + n_workers < 20 + and sum(w.occupancy for w in worker_pool.values()) > 1.0e-04 + ): # smart but linear in small case ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) else: # dumb but fast in large case ws = worker_pool.values()[self._n_tasks % n_workers] diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 14cc084f5dc..ae595dc7705 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1690,6 +1690,14 @@ async def test_result_type(c, s, a, b): assert "int" in s.tasks[x.key].type +@gen_cluster(client=True) +async def test_round_robin_dd(c, s, a, b): + await c.submit(inc, 1) + await c.submit(inc, 2) + await c.submit(inc, 3) + assert a.log and b.log + + @gen_cluster() async def test_close_workers(s, a, b): await s.close(close_workers=True)