Skip to content

Commit

Permalink
Add protection against repeated use of one worker in a quiet cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis committed Mar 29, 2021
1 parent 2c89ac3 commit 2e93b43
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
8 changes: 7 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2130,7 +2130,13 @@ def decide_worker(self, ts: TaskState) -> WorkerState:
worker_pool = self._idle or self._workers
worker_pool_dv = cast(dict, worker_pool)
n_workers: Py_ssize_t = len(worker_pool_dv)
if n_workers < 20: # smart but linear in small case
# if all occupancies in worker pool of size less than 20
# sum to under 0.1 of 1ms; go to the else branch (a round
# robin) because the cluster is considered quiet.
if (
n_workers < 20
and sum(w.occupancy for w in worker_pool.values()) > 1.0e-04
): # smart but linear in small case
ws = min(worker_pool.values(), key=operator.attrgetter("occupancy"))
else: # dumb but fast in large case
ws = worker_pool.values()[self._n_tasks % n_workers]
Expand Down
8 changes: 8 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,14 @@ async def test_result_type(c, s, a, b):
assert "int" in s.tasks[x.key].type


@gen_cluster(client=True)
async def test_round_robin_dd(c, s, a, b):
await c.submit(inc, 1)
await c.submit(inc, 2)
await c.submit(inc, 3)
assert a.log and b.log


@gen_cluster()
async def test_close_workers(s, a, b):
await s.close(close_workers=True)
Expand Down

0 comments on commit 2e93b43

Please sign in to comment.