Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Queue root tasks on scheduler, not workers [with co-assignment] #6598

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion distributed/collections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import heapq
import itertools
import weakref
from collections import OrderedDict, UserDict
from collections.abc import Callable, Hashable, Iterator
Expand Down Expand Up @@ -56,6 +57,9 @@ def __repr__(self) -> str:
def __contains__(self, value: object) -> bool:
return value in self._data

def __bool__(self) -> bool:
return bool(self._data)

def __len__(self) -> int:
return len(self._data)

Expand Down Expand Up @@ -93,6 +97,41 @@ def pop(self) -> T:
self._data.discard(value)
return value

def popright(self) -> T:
"Remove and return one of the largest elements (not necessarily the largest)!"
if not self._data:
raise KeyError("popright from an empty set")
while True:
_, _, vref = self._heap.pop()
value = vref()
if value is not None and value in self._data:
self._data.discard(value)
return value

def topk(self, k: int) -> Iterator[T]:
# TODO confirm big-O values here
"Iterator over the largest K elements. This is O(k*logn) for k < n // 2, O(n*logn) otherwise."
k = min(k, len(self))
if k == 1:
yield self.peek()
elif k >= len(self) // 2:
return itertools.islice(self.sorted(), k)
else:
# FIXME though neat, with all the list mutation this is probably always slower than sorting inplace.
elems: list[tuple[Any, int, weakref.ref[T]]] = []
try:
while len(elems) < k:
elem = heapq.heappop(self._heap)
value = elem[-1]()
if value is not None and value in self._data:
# NOTE: we're in a broken state during iteration, since the value exists
# in the set but not the heap. As with all Python iterators, mutating
# while iterating is undefined.
elems.append(elem)
yield value
finally:
self._heap = elems + self._heap

def __iter__(self) -> Iterator[T]:
"""Iterate over all elements. This is a O(n) operation which returns the
elements in pseudo-random order.
Expand All @@ -104,7 +143,8 @@ def sorted(self) -> Iterator[T]:
elements in order, from smallest to largest according to the key and insertion
order.
"""
for _, _, vref in sorted(self._heap):
self._heap.sort() # A sorted list maintains the heap invariant
for _, _, vref in self._heap:
value = vref()
if value in self._data:
yield value
Expand Down
38 changes: 29 additions & 9 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2091,8 +2091,8 @@ def __init__(self, scheduler, **kwargs):

node_colors = factor_cmap(
"state",
factors=["waiting", "processing", "memory", "released", "erred"],
palette=["gray", "green", "red", "blue", "black"],
factors=["waiting", "queued", "processing", "memory", "released", "erred"],
palette=["gray", "yellow", "green", "red", "blue", "black"],
)

self.root = figure(title="Task Graph", **kwargs)
Expand Down Expand Up @@ -2980,7 +2980,7 @@ def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler

data = progress_quads(
dict(all={}, memory={}, erred={}, released={}, processing={})
dict(all={}, memory={}, erred={}, released={}, processing={}, queued={})
)
self.source = ColumnDataSource(data=data)

