Skip to content

Commit

Permalink
Dashboard: Fix Fine Perf. Metrics mis-aligned ColumnData lengths (#7893)
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger authored Jun 8, 2023
1 parent fdeb6b3 commit e31c864
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 34 deletions.
30 changes: 21 additions & 9 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3426,6 +3426,27 @@ def update(self):
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple)
)

activities = {
activity
for (*_, activity, _), _ in items
if activity not in self.task_activities
}
if activities:
self.substantial_change = True
self.task_activities.extend(activities)

functions = {
func
for (ctx, func, *_), _ in items
if ctx == "execute" and func not in self.task_exec_data["functions"]
}
if functions:
self.substantial_change = True
self.task_exec_data["timestamp"].extend(datetime.now() for _ in functions)
self.function_selector.options.extend(functions)
self.task_exec_data["functions"].extend(functions)

for (context, *parts), val in items:
if context == "get-data":
activity, unit = parts
Expand All @@ -3452,15 +3473,6 @@ def update(self):
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.task_activities:
self.substantial_change = True
self.task_activities.append(activity)

if prefix not in self.task_exec_data["functions"]:
self.substantial_change = True
self.function_selector.options.append(prefix)
self.task_exec_data["functions"].append(prefix)
self.task_exec_data["timestamp"].append(datetime.utcnow())
idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
Expand Down
63 changes: 38 additions & 25 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from distributed.dashboard.scheduler import applications
from distributed.diagnostics.task_stream import TaskStreamPlugin
from distributed.metrics import time
from distributed.spans import span
from distributed.utils import format_dashboard_link
from distributed.utils_test import (
block_on_event,
Expand Down Expand Up @@ -329,38 +330,50 @@ async def test_WorkersMemory(c, s, a, b):
assert all(d["width"])


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={
"distributed.worker.memory.target": 0.7,
"distributed.worker.memory.spill": False,
"distributed.worker.memory.pause": False,
},
worker_kwargs={"memory_limit": 100},
)
async def test_FinePerformanceMetrics(c, s, a, b):
cl = FinePerformanceMetrics(s)

futures = c.map(slowinc, range(10), delay=0.001)
await wait(futures)
await asyncio.sleep(1) # wait for metrics to arrive
# execute on default span; multiple tasks in same TaskGroup
x0 = c.submit(inc, 0, key="x-0", workers=[a.address])
x1 = c.submit(inc, 1, key="x-1", workers=[a.address])

assert not cl.task_exec_data
# execute with spill (output size is individually larger than target)
w = c.submit(lambda: "x" * 75, key="w", workers=[a.address])

cl.update()
assert cl.task_exec_data
assert cl.task_exec_data["functions"] == ["slowinc"]
await wait([x0, x1, w])

a.data.evict()
a.data.evict()
assert not a.data.fast
assert set(a.data.slow) == {"x-0", "x-1", "w"}

@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_FinePerformanceMetrics_simulated_spill_no_crash(c, s, a, b):
metrics = {
("execute", "inc", "disk-read", "seconds"): 1.0,
("execute", "inc", "disk-read", "count"): 16.0,
("execute", "inc", "disk-read", "bytes"): 2059931767.0,
("execute", "inc", "disk-write", "seconds"): 0.12,
("execute", "inc", "disk-write", "count"): 2.0,
("execute", "inc", "disk-write", "bytes"): 268435938.0,
}
s.cumulative_worker_metrics.clear()
s.cumulative_worker_metrics.update(metrics)
http_client = AsyncHTTPClient()
response = await http_client.fetch(
f"http://localhost:{s.http_server.port}/individual-fine-performance-metrics"
)
assert response.code == 200
with span("foo"):
# execute on named span, with unspill
y0 = c.submit(inc, x0, key="y-0", workers=[a.address])
# get_data with unspill + gather_dep + execute on named span
y1 = c.submit(inc, x1, key="y-1", workers=[b.address])

# execute on named span (duplicate name, different id)
with span("foo"):
z = c.submit(inc, 3, key="z")
await wait([y0, y1, z])

await a.heartbeat()
await b.heartbeat()

assert not cl.task_exec_data
cl.update()
assert cl.task_exec_data
assert set(cl.task_exec_data["functions"]) == {"w", "x", "y", "z"}


@gen_cluster(client=True)
Expand Down

0 comments on commit e31c864

Please sign in to comment.