Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import contextlib
import multiprocessing
import queue
import sys
import uuid
Expand Down Expand Up @@ -476,6 +477,9 @@ def __init__(
# underlying data.
self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()

# Start monitoring engine core processes for unexpected failures
self.start_engine_core_monitor()

success = True
finally:
if not success:
Expand Down Expand Up @@ -505,6 +509,41 @@ def free_pending_messages(self):
def dp_engines_running(self) -> bool:
return self.engines_running

def start_engine_core_monitor(self):
"""Start a monitor thread for engine core processes."""
engine_manager = self.resources.engine_manager
if (engine_manager is None or not hasattr(engine_manager, 'processes')
or not engine_manager.processes):
# No engine processes to monitor
return

engine_processes = engine_manager.processes
self_ref = weakref.ref(self)

# Monitor engine core process liveness. If any die unexpectedly,
# logs an error, shuts down the client and invokes the failure
# callback to inform the engine.
def monitor_engine_cores():
sentinels = [proc.sentinel for proc in engine_processes]
died = multiprocessing.connection.wait(sentinels)
_self = self_ref()
if not _self or _self.resources.engine_dead:
return
_self.resources.engine_dead = True
proc_name = next(proc.name for proc in engine_processes
if proc.sentinel == died[0])
logger.error(
"Engine core proc %s died unexpectedly, "
"shutting down client.", proc_name)
_self.shutdown()
# Note: For MPClient, we don't have a failure callback mechanism
# like MultiprocExecutor, but we set engine_dead flag which will
# cause subsequent operations to raise EngineDeadError
Comment on lines +526 to +541
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The monitor_engine_cores function lacks exception handling around the multiprocessing.connection.wait(sentinels) call. If this call is interrupted (e.g., by a signal) or raises an OSError, the monitor thread will terminate silently. This would leave the engine core processes unmonitored, defeating the purpose of this bugfix.

To make the monitoring more robust, this call should be wrapped in a try...except block. In case of an exception, it would be safest to log the error and proceed with shutting down the client.

        def monitor_engine_cores():
            sentinels = [proc.sentinel for proc in engine_processes]
            try:
                died = multiprocessing.connection.wait(sentinels)
            except BaseException as e:
                _self = self_ref()
                if not _self or _self.resources.engine_dead:
                    return
                logger.error("Error in engine core monitor: %s. Shutting down.", e)
                _self.resources.engine_dead = True
                _self.shutdown()
                return

            _self = self_ref()
            if not _self or _self.resources.engine_dead:
                return
            _self.resources.engine_dead = True
            proc_name = next(proc.name for proc in engine_processes
                             if proc.sentinel == died[0])
            logger.error(
                "Engine core proc %s died unexpectedly, "
                "shutting down client.", proc_name)
            _self.shutdown()
            # Note: For MPClient, we don't have a failure callback mechanism
            # like MultiprocExecutor, but we set engine_dead flag which will
            # cause subsequent operations to raise EngineDeadError


Thread(target=monitor_engine_cores,
daemon=True,
name="MPClientEngineMonitor").start()


def _process_utility_output(output: UtilityOutput,
utility_results: dict[int, AnyFuture]):
Expand Down Expand Up @@ -749,6 +788,8 @@ async def process_outputs_socket():
outputs_queue.put_nowait(outputs)
except Exception as e:
outputs_queue.put_nowait(e)
except asyncio.CancelledError:
outputs_queue.put_nowait(EngineDeadError())

resources.output_queue_task = asyncio.create_task(
process_outputs_socket(), name="EngineCoreOutputQueueTask")
Expand Down
45 changes: 43 additions & 2 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ def _init_executor(self) -> None:
finally:
if not success:
# Clean up the worker procs if there was a failure.
# Close death_writers first to signal workers to exit
for uw in unready_workers:
if uw.death_writer is not None:
uw.death_writer.close()
self._ensure_worker_termination(
[w.proc for w in unready_workers])
[uw.proc for uw in unready_workers])

# For pipeline parallel, we use a thread pool for asynchronous
# execute_model.
Expand Down Expand Up @@ -282,6 +286,10 @@ def shutdown(self):

if workers := getattr(self, 'workers', None):
for w in workers:
# Close death_writer to signal child processes to exit
if w.death_writer is not None:
w.death_writer.close()
w.death_writer = None
w.worker_response_mq = None
self._ensure_worker_termination([w.proc for w in workers])

Expand Down Expand Up @@ -316,13 +324,15 @@ class UnreadyWorkerProcHandle:
proc: BaseProcess
rank: int
ready_pipe: Connection
death_writer: Optional[Connection] = None


@dataclass
class WorkerProcHandle:
proc: BaseProcess
rank: int
worker_response_mq: MessageQueue # The worker process writes to this MQ
death_writer: Optional[Connection] = None

@classmethod
def from_unready_handle(
Expand All @@ -332,6 +342,7 @@ def from_unready_handle(
proc=unready_handle.proc,
rank=unready_handle.rank,
worker_response_mq=worker_response_mq,
death_writer=unready_handle.death_writer,
)


Expand Down Expand Up @@ -396,13 +407,17 @@ def make_worker_process(
# (reader, writer)
reader, writer = context.Pipe(duplex=False)

# Create death pipe to detect parent process exit
death_reader, death_writer = context.Pipe(duplex=False)

process_kwargs = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank,
"distributed_init_method": distributed_init_method,
"input_shm_handle": input_shm_handle,
"ready_pipe": (reader, writer),
"death_pipe": death_reader,
}
# Run EngineCore busy loop in background process.
proc = context.Process(target=WorkerProc.worker_main,
Expand All @@ -412,7 +427,9 @@ def make_worker_process(

proc.start()
writer.close()
return UnreadyWorkerProcHandle(proc, rank, reader)
# Keep death_writer open in parent - when parent exits,
# death_reader in child will get EOFError
return UnreadyWorkerProcHandle(proc, rank, reader, death_writer)

@staticmethod
def wait_for_ready(
Expand Down Expand Up @@ -483,6 +500,28 @@ def signal_handler(signum, frame):
worker = None
# tuple[Connection, Connection]
reader, ready_writer = kwargs.pop("ready_pipe")
death_pipe = kwargs.pop("death_pipe", None)

# Start death monitoring thread if death_pipe is provided
if death_pipe is not None:

def monitor_parent_death():
try:
# This will block until parent process exits (pipe closes)
death_pipe.recv()
except EOFError:
# Parent process has exited, terminate this worker
logger.info("Parent process exited, terminating worker")
# Send signal to self to trigger clean shutdown
os.kill(os.getpid(), signal.SIGTERM)
except Exception as e:
logger.warning("Death monitoring error: %s", e)

death_monitor = Thread(target=monitor_parent_death,
daemon=True,
name="WorkerDeathMonitor")
death_monitor.start()

try:
reader.close()
worker = WorkerProc(*args, **kwargs)
Expand Down Expand Up @@ -523,6 +562,8 @@ def signal_handler(signum, frame):
finally:
if ready_writer is not None:
ready_writer.close()
if death_pipe is not None:
death_pipe.close()
# Clean up once worker exits busy loop
if worker is not None:
worker.shutdown()
Expand Down