Skip to content

Commit

Permalink
Co-assignment when queuing is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed Jun 23, 2022
1 parent 988b0cf commit b86fe0f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
35 changes: 32 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,14 @@ class TaskGroup:
#: The result types of this TaskGroup
types: set[str]

#: The worker most recently assigned a task from this group, or None when the group
#: is not identified to be root-like by `SchedulerState.decide_worker`.
last_worker: WorkerState | None

#: If `last_worker` is not None, the number of times that worker should be assigned
#: subsequent tasks until a new worker is chosen.
last_worker_tasks_left: int

prefix: TaskPrefix | None
start: float
stop: float
Expand All @@ -823,6 +831,8 @@ def __init__(self, name: str):
self.start = 0.0
self.stop = 0.0
self.all_durations = defaultdict(float)
self.last_worker = None
self.last_worker_tasks_left = 0

def add_duration(self, action: str, start: float, stop: float) -> None:
duration = stop - start
Expand Down Expand Up @@ -1331,7 +1341,7 @@ def __init__(
self.unrunnable = unrunnable
self.validate = validate
self.workers = workers
self.running = {
self.running: set[WorkerState] = {
ws for ws in self.workers.values() if ws.status == Status.running
}
self.plugins = {} if not plugins else {_get_plugin_name(p): p for p in plugins}
Expand Down Expand Up @@ -1791,13 +1801,32 @@ def decide_worker(
and len(tg.dependencies) < 5
and sum(map(len, tg.dependencies)) < 5
):
if math.isinf(self.WORKER_SATURATION):
if math.isinf(self.WORKER_SATURATION): # no scheduler-side queuing
pool = self.idle.values() if self.idle else self.running
if not pool:
recommendations[ts.key] = "no-worker"
return None

return min(pool, key=lambda ws: len(ws.processing) / ws.nthreads)
lws = tg.last_worker
if not (
lws
and tg.last_worker_tasks_left
and self.workers.get(lws.address) is lws
):
# Last-used worker is full or unknown; pick a new worker for the next few tasks
ws = min(pool, key=partial(self.worker_objective, ts))
tg.last_worker_tasks_left = math.floor(
(len(tg) / self.total_nthreads) * ws.nthreads
)
else:
ws = lws

# Record `last_worker`, or clear it on the final task
tg.last_worker = (
ws if tg.states["released"] + tg.states["waiting"] > 1 else None
)
tg.last_worker_tasks_left -= 1
return ws

if not self.idle:
# All workers busy? Task gets/stays queued.
Expand Down
7 changes: 5 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c):
assert x.key in a.data or x.key in b.data


@pytest.mark.skip("Current queuing does not support co-assignment")
# @pytest.mark.skip("Current queuing does not support co-assignment")
@pytest.mark.parametrize("ndeps", [0, 1, 4])
@pytest.mark.parametrize(
"nthreads",
Expand All @@ -147,7 +147,10 @@ def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
@gen_cluster(
client=True,
nthreads=nthreads,
config={"distributed.scheduler.work-stealing": False},
config={
"distributed.scheduler.work-stealing": False,
"distributed.scheduler.worker-saturation": float("inf"),
},
scheduler_kwargs=dict( # TODO remove
dashboard=True,
dashboard_address=":8787",
Expand Down

0 comments on commit b86fe0f

Please sign in to comment.