Skip to content

Commit c92d5bf

Browse files
njhilljinzhen-lin
authored andcommitted
[Tests] Harden DP tests (vllm-project#21508)
Signed-off-by: Nick Hill <nhill@redhat.com> Signed-off-by: Jinzhen Lin <linjinzhen@hotmail.com>
1 parent 10b5a35 commit c92d5bf

File tree

3 files changed

+82
-55
lines changed

3 files changed

+82
-55
lines changed

tests/v1/test_external_lb_dp.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import pytest_asyncio
1212

1313
from tests.utils import RemoteOpenAIServer
14-
from vllm.platforms import Platform
14+
from vllm.platforms import current_platform
1515

1616
MODEL_NAME = "ibm-research/PowerMoE-3b"
1717

@@ -70,10 +70,11 @@ def start_server(r: int, sargs: list[str]):
7070
sargs,
7171
auto_port=False,
7272
env_dict={
73-
"CUDA_VISIBLE_DEVICES":
73+
current_platform.device_control_env_var:
7474
",".join(
75-
str(Platform.device_id_to_physical_device_id(
76-
i))
75+
str(
76+
current_platform.
77+
device_id_to_physical_device_id(i))
7778
for i in range(r * TP_SIZE, (r + 1) * TP_SIZE))
7879
})
7980
server.__enter__()

tests/v1/test_hybrid_lb_dp.py

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from tests.utils import RemoteOpenAIServer
1414
from tests.v1.test_utils import check_request_balancing
15-
from vllm.platforms import Platform
15+
from vllm.platforms import current_platform
1616

1717
MODEL_NAME = "ibm-research/PowerMoE-3b"
1818

