Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion examples/sglang/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
from typing import Dict, Union

import sglang as sgl
import zmq
from components.decode_worker import SGLangDecodeWorker
from sglang.srt.utils import get_ip
from sglang.srt.utils import get_ip, get_zmq_socket
from utils.protocol import DisaggPreprocessedRequest, PreprocessedRequest
from utils.sgl_utils import parse_sglang_args

Expand Down Expand Up @@ -90,6 +91,11 @@ async def create_metrics_publisher_endpoint(self):

@async_on_start
async def async_init(self):
context = zmq.asyncio.Context()
self.receive_metrics_from_scheduler = get_zmq_socket(
context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
)
asyncio.create_task(self._receive_and_publish_metrics_loop())
Comment on lines +94 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Background task is never tracked or cancelled

asyncio.create_task() is invoked but the returned task handle is discarded.
Untracked tasks hide exceptions and can leak on shutdown.

-asyncio.create_task(self._receive_and_publish_metrics_loop())
+self._metrics_task = asyncio.create_task(
+    self._receive_and_publish_metrics_loop()
+)

Add a shutdown hook (__del__ or framework-specific) to cancel() the task and await it.


🏁 Script executed:

#!/bin/bash
# Verify untracked asyncio.create_task usage in worker.py
rg -n "asyncio.create_task" -C3 examples/sglang/components/worker.py

Length of output: 795


🏁 Script executed:

#!/bin/bash
# Locate class definition and constructor
rg -n "class SGLangWorker" -C2 examples/sglang/components/worker.py
rg -n "def __init__" -C3 examples/sglang/components/worker.py

# Check for destructor or shutdown/cleanup hooks
rg -n "__del__" -C2 examples/sglang/components/worker.py
rg -n "shutdown" -C2 examples/sglang/components/worker.py
rg -n "async def close" -C2 examples/sglang/components/worker.py
rg -n "def stop" -C2 examples/sglang/components/worker.py

Length of output: 759


Track and cancel background tasks on shutdown

Untracked calls to asyncio.create_task() (see lines 70 and 166 in examples/sglang/components/worker.py) will swallow exceptions and leak on process teardown. Capture each task handle and implement a cleanup hook to cancel and await them.

• File: examples/sglang/components/worker.py
– Line 70: metrics loop task
– Line 166: prefill generator task

Suggested diff:

--- a/examples/sglang/components/worker.py
+++ b/examples/sglang/components/worker.py
@@ -67,3 +67,6 @@
         context = zmq.asyncio.Context()
         self.receive_metrics_from_scheduler = get_zmq_socket(
             context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
         )
-        asyncio.create_task(self._receive_and_publish_metrics_loop())
+        self._metrics_task = asyncio.create_task(
+            self._receive_and_publish_metrics_loop()
+        )

@@ -164,3 +167,5 @@
             )
-            prefill_task = asyncio.create_task(self._prefill_generator(prefill))
+            self._prefill_task = asyncio.create_task(
+                self._prefill_generator(prefill)
+            )

+    async def close(self):
+        for t in (getattr(self, "_metrics_task", None),
+                  getattr(self, "_prefill_task", None)):
+            if t:
+                t.cancel()
+                with contextlib.suppress(asyncio.CancelledError):
+                    await t

Ensure your application/framework invokes await worker.close() (or similar) on shutdown to properly cancel background loops.

🤖 Prompt for AI Agents
In examples/sglang/components/worker.py around lines 66 to 70, the
asyncio.create_task call that starts the metrics loop is not tracked, causing
potential unhandled exceptions and resource leaks on shutdown. Fix this by
storing the returned task handle as an instance variable, then implement a
cleanup method (e.g., async close or __del__) that cancels and awaits this task.
Also ensure the application calls this cleanup method on shutdown to properly
stop background tasks.

runtime = dynamo_context["runtime"]
comp_ns, comp_name = SGLangWorker.dynamo_address() # type: ignore
endpoint = runtime.namespace(comp_ns).component(comp_name).endpoint("generate")
Expand Down Expand Up @@ -141,6 +147,24 @@ async def async_init(self):
config=zmq_config,
)

async def _receive_and_publish_metrics_loop(self):
while True:
try:
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj()
self.metrics_publisher.publish(
kv_metrics.request_active_slots,
kv_metrics.request_total_slots,
kv_metrics.kv_active_blocks,
kv_metrics.kv_total_blocks,
kv_metrics.num_requests_waiting,
kv_metrics.gpu_cache_usage_perc,
kv_metrics.gpu_prefix_cache_hit_rate,
kv_metrics.data_parallel_rank,
)

except Exception:
logger.exception("Failed to receive or publish metrics")

def _get_bootstrap_info(self):
"""
Bootstrap info is stored in the worker's tokenizer manager. We use it to
Expand Down
Loading