Skip to content

Commit 0957ef6

Browse files
ishandhananizixuanzhang226
authored andcommitted
feat: receive kvmetrics from sglang scheduler (#1789)
Co-authored-by: zixuanzhang226 <zixuanzhang@bytedance.com>
1 parent 644a68d commit 0957ef6

File tree

1 file changed

+51
-40
lines changed

1 file changed

+51
-40
lines changed

examples/sglang/components/worker.py

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111

1212
import sglang as sgl
1313
import uvloop
14+
import zmq
1415
from sglang.srt.server_args import ServerArgs
15-
from sglang.srt.utils import get_ip
16+
from sglang.srt.utils import get_ip, get_zmq_socket
1617
from utils.protocol import DisaggPreprocessedRequest
1718
from utils.sgl_utils import parse_sglang_args_inc
1819

@@ -45,6 +46,9 @@ def __init__(
4546
self.component = component
4647
self.metrics_publisher = WorkerMetricsPublisher()
4748

49+
self.zmq_context = zmq.asyncio.Context() # type: ignore
50+
self.receive_metrics_from_scheduler = None
51+
4852
if server_args.disaggregation_mode != "null":
4953
self.bootstrap_host, self.bootstrap_port = self._get_bootstrap_info()
5054
if decode_client is None:
@@ -59,67 +63,74 @@ def __init__(
5963
logging.info("Request handler initialized")
6064

6165
def setup_metrics(self):
62-
"""Set up metrics publisher - call this after handler creation"""
66+
"""Set up metrics publisher"""
67+
self.receive_metrics_from_scheduler = get_zmq_socket(
68+
self.zmq_context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
69+
)
70+
71+
self.init_publish()
72+
asyncio.create_task(self._receive_and_publish_metrics_loop())
73+
74+
task = asyncio.create_task(self.create_metrics_publisher_endpoint())
75+
task.add_done_callback(
76+
lambda _: logging.debug("metrics publisher endpoint created")
77+
)
78+
79+
def init_publish(self):
80+
"""Publish initial set of warmup metrics"""
6381
worker_stats = WorkerStats(
6482
request_active_slots=0,
6583
request_total_slots=1024,
6684
num_requests_waiting=0,
67-
data_parallel_rank=None,
85+
data_parallel_rank=0,
6886
)
6987

7088
kv_stats = KvStats(
7189
kv_active_blocks=0,
7290
kv_total_blocks=1024,
73-
gpu_cache_usage_perc=0.0,
74-
gpu_prefix_cache_hit_rate=0.0,
91+
gpu_cache_usage_perc=0,
92+
gpu_prefix_cache_hit_rate=0,
7593
)
7694

7795
metrics = ForwardPassMetrics(
7896
worker_stats=worker_stats,
7997
kv_stats=kv_stats,
8098
spec_decode_stats=None,
8199
)
100+
82101
self.metrics_publisher.publish(metrics)
83-
task = asyncio.create_task(self.create_metrics_publisher_endpoint())
84-
task.add_done_callback(
85-
lambda _: logging.debug("metrics publisher endpoint created")
86-
)
87102

88103
async def create_metrics_publisher_endpoint(self):
89104
logging.debug("Creating metrics publisher endpoint")
90105
await self.metrics_publisher.create_endpoint(self.component)
91106

92-
def _update_metrics(self):
93-
"""Update metrics with current engine state"""
94-
# TODO: remove this once the following upstream changes are merged:
95-
# • sgl-project/sglang#6721 – "Expose runtime KV-cache & request metrics"
96-
logging.warning(
97-
"Publishing placeholder metrics in SGLangWorker; these are NOT real engine metrics yet and will be replaced once upstream support lands."
98-
)
99-
100-
worker_stats = WorkerStats(
101-
request_active_slots=0,
102-
request_total_slots=1024,
103-
num_requests_waiting=0,
104-
data_parallel_rank=None,
105-
)
106-
107-
kv_stats = KvStats(
108-
kv_active_blocks=random.randint(0, 500),
109-
kv_total_blocks=1000,
110-
gpu_cache_usage_perc=random.uniform(0.1, 0.8),
111-
gpu_prefix_cache_hit_rate=random.uniform(0.0, 0.5),
112-
)
113-
114-
# TODO: get spec_dec_stats from sglang once real engine metrics are available
115-
spec_dec_stats = None
107+
async def _receive_and_publish_metrics_loop(self):
108+
"""Receive metrics from SGL scheduler and publish them"""
109+
while True:
110+
try:
111+
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() # type: ignore
112+
worker_stats = WorkerStats(
113+
request_active_slots=kv_metrics.request_active_slots,
114+
request_total_slots=kv_metrics.request_total_slots,
115+
num_requests_waiting=kv_metrics.num_requests_waiting,
116+
data_parallel_rank=kv_metrics.data_parallel_rank, # Note: 0 means it's either 0 or None from sglang
117+
)
118+
kv_stats = KvStats(
119+
kv_active_blocks=kv_metrics.kv_active_blocks,
120+
kv_total_blocks=kv_metrics.kv_total_blocks,
121+
gpu_cache_usage_perc=kv_metrics.gpu_cache_usage_perc,
122+
gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate,
123+
)
124+
spec_dec_stats = None
125+
metrics = ForwardPassMetrics(
126+
worker_stats=worker_stats,
127+
kv_stats=kv_stats,
128+
spec_decode_stats=spec_dec_stats,
129+
)
116130

117-
metrics = ForwardPassMetrics(
118-
worker_stats=worker_stats,
119-
kv_stats=kv_stats,
120-
spec_decode_stats=spec_dec_stats,
121-
)
122-
self.metrics_publisher.publish(metrics)
131+
self.metrics_publisher.publish(metrics)
132+
except Exception:
133+
logging.exception("Failed to recieve or publish metrics")
123134

124135
def _get_bootstrap_info(self):
125136
"""Bootstrap info from tokenizer manager"""
@@ -332,7 +343,7 @@ async def init(runtime: DistributedRuntime, server_args: ServerArgs):
332343
else:
333344
handler = RequestHandler(engine, server_args, component)
334345

335-
# Set up metrics in background
346+
# Set up the engine metrics reciever
336347
handler.setup_metrics()
337348

338349
# Set up ZMQ kv event publisher

0 commit comments

Comments
 (0)