Skip to content

Commit 5db6b2c

Browse files
authored
[V1][BugFix] Fix remaining sync engine client shutdown errors/hangs (#13869)
Signed-off-by: Nick Hill <nhill@redhat.com>
1 parent 6247bae commit 5db6b2c

File tree

3 files changed

+68
-40
lines changed

3 files changed

+68
-40
lines changed

tests/v1/engine/test_llm_engine.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
def _vllm_model(apc: bool, vllm_runner, monkeypatch):
1616
"""Set up VllmRunner instance."""
1717
monkeypatch.setenv("VLLM_USE_V1", "1")
18-
# TODO(nick): Single-proc to work around a ZMQ shutdown hang for now.
19-
monkeypatch.setenv("VLLM_ENABLE_V1_MULTIPROCESSING", "0")
2018
return vllm_runner(
2119
MODEL,
2220
dtype=DTYPE,

vllm/utils.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,10 @@ def get_open_zmq_ipc_path() -> str:
500500
return f"ipc://{base_rpc_path}/{uuid4()}"
501501

502502

503+
def get_open_zmq_inproc_path() -> str:
504+
return f"inproc://{uuid4()}"
505+
506+
503507
def get_open_port() -> int:
504508
"""
505509
Get an open port for the vLLM process to listen on.
@@ -2108,12 +2112,12 @@ def get_exception_traceback():
21082112
def make_zmq_socket(
21092113
ctx: Union[zmq.asyncio.Context, zmq.Context], # type: ignore[name-defined]
21102114
path: str,
2111-
type: Any,
2115+
socket_type: Any,
21122116
) -> Union[zmq.Socket, zmq.asyncio.Socket]: # type: ignore[name-defined]
21132117
"""Make a ZMQ socket with the proper bind/connect semantics."""
21142118

21152119
mem = psutil.virtual_memory()
2116-
socket = ctx.socket(type)
2120+
socket = ctx.socket(socket_type)
21172121

21182122
# Calculate buffer size based on system memory
21192123
total_mem = mem.total / 1024**3
@@ -2127,29 +2131,27 @@ def make_zmq_socket(
21272131
else:
21282132
buf_size = -1 # Use system default buffer size
21292133

2130-
if type == zmq.constants.PULL:
2134+
if socket_type == zmq.constants.PULL:
21312135
socket.setsockopt(zmq.constants.RCVHWM, 0)
21322136
socket.setsockopt(zmq.constants.RCVBUF, buf_size)
21332137
socket.connect(path)
2134-
elif type == zmq.constants.PUSH:
2138+
elif socket_type == zmq.constants.PUSH:
21352139
socket.setsockopt(zmq.constants.SNDHWM, 0)
21362140
socket.setsockopt(zmq.constants.SNDBUF, buf_size)
21372141
socket.bind(path)
21382142
else:
2139-
raise ValueError(f"Unknown Socket Type: {type}")
2143+
raise ValueError(f"Unknown Socket Type: {socket_type}")
21402144

21412145
return socket
21422146

21432147

21442148
@contextlib.contextmanager
2145-
def zmq_socket_ctx(
2146-
path: str,
2147-
type: Any) -> Iterator[zmq.Socket]: # type: ignore[name-defined]
2149+
def zmq_socket_ctx(path: str, socket_type: Any) -> Iterator[zmq.Socket]:
21482150
"""Context manager for a ZMQ socket"""
21492151

2150-
ctx = zmq.Context(io_threads=2) # type: ignore[attr-defined]
2152+
ctx = zmq.Context() # type: ignore[attr-defined]
21512153
try:
2152-
yield make_zmq_socket(ctx, path, type)
2154+
yield make_zmq_socket(ctx, path, socket_type)
21532155

21542156
except KeyboardInterrupt:
21552157
logger.debug("Got Keyboard Interrupt.")

vllm/v1/engine/core_client.py

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
from vllm.config import VllmConfig
1919
from vllm.logger import init_logger
2020
from vllm.lora.request import LoRARequest
21-
from vllm.utils import (get_open_zmq_ipc_path, kill_process_tree,
22-
make_zmq_socket)
21+
from vllm.utils import (get_open_zmq_inproc_path, get_open_zmq_ipc_path,
22+
kill_process_tree, make_zmq_socket)
2323
from vllm.v1.engine import (EngineCoreOutputs, EngineCoreRequest,
2424
EngineCoreRequestType, UtilityOutput)
2525
from vllm.v1.engine.core import EngineCore, EngineCoreProc
@@ -202,10 +202,11 @@ class BackgroundResources:
202202
"""Used as a finalizer for clean shutdown, avoiding
203203
circular reference back to the client object."""
204204

205-
ctx: Union[zmq.Context, zmq.asyncio.Context] = None
205+
ctx: Union[zmq.Context] = None
206206
output_socket: Union[zmq.Socket, zmq.asyncio.Socket] = None
207207
input_socket: Union[zmq.Socket, zmq.asyncio.Socket] = None
208208
proc_handle: Optional[BackgroundProcHandle] = None
209+
shutdown_path: Optional[str] = None
209210

210211
def __call__(self):
211212
"""Clean up background resources."""
@@ -218,8 +219,13 @@ def __call__(self):
218219
self.output_socket.close(linger=0)
219220
if self.input_socket is not None:
220221
self.input_socket.close(linger=0)
221-
if self.ctx is not None:
222-
self.ctx.destroy(linger=0)
222+
if self.shutdown_path is not None:
223+
# We must ensure that the sync output socket is
224+
# closed cleanly in its own thread.
225+
with self.ctx.socket(zmq.PAIR) as shutdown_sender:
226+
shutdown_sender.connect(self.shutdown_path)
227+
# Send shutdown signal.
228+
shutdown_sender.send(b'')
223229

224230

225231
class MPClient(EngineCoreClient):
@@ -261,28 +267,23 @@ def sigusr1_handler(signum, frame):
261267
self.decoder = MsgpackDecoder(EngineCoreOutputs)
262268

263269
# ZMQ setup.
264-
self.ctx = (
265-
zmq.asyncio.Context() # type: ignore[attr-defined]
266-
if asyncio_mode else zmq.Context()) # type: ignore[attr-defined]
270+
sync_ctx = zmq.Context()
271+
self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctx
267272

268273
# This will ensure resources created so far are closed
269274
# when the client is garbage collected, even if an
270275
# exception is raised mid-construction.
271-
resources = BackgroundResources(ctx=self.ctx)
272-
self._finalizer = weakref.finalize(self, resources)
276+
self.resources = BackgroundResources(ctx=sync_ctx)
277+
self._finalizer = weakref.finalize(self, self.resources)
273278

274-
# Paths and sockets for IPC.
275-
output_path = get_open_zmq_ipc_path()
279+
# Paths for IPC.
280+
self.output_path = get_open_zmq_ipc_path()
276281
input_path = get_open_zmq_ipc_path()
277-
resources.output_socket = make_zmq_socket(self.ctx, output_path,
278-
zmq.constants.PULL)
279-
resources.input_socket = make_zmq_socket(self.ctx, input_path,
280-
zmq.constants.PUSH)
281282

282283
# Start EngineCore in background process.
283-
resources.proc_handle = BackgroundProcHandle(
284+
self.resources.proc_handle = BackgroundProcHandle(
284285
input_path=input_path,
285-
output_path=output_path,
286+
output_path=self.output_path,
286287
process_name="EngineCore",
287288
target_fn=EngineCoreProc.run_engine_core,
288289
process_kwargs={
@@ -291,8 +292,10 @@ def sigusr1_handler(signum, frame):
291292
"log_stats": log_stats,
292293
})
293294

294-
self.output_socket = resources.output_socket
295-
self.input_socket = resources.input_socket
295+
# Create input socket.
296+
self.resources.input_socket = make_zmq_socket(self.ctx, input_path,
297+
zmq.constants.PUSH)
298+
self.input_socket = self.resources.input_socket
296299
self.utility_results: dict[int, AnyFuture] = {}
297300

298301
def shutdown(self):
@@ -325,27 +328,48 @@ def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],
325328

326329
# Ensure that the outputs socket processing thread does not have
327330
# a ref to the client which prevents gc.
328-
output_socket = self.output_socket
331+
ctx = self.ctx
332+
output_path = self.output_path
329333
decoder = self.decoder
330334
utility_results = self.utility_results
331335
outputs_queue = self.outputs_queue
332336

337+
shutdown_path = get_open_zmq_inproc_path()
338+
self.resources.shutdown_path = shutdown_path
339+
333340
def process_outputs_socket():
341+
shutdown_socket = ctx.socket(zmq.PAIR)
342+
shutdown_socket.bind(shutdown_path)
343+
out_socket = make_zmq_socket(ctx, output_path, zmq.constants.PULL)
334344
try:
345+
poller = zmq.Poller()
346+
poller.register(shutdown_socket)
347+
poller.register(out_socket)
335348
while True:
336-
(frame, ) = output_socket.recv_multipart(copy=False)
349+
socks = poller.poll()
350+
if not socks:
351+
continue
352+
if len(socks) == 2 or socks[0][0] == shutdown_socket:
353+
# shutdown signal, exit thread.
354+
break
355+
356+
(frame, ) = out_socket.recv_multipart(copy=False)
337357
outputs = decoder.decode(frame.buffer)
338358
if outputs.utility_output:
339359
_process_utility_output(outputs.utility_output,
340360
utility_results)
341361
else:
342362
outputs_queue.put_nowait(outputs)
343-
except zmq.error.ContextTerminated:
344-
# Expected when the class is GC'd / during process termination.
345-
pass
363+
finally:
364+
# Close sockets.
365+
shutdown_socket.close(linger=0)
366+
out_socket.close(linger=0)
346367

347368
# Process outputs from engine in separate thread.
348-
Thread(target=process_outputs_socket, daemon=True).start()
369+
self.output_queue_thread = Thread(target=process_outputs_socket,
370+
name="EngineCoreOutputQueueThread",
371+
daemon=True)
372+
self.output_queue_thread.start()
349373

350374
def get_output(self) -> EngineCoreOutputs:
351375
return self.outputs_queue.get()
@@ -424,10 +448,13 @@ async def _start_output_queue_task(self):
424448
# Perform IO in separate task to parallelize as much as possible.
425449
# Avoid task having direct reference back to the client.
426450
self.outputs_queue = asyncio.Queue()
427-
output_socket = self.output_socket
428451
decoder = self.decoder
429452
utility_results = self.utility_results
430453
outputs_queue = self.outputs_queue
454+
output_path = self.output_path
455+
output_socket = make_zmq_socket(self.ctx, output_path,
456+
zmq.constants.PULL)
457+
self.resources.output_socket = output_socket
431458

432459
async def process_outputs_socket():
433460
while True:
@@ -439,7 +466,8 @@ async def process_outputs_socket():
439466
else:
440467
outputs_queue.put_nowait(outputs)
441468

442-
self.queue_task = asyncio.create_task(process_outputs_socket())
469+
self.queue_task = asyncio.create_task(process_outputs_socket(),
470+
name="EngineCoreOutputQueueTask")
443471

444472
async def get_output_async(self) -> EngineCoreOutputs:
445473
if self.outputs_queue is None:

0 commit comments

Comments
 (0)