Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Withhold root tasks [no co assignment] #6614

Merged
merged 104 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
afedccd
unused: `OrderedSet` collection
gjoseph92 Jun 16, 2022
6b6651b
Queue root tasks scheduler-side
gjoseph92 Jun 17, 2022
6225d1a
Show queued tasks with crosshatching on dashboard
gjoseph92 Jun 17, 2022
1496abb
improve reasonableness of task-state order
gjoseph92 Jun 17, 2022
7457865
Allow configurable oversaturation
gjoseph92 Jun 17, 2022
67e9bd2
Only support floats for `worker-oversaturation`
gjoseph92 Jun 17, 2022
2410a82
Push memory limits a little more in test
gjoseph92 Jun 17, 2022
49d5ddd
Queued tasks on info pages
gjoseph92 Jun 18, 2022
b546997
driveby: WIP color task graph by worker
gjoseph92 Jul 23, 2021
2b44820
Revert "driveby: WIP color task graph by worker"
gjoseph92 Jun 18, 2022
e494e87
Queued tasks on graph
gjoseph92 Jun 18, 2022
ad417ed
Redistribute queues when new worker joins
gjoseph92 Jun 18, 2022
b4c698e
Fix task_slots_available when queuing disabled
gjoseph92 Jun 18, 2022
aa4e531
Fix co-assignment logic to consider queued tasks
gjoseph92 Jun 18, 2022
b514e84
Revert "unused: `OrderedSet` collection"
gjoseph92 Jun 20, 2022
1835a89
Fix potential stale worker use in decide_worker
gjoseph92 Jun 20, 2022
db42c22
WIP identify root task families
gjoseph92 Jun 22, 2022
0f6603c
Withhold root tasks [no co-assignment]
gjoseph92 Jun 22, 2022
e10fdca
Factor out `_add_to_processing`
gjoseph92 Jun 22, 2022
3eb1d68
Factor out `_propagage_released`
gjoseph92 Jun 22, 2022
c685b3c
Update `check_idle_saturated`
gjoseph92 Jun 22, 2022
e1dda98
Fix `topk` for 0/negative values
gjoseph92 Jun 22, 2022
f811246
Tests for HeapSet.topk
gjoseph92 Jun 22, 2022
d347b32
fix mypy
gjoseph92 Jun 22, 2022
1990dd7
worker-oversaturation -> worker-saturation
gjoseph92 Jun 22, 2022
be1b9ca
fixup! Factor out `_add_to_processing`
gjoseph92 Jun 22, 2022
85f9120
fix `test_queued_tasks_rebalance`
gjoseph92 Jun 22, 2022
bb08c8d
Fix occupancy tests
gjoseph92 Jun 22, 2022
966d61f
Test releasing previously queued paused tasks
gjoseph92 Jun 23, 2022
15494f0
driveby: fix transition debug log end state
gjoseph92 Jun 23, 2022
546aa4a
Refactor scheduling when no workers are running
gjoseph92 Jun 23, 2022
ffbb53b
Don't send queued tasks to no-worker
gjoseph92 Jun 23, 2022
65735f8
Schedule rootish tasks when some workers are paused
gjoseph92 Jun 23, 2022
6bf710c
Rever less-intrusive all-paused handling
gjoseph92 Jun 23, 2022
25e6f3b
Decrease test_root_task_overproduction size
gjoseph92 Jun 23, 2022
988b0cf
Merge remote-tracking branch 'upstream/main' into withold-root-tasks-…
gjoseph92 Jun 23, 2022
b86fe0f
Co-assignment when queuing is disabled
gjoseph92 Jun 23, 2022
7ebd1d9
Fix co-assignment for binary operations
gjoseph92 Jun 23, 2022
034f980
Turn withholding off by default
gjoseph92 Jun 24, 2022
0af53b4
Merge remote-tracking branch 'upstream/main' into withold-root-tasks-…
gjoseph92 Aug 17, 2022
a63d25b
Remove redundant insert into `idle`
gjoseph92 Aug 17, 2022
5f7e7f1
Update `idle` docstring
gjoseph92 Aug 17, 2022
9aeecc9
fix `test_ready_remove_worker`
gjoseph92 Aug 17, 2022
dcb11e4
fix `test_saturation_factor`
gjoseph92 Aug 17, 2022
7dfc83e
remove debug dashboards from tests
gjoseph92 Aug 17, 2022
c99bbe8
fix `progress_stream`
gjoseph92 Aug 18, 2022
c1544f3
fix `test_prometheus_collect_task_states`
gjoseph92 Aug 18, 2022
349712f
fix config json schema
gjoseph92 Aug 18, 2022
594585e
fix retire workers
gjoseph92 Aug 18, 2022
b4f843d
update `validate_task_state`
gjoseph92 Aug 17, 2022
2db4db9
fix `test_saturation_factor` again
gjoseph92 Aug 17, 2022
8395ef4
ignore long-running in `task_slots_available`
gjoseph92 Aug 19, 2022
36a60a5
hackily consider queue in adaptive target
gjoseph92 Aug 19, 2022
da04438
update explanation in schema slightly
gjoseph92 Aug 19, 2022
c92236c
update comments in `decide_worker`
gjoseph92 Aug 19, 2022
e990b92
formalize `transition_queued_processing` assertion
gjoseph92 Aug 19, 2022
0d21c78
correct bulk_schedule comment
gjoseph92 Aug 19, 2022
b40cec1
private `_task_slots_available`
gjoseph92 Aug 23, 2022
12b94d0
`worker_saturated` -> `_worker_full`
gjoseph92 Aug 23, 2022
4c0e768
remove unused `_add_to_processing` return value
gjoseph92 Aug 23, 2022
14cc157
yaml schema fixes
gjoseph92 Aug 24, 2022
f5d7be4
`topk` -> `peekn`
gjoseph92 Aug 24, 2022
704b485
driveby: fix `saturated` docstring
gjoseph92 Aug 24, 2022
38e0598
driveby: clarify `finish2` name
gjoseph92 Aug 24, 2022
d47e80d
Split up `decide_worker`, remove recs
gjoseph92 Aug 24, 2022
2cc8631
remove no_worker->memory just to see what happens
gjoseph92 Aug 24, 2022
100118a
Render scheduler state graph with inline graphviz
gjoseph92 Aug 24, 2022
842ee71
Update scheduling state graph and docs slightly
gjoseph92 Aug 24, 2022
494fe48
Merge remote-tracking branch 'upstream/main' into withold-root-tasks-…
gjoseph92 Aug 24, 2022
dd88b0d
apt package is graphviz, not dot
gjoseph92 Aug 24, 2022
e17c624
Revert "remove no_worker->memory just to see what happens"
gjoseph92 Aug 24, 2022
06d60fe
Revert "Render scheduler state graph with inline graphviz"
gjoseph92 Aug 25, 2022
96d59eb
Update scheduling state docs
gjoseph92 Aug 25, 2022
3240a43
`test_root_task_overproduction` adaptive data size
gjoseph92 Aug 25, 2022
9344dd9
Don't assert rootish in `decide_worker`
gjoseph92 Aug 25, 2022
aa8e1db
improve `test_queued_paused`
gjoseph92 Aug 26, 2022
ee1a754
`test_queued_paused_unpaused`
gjoseph92 Aug 26, 2022
f36a6ac
`bulk_schedule_after_adding_worker` docstring
gjoseph92 Aug 26, 2022
78353e1
`test_queued_remove_add_worker`
gjoseph92 Aug 26, 2022
18b7bb5
reorder and quicken `test_queued_tasks_rebalance`
gjoseph92 Aug 26, 2022
f3a66df
queued tasks cause scale-up from empty
gjoseph92 Aug 26, 2022
c5f2746
Improve adaptive target for queued tasks
gjoseph92 Aug 26, 2022
4b2a209
don't need that fail_func
gjoseph92 Aug 26, 2022
3cebe54
remove `test_oversaturation_multiple_task_groups`
gjoseph92 Aug 26, 2022
51dca31
Merge remote-tracking branch 'upstream/main' into withold-root-tasks-…
gjoseph92 Aug 26, 2022
14dc850
Documentation suggestions
gjoseph92 Aug 27, 2022
b36064e
`test_graph_execution_width`
gjoseph92 Aug 27, 2022
5b2bc02
skip `test_root_task_overproduction` on windows
gjoseph92 Aug 27, 2022
1819a51
Merge remote-tracking branch 'upstream/main' into withold-root-tasks-…
gjoseph92 Aug 29, 2022
2952f6b
few more type annotations since they're available
gjoseph92 Aug 29, 2022
d00ea54
`decide_worker_rootish_queuing_enabled` no task
gjoseph92 Aug 29, 2022
8ba4ced
fix `check_idle` for queued tasks
gjoseph92 Aug 29, 2022
63d863d
_remove_from_processing->_exit_processing_common
gjoseph92 Aug 30, 2022
b7704e3
remove `test_queued_tasks_rebalance`
gjoseph92 Aug 30, 2022
00b54e7
`scheduler.allowed-failures` instead of PIDs
gjoseph92 Aug 30, 2022
12207e6
test_root_task_overproduction->test_near_memory_limit_workload
gjoseph92 Aug 30, 2022
02c98b3
remove all sleeps from `graph_execution_width`
gjoseph92 Aug 31, 2022
2b3f6ae
remove `test_near_memory_limit_workload`
gjoseph92 Aug 31, 2022
5e4d53d
`handle_worker_status_change` in `retire_workers`
gjoseph92 Aug 31, 2022
333bbb2
Merge remote-tracking branch 'upstream/main' into withold-root-tasks-…
gjoseph92 Aug 31, 2022
acc524f
avoid flaky `test_graph_execution_width`
gjoseph92 Aug 31, 2022
ba336b9
fix test_decide_worker_coschedule_order_binary_op
gjoseph92 Aug 31, 2022
9d99d74
fixup! `handle_worker_status_change`
gjoseph92 Aug 31, 2022
093d7dc
Fix merge conflict of renaming transfer log
fjetter Aug 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2162,8 +2162,8 @@ def __init__(self, scheduler, **kwargs):

