Skip to content

Commit bccc43c

Browse files
authored
[Bugfix]check health for engine core process exiting unexpectedly (#21728)
Signed-off-by: wuhang <wuhang6@huawei.com>
1 parent 1395dd9 commit bccc43c

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

vllm/v1/engine/core_client.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
33
import asyncio
44
import contextlib
5+
import multiprocessing
56
import queue
67
import sys
78
import uuid
@@ -476,6 +477,9 @@ def __init__(
476477
# underlying data.
477478
self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()
478479

480+
# Start monitoring engine core processes for unexpected failures
481+
self.start_engine_core_monitor()
482+
479483
success = True
480484
finally:
481485
if not success:
@@ -505,6 +509,41 @@ def free_pending_messages(self):
505509
def dp_engines_running(self) -> bool:
506510
return self.engines_running
507511

512+
def start_engine_core_monitor(self):
513+
"""Start a monitor thread for engine core processes."""
514+
engine_manager = self.resources.engine_manager
515+
if (engine_manager is None or not hasattr(engine_manager, 'processes')
516+
or not engine_manager.processes):
517+
# No engine processes to monitor
518+
return
519+
520+
engine_processes = engine_manager.processes
521+
self_ref = weakref.ref(self)
522+
523+
# Monitor engine core process liveness. If any die unexpectedly,
524+
# logs an error, shuts down the client and invokes the failure
525+
# callback to inform the engine.
526+
def monitor_engine_cores():
527+
sentinels = [proc.sentinel for proc in engine_processes]
528+
died = multiprocessing.connection.wait(sentinels)
529+
_self = self_ref()
530+
if not _self or _self.resources.engine_dead:
531+
return
532+
_self.resources.engine_dead = True
533+
proc_name = next(proc.name for proc in engine_processes
534+
if proc.sentinel == died[0])
535+
logger.error(
536+
"Engine core proc %s died unexpectedly, "
537+
"shutting down client.", proc_name)
538+
_self.shutdown()
539+
# Note: For MPClient, we don't have a failure callback mechanism
540+
# like MultiprocExecutor, but we set engine_dead flag which will
541+
# cause subsequent operations to raise EngineDeadError
542+
543+
Thread(target=monitor_engine_cores,
544+
daemon=True,
545+
name="MPClientEngineMonitor").start()
546+
508547

509548
def _process_utility_output(output: UtilityOutput,
510549
utility_results: dict[int, AnyFuture]):
@@ -749,6 +788,8 @@ async def process_outputs_socket():
749788
outputs_queue.put_nowait(outputs)
750789
except Exception as e:
751790
outputs_queue.put_nowait(e)
791+
except asyncio.CancelledError:
792+
outputs_queue.put_nowait(EngineDeadError())
752793

753794
resources.output_queue_task = asyncio.create_task(
754795
process_outputs_socket(), name="EngineCoreOutputQueueTask")

vllm/v1/executor/multiproc_executor.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,12 @@ def _init_executor(self) -> None:
104104
finally:
105105
if not success:
106106
# Clean up the worker procs if there was a failure.
107+
# Close death_writers first to signal workers to exit
108+
for uw in unready_workers:
109+
if uw.death_writer is not None:
110+
uw.death_writer.close()
107111
self._ensure_worker_termination(
108-
[w.proc for w in unready_workers])
112+
[uw.proc for uw in unready_workers])
109113

110114
# For pipeline parallel, we use a thread pool for asynchronous
111115
# execute_model.
@@ -282,6 +286,10 @@ def shutdown(self):
282286

283287
if workers := getattr(self, 'workers', None):
284288
for w in workers:
289+
# Close death_writer to signal child processes to exit
290+
if w.death_writer is not None:
291+
w.death_writer.close()
292+
w.death_writer = None
285293
w.worker_response_mq = None
286294
self._ensure_worker_termination([w.proc for w in workers])
287295

@@ -316,13 +324,15 @@ class UnreadyWorkerProcHandle:
316324
proc: BaseProcess
317325
rank: int
318326
ready_pipe: Connection
327+
death_writer: Optional[Connection] = None
319328

320329

321330
@dataclass
322331
class WorkerProcHandle:
323332
proc: BaseProcess
324333
rank: int
325334
worker_response_mq: MessageQueue # The worker process writes to this MQ
335+
death_writer: Optional[Connection] = None
326336

327337
@classmethod
328338
def from_unready_handle(
@@ -332,6 +342,7 @@ def from_unready_handle(
332342
proc=unready_handle.proc,
333343
rank=unready_handle.rank,
334344
worker_response_mq=worker_response_mq,
345+
death_writer=unready_handle.death_writer,
335346
)
336347

337348

@@ -396,13 +407,17 @@ def make_worker_process(
396407
# (reader, writer)
397408
reader, writer = context.Pipe(duplex=False)
398409

410+
# Create death pipe to detect parent process exit
411+
death_reader, death_writer = context.Pipe(duplex=False)
412+
399413
process_kwargs = {
400414
"vllm_config": vllm_config,
401415
"local_rank": local_rank,
402416
"rank": rank,
403417
"distributed_init_method": distributed_init_method,
404418
"input_shm_handle": input_shm_handle,
405419
"ready_pipe": (reader, writer),
420+
"death_pipe": death_reader,
406421
}
407422
# Run EngineCore busy loop in background process.
408423
proc = context.Process(target=WorkerProc.worker_main,
@@ -412,7 +427,9 @@ def make_worker_process(
412427

413428
proc.start()
414429
writer.close()
415-
return UnreadyWorkerProcHandle(proc, rank, reader)
430+
# Keep death_writer open in parent - when parent exits,
431+
# death_reader in child will get EOFError
432+
return UnreadyWorkerProcHandle(proc, rank, reader, death_writer)
416433

417434
@staticmethod
418435
def wait_for_ready(
@@ -483,6 +500,28 @@ def signal_handler(signum, frame):
483500
worker = None
484501
# tuple[Connection, Connection]
485502
reader, ready_writer = kwargs.pop("ready_pipe")
503+
death_pipe = kwargs.pop("death_pipe", None)
504+
505+
# Start death monitoring thread if death_pipe is provided
506+
if death_pipe is not None:
507+
508+
def monitor_parent_death():
509+
try:
510+
# This will block until parent process exits (pipe closes)
511+
death_pipe.recv()
512+
except EOFError:
513+
# Parent process has exited, terminate this worker
514+
logger.info("Parent process exited, terminating worker")
515+
# Send signal to self to trigger clean shutdown
516+
os.kill(os.getpid(), signal.SIGTERM)
517+
except Exception as e:
518+
logger.warning("Death monitoring error: %s", e)
519+
520+
death_monitor = Thread(target=monitor_parent_death,
521+
daemon=True,
522+
name="WorkerDeathMonitor")
523+
death_monitor.start()
524+
486525
try:
487526
reader.close()
488527
worker = WorkerProc(*args, **kwargs)
@@ -523,6 +562,8 @@ def signal_handler(signum, frame):
523562
finally:
524563
if ready_writer is not None:
525564
ready_writer.close()
565+
if death_pipe is not None:
566+
death_pipe.close()
526567
# Clean up once worker exits busy loop
527568
if worker is not None:
528569
worker.shutdown()

0 commit comments

Comments
 (0)