Skip to content

Commit

Permalink
Add spans to Fine Performance Metrics bokeh dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 14, 2023
1 parent 1d426b0 commit 22613f6
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 3 deletions.
40 changes: 37 additions & 3 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from distributed.diagnostics.task_stream import colors as ts_color_lookup
from distributed.metrics import time
from distributed.scheduler import Scheduler
from distributed.spans import SpansSchedulerExtension
from distributed.utils import Log, log_errors

if dask.config.get("distributed.dashboard.export-tool"):
Expand Down Expand Up @@ -3392,6 +3393,7 @@ class FinePerformanceMetrics(DashboardComponent):
visible_functions: list[str]
visible_activities: list[str]
function_selector: MultiChoice
span_tag_selector: MultiChoice
unit_selector: Select
task_exec_by_activity_chart: figure | None
task_exec_by_prefix_chart: figure | None
Expand All @@ -3418,12 +3420,19 @@ def __init__(self, scheduler: Scheduler, **kwargs: Any):
value=[],
options=[],
)
self.span_tag_selector = MultiChoice(
title="Filter by span tag",
placeholder="Select specific span tags",
value=[],
options=[],
)
self.unit_selector = Select(title="Unit selection", options=["seconds"])
self.unit_selector.value = "seconds"
self.unit_selector.on_change("value", self._handle_change_unit)

