diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index b14d85bbf8e9..acff5bf6823d 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import asyncio import contextlib +import multiprocessing import queue import sys import uuid @@ -476,6 +477,9 @@ def __init__( # underlying data. self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]() + # Start monitoring engine core processes for unexpected failures + self.start_engine_core_monitor() + success = True finally: if not success: @@ -505,6 +509,41 @@ def free_pending_messages(self): def dp_engines_running(self) -> bool: return self.engines_running + def start_engine_core_monitor(self): + """Start a monitor thread for engine core processes.""" + engine_manager = self.resources.engine_manager + if (engine_manager is None or not hasattr(engine_manager, 'processes') + or not engine_manager.processes): + # No engine processes to monitor + return + + engine_processes = engine_manager.processes + self_ref = weakref.ref(self) + + # Monitor engine core process liveness. If any die unexpectedly, + # logs an error, shuts down the client and invokes the failure + # callback to inform the engine. + def monitor_engine_cores(): + sentinels = [proc.sentinel for proc in engine_processes] + died = multiprocessing.connection.wait(sentinels) + _self = self_ref() + if not _self or _self.resources.engine_dead: + return + _self.resources.engine_dead = True + proc_name = next(proc.name for proc in engine_processes + if proc.sentinel == died[0]) + logger.error( + "Engine core proc %s died unexpectedly, " + "shutting down client.", proc_name) + _self.shutdown() + # Note: For MPClient, we don't have a failure callback mechanism + # like MultiprocExecutor, but we set engine_dead flag which will + # cause subsequent operations to raise EngineDeadError + + Thread(target=monitor_engine_cores, + daemon=True, + name="MPClientEngineMonitor").start() + def _process_utility_output(output: UtilityOutput, utility_results: dict[int, AnyFuture]): @@ -749,6 +788,8 @@ async def process_outputs_socket(): outputs_queue.put_nowait(outputs) except Exception as e: outputs_queue.put_nowait(e) + except asyncio.CancelledError: + outputs_queue.put_nowait(EngineDeadError()) resources.output_queue_task = asyncio.create_task( process_outputs_socket(), name="EngineCoreOutputQueueTask") diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 993a90752bb2..897174c1599d 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -104,8 +104,12 @@ def _init_executor(self) -> None: finally: if not success: # Clean up the worker procs if there was a failure. + # Close death_writers first to signal workers to exit + for uw in unready_workers: + if uw.death_writer is not None: + uw.death_writer.close() self._ensure_worker_termination( - [w.proc for w in unready_workers]) + [uw.proc for uw in unready_workers]) # For pipeline parallel, we use a thread pool for asynchronous # execute_model. @@ -282,6 +286,10 @@ def shutdown(self): if workers := getattr(self, 'workers', None): for w in workers: + # Close death_writer to signal child processes to exit + if w.death_writer is not None: + w.death_writer.close() + w.death_writer = None w.worker_response_mq = None self._ensure_worker_termination([w.proc for w in workers]) @@ -316,6 +324,7 @@ class UnreadyWorkerProcHandle: proc: BaseProcess rank: int ready_pipe: Connection + death_writer: Optional[Connection] = None @dataclass @@ -323,6 +332,7 @@ class WorkerProcHandle: proc: BaseProcess rank: int worker_response_mq: MessageQueue # The worker process writes to this MQ + death_writer: Optional[Connection] = None @classmethod def from_unready_handle( @@ -332,6 +342,7 @@ def from_unready_handle( proc=unready_handle.proc, rank=unready_handle.rank, worker_response_mq=worker_response_mq, + death_writer=unready_handle.death_writer, ) @@ -396,6 +407,9 @@ def make_worker_process( # (reader, writer) reader, writer = context.Pipe(duplex=False) + # Create death pipe to detect parent process exit + death_reader, death_writer = context.Pipe(duplex=False) + process_kwargs = { "vllm_config": vllm_config, "local_rank": local_rank, @@ -403,6 +417,7 @@ def make_worker_process( "distributed_init_method": distributed_init_method, "input_shm_handle": input_shm_handle, "ready_pipe": (reader, writer), + "death_pipe": death_reader, } # Run EngineCore busy loop in background process. proc = context.Process(target=WorkerProc.worker_main, @@ -412,7 +427,9 @@ def make_worker_process( proc.start() writer.close() - return UnreadyWorkerProcHandle(proc, rank, reader) + # Keep death_writer open in parent - when parent exits, + # death_reader in child will get EOFError + return UnreadyWorkerProcHandle(proc, rank, reader, death_writer) @staticmethod def wait_for_ready( @@ -483,6 +500,28 @@ def signal_handler(signum, frame): worker = None # tuple[Connection, Connection] reader, ready_writer = kwargs.pop("ready_pipe") + death_pipe = kwargs.pop("death_pipe", None) + + # Start death monitoring thread if death_pipe is provided + if death_pipe is not None: + + def monitor_parent_death(): + try: + # This will block until parent process exits (pipe closes) + death_pipe.recv() + except EOFError: + # Parent process has exited, terminate this worker + logger.info("Parent process exited, terminating worker") + # Send signal to self to trigger clean shutdown + os.kill(os.getpid(), signal.SIGTERM) + except Exception as e: + logger.warning("Death monitoring error: %s", e) + + death_monitor = Thread(target=monitor_parent_death, + daemon=True, + name="WorkerDeathMonitor") + death_monitor.start() + try: reader.close() worker = WorkerProc(*args, **kwargs) @@ -523,6 +562,8 @@ def signal_handler(signum, frame): finally: if ready_writer is not None: ready_writer.close() + if death_pipe is not None: + death_pipe.close() # Clean up once worker exits busy loop if worker is not None: worker.shutdown()