Expand Down Expand Up @@ -3052,6 +3052,18 @@ def __init__(self, scheduler, **kwargs):
fill_alpha=0.35,
line_alpha=0,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="processing-loc",
right="queued-loc",
fill_color="gray",
hatch_pattern="/",
hatch_color="white",
fill_alpha=0.35,
line_alpha=0,
)
self.root.text(
source=self.source,
text="show-name",
Expand Down Expand Up @@ -3087,6 +3099,14 @@ def __init__(self, scheduler, **kwargs):
<span style="font-size: 14px; font-weight: bold;">All:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@all</span>
</div>
<div>
<span style="font-size: 14px; font-weight: bold;">Queued:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@queued</span>
</div>
<div>
<span style="font-size: 14px; font-weight: bold;">Processing:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@processing</span>
</div>
<div>
<span style="font-size: 14px; font-weight: bold;">Memory:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@memory</span>
Expand All @@ -3095,10 +3115,6 @@ def __init__(self, scheduler, **kwargs):
<span style="font-size: 14px; font-weight: bold;">Erred:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@erred</span>
</div>
<div>
<span style="font-size: 14px; font-weight: bold;">Ready:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@processing</span>
</div>
""",
)
self.root.add_tools(hover)
Expand All @@ -3112,6 +3128,7 @@ def update(self):
"released": {},
"processing": {},
"waiting": {},
"queued": {},
}

for tp in self.scheduler.task_prefixes.values():
Expand All @@ -3122,6 +3139,7 @@ def update(self):
state["released"][tp.name] = active_states["released"]
state["processing"][tp.name] = active_states["processing"]
state["waiting"][tp.name] = active_states["waiting"]
state["queued"][tp.name] = active_states["queued"]

state["all"] = {k: sum(v[k] for v in state.values()) for k in state["memory"]}

Expand All @@ -3134,16 +3152,18 @@ def update(self):

totals = {
k: sum(state[k].values())
for k in ["all", "memory", "erred", "released", "waiting"]
for k in ["all", "memory", "erred", "released", "waiting", "queued"]
}
totals["processing"] = totals["all"] - sum(
v for k, v in totals.items() if k != "all"
)

self.root.title.text = (
"Progress -- total: %(all)s, "
"in-memory: %(memory)s, processing: %(processing)s, "
"waiting: %(waiting)s, "
"queued: %(queued)s, "
"processing: %(processing)s, "
"in-memory: %(memory)s, "
"erred: %(erred)s" % totals
)

Expand Down
49 changes: 32 additions & 17 deletions distributed/diagnostics/progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,29 @@ def progress_quads(msg, nrows=8, ncols=3):
... 'memory': {'inc': 2, 'dec': 0, 'add': 1},
... 'erred': {'inc': 0, 'dec': 1, 'add': 0},
... 'released': {'inc': 1, 'dec': 0, 'add': 1},
... 'processing': {'inc': 1, 'dec': 0, 'add': 2}}
... 'processing': {'inc': 1, 'dec': 0, 'add': 2},
... 'queued': {'inc': 1, 'dec': 0, 'add': 2}}

>>> progress_quads(msg, nrows=2) # doctest: +SKIP
{'name': ['inc', 'add', 'dec'],
'left': [0, 0, 1],
'right': [0.9, 0.9, 1.9],
'top': [0, -1, 0],
'bottom': [-.8, -1.8, -.8],
'released': [1, 1, 0],
'memory': [2, 1, 0],
'erred': [0, 0, 1],
'processing': [1, 0, 2],
'done': ['3 / 5', '2 / 4', '1 / 1'],
'released-loc': [.2/.9, .25 / 0.9, 1],
'memory-loc': [3 / 5 / .9, .5 / 0.9, 1],
'erred-loc': [3 / 5 / .9, .5 / 0.9, 1.9],
'processing-loc': [4 / 5, 1 / 1, 1]}}
{'all': [5, 4, 1],
'memory': [2, 1, 0],
'erred': [0, 0, 1],
'released': [1, 1, 0],
'processing': [1, 2, 0],
'queued': [1, 2, 0],
'name': ['inc', 'add', 'dec'],
'show-name': ['inc', 'add', 'dec'],
'left': [0, 0, 1],
'right': [0.9, 0.9, 1.9],
'top': [0, -1, 0],
'bottom': [-0.8, -1.8, -0.8],
'color': ['#45BF6F', '#2E6C8E', '#440154'],
'released-loc': [0.18, 0.225, 1.0],
'memory-loc': [0.54, 0.45, 1.0],
'erred-loc': [0.54, 0.45, 1.9],
'processing-loc': [0.72, 0.9, 1.9],
'queued-loc': [0.9, 1.35, 1.9],
'done': ['3 / 5', '2 / 4', '1 / 1']}
"""
width = 0.9
names = sorted(msg["all"], key=msg["all"].get, reverse=True)
Expand All @@ -100,19 +106,28 @@ def progress_quads(msg, nrows=8, ncols=3):
d["memory-loc"] = []
d["erred-loc"] = []
d["processing-loc"] = []
d["queued-loc"] = []
d["done"] = []
for r, m, e, p, a, l in zip(
d["released"], d["memory"], d["erred"], d["processing"], d["all"], d["left"]
for r, m, e, p, q, a, l in zip(
d["released"],
d["memory"],
d["erred"],
d["processing"],
d["queued"],
d["all"],
d["left"],
):
rl = width * r / a + l
ml = width * (r + m) / a + l
el = width * (r + m + e) / a + l
pl = width * (p + r + m + e) / a + l
ql = width * (p + r + m + e + q) / a + l
done = "%d / %d" % (r + m + e, a)
d["released-loc"].append(rl)
d["memory-loc"].append(ml)
d["erred-loc"].append(el)
d["processing-loc"].append(pl)
d["queued-loc"].append(ql)
d["done"].append(done)

return d
Expand Down
20 changes: 20 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ properties:
description: |
How frequently to balance worker loads

worker-oversaturation:
type: float
description: |
Controls how many extra root tasks are sent to workers (like a `readahead`).

`floor(worker-oversaturation * worker.nthreads)` _extra_ tasks are sent to the worker
beyond its thread count. If `.inf`, all runnable tasks are immediately sent to workers.

Allowing oversaturation means a worker will start running a new root task as soon as
it completes the previous, even if there is a higher-priority downstream task to run.
This reduces worker idleness, by letting workers do something while waiting for further
instructions from the scheduler.

This generally comes at the expense of increased memory usage. It leads to "wider"
(more breadth-first) execution of the graph.

Compute-bound workloads benefit from oversaturation. Memory-bound workloads should
generally leave `worker-oversaturation` at 0, though 0.25-0.5 could slightly improve
performance if ample memory is available.

worker-ttl:
type:
- string
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ distributed:
events-log-length: 100000
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-oversaturation: 0.0 # Send this fraction of nthreads extra root tasks to workers
worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this
pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings
preload: [] # Run custom modules with Scheduler
Expand Down
2 changes: 2 additions & 0 deletions distributed/http/templates/worker-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<th> Memory </th>
<th> Memory use </th>
<th> Occupancy </th>
<th> Queued </th>
<th> Processing </th>
<th> In-memory </th>
<th> Services</th>
Expand All @@ -20,6 +21,7 @@
<td> {{ format_bytes(ws.memory_limit) if ws.memory_limit is not None else "" }} </td>
<td> <progress class="progress" value="{{ ws.metrics['memory'] }}" max="{{ ws.memory_limit }}"></progress> </td>
<td> {{ format_time(ws.occupancy) }} </td>
<td> {{ len(ws.queued) }} </td>
<td> {{ len(ws.processing) }} </td>
<td> {{ len(ws.has_what) }} </td>
{% if 'dashboard' in ws.services %}
Expand Down
15 changes: 15 additions & 0 deletions distributed/http/templates/worker.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ <h2 class="subtitle"> Processing </h2>
{% end %}
</table>
</div>
<div class="content">
<h2 class="subtitle"> Queued </h2>
<table class="table is-striped is-hoverable">
<tr>
<th> Task </th>
<th> Priority </th>
</tr>
{% for ts in ws.queued.sorted() %}
<tr>
<td> <a href="../task/{{ url_escape(ts.key) }}.html">{{ts.key}}</a></td>
<td> {{ts.priority }} </td>
</tr>
{% end %}
</table>
</div>
</div>
</div>

Expand Down
Loading