From f9403dbd0f8ababb8488ead99fa5dd2201a343e3 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 7 Jul 2025 05:51:40 +0000 Subject: [PATCH 01/13] go --- examples/sglang/components/worker.py | 48 +++++++++++++++++----------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 84ae62e213..211a0bef03 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -7,12 +7,13 @@ import signal import socket import sys +import zmq from typing import Any, Dict, Optional, Union import sglang as sgl import uvloop from sglang.srt.server_args import ServerArgs -from sglang.srt.utils import get_ip +from sglang.srt.utils import get_ip, get_zmq_socket from utils.protocol import DisaggPreprocessedRequest from utils.sgl_utils import parse_sglang_args_inc @@ -42,6 +43,9 @@ def __init__( self.component = component self.metrics_publisher = WorkerMetricsPublisher() + self.zmq_context = zmq.asyncio.Context() + self.receive_metrics_from_scheduler = None + if server_args.disaggregation_mode != "null": self.bootstrap_host, self.bootstrap_port = self._get_bootstrap_info() if decode_client is None: @@ -56,7 +60,13 @@ def __init__( logging.info("Request handler initialized") def setup_metrics(self): - """Set up metrics publisher - call this after handler creation""" + """Set up metrics publisher""" + self.receive_metrics_from_scheduler = get_zmq_socket( + self.zmq_context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True + ) + + asyncio.create_task(self._recieve_and_publish_metrics_loop()) + self.metrics_publisher.publish( request_active_slots=0, request_total_slots=1024, @@ -75,22 +85,24 @@ async def create_metrics_publisher_endpoint(self): logging.debug("Creating metrics publisher endpoint") await self.metrics_publisher.create_endpoint(self.component) - def _update_metrics(self): - """Update metrics with current engine state""" - # TODO: remove this once the following upstream changes are merged: - # • sgl-project/sglang#6721 – "Expose runtime KV-cache & request metrics" - logging.warning( - "Publishing placeholder metrics in SGLangWorker; these are NOT real engine metrics yet and will be replaced once upstream support lands." - ) - self.metrics_publisher.publish( - request_active_slots=1, - request_total_slots=100, - kv_active_blocks=random.randint(0, 500), - kv_total_blocks=1000, - num_requests_waiting=0, - gpu_cache_usage_perc=random.uniform(0.1, 0.8), - gpu_prefix_cache_hit_rate=random.uniform(0.0, 0.5), - ) + async def _receive_and_publish_metrics_loop(self): + """Receive metrics from SGL scheduler and publish them""" + while True: + try: + kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() + self.metrics_publisher.publish( + request_active_slots=kv_metrics.request_active_slots, + request_total_slots=kv_metrics.request_total_slots, + kv_active_blocks=kv_metrics.kv_active_blocks, + kv_total_blocks=kv_metrics.kv_total_blocks, + num_requests_waiting=kv_metrics.num_requests_waiting, + gpu_cache_usage_perc=kv_metrics.gpu_cache_usage_perc, + gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate, + data_parallel_rank=getattr(kv_metrics, 'data_parallel_rank', None), + ) + logging.debug(f"Published metrics: {kv_metrics}") + except Exception: + logging.exception("Failed to recieve or publish metrics") def _get_bootstrap_info(self): """Bootstrap info from tokenizer manager""" From 5d628140a7d06d3b6409bb204e1f7d6f3129f02a Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 7 Jul 2025 06:15:57 +0000 Subject: [PATCH 02/13] print --- examples/sglang/components/worker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 211a0bef03..4f3edf63fd 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -65,7 +65,7 @@ def setup_metrics(self): self.zmq_context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True ) - asyncio.create_task(self._recieve_and_publish_metrics_loop()) + asyncio.create_task(self._receive_and_publish_metrics_loop()) self.metrics_publisher.publish( request_active_slots=0, @@ -100,7 +100,6 @@ async def _receive_and_publish_metrics_loop(self): gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate, data_parallel_rank=getattr(kv_metrics, 'data_parallel_rank', None), ) - logging.debug(f"Published metrics: {kv_metrics}") except Exception: logging.exception("Failed to recieve or publish metrics") @@ -307,7 +306,7 @@ async def init(runtime: DistributedRuntime, server_args: ServerArgs): else: handler = RequestHandler(engine, server_args, component) - # Set up metrics in background + # Set up the engine metrics reciever handler.setup_metrics() # Set up ZMQ kv event publisher From a7dd1e7e3a437c74cc1392fbcc7ec7469bfe8ce9 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Sun, 6 Jul 2025 23:19:06 -0700 Subject: [PATCH 03/13] pre --- examples/sglang/components/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 4f3edf63fd..f1b1464769 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -7,11 +7,11 @@ import signal import socket import sys -import zmq from typing import Any, Dict, Optional, Union import sglang as sgl import uvloop +import zmq from sglang.srt.server_args import ServerArgs from sglang.srt.utils import get_ip, get_zmq_socket from utils.protocol import DisaggPreprocessedRequest @@ -98,7 +98,7 @@ async def _receive_and_publish_metrics_loop(self): num_requests_waiting=kv_metrics.num_requests_waiting, gpu_cache_usage_perc=kv_metrics.gpu_cache_usage_perc, gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate, - data_parallel_rank=getattr(kv_metrics, 'data_parallel_rank', None), + data_parallel_rank=getattr(kv_metrics, "data_parallel_rank", None), ) except Exception: logging.exception("Failed to recieve or publish metrics") From 0a743c875e03add0d76728dad3104623099b37ee Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 7 Jul 2025 15:34:17 -0700 Subject: [PATCH 04/13] mypy --- examples/sglang/components/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index f1b1464769..ccb301ea75 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -43,7 +43,7 @@ def __init__( self.component = component self.metrics_publisher = WorkerMetricsPublisher() - self.zmq_context = zmq.asyncio.Context() + self.zmq_context = zmq.asyncio.Context() # type: ignore self.receive_metrics_from_scheduler = None if server_args.disaggregation_mode != "null": @@ -89,7 +89,7 @@ async def _receive_and_publish_metrics_loop(self): """Receive metrics from SGL scheduler and publish them""" while True: try: - kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() + kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() # type: ignore self.metrics_publisher.publish( request_active_slots=kv_metrics.request_active_slots, request_total_slots=kv_metrics.request_total_slots, From 09d36d362357155bc0911565815bf76f19f25192 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 7 Jul 2025 15:36:32 -0700 Subject: [PATCH 05/13] ok --- examples/sglang/components/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index ccb301ea75..2811ef0576 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -43,7 +43,7 @@ def __init__( self.component = component self.metrics_publisher = WorkerMetricsPublisher() - self.zmq_context = zmq.asyncio.Context() # type: ignore + self.zmq_context = zmq.asyncio.Context() # type: ignore self.receive_metrics_from_scheduler = None if server_args.disaggregation_mode != "null": @@ -89,7 +89,7 @@ async def _receive_and_publish_metrics_loop(self): """Receive metrics from SGL scheduler and publish them""" while True: try: - kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() # type: ignore + kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() # type: ignore self.metrics_publisher.publish( request_active_slots=kv_metrics.request_active_slots, request_total_slots=kv_metrics.request_total_slots, From bd38ed082daaa68728e7f9e617f112b4e7096069 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Tue, 8 Jul 2025 04:09:38 +0000 Subject: [PATCH 06/13] go --- examples/sglang/components/worker.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 1657e8bf42..cd1f69239c 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -117,16 +117,6 @@ async def _receive_and_publish_metrics_loop(self): except Exception: logging.exception("Failed to recieve or publish metrics") - # TODO: get spec_dec_stats from sglang once real engine metrics are available - spec_dec_stats = None - - metrics = ForwardPassMetrics( - worker_stats=worker_stats, - kv_stats=kv_stats, - spec_decode_stats=spec_dec_stats, - ) - self.metrics_publisher.publish(metrics) - def _get_bootstrap_info(self): """Bootstrap info from tokenizer manager""" inner_tm = self.engine.tokenizer_manager From 9ad7e63c16b4e718fd21d8ffded5157484a9e58d Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Tue, 8 Jul 2025 04:10:16 +0000 Subject: [PATCH 07/13] go --- examples/sglang/components/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index cd1f69239c..8ca8f82668 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -22,7 +22,6 @@ KvStats, ModelType, WorkerMetricsPublisher, - WorkerStats, ZmqKvEventPublisher, ZmqKvEventPublisherConfig, register_llm, From 66fcbfe870c8714f5edd7432730a3d4620bec0f1 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Tue, 8 Jul 2025 04:16:00 +0000 Subject: [PATCH 08/13] add metrics from bytedance team Co-authored-by: zixuanzhang226 --- examples/sglang/components/worker.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 8ca8f82668..69fe24d0e3 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -18,8 +18,6 @@ from utils.sgl_utils import parse_sglang_args_inc from dynamo.llm import ( - ForwardPassMetrics, - KvStats, ModelType, WorkerMetricsPublisher, ZmqKvEventPublisher, @@ -72,23 +70,12 @@ def setup_metrics(self): self.metrics_publisher.publish( request_active_slots=0, request_total_slots=1024, - num_requests_waiting=0, - data_parallel_rank=None, - ) - - kv_stats = KvStats( kv_active_blocks=0, kv_total_blocks=1024, + num_requests_waiting=0, gpu_cache_usage_perc=0.0, gpu_prefix_cache_hit_rate=0.0, ) - - metrics = ForwardPassMetrics( - worker_stats=worker_stats, - kv_stats=kv_stats, - spec_decode_stats=None, - ) - self.metrics_publisher.publish(metrics) task = asyncio.create_task(self.create_metrics_publisher_endpoint()) task.add_done_callback( lambda _: logging.debug("metrics publisher endpoint created") From cfa647f176c5b3ea9e8d0a946d7002cfa8fdbf08 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 14 Jul 2025 17:51:20 +0000 Subject: [PATCH 09/13] add metrics from bytedance team Co-authored-by: zixuanzhang226 --- examples/sglang/components/worker.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 69fe24d0e3..a0d313ad43 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -18,8 +18,11 @@ from utils.sgl_utils import parse_sglang_args_inc from dynamo.llm import ( + ForwardPassMetrics, + KvStats, ModelType, WorkerMetricsPublisher, + WorkerStats, ZmqKvEventPublisher, ZmqKvEventPublisherConfig, register_llm, @@ -65,17 +68,27 @@ def setup_metrics(self): self.zmq_context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True ) - asyncio.create_task(self._receive_and_publish_metrics_loop()) - - self.metrics_publisher.publish( + # send initial set of warmup metrics + worker_stats = WorkerStats( request_active_slots=0, request_total_slots=1024, + num_requests_waiting=0, + data_parallel_rank=None, + ) + kv_stats = KvStats( kv_active_blocks=0, kv_total_blocks=1024, - num_requests_waiting=0, gpu_cache_usage_perc=0.0, gpu_prefix_cache_hit_rate=0.0, ) + metrics = ForwardPassMetrics( + worker_stats=worker_stats, + kv_stats=kv_stats, + spec_decode_stats=None, + ) + self.metrics_publisher.publish(metrics) + + asyncio.create_task(self._receive_and_publish_metrics_loop()) task = asyncio.create_task(self.create_metrics_publisher_endpoint()) task.add_done_callback( lambda _: logging.debug("metrics publisher endpoint created") From d824025a68c58d7e90b0d80b15025cc0045a2797 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 14 Jul 2025 17:53:44 +0000 Subject: [PATCH 10/13] add metrics from bytedance team Co-authored-by: zixuanzhang226 --- examples/sglang/components/worker.py | 39 +++++++++++----------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index a0d313ad43..ddf375cd36 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -68,27 +68,8 @@ def setup_metrics(self): self.zmq_context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True ) - # send initial set of warmup metrics - worker_stats = WorkerStats( - request_active_slots=0, - request_total_slots=1024, - num_requests_waiting=0, - data_parallel_rank=None, - ) - kv_stats = KvStats( - kv_active_blocks=0, - kv_total_blocks=1024, - gpu_cache_usage_perc=0.0, - gpu_prefix_cache_hit_rate=0.0, - ) - metrics = ForwardPassMetrics( - worker_stats=worker_stats, - kv_stats=kv_stats, - spec_decode_stats=None, - ) - self.metrics_publisher.publish(metrics) - asyncio.create_task(self._receive_and_publish_metrics_loop()) + task = asyncio.create_task(self.create_metrics_publisher_endpoint()) task.add_done_callback( lambda _: logging.debug("metrics publisher endpoint created") @@ -103,16 +84,26 @@ async def _receive_and_publish_metrics_loop(self): while True: try: kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() # type: ignore - self.metrics_publisher.publish( + worker_stats = WorkerStats( request_active_slots=kv_metrics.request_active_slots, request_total_slots=kv_metrics.request_total_slots, + num_requests_waiting=kv_metrics.num_requests_waiting, + data_parallel_rank=kv_metrics.data_parallel_rank, # Note: 0 means it's either 0 or None from sglang + ) + kv_stats = KvStats( kv_active_blocks=kv_metrics.kv_active_blocks, kv_total_blocks=kv_metrics.kv_total_blocks, - num_requests_waiting=kv_metrics.num_requests_waiting, gpu_cache_usage_perc=kv_metrics.gpu_cache_usage_perc, gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate, - data_parallel_rank=getattr(kv_metrics, "data_parallel_rank", None), ) + spec_dec_stats = None + metrics = ForwardPassMetrics( + worker_stats=worker_stats, + kv_stats=kv_stats, + spec_decode_stats=spec_dec_stats, + ) + + self.metrics_publisher.publish(metrics) except Exception: logging.exception("Failed to recieve or publish metrics") @@ -347,4 +338,4 @@ async def init(runtime: DistributedRuntime, server_args: ServerArgs): if __name__ == "__main__": uvloop.install() - asyncio.run(worker()) + asyncio.run(worker()) \ No newline at end of file From d3430d25eeb596cd0ffc20dc788ae63e7399a5f9 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 14 Jul 2025 17:56:39 +0000 Subject: [PATCH 11/13] bump --- examples/sglang/components/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index ddf375cd36..52da77af6a 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -338,4 +338,4 @@ async def init(runtime: DistributedRuntime, server_args: ServerArgs): if __name__ == "__main__": uvloop.install() - asyncio.run(worker()) \ No newline at end of file + asyncio.run(worker()) From 001b8eb45dae6bfb6805b65e99ab447086626fcd Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 14 Jul 2025 18:23:51 +0000 Subject: [PATCH 12/13] add metrics from bytedance team Co-authored-by: zixuanzhang226 --- examples/sglang/components/worker.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 52da77af6a..c83dad2bcb 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -68,6 +68,7 @@ def setup_metrics(self): self.zmq_context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True ) + self.init_publish() asyncio.create_task(self._receive_and_publish_metrics_loop()) task = asyncio.create_task(self.create_metrics_publisher_endpoint()) @@ -75,6 +76,30 @@ def setup_metrics(self): lambda _: logging.debug("metrics publisher endpoint created") ) + def init_publish(self): + """Publish initial set of warmup metrics""" + worker_stats = WorkerStats( + request_active_slots=0, + request_total_slots=self.request_total_slots, + num_requests_waiting=0, + data_parallel_rank=self.dp_rank, + ) + + kv_stats = KvStats( + kv_active_blocks=0, + kv_total_blocks=self.num_gpu_block, + gpu_cache_usage_perc=0, + gpu_prefix_cache_hit_rate=0, + ) + + metrics = ForwardPassMetrics( + worker_stats=worker_stats, + kv_stats=kv_stats, + spec_decode_stats=None, + ) + + self.metrics_publisher.publish(metrics) + async def create_metrics_publisher_endpoint(self): logging.debug("Creating metrics publisher endpoint") await self.metrics_publisher.create_endpoint(self.component) From 8f300d1cc94068686d5cb8583abd2e2af9683ff6 Mon Sep 17 00:00:00 2001 From: ishandhanani Date: Mon, 14 Jul 2025 18:52:25 +0000 Subject: [PATCH 13/13] add metrics from bytedance team Co-authored-by: zixuanzhang226 --- examples/sglang/components/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index c83dad2bcb..599a65070a 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -80,14 +80,14 @@ def init_publish(self): """Publish initial set of warmup metrics""" worker_stats = WorkerStats( request_active_slots=0, - request_total_slots=self.request_total_slots, + request_total_slots=1024, num_requests_waiting=0, - data_parallel_rank=self.dp_rank, + data_parallel_rank=0, ) kv_stats = KvStats( kv_active_blocks=0, - kv_total_blocks=self.num_gpu_block, + kv_total_blocks=1024, gpu_cache_usage_perc=0, gpu_prefix_cache_hit_rate=0, )