Skip to content

Commit

Permalink
Merge branch 'main' into flwr_enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster authored Oct 8, 2024
2 parents 38814d6 + 39da4f0 commit d4a9c61
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 391 deletions.
32 changes: 17 additions & 15 deletions nvflare/fuel/f3/streaming/byte_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,22 +309,22 @@ def close(self):


class ByteReceiver:

received_stream_counter_pool = StatsPoolManager.add_counter_pool(
name="Received_Stream_Counters",
description="Counters of received streams",
counter_names=[COUNTER_NAME_RECEIVED],
)

received_stream_size_pool = StatsPoolManager.add_msg_size_pool(
"Received_Stream_Sizes", "Sizes of streams received (MBs)"
)

def __init__(self, cell: CoreCell):
self.cell = cell
self.cell.register_request_cb(channel=STREAM_CHANNEL, topic=STREAM_DATA_TOPIC, cb=self._data_handler)
self.registry = Registry()

self.received_stream_counter_pool = StatsPoolManager.add_counter_pool(
name="Received_Stream_Counters",
description="Counters of received streams",
counter_names=[COUNTER_NAME_RECEIVED],
scope=self.cell.my_info.fqcn,
)

self.received_stream_size_pool = StatsPoolManager.add_msg_size_pool(
"Received_Stream_Sizes", "Sizes of streams received (MBs)", scope=self.cell.my_info.fqcn
)

def register_callback(self, channel: str, topic: str, stream_cb: Callable, *args, **kwargs):
if not callable(stream_cb):
raise StreamError(f"specified stream_cb {type(stream_cb)} is not callable")
Expand All @@ -345,13 +345,15 @@ def _data_handler(self, message: Message):
task.stop(StreamError(f"{task} No callback is registered for {task.channel}/{task.topic}"))
return

self.received_stream_counter_pool.increment(
category=stream_stats_category(task.channel, task.topic, "stream"),
fqcn = self.cell.my_info.fqcn
ByteReceiver.received_stream_counter_pool.increment(
category=stream_stats_category(fqcn, task.channel, task.topic, "stream"),
counter_name=COUNTER_NAME_RECEIVED,
)

self.received_stream_size_pool.record_value(
category=stream_stats_category(task.channel, task.topic, "stream"), value=task.size / ONE_MB
ByteReceiver.received_stream_size_pool.record_value(
category=stream_stats_category(fqcn, task.channel, task.topic, "stream"),
value=task.size / ONE_MB,
)

stream_thread_pool.submit(self._callback_wrapper, task, callback)
Expand Down
Loading

0 comments on commit d4a9c61

Please sign in to comment.