3030import socket
3131
3232import sglang as sgl
33+ import zmq
3334from components .decode_worker import SGLangDecodeWorker
34- from sglang .srt .utils import get_ip
35+ from sglang .srt .utils import get_ip , get_zmq_socket
3536from utils .protocol import DisaggPreprocessedRequest , PreprocessedRequest
3637from utils .sglang import parse_sglang_args
3738
38- from dynamo .llm import ModelType , register_llm
39+ from dynamo .llm import ModelType , WorkerMetricsPublisher , register_llm
3940from dynamo .sdk import async_on_start , depends , dynamo_context , endpoint , service
4041
4142logger = logging .getLogger (__name__ )
@@ -55,11 +56,17 @@ def __init__(self):
5556 class_name = self .__class__ .__name__
5657 self .engine_args = parse_sglang_args (class_name , "" )
5758 self .engine = sgl .Engine (server_args = self .engine_args )
59+ self .metrics_publisher = WorkerMetricsPublisher ()
5860
5961 logger .info ("SGLangWorker initialized" )
6062
6163 @async_on_start
6264 async def async_init (self ):
65+ context = zmq .asyncio .Context ()
66+ self .receive_metrics_from_scheduler = get_zmq_socket (
67+ context , zmq .PULL , self .engine .port_args .metrics_ipc_name , True
68+ )
69+ asyncio .create_task (self ._receive_and_publish_metrics_loop ())
6370 runtime = dynamo_context ["runtime" ]
6471 logger .info ("Registering LLM for discovery" )
6572 comp_ns , comp_name = SGLangWorker .dynamo_address () # type: ignore
@@ -80,6 +87,24 @@ async def async_init(self):
8087 .client ()
8188 )
8289
90+ async def _receive_and_publish_metrics_loop (self ):
91+ while True :
92+ try :
93+ kv_metrics = await self .receive_metrics_from_scheduler .recv_pyobj ()
94+ self .metrics_publisher .publish (
95+ kv_metrics .request_active_slots ,
96+ kv_metrics .request_total_slots ,
97+ kv_metrics .kv_active_blocks ,
98+ kv_metrics .kv_total_blocks ,
99+ kv_metrics .num_requests_waiting ,
100+ kv_metrics .gpu_cache_usage_perc ,
101+ kv_metrics .gpu_prefix_cache_hit_rate ,
102+ kv_metrics .data_parallel_rank ,
103+ )
104+
105+ except Exception as e :
106+ logger .exception ("Failed to receive or publish metrics" )
107+
83108 def _get_bootstrap_info (self ):
84109 """
85110 Bootstrap info is stored in the worker's tokenizer manager. We use it to
0 commit comments