Skip to content

Commit 04cef2c

Browse files
[Bugfix] Fix MQLLMEngine hanging (#9973)
Signed-off-by: rshaw@neuralmagic.com <rshaw@neuralmagic.com>
1 parent 6e056bc commit 04cef2c

File tree

3 files changed

+42
-23
lines changed

3 files changed

+42
-23
lines changed

vllm/engine/multiprocessing/client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,11 @@ def __init__(self, ipc_path: str, engine_config: VllmConfig,
112112

113113
# Stream for each individual request.
114114
self.output_queues: Dict[str, asyncio.Queue] = {}
115-
self.output_loop = asyncio.create_task(self.run_output_handler_loop())
115+
116+
# Loop to handle output of the LLMEngine periodically.
117+
# Started after the MQLLMEngine is ready so that we can
118+
# build the Client in an executor to enable clean shutdown.
119+
self.output_loop: Optional[asyncio.Task] = None
116120

117121
# Loop to check health of the LLMEngine periodically.
118122
# Started after the MQLLMEngine is ready.
@@ -247,6 +251,9 @@ async def run_output_handler_loop(self):
247251
async def setup(self):
248252
"""Setup the client before it starts sending server requests."""
249253

254+
# Start output_loop
255+
self.output_loop = asyncio.create_task(self.run_output_handler_loop())
256+
250257
with self.get_data_socket() as socket:
251258
# Wait until server is ready.
252259
response = await self._wait_for_server_rpc(socket)
@@ -265,7 +272,8 @@ def close(self):
265272
# Cancel background tasks.
266273
if self.health_loop is not None:
267274
self.health_loop.cancel()
268-
self.output_loop.cancel()
275+
if self.output_loop is not None:
276+
self.output_loop.cancel()
269277

270278
def _set_errored(self, e: BaseException):
271279
logger.exception(repr(e))

vllm/engine/multiprocessing/engine.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -349,16 +349,22 @@ def stop_profile(self) -> None:
349349
self.engine.model_executor._run_workers("stop_profile")
350350

351351

352+
def signal_handler(*_) -> None:
353+
raise KeyboardInterrupt("MQLLMEngine terminated")
354+
355+
352356
def run_mp_engine(engine_args: AsyncEngineArgs, usage_context: UsageContext,
353-
ipc_path: str):
357+
ipc_path: str, engine_alive):
358+
try:
359+
engine = MQLLMEngine.from_engine_args(engine_args=engine_args,
360+
usage_context=usage_context,
361+
ipc_path=ipc_path)
354362

355-
def signal_handler(*_) -> None:
356-
# Interrupt server on sigterm
357-
raise KeyboardInterrupt("MQLLMEngine terminated")
363+
signal.signal(signal.SIGTERM, signal_handler)
358364

359-
signal.signal(signal.SIGTERM, signal_handler)
365+
engine.start()
360366

361-
engine = MQLLMEngine.from_engine_args(engine_args=engine_args,
362-
usage_context=usage_context,
363-
ipc_path=ipc_path)
364-
engine.start()
367+
except BaseException as e:
368+
logger.exception(e)
369+
engine_alive.value = False
370+
raise e

vllm/entrypoints/openai/api_server.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -171,39 +171,44 @@ async def build_async_engine_client_from_engine_args(
171171
# so we need to spawn a new process
172172
context = multiprocessing.get_context("spawn")
173173

174+
# The Process can raise an exception during startup, which may
175+
# not actually result in an exitcode being reported. As a result
176+
# we use a shared variable to communicate the information.
177+
engine_alive = multiprocessing.Value('b', True, lock=False)
174178
engine_process = context.Process(target=run_mp_engine,
175179
args=(engine_args,
176180
UsageContext.OPENAI_API_SERVER,
177-
ipc_path))
181+
ipc_path, engine_alive))
178182
engine_process.start()
179183
engine_pid = engine_process.pid
180-
assert engine_pid is not None, "Engine process failed to start"
184+
assert engine_pid is not None, "Engine process failed to start."
181185
logger.info("Started engine process with PID %d", engine_pid)
182186

183187
# Build RPCClient, which conforms to EngineClient Protocol.
184-
# NOTE: Actually, this is not true yet. We still need to support
185-
# embedding models via RPC (see TODO above)
186188
engine_config = engine_args.create_engine_config()
187-
mp_engine_client = MQLLMEngineClient(ipc_path, engine_config,
188-
engine_pid)
189-
189+
build_client = partial(MQLLMEngineClient, ipc_path, engine_config,
190+
engine_pid)
191+
mq_engine_client = await asyncio.get_running_loop().run_in_executor(
192+
None, build_client)
190193
try:
191194
while True:
192195
try:
193-
await mp_engine_client.setup()
196+
await mq_engine_client.setup()
194197
break
195198
except TimeoutError:
196-
if not engine_process.is_alive():
199+
if (not engine_process.is_alive()
200+
or not engine_alive.value):
197201
raise RuntimeError(
198-
"Engine process failed to start") from None
202+
"Engine process failed to start. See stack "
203+
"trace for the root cause.") from None
199204

200-
yield mp_engine_client # type: ignore[misc]
205+
yield mq_engine_client # type: ignore[misc]
201206
finally:
202207
# Ensure rpc server process was terminated
203208
engine_process.terminate()
204209

205210
# Close all open connections to the backend
206-
mp_engine_client.close()
211+
mq_engine_client.close()
207212

208213
# Wait for engine process to join
209214
engine_process.join(4)

0 commit comments

Comments
 (0)