Skip to content

Commit f7cf5b5

Browse files
authored
[Frontend] Add /collective_rpc API endpoint (#23075)
Signed-off-by: 22quinn <33176974+22quinn@users.noreply.github.com>
1 parent 03d4235 commit f7cf5b5

File tree

4 files changed

+126
-1
lines changed

4 files changed

+126
-1
lines changed

.buildkite/test-pipeline.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ steps:
126126
- tests/entrypoints/test_chat_utils
127127
commands:
128128
- export VLLM_WORKER_MULTIPROC_METHOD=spawn
129-
- pytest -v -s entrypoints/openai --ignore=entrypoints/openai/test_chat_with_tool_reasoning.py --ignore=entrypoints/openai/test_oot_registration.py --ignore=entrypoints/openai/test_tensorizer_entrypoint.py --ignore=entrypoints/openai/correctness/
129+
- PYTHONPATH=/vllm-workspace pytest -v -s entrypoints/openai/test_collective_rpc.py # PYTHONPATH is needed to import custom Worker extension
130+
- pytest -v -s entrypoints/openai --ignore=entrypoints/openai/test_chat_with_tool_reasoning.py --ignore=entrypoints/openai/test_oot_registration.py --ignore=entrypoints/openai/test_tensorizer_entrypoint.py --ignore=entrypoints/openai/correctness/ --ignore=entrypoints/openai/test_collective_rpc.py
130131
- pytest -v -s entrypoints/test_chat_utils.py
131132

132133
- label: Distributed Tests (4 GPUs) # 10min
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
4+
from typing import Any
5+
6+
import pytest
7+
import requests
8+
9+
from tests.utils import RemoteOpenAIServer
10+
11+
MODEL_NAME = "Qwen/Qwen3-0.6B"
12+
13+
14+
class TestWorkerExtension:
15+
16+
def get_model_name(self) -> str:
17+
"""Test non-pydantic return type."""
18+
return MODEL_NAME
19+
20+
def echo_args_kwargs(self, *args, **kwargs) -> dict[str, Any]:
21+
"""Echo back both args and kwargs."""
22+
return dict(
23+
args=list(args),
24+
kwargs=kwargs,
25+
total_items=len(args) + len(kwargs),
26+
)
27+
28+
def return_none(self, *args, **kwargs) -> None:
29+
"""Test method that does not return anything"""
30+
return
31+
32+
33+
@pytest.fixture(scope="module")
34+
def server():
35+
args = [
36+
"--max-model-len",
37+
"8192",
38+
"--max-num-seqs",
39+
"128",
40+
"--worker-extension-cls",
41+
"tests.entrypoints.openai.test_collective_rpc.TestWorkerExtension",
42+
]
43+
with RemoteOpenAIServer(
44+
MODEL_NAME,
45+
args,
46+
env_dict={
47+
"VLLM_SERVER_DEV_MODE": "1",
48+
"CUDA_VISIBLE_DEVICES": "0"
49+
},
50+
) as remote_server:
51+
yield remote_server
52+
53+
54+
def test_get_model_name(server):
55+
"""Test basic response"""
56+
response = requests.post(server.url_for("collective_rpc"),
57+
json={"method": "get_model_name"})
58+
assert response.status_code == 200
59+
results = response.json()
60+
assert "results" in results
61+
assert results["results"] == [MODEL_NAME]
62+
63+
64+
def test_return_none(server):
65+
"""Test return none"""
66+
response = requests.post(server.url_for("collective_rpc"),
67+
json={"method": "return_none"})
68+
assert response.status_code == 200
69+
results = response.json()
70+
assert results["results"] == [None]
71+
72+
73+
def test_echo_args_kwargs(server):
74+
"""Test args, kwargs, and dict response"""
75+
args = ["arg1", "arg2"]
76+
kwargs = {"key1": "value1", "key2": "value2"}
77+
response = requests.post(server.url_for("collective_rpc"),
78+
json={
79+
"method": "echo_args_kwargs",
80+
"args": args,
81+
"kwargs": kwargs
82+
})
83+
assert response.status_code == 200
84+
results = response.json()
85+
result = results["results"][0]
86+
assert result["args"] == args
87+
assert result["kwargs"] == kwargs
88+
assert result["total_items"] == len(args) + len(kwargs)

vllm/engine/protocol.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,3 +329,11 @@ async def scale_elastic_ep(self,
329329
drain_timeout: int = 300) -> None:
330330
"""Scale the engine"""
331331
raise NotImplementedError
332+
333+
async def collective_rpc(self,
334+
method: str,
335+
timeout: Optional[float] = None,
336+
args: tuple = (),
337+
kwargs: Optional[dict] = None):
338+
"""Perform a collective RPC call to the given path."""
339+
raise NotImplementedError

vllm/entrypoints/openai/api_server.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,34 @@ async def is_sleeping(raw_request: Request):
10441044
is_sleeping = await engine_client(raw_request).is_sleeping()
10451045
return JSONResponse(content={"is_sleeping": is_sleeping})
10461046

1047+
@router.post("/collective_rpc")
1048+
async def collective_rpc(raw_request: Request):
1049+
try:
1050+
body = await raw_request.json()
1051+
except json.JSONDecodeError as e:
1052+
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST.value,
1053+
detail=f"JSON decode error: {e}") from e
1054+
method = body.get("method")
1055+
if method is None:
1056+
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST.value,
1057+
detail="Missing 'method' in request body")
1058+
# For security reason, only serialized string args/kwargs are passed.
1059+
# User-defined `method` is responsible for deseralization if needed.
1060+
args: list[str] = body.get("args", [])
1061+
kwargs: dict[str, str] = body.get("kwargs", {})
1062+
timeout: Optional[float] = body.get("timeout")
1063+
results = await engine_client(raw_request).collective_rpc(
1064+
method=method, timeout=timeout, args=tuple(args), kwargs=kwargs)
1065+
if results is None:
1066+
return Response(status_code=200)
1067+
response: list[Any] = []
1068+
for result in results:
1069+
if result is None or isinstance(result, (dict, list)):
1070+
response.append(result)
1071+
else:
1072+
response.append(str(result))
1073+
return JSONResponse(content={"results": response})
1074+
10471075

10481076
@router.post("/scale_elastic_ep",
10491077
dependencies=[Depends(validate_json_request)],

0 commit comments

Comments
 (0)