node_colors = factor_cmap(
"state",
factors=["waiting", "processing", "memory", "released", "erred"],
palette=["gray", "green", "red", "blue", "black"],
factors=["waiting", "queued", "processing", "memory", "released", "erred"],
palette=["gray", "yellow", "green", "red", "blue", "black"],
)

self.root = figure(title="Task Graph", **kwargs)
Expand Down Expand Up @@ -3051,7 +3051,7 @@ def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler

data = progress_quads(
dict(all={}, memory={}, erred={}, released={}, processing={})
dict(all={}, memory={}, erred={}, released={}, processing={}, queued={})
)
self.source = ColumnDataSource(data=data)

Expand Down Expand Up @@ -3123,6 +3123,18 @@ def __init__(self, scheduler, **kwargs):
fill_alpha=0.35,
line_alpha=0,
)
self.root.quad(
source=self.source,
top="top",
bottom="bottom",
left="processing-loc",
right="queued-loc",
fill_color="gray",
hatch_pattern="/",
hatch_color="white",
fill_alpha=0.35,
line_alpha=0,
)
self.root.text(
source=self.source,
text="show-name",
Expand Down Expand Up @@ -3158,6 +3170,14 @@ def __init__(self, scheduler, **kwargs):
<span style="font-size: 14px; font-weight: bold;">All:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@all</span>
</div>
<div>
<span style="font-size: 14px; font-weight: bold;">Queued:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@queued</span>
</div>
<div>
<span style="font-size: 14px; font-weight: bold;">Processing:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@processing</span>
</div>
<div>
<span style="font-size: 14px; font-weight: bold;">Memory:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@memory</span>
Expand All @@ -3166,10 +3186,6 @@ def __init__(self, scheduler, **kwargs):
<span style="font-size: 14px; font-weight: bold;">Erred:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@erred</span>
</div>
<div>
<span style="font-size: 14px; font-weight: bold;">Ready:</span>&nbsp;
<span style="font-size: 10px; font-family: Monaco, monospace;">@processing</span>
</div>
""",
)
self.root.add_tools(hover)
Expand All @@ -3183,6 +3199,7 @@ def update(self):
"released": {},
"processing": {},
"waiting": {},
"queued": {},
}

for tp in self.scheduler.task_prefixes.values():
Expand All @@ -3193,6 +3210,7 @@ def update(self):
state["released"][tp.name] = active_states["released"]
state["processing"][tp.name] = active_states["processing"]
state["waiting"][tp.name] = active_states["waiting"]
state["queued"][tp.name] = active_states["queued"]

state["all"] = {k: sum(v[k] for v in state.values()) for k in state["memory"]}

Expand All @@ -3205,16 +3223,18 @@ def update(self):

totals = {
k: sum(state[k].values())
for k in ["all", "memory", "erred", "released", "waiting"]
for k in ["all", "memory", "erred", "released", "waiting", "queued"]
}
totals["processing"] = totals["all"] - sum(
v for k, v in totals.items() if k != "all"
)

self.root.title.text = (
"Progress -- total: %(all)s, "
"in-memory: %(memory)s, processing: %(processing)s, "
"waiting: %(waiting)s, "
"queued: %(queued)s, "
"processing: %(processing)s, "
"in-memory: %(memory)s, "
"erred: %(erred)s" % totals
)

Expand Down
51 changes: 33 additions & 18 deletions distributed/diagnostics/progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def counts(scheduler, allprogress):
{"all": valmap(len, allprogress.all), "nbytes": allprogress.nbytes},
{
state: valmap(len, allprogress.state[state])
for state in ["memory", "erred", "released", "processing"]
for state in ["memory", "erred", "released", "processing", "queued"]
},
)

Expand Down Expand Up @@ -66,23 +66,29 @@ def progress_quads(msg, nrows=8, ncols=3):
... 'memory': {'inc': 2, 'dec': 0, 'add': 1},
... 'erred': {'inc': 0, 'dec': 1, 'add': 0},
... 'released': {'inc': 1, 'dec': 0, 'add': 1},
... 'processing': {'inc': 1, 'dec': 0, 'add': 2}}
... 'processing': {'inc': 1, 'dec': 0, 'add': 2},
... 'queued': {'inc': 1, 'dec': 0, 'add': 2}}

>>> progress_quads(msg, nrows=2) # doctest: +SKIP
{'name': ['inc', 'add', 'dec'],
'left': [0, 0, 1],
'right': [0.9, 0.9, 1.9],
'top': [0, -1, 0],
'bottom': [-.8, -1.8, -.8],
'released': [1, 1, 0],
'memory': [2, 1, 0],
'erred': [0, 0, 1],
'processing': [1, 0, 2],
'done': ['3 / 5', '2 / 4', '1 / 1'],
'released-loc': [.2/.9, .25 / 0.9, 1],
'memory-loc': [3 / 5 / .9, .5 / 0.9, 1],
'erred-loc': [3 / 5 / .9, .5 / 0.9, 1.9],
'processing-loc': [4 / 5, 1 / 1, 1]}}
{'all': [5, 4, 1],
'memory': [2, 1, 0],
'erred': [0, 0, 1],
'released': [1, 1, 0],
'processing': [1, 2, 0],
'queued': [1, 2, 0],
'name': ['inc', 'add', 'dec'],
'show-name': ['inc', 'add', 'dec'],
'left': [0, 0, 1],
'right': [0.9, 0.9, 1.9],
'top': [0, -1, 0],
'bottom': [-0.8, -1.8, -0.8],
'color': ['#45BF6F', '#2E6C8E', '#440154'],
'released-loc': [0.18, 0.225, 1.0],
'memory-loc': [0.54, 0.45, 1.0],
'erred-loc': [0.54, 0.45, 1.9],
'processing-loc': [0.72, 0.9, 1.9],
'queued-loc': [0.9, 1.35, 1.9],
'done': ['3 / 5', '2 / 4', '1 / 1']}
"""
width = 0.9
names = sorted(msg["all"], key=msg["all"].get, reverse=True)
Expand All @@ -102,19 +108,28 @@ def progress_quads(msg, nrows=8, ncols=3):
d["memory-loc"] = []
d["erred-loc"] = []
d["processing-loc"] = []
d["queued-loc"] = []
d["done"] = []
for r, m, e, p, a, l in zip(
d["released"], d["memory"], d["erred"], d["processing"], d["all"], d["left"]
for r, m, e, p, q, a, l in zip(
d["released"],
d["memory"],
d["erred"],
d["processing"],
d["queued"],
d["all"],
d["left"],
):
rl = width * r / a + l
ml = width * (r + m) / a + l
el = width * (r + m + e) / a + l
pl = width * (p + r + m + e) / a + l
ql = width * (p + r + m + e + q) / a + l
done = "%d / %d" % (r + m + e, a)
d["released-loc"].append(rl)
d["memory-loc"].append(ml)
d["erred-loc"].append(el)
d["processing-loc"].append(pl)
d["queued-loc"].append(ql)
d["done"].append(done)

return d
Expand Down
6 changes: 6 additions & 0 deletions distributed/diagnostics/tests/test_progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def test_progress_quads():
"erred": {"inc": 0, "dec": 1, "add": 0},
"released": {"inc": 1, "dec": 0, "add": 1},
"processing": {"inc": 1, "dec": 0, "add": 2},
"queued": {"inc": 1, "dec": 0, "add": 2},
}

d = progress_quads(msg, nrows=2)
Expand All @@ -35,11 +36,13 @@ def test_progress_quads():
"memory": [2, 1, 0],
"erred": [0, 0, 1],
"processing": [1, 2, 0],
"queued": [1, 2, 0],
"done": ["3 / 5", "2 / 4", "1 / 1"],
"released-loc": [0.9 * 1 / 5, 0.25 * 0.9, 1.0],
"memory-loc": [0.9 * 3 / 5, 0.5 * 0.9, 1.0],
"erred-loc": [0.9 * 3 / 5, 0.5 * 0.9, 1.9],
"processing-loc": [0.9 * 4 / 5, 1 * 0.9, 1 * 0.9 + 1],
"queued-loc": [1 * 0.9, 1.5 * 0.9, 1 * 0.9 + 1],
}
assert d == expected

