Skip to content
91 changes: 51 additions & 40 deletions examples/sglang/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@

import sglang as sgl
import uvloop
import zmq
from sglang.srt.server_args import ServerArgs
from sglang.srt.utils import get_ip
from sglang.srt.utils import get_ip, get_zmq_socket
from utils.protocol import DisaggPreprocessedRequest
from utils.sgl_utils import parse_sglang_args_inc

Expand Down Expand Up @@ -45,6 +46,9 @@ def __init__(
self.component = component
self.metrics_publisher = WorkerMetricsPublisher()

self.zmq_context = zmq.asyncio.Context() # type: ignore
self.receive_metrics_from_scheduler = None

if server_args.disaggregation_mode != "null":
self.bootstrap_host, self.bootstrap_port = self._get_bootstrap_info()
if decode_client is None:
Expand All @@ -59,67 +63,74 @@ def __init__(
logging.info("Request handler initialized")

def setup_metrics(self):
"""Set up metrics publisher - call this after handler creation"""
"""Set up metrics publisher"""
self.receive_metrics_from_scheduler = get_zmq_socket(
self.zmq_context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
)

self.init_publish()
asyncio.create_task(self._receive_and_publish_metrics_loop())

task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback(
lambda _: logging.debug("metrics publisher endpoint created")
)

def init_publish(self):
"""Publish initial set of warmup metrics"""
worker_stats = WorkerStats(
request_active_slots=0,
request_total_slots=1024,
num_requests_waiting=0,
data_parallel_rank=None,
data_parallel_rank=0,
)

kv_stats = KvStats(
kv_active_blocks=0,
kv_total_blocks=1024,
gpu_cache_usage_perc=0.0,
gpu_prefix_cache_hit_rate=0.0,
gpu_cache_usage_perc=0,
gpu_prefix_cache_hit_rate=0,
)

metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)

self.metrics_publisher.publish(metrics)
task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback(
lambda _: logging.debug("metrics publisher endpoint created")
)

async def create_metrics_publisher_endpoint(self):
logging.debug("Creating metrics publisher endpoint")
await self.metrics_publisher.create_endpoint(self.component)

def _update_metrics(self):
"""Update metrics with current engine state"""
# TODO: remove this once the following upstream changes are merged:
# • sgl-project/sglang#6721 – "Expose runtime KV-cache & request metrics"
logging.warning(
"Publishing placeholder metrics in SGLangWorker; these are NOT real engine metrics yet and will be replaced once upstream support lands."
)

worker_stats = WorkerStats(
request_active_slots=0,
request_total_slots=1024,
num_requests_waiting=0,
data_parallel_rank=None,
)

kv_stats = KvStats(
kv_active_blocks=random.randint(0, 500),
kv_total_blocks=1000,
gpu_cache_usage_perc=random.uniform(0.1, 0.8),
gpu_prefix_cache_hit_rate=random.uniform(0.0, 0.5),
)

# TODO: get spec_dec_stats from sglang once real engine metrics are available
spec_dec_stats = None
async def _receive_and_publish_metrics_loop(self):
"""Receive metrics from SGL scheduler and publish them"""
while True:
try:
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() # type: ignore
worker_stats = WorkerStats(
request_active_slots=kv_metrics.request_active_slots,
request_total_slots=kv_metrics.request_total_slots,
num_requests_waiting=kv_metrics.num_requests_waiting,
data_parallel_rank=kv_metrics.data_parallel_rank, # Note: 0 means it's either 0 or None from sglang
)
kv_stats = KvStats(
kv_active_blocks=kv_metrics.kv_active_blocks,
kv_total_blocks=kv_metrics.kv_total_blocks,
gpu_cache_usage_perc=kv_metrics.gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate,
)
spec_dec_stats = None
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_dec_stats,
)

metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_dec_stats,
)
self.metrics_publisher.publish(metrics)
self.metrics_publisher.publish(metrics)
except Exception:
logging.exception("Failed to recieve or publish metrics")

def _get_bootstrap_info(self):
"""Bootstrap info from tokenizer manager"""
Expand Down Expand Up @@ -332,7 +343,7 @@ async def init(runtime: DistributedRuntime, server_args: ServerArgs):
else:
handler = RequestHandler(engine, server_args, component)

# Set up metrics in background
# Set up the engine metrics reciever
handler.setup_metrics()

# Set up ZMQ kv event publisher
Expand Down
Loading