diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index d89483472da..cd38f364c8f 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1783,7 +1783,7 @@ def __init__(self, scheduler, **kwargs): self.last = 0 self.source = ColumnDataSource( { - "time": [time() - 20, time()], + "time": [time() - 60, time()], "level": [0, 15], "color": ["white", "white"], "duration": [0, 0], @@ -1828,7 +1828,7 @@ def convert(self, msgs): """Convert a log message to a glyph""" total_duration = 0 for msg in msgs: - time, level, key, duration, sat, occ_sat, idl, occ_idl = msg + time, level, key, duration, sat, occ_sat, idl, occ_idl = msg[:8] total_duration += duration try: diff --git a/distributed/stealing.py b/distributed/stealing.py index 51cfd379a8c..f1539c886e6 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -237,18 +237,22 @@ def steal_time_ratio(self, ts: TaskState) -> tuple[float, int] | tuple[None, Non assert ts.processing_on ws = ts.processing_on compute_time = ws.processing[ts] - if compute_time < 0.005: # 5ms, just give up + + if not compute_time: + # occupancy/ws.proccessing[ts] is only allowed to be zero for + # long running tasks which cannot be stolen + assert ts in ws.long_running return None, None nbytes = ts.get_nbytes_deps() transfer_time = nbytes / self.scheduler.bandwidth + LATENCY cost_multiplier = transfer_time / compute_time - if cost_multiplier > 100: - return None, None level = int(round(log2(cost_multiplier) + 6)) if level < 1: level = 1 + elif level >= len(self.cost_multipliers): + return None, None return cost_multiplier, level diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 6f3874bc1b2..4a3751d6d91 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -10,6 +10,7 @@ from operator import mul from time import sleep +import numpy as np import pytest from tlz import sliding_window @@ -1350,3 +1351,28 @@ def test_steal_worker_state(ws_with_running_task): assert "x" not in ws.tasks assert "x" not in ws.data assert ws.available_resources == {"R": 1} + + +@pytest.mark.slow() +@gen_cluster(nthreads=[("", 1)] * 4, client=True) +async def test_steal_very_fast_tasks(c, s, *workers): + # Ensure that very fast tasks are allowed to be stolen + root = dask.delayed(lambda n: "x" * n)( + dask.utils.parse_bytes("1MiB"), dask_key_name="root" + ) + + @dask.delayed + def func(*args): + import time + + time.sleep(0.002) + + ntasks = 1000 + results = [func(root, i) for i in range(ntasks)] + futs = c.compute(results) + await c.gather(futs) + + ntasks_per_worker = np.array([len(w.data) for w in workers]) + ideal = ntasks / len(workers) + assert (ntasks_per_worker > ideal * 0.5).all(), (ideal, ntasks_per_worker) + assert (ntasks_per_worker < ideal * 1.5).all(), (ideal, ntasks_per_worker)