Skip to content

Commit 7098a22

Browse files
feat: receive kvmetrics from sglang scheduler
1 parent e8e728b commit 7098a22

File tree

1 file changed

+27
-2
lines changed

1 file changed

+27
-2
lines changed

examples/sglang/components/worker.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,16 @@
2828
import logging
2929
import random
3030
import socket
31+
import zmq
32+
import asyncio
3133

3234
import sglang as sgl
3335
from components.decode_worker import SGLangDecodeWorker
34-
from sglang.srt.utils import get_ip
36+
from sglang.srt.utils import get_ip, get_zmq_socket
3537
from utils.protocol import DisaggPreprocessedRequest, PreprocessedRequest
3638
from utils.sglang import parse_sglang_args
3739

38-
from dynamo.llm import ModelType, register_llm
40+
from dynamo.llm import ModelType, register_llm, WorkerMetricsPublisher
3941
from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service
4042

4143
logger = logging.getLogger(__name__)
@@ -55,11 +57,17 @@ def __init__(self):
5557
class_name = self.__class__.__name__
5658
self.engine_args = parse_sglang_args(class_name, "")
5759
self.engine = sgl.Engine(server_args=self.engine_args)
60+
self.metrics_publisher = WorkerMetricsPublisher()
5861

5962
logger.info("SGLangWorker initialized")
6063

6164
@async_on_start
6265
async def async_init(self):
66+
context = zmq.asyncio.Context()
67+
self.receive_metrics_from_scheduler = get_zmq_socket(
68+
context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
69+
)
70+
asyncio.create_task(self._receive_and_publish_metrics_loop())
6371
runtime = dynamo_context["runtime"]
6472
logger.info("Registering LLM for discovery")
6573
comp_ns, comp_name = SGLangWorker.dynamo_address() # type: ignore
@@ -80,6 +88,23 @@ async def async_init(self):
8088
.client()
8189
)
8290

91+
async def _receive_and_publish_metrics_loop(self):
92+
while True:
93+
try:
94+
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj()
95+
self.metrics_publisher.publish(
96+
kv_metrics.request_active_slots,
97+
kv_metrics.request_total_slots,
98+
kv_metrics.kv_active_blocks,
99+
kv_metrics.kv_total_blocks,
100+
kv_metrics.num_requests_waiting,
101+
kv_metrics.gpu_cache_usage_perc,
102+
kv_metrics.gpu_prefix_cache_hit_rate,
103+
kv_metrics.data_parallel_rank)
104+
105+
except Exception as e:
106+
logger.exception("Failed to receive or publish metrics")
107+
83108
def _get_bootstrap_info(self):
84109
"""
85110
Bootstrap info is stored in the worker's tokenizer manager. We use it to

0 commit comments

Comments
 (0)