From 9f3a67c3f0521d12d77e3c9ec9566a43e025e513 Mon Sep 17 00:00:00 2001 From: Zixuan Zhang Date: Fri, 11 Jul 2025 01:45:46 +0000 Subject: [PATCH] feat: update metrics receiving part --- examples/sglang/components/worker.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 69fe24d0e3..52da77af6a 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -18,8 +18,11 @@ from utils.sgl_utils import parse_sglang_args_inc from dynamo.llm import ( + ForwardPassMetrics, + KvStats, ModelType, WorkerMetricsPublisher, + WorkerStats, ZmqKvEventPublisher, ZmqKvEventPublisherConfig, register_llm, @@ -67,15 +70,6 @@ def setup_metrics(self): asyncio.create_task(self._receive_and_publish_metrics_loop()) - self.metrics_publisher.publish( - request_active_slots=0, - request_total_slots=1024, - kv_active_blocks=0, - kv_total_blocks=1024, - num_requests_waiting=0, - gpu_cache_usage_perc=0.0, - gpu_prefix_cache_hit_rate=0.0, - ) task = asyncio.create_task(self.create_metrics_publisher_endpoint()) task.add_done_callback( lambda _: logging.debug("metrics publisher endpoint created") @@ -90,16 +84,26 @@ async def _receive_and_publish_metrics_loop(self): while True: try: kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() # type: ignore - self.metrics_publisher.publish( + worker_stats = WorkerStats( request_active_slots=kv_metrics.request_active_slots, request_total_slots=kv_metrics.request_total_slots, + num_requests_waiting=kv_metrics.num_requests_waiting, + data_parallel_rank=kv_metrics.data_parallel_rank, # Note: 0 means it's either 0 or None from sglang + ) + kv_stats = KvStats( kv_active_blocks=kv_metrics.kv_active_blocks, kv_total_blocks=kv_metrics.kv_total_blocks, - num_requests_waiting=kv_metrics.num_requests_waiting, gpu_cache_usage_perc=kv_metrics.gpu_cache_usage_perc, gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate, - data_parallel_rank=getattr(kv_metrics, "data_parallel_rank", None), ) + spec_dec_stats = None + metrics = ForwardPassMetrics( + worker_stats=worker_stats, + kv_stats=kv_stats, + spec_decode_stats=spec_dec_stats, + ) + + self.metrics_publisher.publish(metrics) except Exception: logging.exception("Failed to recieve or publish metrics")