Expand All @@ -52,6 +55,7 @@ def test_progress_quads_too_many():
"erred": {k: 0 for k in keys},
"released": {k: 0 for k in keys},
"processing": {k: 0 for k in keys},
"queued": {k: 0 for k in keys},
}

d = progress_quads(msg, nrows=6, ncols=3)
Expand All @@ -78,6 +82,7 @@ async def test_progress_stream(c, s, a, b):
"memory": {"div": 9, "inc": 1},
"released": {"inc": 4},
"processing": {},
"queued": {},
}
assert set(nbytes) == set(msg["all"])
assert all(v > 0 for v in nbytes.values())
Expand All @@ -95,6 +100,7 @@ def test_progress_quads_many_functions():
"erred": {fn: 0 for fn in funcnames},
"released": {fn: 0 for fn in funcnames},
"processing": {fn: 0 for fn in funcnames},
"queued": {fn: 0 for fn in funcnames},
}

d = progress_quads(msg, nrows=2)
Expand Down
22 changes: 22 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,28 @@ properties:
description: |
How frequently to balance worker loads

worker-saturation:
type: number
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
exclusiveMinimum: 0
description: |
Controls how many root tasks are sent to workers (like a `readahead`).

Up to worker-saturation * nthreads root tasks are sent to a
worker at a time. If `.inf`, all runnable tasks are immediately sent to workers.