@@ -92,10 +92,12 @@ def start_server(node: int, sargs: list[str]):
9292
sargs,
9393
auto_port=False,
9494
env_dict={
95-
"CUDA_VISIBLE_DEVICES":
95+
current_platform.device_control_env_var:
9696
",".join(
97-
str(Platform.device_id_to_physical_device_id(
98-
i)) for i in range(gpu_start, gpu_end))
97+
str(
98+
current_platform.
99+
device_id_to_physical_device_id(i))
100+
for i in range(gpu_start, gpu_end))
99101
})
100102
server.__enter__()
101103
print(f"Hybrid LB node {node} started successfully with "
@@ -180,7 +182,7 @@ async def make_request(client: openai.AsyncOpenAI):
180182
completion = await client.completions.create(
181183
model=model_name,
182184
prompt="Hello, my name is",
183-
max_tokens=10,
185+
max_tokens=5,
184186
temperature=1.0)
185187

186188
assert completion.id is not None
@@ -212,27 +214,28 @@ async def make_request(client: openai.AsyncOpenAI):
212214
await asyncio.sleep(0.5)
213215

214216
# Send requests to all nodes - each should balance within its local DP ranks
215-
num_requests_per_node = 25 # Total 50 requests across 2 nodes
217+
num_requests = 200 # Total 200 requests across 2 nodes
216218
all_tasks = []
217-
218-
for i, client in enumerate(clients):
219-
tasks = [make_request(client) for _ in range(num_requests_per_node)]
220-
all_tasks.extend(tasks)
219+
for i in range(num_requests):
220+
client = clients[i % len(clients)]
221+
all_tasks.append(asyncio.create_task(make_request(client)))
222+
await asyncio.sleep(0.01)
221223

222224
results = await asyncio.gather(*all_tasks)
223-
assert len(results) == num_requests_per_node * len(clients)
225+
assert len(results) == num_requests
224226
assert all(completion is not None for completion in results)
225227

226228
await asyncio.sleep(0.5)
227229

228230
# Second burst of requests
229231
all_tasks = []
230-
for i, client in enumerate(clients):
231-
tasks = [make_request(client) for _ in range(num_requests_per_node)]
232-
all_tasks.extend(tasks)
232+
for i in range(num_requests):
233+
client = clients[i % len(clients)]
234+
all_tasks.append(asyncio.create_task(make_request(client)))
235+
await asyncio.sleep(0.01)
233236

234237
results = await asyncio.gather(*all_tasks)
235-
assert len(results) == num_requests_per_node * len(clients)
238+
assert len(results) == num_requests
236239
assert all(completion is not None for completion in results)
237240

238241
_, server_args = servers[0]
@@ -309,33 +312,28 @@ async def make_streaming_request(client: openai.AsyncOpenAI):
309312
await asyncio.sleep(0.5)
310313

311314
# Send streaming requests to all nodes
312-
num_requests_per_node = 25 # Total 50 requests across 2 nodes
315+
num_requests = 200 # Total 200 requests across 2 nodes
313316
all_tasks = []
314-
315-
for i, client in enumerate(clients):
316-
tasks = [
317-
make_streaming_request(client)
318-
for _ in range(num_requests_per_node)
319-
]
320-
all_tasks.extend(tasks)
317+
for i in range(num_requests):
318+
client = clients[i % len(clients)]
319+
all_tasks.append(asyncio.create_task(make_streaming_request(client)))
320+
await asyncio.sleep(0.01)
321321

322322
results = await asyncio.gather(*all_tasks)
323-
assert len(results) == num_requests_per_node * len(clients)
323+
assert len(results) == num_requests
324324
assert all(results), "Not all streaming requests completed successfully."
325325

326326
await asyncio.sleep(0.5)
327327

328328
# Second burst of streaming requests
329329
all_tasks = []
330-
for i, client in enumerate(clients):
331-
tasks = [
332-
make_streaming_request(client)
333-
for _ in range(num_requests_per_node)
334-
]
335-
all_tasks.extend(tasks)
330+
for i in range(num_requests):
331+
client = clients[i % len(clients)]
332+
all_tasks.append(asyncio.create_task(make_streaming_request(client)))
333+
await asyncio.sleep(0.01)
336334

337335
results = await asyncio.gather(*all_tasks)
338-
assert len(results) == num_requests_per_node * len(clients)
336+
assert len(results) == num_requests
339337
assert all(results), "Not all streaming requests completed successfully."
340338

341339
_, server_args = servers[0]

tests/v1/test_internal_lb_dp.py

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from tests.utils import RemoteOpenAIServer
1313
from tests.v1.test_utils import check_request_balancing
14-
from vllm.platforms import Platform
14+
from vllm.platforms import current_platform
1515

1616
MODEL_NAME = "ibm-research/PowerMoE-3b"
1717

@@ -96,10 +96,12 @@ def start_server(r: int, sargs: list[str]):
9696
sargs,
9797
auto_port=False,
9898
env_dict={
99-
"CUDA_VISIBLE_DEVICES":
99+
current_platform.device_control_env_var:
100100
",".join(
101-
str(Platform.device_id_to_physical_device_id(
102-
i)) for i in range(r, r + gpus_per_node))
101+
str(
102+
current_platform.
103+
device_id_to_physical_device_id(i))
104+
for i in range(r, r + gpus_per_node))
103105
})
104106
server.__enter__()
105107
if r == 0:
@@ -219,9 +221,11 @@ def start_engines_server():
219221
engines_server_args,
220222
auto_port=False,
221223
env_dict={
222-
"CUDA_VISIBLE_DEVICES":
224+
current_platform.device_control_env_var:
223225
",".join(
224-
str(Platform.device_id_to_physical_device_id(i))
226+
str(
227+
current_platform.
228+
device_id_to_physical_device_id(i))
225229
for i in range(self.dp_size * self.tp_size))
226230
})
227231
server.__enter__()
@@ -330,7 +334,7 @@ async def make_request():
330334
completion = await client.completions.create(
331335
model=model_name,
332336
prompt="Hello, my name is",
333-
max_tokens=10,
337+
max_tokens=5,
334338
temperature=1.0)
335339

336340
assert completion.id is not None
@@ -361,8 +365,11 @@ async def make_request():
361365
await asyncio.sleep(0.5)
362366

363367
# Send multiple requests - internal LB should distribute across DP ranks
364-
num_requests = 50
365-
all_tasks = [make_request() for _ in range(num_requests)]
368+
num_requests = 200
369+
all_tasks = []
370+
for _ in range(num_requests):
371+
all_tasks.append(asyncio.create_task(make_request()))
372+
await asyncio.sleep(0.01)
366373

