From 76230db5eef3d8b0cbd73db376afa45e41862aa7 Mon Sep 17 00:00:00 2001 From: Zixuan Zhang Date: Mon, 30 Jun 2025 21:53:04 +0000 Subject: [PATCH] feat: receive kvmetrics from sglang scheduler --- examples/sglang/components/worker.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/examples/sglang/components/worker.py b/examples/sglang/components/worker.py index 43006c7abd..b05a4e28f5 100644 --- a/examples/sglang/components/worker.py +++ b/examples/sglang/components/worker.py @@ -31,8 +31,9 @@ from typing import Dict, Union import sglang as sgl +import zmq from components.decode_worker import SGLangDecodeWorker -from sglang.srt.utils import get_ip +from sglang.srt.utils import get_ip, get_zmq_socket from utils.protocol import DisaggPreprocessedRequest, PreprocessedRequest from utils.sgl_utils import parse_sglang_args @@ -90,6 +91,11 @@ async def create_metrics_publisher_endpoint(self): @async_on_start async def async_init(self): + context = zmq.asyncio.Context() + self.receive_metrics_from_scheduler = get_zmq_socket( + context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True + ) + asyncio.create_task(self._receive_and_publish_metrics_loop()) runtime = dynamo_context["runtime"] comp_ns, comp_name = SGLangWorker.dynamo_address() # type: ignore endpoint = runtime.namespace(comp_ns).component(comp_name).endpoint("generate") @@ -141,6 +147,24 @@ async def async_init(self): config=zmq_config, ) + async def _receive_and_publish_metrics_loop(self): + while True: + try: + kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() + self.metrics_publisher.publish( + kv_metrics.request_active_slots, + kv_metrics.request_total_slots, + kv_metrics.kv_active_blocks, + kv_metrics.kv_total_blocks, + kv_metrics.num_requests_waiting, + kv_metrics.gpu_cache_usage_perc, + kv_metrics.gpu_prefix_cache_hit_rate, + kv_metrics.data_parallel_rank, + ) + + except Exception: + logger.exception("Failed to receive or publish metrics") + def _get_bootstrap_info(self): """ Bootstrap info is stored in the worker's tokenizer manager. We use it to