Skip to content

Commit

Permalink
Save a few cycles
Browse files Browse the repository at this point in the history
Only compute `total_nthreads` when a new worker is needed, and only compute the number of dependencies once per task group. Overloads the meaning of `_last_worker` to indicate if we've decided in the past whether a TaskGroup is root-ish or not.
  • Loading branch information
gjoseph92 committed Jun 16, 2021
1 parent 0fbb75e commit f2da0bc
Showing 1 changed file with 63 additions and 43 deletions.
106 changes: 63 additions & 43 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7508,6 +7508,9 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState):
steal.put_key_in_stealable(ts)


NOT_ROOT_ISH = WorkerState()


@cfunc
@exceptval(check=False)
def decide_worker(
Expand Down Expand Up @@ -7537,54 +7540,67 @@ def decide_worker(
group: TaskGroup = ts._group
ws: WorkerState = group._last_worker

if valid_workers is not None:
total_nthreads = sum(wws._nthreads for wws in valid_workers)
group_tasks_per_worker: float
if ws is None or (ws is not NOT_ROOT_ISH and group._last_worker_tasks_left) == 0:
# Calculate the ratio of tasks in the task group to number of workers.
# We only need to do this computation when 1) seeing a task group for the first time,
# or 2) this is a root-ish task group, and we've just filled up the worker we were
# sending tasks to and need to pick a new one.
if valid_workers is not None:
total_nthreads = sum(wws._nthreads for wws in valid_workers)

group_tasks_per_worker = len(group) / total_nthreads
ignore_deps_while_picking: bool = False
group_tasks_per_worker = len(group) / total_nthreads
else:
group_tasks_per_worker = float("nan")

# Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks
# don't require data transfer. Assumes `decide_worker` is being called in priority order.
if (
ws is not None # there is a previous worker
and group_tasks_per_worker > 1 # group is larger than cluster
and ( # is a root-like task (task group is large, but depends on very few tasks)
sum(map(len, group._dependencies)) < 5 # TODO what number
)
):
if group._last_worker_tasks_left > 0:
# Previous worker not fully assigned
group._last_worker_tasks_left -= 1
if group._last_worker_priority >= ts.priority:
print(
f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n"
f"{ts=}\n"
f"{group.last_worker=}\n"
f"{group.last_worker_tasks_left=}\n"
f"{group_tasks_per_worker=}\n"
)
group._last_worker_priority = ts.priority
print(f"reusing worker - {ts.group_key} -> {ws.name}")
return ws
is_root_ish: bool
if ws is None:
# Very fist task in the group - we haven't determined yet whether it's a root-ish task group
if (
group_tasks_per_worker > 1 # group is larger than cluster
and ( # is a root-like task (task group is large, but depends on very few tasks)
sum(map(len, group._dependencies)) < 5 # TODO what number
)
):
is_root_ish = True
else:
is_root_ish = False
group._last_worker = NOT_ROOT_ISH
else:
# We've seen this task group before and already made the above determination
is_root_ish = ws is not NOT_ROOT_ISH

if is_root_ish and ws is not None and group._last_worker_tasks_left > 0:
# Root-ish task and previous worker not fully assigned - reuse previous worker.
# (When the previous worker _is_ fully assigned, we fall through here to the pick-a-worker logic.)
if group._last_worker_priority >= ts.priority:
print(
f"decide_worker called out of priority order: {group._last_worker_priority} >= {ts.priority}.\n"
f"{ts=}\n"
f"{group.last_worker=}\n"
f"{group.last_worker_tasks_left=}\n"
f"{group_tasks_per_worker=}\n"
)
group._last_worker_priority = ts.priority
group._last_worker_tasks_left -= 1
print(f"reusing worker - {ts.group_key} -> {ws.name}")
return ws

# Previous worker is fully assigned, so pick a new worker.
# Pick a worker to run this task
deps: set = ts._dependencies
dts: TaskState
candidates: set
assert all([dts._who_has for dts in deps])
if is_root_ish:
# Previous worker is fully assigned (or unknown), so pick a new worker.
# Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers.
# Every worker is going to end up running this type of task eventually, and any dependencies will have to be
# transferred to all workers, so there's no gain from only considering workers where the dependencies already live.
# Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time,
# since there are only N workers to choose from that actually have the dependency (where N <= n_deps).
ignore_deps_while_picking = True
print(f"{ts.group_key} is root-like but {ws.name} is full - picking a new worker")

# Not a root-like task; pick the best worker among the valid workers
# that hold at least one dependency of this task.
deps: set = ts._dependencies
dts: TaskState
candidates: set
assert all([dts._who_has for dts in deps])
if ignore_deps_while_picking:
candidates = valid_workers if valid_workers is not None else set(all_workers)
else:
# Restrict placement of this task to workers that hold its dependencies
if ts._actor:
candidates = set(all_workers)
else:
Expand All @@ -7598,13 +7614,15 @@ def decide_worker(
candidates = valid_workers
if not candidates:
if ts._loose_restrictions:
ws = decide_worker(ts, all_workers, None, objective, total_nthreads)
ws = decide_worker(
ts, all_workers, None, objective, total_nthreads
)
return ws

ncandidates: Py_ssize_t = len(candidates)
if ncandidates == 0:
print(f"no candidates - {ts.group_key}")
pass
return None
elif ncandidates == 1:
# NOTE: this is the ideal case: all the deps are already on the same worker.
# We did a good job in previous `decide_worker`s!
Expand All @@ -7615,9 +7633,11 @@ def decide_worker(
ws = min(candidates, key=objective)
print(f"picked worker - {ts.group_key} -> {ws.name}")

group._last_worker = ws
group._last_worker_tasks_left = math.floor(group_tasks_per_worker)
group._last_worker_priority = ts.priority
if is_root_ish:
group._last_worker = ws
group._last_worker_tasks_left = math.floor(group_tasks_per_worker)
group._last_worker_priority = ts.priority

return ws


Expand Down

0 comments on commit f2da0bc

Please sign in to comment.