Allowing oversaturation (> 1.0) means a worker may start running a new root task as
soon as it completes the previous, even if there is a higher-priority downstream task
to run. This reduces worker idleness, by letting workers do something while waiting for
further instructions from the scheduler, even if it's not the most efficient
thing to do.

This generally comes at the expense of increased memory usage. It leads to "wider"
(more breadth-first) execution of the graph.

Compute-bound workloads may benefit from oversaturation. Memory-bound workloads should
generally leave `worker-saturation` at 1.0, though 1.25-1.5 could slightly improve
performance if ample memory is available.
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

worker-ttl:
type:
- string
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ distributed:
events-log-length: 100000
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: .inf # Send this fraction of nthreads root tasks to workers
worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this
pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings
preload: [] # Run custom modules with Scheduler
Expand Down
10 changes: 9 additions & 1 deletion distributed/http/scheduler/tests/test_scheduler_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,15 @@ async def fetch_metrics():
]
return active_metrics, forgotten_tasks

expected = {"memory", "released", "processing", "waiting", "no-worker", "erred"}
expected = {
"memory",
"released",
"queued",
"processing",
"waiting",
"no-worker",
"erred",
}

# Ensure that we get full zero metrics for all states even though the
# scheduler did nothing, yet
Expand Down
Loading