367374
results = await asyncio.gather(*all_tasks)
368375
assert len(results) == num_requests
@@ -371,7 +378,10 @@ async def make_request():
371378
await asyncio.sleep(0.5)
372379

373380
# Second burst of requests
374-
all_tasks = [make_request() for _ in range(num_requests)]
381+
all_tasks = []
382+
for _ in range(num_requests):
383+
all_tasks.append(asyncio.create_task(make_request()))
384+
await asyncio.sleep(0.01)
375385

376386
results = await asyncio.gather(*all_tasks)
377387
assert len(results) == num_requests
@@ -449,8 +459,11 @@ async def make_streaming_request():
449459

450460
# Send multiple streaming requests - internal LB should distribute across
451461
# DP ranks
452-
num_requests = 50
453-
all_tasks = [make_streaming_request() for _ in range(num_requests)]
462+
num_requests = 200
463+
all_tasks = []
464+
for _ in range(num_requests):
465+
all_tasks.append(asyncio.create_task(make_streaming_request()))
466+
await asyncio.sleep(0.01)
454467

455468
results = await asyncio.gather(*all_tasks)
456469
assert len(results) == num_requests
@@ -459,7 +472,10 @@ async def make_streaming_request():
459472
await asyncio.sleep(0.5)
460473

461474
# Second burst of streaming requests
462-
all_tasks = [make_streaming_request() for _ in range(num_requests)]
475+
all_tasks = []
476+
for _ in range(num_requests):
477+
all_tasks.append(asyncio.create_task(make_streaming_request()))
478+
await asyncio.sleep(0.01)
463479

464480
results = await asyncio.gather(*all_tasks)
465481
assert len(results) == num_requests
@@ -492,7 +508,7 @@ async def make_request():
492508
completion = await api_only_client.completions.create(
493509
model=model_name,
494510
prompt="Hello, my name is",
495-
max_tokens=10,
511+
max_tokens=5,
496512
temperature=1.0)
497513

498514
assert completion.id is not None
@@ -522,8 +538,11 @@ async def make_request():
522538

523539
# Send multiple requests - should be distributed across engines on
524540
# headless server
525-
num_requests = 50
526-
all_tasks = [make_request() for _ in range(num_requests)]
541+
num_requests = 200
542+
all_tasks = []
543+
for _ in range(num_requests):
544+
all_tasks.append(asyncio.create_task(make_request()))
545+
await asyncio.sleep(0.01)
527546

528547
results = await asyncio.gather(*all_tasks)
529548
assert len(results) == num_requests
@@ -532,7 +551,10 @@ async def make_request():
532551
await asyncio.sleep(0.5)
533552

534553
# Second burst of requests
535-
all_tasks = [make_request() for _ in range(num_requests)]
554+
all_tasks = []
555+
for _ in range(num_requests):
556+
all_tasks.append(asyncio.create_task(make_request()))
557+
await asyncio.sleep(0.01)
536558

537559
results = await asyncio.gather(*all_tasks)
538560
assert len(results) == num_requests
@@ -610,8 +632,11 @@ async def make_streaming_request():
610632
await asyncio.sleep(0.5)
611633

612634
# Send multiple streaming requests - should be distributed across engines
613-
num_requests = 50
614-
all_tasks = [make_streaming_request() for _ in range(num_requests)]
635+
num_requests = 200
636+
all_tasks = []
637+
for _ in range(num_requests):
638+
all_tasks.append(asyncio.create_task(make_streaming_request()))
639+
await asyncio.sleep(0.01)
615640

616641
results = await asyncio.gather(*all_tasks)
617642
assert len(results) == num_requests
@@ -620,7 +645,10 @@ async def make_streaming_request():
620645
await asyncio.sleep(0.5)
621646

622647
# Second burst of streaming requests
623-
all_tasks = [make_streaming_request() for _ in range(num_requests)]
648+
all_tasks = []
649+
for _ in range(num_requests):
650+
all_tasks.append(asyncio.create_task(make_streaming_request()))
651+
await asyncio.sleep(0.01)
624652

625653
results = await asyncio.gather(*all_tasks)
626654
assert len(results) == num_requests

0 commit comments

Comments
 (0)