From e31c8641b9108ef0f5b200538c8fd3bf20a82338 Mon Sep 17 00:00:00 2001 From: Miles Date: Thu, 8 Jun 2023 12:59:47 +0200 Subject: [PATCH] Dashboard: Fix Fine Perf. Metrics mis-aligned ColumnData lengths (#7893) --- distributed/dashboard/components/scheduler.py | 30 ++++++--- .../dashboard/tests/test_scheduler_bokeh.py | 63 +++++++++++-------- 2 files changed, 59 insertions(+), 34 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index b66d25f466e..5431ea38ae0 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -3426,6 +3426,27 @@ def update(self): for k, v in self.scheduler.cumulative_worker_metrics.items() if isinstance(k, tuple) ) + + activities = { + activity + for (*_, activity, _), _ in items + if activity not in self.task_activities + } + if activities: + self.substantial_change = True + self.task_activities.extend(activities) + + functions = { + func + for (ctx, func, *_), _ in items + if ctx == "execute" and func not in self.task_exec_data["functions"] + } + if functions: + self.substantial_change = True + self.task_exec_data["timestamp"].extend(datetime.now() for _ in functions) + self.function_selector.options.extend(functions) + self.task_exec_data["functions"].extend(functions) + for (context, *parts), val in items: if context == "get-data": activity, unit = parts @@ -3452,15 +3473,6 @@ def update(self): # note append doesn't work here self.unit_selector.options += [unit] - if activity not in self.task_activities: - self.substantial_change = True - self.task_activities.append(activity) - - if prefix not in self.task_exec_data["functions"]: - self.substantial_change = True - self.function_selector.options.append(prefix) - self.task_exec_data["functions"].append(prefix) - self.task_exec_data["timestamp"].append(datetime.utcnow()) idx = self.task_exec_data["functions"].index(prefix) # Some function/activity combos missing, so need to keep columns aligned diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 3905f41f986..77f910ddcac 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -53,6 +53,7 @@ from distributed.dashboard.scheduler import applications from distributed.diagnostics.task_stream import TaskStreamPlugin from distributed.metrics import time +from distributed.spans import span from distributed.utils import format_dashboard_link from distributed.utils_test import ( block_on_event, @@ -329,38 +330,50 @@ async def test_WorkersMemory(c, s, a, b): assert all(d["width"]) -@gen_cluster(client=True) +@gen_cluster( + client=True, + config={ + "distributed.worker.memory.target": 0.7, + "distributed.worker.memory.spill": False, + "distributed.worker.memory.pause": False, + }, + worker_kwargs={"memory_limit": 100}, +) async def test_FinePerformanceMetrics(c, s, a, b): cl = FinePerformanceMetrics(s) - futures = c.map(slowinc, range(10), delay=0.001) - await wait(futures) - await asyncio.sleep(1) # wait for metrics to arrive + # execute on default span; multiple tasks in same TaskGroup + x0 = c.submit(inc, 0, key="x-0", workers=[a.address]) + x1 = c.submit(inc, 1, key="x-1", workers=[a.address]) - assert not cl.task_exec_data + # execute with spill (output size is individually larger than target) + w = c.submit(lambda: "x" * 75, key="w", workers=[a.address]) - cl.update() - assert cl.task_exec_data - assert cl.task_exec_data["functions"] == ["slowinc"] + await wait([x0, x1, w]) + a.data.evict() + a.data.evict() + assert not a.data.fast + assert set(a.data.slow) == {"x-0", "x-1", "w"} -@gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) -async def test_FinePerformanceMetrics_simulated_spill_no_crash(c, s, a, b): - metrics = { - ("execute", "inc", "disk-read", "seconds"): 1.0, - ("execute", "inc", "disk-read", "count"): 16.0, - ("execute", "inc", "disk-read", "bytes"): 2059931767.0, - ("execute", "inc", "disk-write", "seconds"): 0.12, - ("execute", "inc", "disk-write", "count"): 2.0, - ("execute", "inc", "disk-write", "bytes"): 268435938.0, - } - s.cumulative_worker_metrics.clear() - s.cumulative_worker_metrics.update(metrics) - http_client = AsyncHTTPClient() - response = await http_client.fetch( - f"http://localhost:{s.http_server.port}/individual-fine-performance-metrics" - ) - assert response.code == 200 + with span("foo"): + # execute on named span, with unspill + y0 = c.submit(inc, x0, key="y-0", workers=[a.address]) + # get_data with unspill + gather_dep + execute on named span + y1 = c.submit(inc, x1, key="y-1", workers=[b.address]) + + # execute on named span (duplicate name, different id) + with span("foo"): + z = c.submit(inc, 3, key="z") + await wait([y0, y1, z]) + + await a.heartbeat() + await b.heartbeat() + + assert not cl.task_exec_data + cl.update() + assert cl.task_exec_data + assert set(cl.task_exec_data["functions"]) == {"w", "x", "y", "z"} @gen_cluster(client=True)