Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements/test.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pytest-forked
pytest-asyncio
pytest-rerunfailures
pytest-shard
pytest-timeout

# testing utils
awscli
Expand Down
3 changes: 3 additions & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ pytest==8.3.3
# pytest-mock
# pytest-rerunfailures
# pytest-shard
# pytest-timeout
pytest-asyncio==0.24.0
# via -r requirements/test.in
pytest-forked==1.6.0
Expand All @@ -454,6 +455,8 @@ pytest-rerunfailures==14.0
# via -r requirements/test.in
pytest-shard==0.1.2
# via -r requirements/test.in
pytest-timeout==2.3.1
# via -r requirements/test.in
python-dateutil==2.9.0.post0
# via
# botocore
Expand Down
41 changes: 41 additions & 0 deletions tests/v1/engine/test_engine_core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import asyncio
import time
import uuid
from threading import Thread
from typing import Optional

import psutil
import pytest
from transformers import AutoTokenizer

Expand Down Expand Up @@ -245,3 +247,42 @@ async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch):
await core_client.call_utility_async("echo", None, "help!")

assert str(e_info.value) == "Call to echo method failed: help!"


@pytest.mark.timeout(10)
def test_startup_failure(monkeypatch: pytest.MonkeyPatch):

with monkeypatch.context() as m, pytest.raises(Exception) as e_info:
m.setenv("VLLM_USE_V1", "1")

engine_args = EngineArgs(model=MODEL_NAME)
vllm_config = engine_args.create_engine_config(
usage_context=UsageContext.UNKNOWN_CONTEXT)
executor_class = Executor.get_class(vllm_config)

# Start another thread to wait for engine core process to start
# and kill it - simulate fatal uncaught process exit.
this_proc = psutil.Process()
children_before = set(this_proc.children())

def kill_first_child():
while True:
time.sleep(0.5)
children = set(this_proc.children()) - children_before
if children:
child = children.pop()
print("Killing child core process", child.pid)
child.kill()
break

Thread(target=kill_first_child, daemon=True).start()

_core_client = EngineCoreClient.make_client(
multiprocess_mode=True,
asyncio_mode=True,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=True,
)

assert "Engine core initialization failed" in str(e_info.value)
23 changes: 14 additions & 9 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,21 @@ def _wait_for_engine_startup(self):

# Wait for engine core process(es) to send ready messages.
identities = set(eng.index for eng in self.resources.core_engines)
poller = zmq.Poller()
poller.register(sync_input_socket, zmq.POLLIN)
for eng in self.resources.core_engines:
poller.register(eng.proc_handle, zmq.POLLIN)
while identities:
while not sync_input_socket.poll(timeout=STARTUP_POLL_PERIOD_MS):
logger.info("Waiting for %d core engine proc(s) to start: %s",
len(identities), identities)
events = poller.poll(STARTUP_POLL_PERIOD_MS)
if not events:
logger.debug("Waiting for %d core engine proc(s) to start: %s",
len(identities), identities)
continue
if len(events) > 1 or events[0][0] != sync_input_socket:
# One of the core processes exited.
raise RuntimeError("Engine core initialization failed. "
"See root cause above.")

eng_id_bytes, msg = sync_input_socket.recv_multipart()
eng_id = int.from_bytes(eng_id_bytes, byteorder="little")
if eng_id not in identities:
Expand All @@ -424,12 +435,6 @@ def _wait_for_engine_startup(self):
logger.info("Core engine process %d ready.", eng_id)
identities.discard(eng_id)

# Double check that the process are running.
for engine in self.resources.core_engines:
proc = engine.proc_handle.proc
if proc.exitcode is not None:
raise RuntimeError(f"Engine proc {proc.name} not running")

def _init_core_engines(
self,
vllm_config: VllmConfig,
Expand Down
13 changes: 8 additions & 5 deletions vllm/v1/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# SPDX-License-Identifier: Apache-2.0

import multiprocessing
import os
import weakref
from collections import defaultdict
from collections.abc import Sequence
from multiprocessing import Process
from typing import (TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar,
Union, overload)

Expand Down Expand Up @@ -112,20 +112,23 @@ def __init__(
process_kwargs["output_path"] = output_path

# Run busy loop in background process.
self.proc = context.Process(target=target_fn,
kwargs=process_kwargs,
name=process_name)
self.proc: Process = context.Process(target=target_fn,
kwargs=process_kwargs,
name=process_name)
self._finalizer = weakref.finalize(self, shutdown, self.proc,
input_path, output_path)
self.proc.start()

def fileno(self):
return self.proc.sentinel

def shutdown(self):
self._finalizer()


# Note(rob): shutdown function cannot be a bound method,
# else the gc cannot collect the object.
def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str):
def shutdown(proc: Process, input_path: str, output_path: str):
# Shutdown the process.
if proc.is_alive():
proc.terminate()
Expand Down