Skip to content

Commit

Permalink
Allow very fast keys and very expensive transfers as stealing candida…
Browse files Browse the repository at this point in the history
…tes (#7022)

Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Hendrik Makait <hendrik@coiled.io>
  • Loading branch information
3 people authored Sep 9, 2022
1 parent f02d2f9 commit 77aeecb
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
4 changes: 2 additions & 2 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1783,7 +1783,7 @@ def __init__(self, scheduler, **kwargs):
self.last = 0
self.source = ColumnDataSource(
{
"time": [time() - 20, time()],
"time": [time() - 60, time()],
"level": [0, 15],
"color": ["white", "white"],
"duration": [0, 0],
Expand Down Expand Up @@ -1828,7 +1828,7 @@ def convert(self, msgs):
"""Convert a log message to a glyph"""
total_duration = 0
for msg in msgs:
time, level, key, duration, sat, occ_sat, idl, occ_idl = msg
time, level, key, duration, sat, occ_sat, idl, occ_idl = msg[:8]
total_duration += duration

try:
Expand Down
10 changes: 7 additions & 3 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,18 +237,22 @@ def steal_time_ratio(self, ts: TaskState) -> tuple[float, int] | tuple[None, Non
assert ts.processing_on
ws = ts.processing_on
compute_time = ws.processing[ts]
if compute_time < 0.005: # 5ms, just give up

if not compute_time:
# occupancy/ws.proccessing[ts] is only allowed to be zero for
# long running tasks which cannot be stolen
assert ts in ws.long_running
return None, None

nbytes = ts.get_nbytes_deps()
transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
cost_multiplier = transfer_time / compute_time
if cost_multiplier > 100:
return None, None

level = int(round(log2(cost_multiplier) + 6))
if level < 1:
level = 1
elif level >= len(self.cost_multipliers):
return None, None

return cost_multiplier, level

Expand Down
26 changes: 26 additions & 0 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from operator import mul
from time import sleep

import numpy as np
import pytest
from tlz import sliding_window

Expand Down Expand Up @@ -1350,3 +1351,28 @@ def test_steal_worker_state(ws_with_running_task):
assert "x" not in ws.tasks
assert "x" not in ws.data
assert ws.available_resources == {"R": 1}


@pytest.mark.slow()
@gen_cluster(nthreads=[("", 1)] * 4, client=True)
async def test_steal_very_fast_tasks(c, s, *workers):
# Ensure that very fast tasks are allowed to be stolen
root = dask.delayed(lambda n: "x" * n)(
dask.utils.parse_bytes("1MiB"), dask_key_name="root"
)

@dask.delayed
def func(*args):
import time

time.sleep(0.002)

ntasks = 1000
results = [func(root, i) for i in range(ntasks)]
futs = c.compute(results)
await c.gather(futs)

ntasks_per_worker = np.array([len(w.data) for w in workers])
ideal = ntasks / len(workers)
assert (ntasks_per_worker > ideal * 0.5).all(), (ideal, ntasks_per_worker)
assert (ntasks_per_worker < ideal * 1.5).all(), (ideal, ntasks_per_worker)

0 comments on commit 77aeecb

Please sign in to comment.