diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4649dc205d8..369fd070668 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2899,7 +2899,7 @@ def is_rootish(self, ts: TaskState) -> bool: return ( len(tg) > self.total_nthreads * 2 and len(tg.dependencies) < 5 - and sum(map(len, tg.dependencies)) < 5 + and sum(map(len, tg.dependencies)) < max(5, len(tg) * 0.01) ) def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index d0703361d39..dccdfcd8d93 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4477,3 +4477,18 @@ async def test_scatter_creates_ts(c, s, a, b): await a.close() assert await x2 == 2 assert s.tasks["x"].run_spec is not None + + +@gen_cluster(client=True) +async def test_rootish_for_many_tasks(c, s, a, b): + def f(x, y=None): + pass + + base = c.map(inc, range(10)) + derived = c.map(f, range(2000), y=base) + + while derived[0].key not in s.tasks: + await asyncio.sleep(0.01) + + ts = s.tasks[derived[0].key] + assert s.is_rootish(s.tasks[derived[0].key])