Skip to content

Commit

Permalink
Add dashboard component for size of open data transfers (#6982)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Sep 9, 2022
1 parent 77aeecb commit 1fd07f0
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 4 deletions.
124 changes: 120 additions & 4 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,11 @@ def __init__(self, scheduler, width=600, **kwargs):
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
self.root.toolbar_location = None

tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
self.root.add_tools(tap)

self.root.toolbar_location = None
self.root.yaxis.visible = False

hover = HoverTool(
point_policy="follow_mouse",
tooltips="""
Expand Down Expand Up @@ -632,6 +630,119 @@ def update(self):
update(self.source, d)


class WorkersTransferBytes(DashboardComponent):
"""Size of open data transfers from/to other workers per worker"""

@log_errors
def __init__(self, scheduler, width=600, **kwargs):
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"escaped_worker": [],
"transfer_incoming_bytes": [],
"transfer_outgoing_bytes": [],
"worker": [],
"y_incoming": [],
"y_outgoing": [],
}
)

self.root = figure(
title=f"Bytes transferring: {format_bytes(0)}",
tools="",
id="bk-workers-transfer-bytes-plot",
width=int(width / 2),
name="workers_transfer_bytes",
min_border_bottom=50,
**kwargs,
)

# transfer_incoming_bytes
self.root.hbar(
name="transfer_incoming_bytes",
y="y_incoming",
right="transfer_incoming_bytes",
line_color=None,
left=0,
height=0.5,
fill_color="red",
source=self.source,
)

# transfer_outgoing_bytes
self.root.hbar(
name="transfer_outgoing_bytes",
y="y_outgoing",
right="transfer_outgoing_bytes",
line_color=None,
left=0,
height=0.5,
fill_color="blue",
source=self.source,
)

self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
self.root.xaxis.minor_tick_line_alpha = 0
self.root.x_range = Range1d(start=0)
self.root.yaxis.visible = False
self.root.ygrid.visible = False
self.root.toolbar_location = None

tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
hover = HoverTool(
tooltips=[
("Worker", "@worker"),
("Incoming", "@transfer_incoming_bytes{0.00 b}"),
("Outgoing", "@transfer_outgoing_bytes{0.00 b}"),
],
point_policy="follow_mouse",
)
self.root.add_tools(hover, tap)

@without_property_validation
@log_errors
def update(self):
wss = self.scheduler.workers.values()

h = 0.1
y_incoming = [i + 0.75 + i * h for i in range(len(wss))]
y_outgoing = [i + 0.25 + i * h for i in range(len(wss))]

transfer_incoming_bytes = [
ws.metrics["transfer"]["incoming_bytes"] for ws in wss
]
transfer_outgoing_bytes = [
ws.metrics["transfer"]["outgoing_bytes"] for ws in wss
]
workers = [ws.address for ws in wss]
escaped_workers = [escape.url_escape(worker) for worker in workers]

if wss:
x_limit = max(
max(transfer_incoming_bytes),
max(transfer_outgoing_bytes),
max(ws.memory_limit for ws in wss),
)
else:
x_limit = 0
self.root.x_range.end = x_limit

result = {
"escaped_worker": escaped_workers,
"transfer_incoming_bytes": transfer_incoming_bytes,
"transfer_outgoing_bytes": transfer_outgoing_bytes,
"worker": workers,
"y_incoming": y_incoming,
"y_outgoing": y_outgoing,
}
self.root.title.text = (
f"Bytes transferring: {format_bytes(sum(transfer_incoming_bytes))}"
)
update(self.source, result)


class Hardware(DashboardComponent):
"""Occupancy (in time) per worker"""

Expand Down Expand Up @@ -4120,6 +4231,7 @@ def status_doc(scheduler, extra, doc):

if len(scheduler.workers) <= 100:
workers_memory = WorkersMemory(scheduler, sizing_mode="stretch_both")

processing = CurrentLoad(scheduler, sizing_mode="stretch_both")

processing_root = processing.processing_figure
Expand All @@ -4131,16 +4243,19 @@ def status_doc(scheduler, extra, doc):

current_load = CurrentLoad(scheduler, sizing_mode="stretch_both")
occupancy = Occupancy(scheduler, sizing_mode="stretch_both")
workers_transfer_bytes = WorkersTransferBytes(scheduler, sizing_mode="stretch_both")

cpu_root = current_load.cpu_figure
occupancy_root = occupancy.root

workers_memory.update()
workers_transfer_bytes.update()
processing.update()
current_load.update()
occupancy.update()

add_periodic_callback(doc, workers_memory, 100)
add_periodic_callback(doc, workers_transfer_bytes, 100)
add_periodic_callback(doc, processing, 100)
add_periodic_callback(doc, current_load, 100)
add_periodic_callback(doc, occupancy, 100)
Expand All @@ -4150,8 +4265,9 @@ def status_doc(scheduler, extra, doc):
tab1 = Panel(child=processing_root, title="Processing")
tab2 = Panel(child=cpu_root, title="CPU")
tab3 = Panel(child=occupancy_root, title="Occupancy")
tab4 = Panel(child=workers_transfer_bytes.root, title="Data Transfer")

proc_tabs = Tabs(tabs=[tab1, tab2, tab3], name="processing_tabs")
proc_tabs = Tabs(tabs=[tab1, tab2, tab3, tab4], name="processing_tabs")
doc.add_root(proc_tabs)

task_stream = TaskStream(
Expand Down
2 changes: 2 additions & 0 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
TaskStream,
WorkerNetworkBandwidth,
WorkersMemory,
WorkersTransferBytes,
WorkerTable,
events_doc,
exceptions_doc,
Expand Down Expand Up @@ -77,6 +78,7 @@
"/individual-group-progress": individual_doc(TaskGroupProgress, 200),
"/individual-workers-memory": individual_doc(WorkersMemory, 100),
"/individual-cluster-memory": individual_doc(ClusterMemory, 100),
"/individual-workers-transfer-bytes": individual_doc(WorkersTransferBytes, 100),
"/individual-cpu": individual_doc(CurrentLoad, 100, fig_attr="cpu_figure"),
"/individual-nprocessing": individual_doc(
CurrentLoad, 100, fig_attr="processing_figure"
Expand Down

0 comments on commit 1fd07f0

Please sign in to comment.