Skip to content

Commit 29c5e1f

Browse files
DarkLight1337xuebwang-amd
authored andcommitted
[Chore] Separate out vllm.utils.async_utils (vllm-project#26913)
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk> Signed-off-by: xuebwang-amd <xuebwang@amd.com>
1 parent be3067f commit 29c5e1f

File tree

17 files changed

+364
-354
lines changed

17 files changed

+364
-354
lines changed

tests/lora/test_add_lora.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from vllm.inputs import TextPrompt
1313
from vllm.lora.request import LoRARequest
1414
from vllm.sampling_params import SamplingParams
15-
from vllm.utils import merge_async_iterators
15+
from vllm.utils.async_utils import merge_async_iterators
1616

1717
MODEL_PATH = "zai-org/chatglm3-6b"
1818
LORA_RANK = 64

tests/utils_/test_async_utils.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
import asyncio
4+
from collections.abc import AsyncIterator
5+
6+
import pytest
7+
8+
from vllm.utils.async_utils import merge_async_iterators
9+
10+
11+
async def _mock_async_iterator(idx: int):
12+
try:
13+
while True:
14+
yield f"item from iterator {idx}"
15+
await asyncio.sleep(0.1)
16+
except asyncio.CancelledError:
17+
print(f"iterator {idx} cancelled")
18+
19+
20+
@pytest.mark.asyncio
21+
async def test_merge_async_iterators():
22+
iterators = [_mock_async_iterator(i) for i in range(3)]
23+
merged_iterator = merge_async_iterators(*iterators)
24+
25+
async def stream_output(generator: AsyncIterator[tuple[int, str]]):
26+
async for idx, output in generator:
27+
print(f"idx: {idx}, output: {output}")
28+
29+
task = asyncio.create_task(stream_output(merged_iterator))
30+
await asyncio.sleep(0.5)
31+
task.cancel()
32+
with pytest.raises(asyncio.CancelledError):
33+
await task
34+
35+
for iterator in iterators:
36+
try:
37+
await asyncio.wait_for(anext(iterator), 1)
38+
except StopAsyncIteration:
39+
# All iterators should be cancelled and print this message.
40+
print("Iterator was cancelled normally")
41+
except (Exception, asyncio.CancelledError) as e:
42+
raise AssertionError() from e

tests/utils_/test_utils.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
33
# ruff: noqa
44

5-
import asyncio
65
import hashlib
76
import json
87
import os
98
import pickle
109
import socket
1110
import tempfile
12-
from collections.abc import AsyncIterator
1311
from pathlib import Path
1412
from unittest.mock import patch
1513

@@ -37,7 +35,6 @@
3735
make_zmq_path,
3836
make_zmq_socket,
3937
memory_profiling,
40-
merge_async_iterators,
4138
sha256,
4239
split_host_port,
4340
split_zmq_path,
@@ -48,39 +45,6 @@
4845
from ..utils import create_new_process_for_each_test
4946

5047

51-
@pytest.mark.asyncio
52-
async def test_merge_async_iterators():
53-
async def mock_async_iterator(idx: int):
54-
try:
55-
while True:
56-
yield f"item from iterator {idx}"
57-
await asyncio.sleep(0.1)
58-
except asyncio.CancelledError:
59-
print(f"iterator {idx} cancelled")
60-
61-
iterators = [mock_async_iterator(i) for i in range(3)]
62-
merged_iterator = merge_async_iterators(*iterators)
63-
64-
async def stream_output(generator: AsyncIterator[tuple[int, str]]):
65-
async for idx, output in generator:
66-
print(f"idx: {idx}, output: {output}")
67-
68-
task = asyncio.create_task(stream_output(merged_iterator))
69-
await asyncio.sleep(0.5)
70-
task.cancel()
71-
with pytest.raises(asyncio.CancelledError):
72-
await task
73-
74-
for iterator in iterators:
75-
try:
76-
await asyncio.wait_for(anext(iterator), 1)
77-
except StopAsyncIteration:
78-
# All iterators should be cancelled and print this message.
79-
print("Iterator was cancelled normally")
80-
except (Exception, asyncio.CancelledError) as e:
81-
raise AssertionError() from e
82-
83-
8448
def test_get_open_port(monkeypatch: pytest.MonkeyPatch):
8549
with monkeypatch.context() as m:
8650
m.setenv("VLLM_PORT", "5678")

vllm/benchmarks/throughput.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from vllm.lora.request import LoRARequest
3535
from vllm.outputs import RequestOutput
3636
from vllm.sampling_params import BeamSearchParams
37-
from vllm.utils import merge_async_iterators
37+
from vllm.utils.async_utils import merge_async_iterators
3838

3939

4040
def run_vllm(

vllm/entrypoints/openai/serving_completion.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
from vllm.outputs import RequestOutput
3535
from vllm.sampling_params import BeamSearchParams, SamplingParams
3636
from vllm.transformers_utils.tokenizer import AnyTokenizer
37-
from vllm.utils import as_list, merge_async_iterators
37+
from vllm.utils import as_list
38+
from vllm.utils.async_utils import merge_async_iterators
3839

3940
logger = init_logger(__name__)
4041

vllm/entrypoints/openai/serving_embedding.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
)
4141
from vllm.pooling_params import PoolingParams
4242
from vllm.utils import chunk_list
43+
from vllm.utils.async_utils import merge_async_iterators
4344

4445
logger = init_logger(__name__)
4546

@@ -387,8 +388,6 @@ async def _prepare_generators(
387388
)
388389
generators.append(generator)
389390

390-
from vllm.utils import merge_async_iterators
391-
392391
ctx.result_generator = merge_async_iterators(*generators)
393392

394393
return None

vllm/entrypoints/openai/serving_engine.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,13 @@
9090
log_tracing_disabled_warning,
9191
)
9292
from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer
93-
from vllm.utils import (
93+
from vllm.utils import is_list_of, random_uuid
94+
from vllm.utils.async_utils import (
9495
AsyncMicrobatchTokenizer,
9596
collect_from_async_generator,
96-
is_list_of,
97+
make_async,
9798
merge_async_iterators,
98-
random_uuid,
9999
)
100-
from vllm.utils.func import make_async
101100
from vllm.v1.engine import EngineCoreRequest
102101

103102
logger = init_logger(__name__)

vllm/entrypoints/openai/serving_pooling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from vllm.logger import init_logger
3737
from vllm.outputs import PoolingOutput, PoolingRequestOutput
3838
from vllm.tasks import SupportedTask
39-
from vllm.utils import merge_async_iterators
39+
from vllm.utils.async_utils import merge_async_iterators
4040

4141
logger = init_logger(__name__)
4242

vllm/entrypoints/openai/serving_score.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
from vllm.lora.request import LoRARequest
3838
from vllm.outputs import PoolingRequestOutput, ScoringRequestOutput
3939
from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer
40-
from vllm.utils import merge_async_iterators
41-
from vllm.utils.func import make_async
40+
from vllm.utils.async_utils import make_async, merge_async_iterators
4241

4342
logger = init_logger(__name__)
4443

vllm/entrypoints/renderer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from vllm.inputs.data import TokensPrompt as EngineTokensPrompt
1818
from vllm.inputs.parse import get_prompt_components, parse_raw_prompts
1919
from vllm.transformers_utils.tokenizer import AnyTokenizer
20-
from vllm.utils import AsyncMicrobatchTokenizer
20+
from vllm.utils.async_utils import AsyncMicrobatchTokenizer
2121

2222

2323
@dataclass(frozen=True)

0 commit comments

Comments
 (0)