selectors_row = row(
children=[
self.span_tag_selector,
self.function_selector,
self.unit_selector,
],
Expand Down Expand Up @@ -3501,6 +3510,7 @@ def _update_selectors(self) -> None:
- self.unit_selector
- self.function_selector
- self.span_tag_selector
"""
units = set()
functions = set()
Expand Down Expand Up @@ -3529,6 +3539,18 @@ def _update_selectors(self) -> None:
f"Filter by function ({len(self.function_selector.options)}):"
)

spans_ext: SpansSchedulerExtension | None = self.scheduler.extensions.get(
"spans"
)
if spans_ext:
tags = set(spans_ext.spans_search_by_tag)
tags.difference_update(self.span_tag_selector.options)
if tags:
self.span_tag_selector.options.extend(tags)
self.span_tag_selector.title = (
f"Filter by span tag ({len(self.span_tag_selector.options)}):"
)

def _format(self, val: float) -> str:
unit = self.unit_selector.value
assert isinstance(unit, str)
Expand Down Expand Up @@ -3573,17 +3595,28 @@ def _build_data_sources(self) -> None:

function_sel = set(self.function_selector.value)

for k, v in self.scheduler.cumulative_worker_metrics.items():
if self.span_tag_selector.value:
spans_ext: SpansSchedulerExtension = self.scheduler.extensions["spans"]
metrics = spans_ext.merge_by_tags(
*self.span_tag_selector.value
).cumulative_worker_metrics
has_span_id = False
else:
metrics = self.scheduler.cumulative_worker_metrics
has_span_id = True

for k, v in metrics.items():
if not isinstance(k, tuple):
continue
# Only happens in global metrics
continue # type: ignore[unreachable]
context, *other, activity, unit = k
assert isinstance(unit, str)
assert self.unit_selector.value
if unit != self.unit_selector.value:
continue

if context == "execute":
_, function = other
function = other[1 if has_span_id else 0]
assert isinstance(function, str)
if not function_sel or function in function_sel:
# Custom metrics can provide any hashable as the label
Expand All @@ -3595,6 +3628,7 @@ def _build_data_sources(self) -> None:
execute[activity] += v

elif context == "get-data" and not function_sel:
# Note: this will always be empty when a span is selected
assert isinstance(activity, str)
visible_activities.add(activity)
get_data[activity] += v
Expand Down
4 changes: 4 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ def f():
cl.update()
assert cl.visible_activities == orig_activities

cl.span_tag_selector.value = ["foo"]
cl.update()
assert sorted(cl.visible_functions) == ["y", "z"]


@gen_cluster(client=True)
async def test_ClusterMemory(c, s, a, b):
Expand Down
38 changes: 38 additions & 0 deletions distributed/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,16 @@ def cumulative_worker_metrics(self) -> defaultdict[tuple[str, ...], float]:
out[k] += v
return out

@staticmethod
def merge(*items: Span) -> Span:
"""Merge multiple spans into a synthetic one.
The input spans must not be related with each other.
"""
out = Span(name=("(merged)",), id_="(merged)", parent=None)
out.children.extend(items)
out.enqueued = min(child.enqueued for child in items)
return out


class SpansSchedulerExtension:
"""Scheduler extension for spans support"""
Expand All @@ -300,6 +310,11 @@ class SpansSchedulerExtension:
#: All spans, keyed by the individual tags that make up their name and sorted by
#: creation time.
#: This is a convenience helper structure to speed up searches.
#:
#: See Also
#: --------
#: find_by_tags
#: merge_by_tags
spans_search_by_tag: defaultdict[str, list[Span]]

def __init__(self, scheduler: Scheduler):
Expand Down Expand Up @@ -379,6 +394,29 @@ def _ensure_span(self, name: tuple[str, ...], ids: tuple[str, ...]) -> Span:

return span

def find_by_tags(self, *tags: str) -> Iterator[Span]:
"""Yield all spans that contain any of the given tags.
When a tag is shared both by a span and its (grand)children, only return the
parent.
"""
by_level = defaultdict(list)
for tag in tags:
for sp in self.spans_search_by_tag[tag]:
by_level[len(sp.name)].append(sp)

seen = set()
for _, level in sorted(by_level.items()):
seen.update(level)
for sp in level:
if sp.parent not in seen:
yield sp

def merge_by_tags(self, *tags: str) -> Span:
"""Return a synthetic Span which is the sum of all spans containing the given
tags
"""
return Span.merge(*self.find_by_tags(*tags))

def heartbeat(
self, ws: WorkerState, data: dict[str, dict[tuple[str, ...], float]]
) -> None:
Expand Down
64 changes: 64 additions & 0 deletions distributed/tests/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
async_poll_for,
gen_cluster,
inc,
slowinc,
wait_for_state,
)

Expand Down Expand Up @@ -530,3 +531,66 @@ async def test_worker_metrics(c, s, a, b):
]
for k, v in cum_metrics.items():
assert v == sum(sp._cumulative_worker_metrics[k] for sp in ext.spans.values())


@gen_cluster(client=True)
async def test_merge_by_tags(c, s, a, b):
with span("foo") as foo1:
await c.submit(inc, 1, key="x1")
with span("bar") as bar1: # foo, bar
await c.submit(inc, 2, key="x2")
with span("foo") as foo2: # foo, bar, foo
await c.submit(inc, 3, key="x3")
with span("foo") as foo3: # foo, foo
await c.submit(inc, 4, key="x4")
with span("bar") as bar2: # bar
await c.submit(inc, 5, key="x5")

ext = s.extensions["spans"]
assert {s.id for s in ext.find_by_tags("foo")} == {foo1}
assert {s.id for s in ext.find_by_tags("foo", "bar")} == {foo1, bar2}
assert {s.id for s in ext.find_by_tags("bar", "foo")} == {foo1, bar2}
assert {s.id for s in ext.find_by_tags("bar")} == {bar1, bar2}

def tgnames(*tags):
return [tg.name for tg in ext.merge_by_tags(*tags).traverse_groups()]

assert tgnames("foo") == ["x1", "x2", "x3", "x4"]
assert tgnames("foo", "bar") == ["x1", "x2", "x3", "x4", "x5"]
assert tgnames("bar", "foo") == ["x5", "x1", "x2", "x3", "x4"]
assert tgnames("bar") == ["x5", "x2", "x3"]


@gen_cluster(client=True)
async def test_merge_by_tags_metrics(c, s, a, b):
with span("foo") as foo1:
await c.submit(slowinc, 1, delay=0.05, key="x-1")
await async_poll_for(lambda: not s.task_groups, timeout=5)

with span("foo") as foo2:
await c.submit(slowinc, 2, delay=0.06, key="x-2")
await async_poll_for(lambda: not s.task_groups, timeout=5)

with span("bar") as bar1:
await c.submit(slowinc, 3, delay=0.07, key="x-3")
await async_poll_for(lambda: not s.task_groups, timeout=5)

await a.heartbeat()
await b.heartbeat()

ext = s.extensions["spans"]
k = ("execute", "x", "thread-noncpu", "seconds")
t_foo = ext.merge_by_tags("foo").cumulative_worker_metrics[k]
t_bar = ext.merge_by_tags("bar").cumulative_worker_metrics[k]
t_foo1 = ext.spans[foo1]._cumulative_worker_metrics[k]
t_foo2 = ext.spans[foo2]._cumulative_worker_metrics[k]
t_bar1 = ext.spans[bar1]._cumulative_worker_metrics[k]
assert t_foo1 > 0
assert t_foo2 > 0
assert t_bar1 > 0
assert t_foo == t_foo1 + t_foo2
assert t_bar == t_bar1

assert ext.merge_by_tags("foo").enqueued == min(
ext.spans[foo1].enqueued, ext.spans[foo2].enqueued
)

0 comments on commit 22613f6

Please sign in to comment.