Skip to content

Commit a5528f5

Browse files
committed
clean tests
Signed-off-by: PeaBrane <yanrpei@gmail.com>
1 parent f542d8c commit a5528f5

File tree

1 file changed

+92
-115
lines changed

1 file changed

+92
-115
lines changed

tests/router/test_router_e2e_with_mockers.py

Lines changed: 92 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -46,36 +46,55 @@ def generate_random_suffix() -> str:
4646
}
4747

4848

49-
class MockerProcess(ManagedProcess):
50-
"""Manages a single mocker engine instance"""
49+
class MockerProcess:
50+
"""Manages multiple mocker engine instances with the same namespace"""
5151

52-
def __init__(self, request, mocker_args_file: str):
53-
# Generate a unique endpoint with random namespace suffix
52+
def __init__(self, request, mocker_args_file: str, num_mockers: int = 1):
53+
# Generate a unique namespace suffix shared by all mockers
5454
namespace_suffix = generate_random_suffix()
5555
self.namespace = f"test-namespace-{namespace_suffix}"
5656
self.endpoint = f"dyn://{self.namespace}.mocker.generate"
57+
self.num_mockers = num_mockers
58+
self.mocker_processes = []
59+
60+
# Create multiple mocker processes with the same namespace
61+
for i in range(num_mockers):
62+
command = [
63+
"python",
64+
"-m",
65+
"dynamo.mocker",
66+
"--model-path",
67+
MODEL_NAME,
68+
"--extra-engine-args",
69+
mocker_args_file,
70+
"--endpoint",
71+
self.endpoint,
72+
]
73+
74+
process = ManagedProcess(
75+
command=command,
76+
timeout=60,
77+
display_output=True,
78+
health_check_ports=[],
79+
health_check_urls=[],
80+
log_dir=request.node.name,
81+
terminate_existing=False,
82+
)
83+
self.mocker_processes.append(process)
84+
logger.info(f"Created mocker instance {i} with endpoint: {self.endpoint}")
5785

58-
command = [
59-
"python",
60-
"-m",
61-
"dynamo.mocker",
62-
"--model-path",
63-
MODEL_NAME,
64-
"--extra-engine-args",
65-
mocker_args_file,
66-
"--endpoint",
67-
self.endpoint,
68-
]
86+
def __enter__(self):
87+
"""Start all mocker processes"""
88+
for i, process in enumerate(self.mocker_processes):
89+
logger.info(f"Starting mocker instance {i}")
90+
process.__enter__()
91+
return self
6992

70-
super().__init__(
71-
command=command,
72-
timeout=60,
73-
display_output=True,
74-
health_check_ports=[],
75-
health_check_urls=[],
76-
log_dir=request.node.name,
77-
terminate_existing=False,
78-
)
93+
def __exit__(self, exc_type, exc_val, exc_tb):
94+
"""Stop all mocker processes"""
95+
for i, process in enumerate(self.mocker_processes):
96+
logger.info(f"Stopping mocker instance {i}")
97+
process.__exit__(exc_type, exc_val, exc_tb)
7998

8099

81100
class KVRouterProcess(ManagedProcess):
@@ -279,9 +298,6 @@ def test_mocker_kv_router(request, runtime_services):
279298
with open(mocker_args_file, "w") as f:
280299
json.dump(mocker_args, f)
281300

282-
# Start mocker instances
283-
mocker_processes = []
284-
285301
try:
286302
# Start KV router (frontend)
287303
frontend_port = PORT
@@ -290,15 +306,11 @@ def test_mocker_kv_router(request, runtime_services):
290306
kv_router = KVRouterProcess(request, frontend_port)
291307
kv_router.__enter__()
292308

293-
for i in range(NUM_MOCKERS):
294-
logger.info(f"Starting mocker instance {i}")
295-
mocker = MockerProcess(request, mocker_args_file)
296-
logger.info(f"Mocker {i} using endpoint: {mocker.endpoint}")
297-
mocker_processes.append(mocker)
298-
299-
# Start all mockers
300-
for mocker in mocker_processes:
301-
mocker.__enter__()
309+
# Start mocker instances
310+
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
311+
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS)
312+
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
313+
mockers.__enter__()
302314

303315
# Use async to send requests concurrently for better performance
304316
asyncio.run(
@@ -314,20 +326,18 @@ def test_mocker_kv_router(request, runtime_services):
314326
logger.info(f"Successfully completed {NUM_REQUESTS} requests")
315327

316328
# Check etcd registration - expect 1 KV router
317-
# Use the first mocker's endpoint since all mockers share the same component path
329+
# Use the mockers' endpoint since all mockers share the same component path
318330
asyncio.run(
319-
check_registration_in_etcd(
320-
expected_count=1, endpoint=mocker_processes[0].endpoint
321-
)
331+
check_registration_in_etcd(expected_count=1, endpoint=mockers.endpoint)
322332
)
323333

324334
finally:
325335
# Clean up
326336
if "kv_router" in locals():
327337
kv_router.__exit__(None, None, None)
328338

329-
for mocker in mocker_processes:
330-
mocker.__exit__(None, None, None)
339+
if "mockers" in locals():
340+
mockers.__exit__(None, None, None)
331341

332342
if os.path.exists(mocker_args_file):
333343
os.unlink(mocker_args_file)
@@ -350,8 +360,6 @@ def test_mocker_two_kv_router(request, runtime_services):
350360
with open(mocker_args_file, "w") as f:
351361
json.dump(mocker_args, f)
352362

353-
# Start mocker instances
354-
mocker_processes = []
355363
kv_routers = []
356364

357365
try:
@@ -364,15 +372,11 @@ def test_mocker_two_kv_router(request, runtime_services):
364372
kv_router.__enter__()
365373
kv_routers.append(kv_router)
366374

367-
for i in range(NUM_MOCKERS):
368-
logger.info(f"Starting mocker instance {i}")
369-
mocker = MockerProcess(request, mocker_args_file)
370-
logger.info(f"Mocker {i} using endpoint: {mocker.endpoint}")
371-
mocker_processes.append(mocker)
372-
373-
# Start all mockers
374-
for mocker in mocker_processes:
375-
mocker.__enter__()
375+
# Start mocker instances
376+
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
377+
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS)
378+
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
379+
mockers.__enter__()
376380

377381
# Build URLs for both routers
378382
router_urls = [
@@ -393,11 +397,9 @@ def test_mocker_two_kv_router(request, runtime_services):
393397
)
394398

395399
# Check etcd registration - expect 2 KV routers
396-
# Use the first mocker's endpoint since all mockers share the same component path
400+
# Use the mockers' endpoint since all mockers share the same component path
397401
asyncio.run(
398-
check_registration_in_etcd(
399-
expected_count=2, endpoint=mocker_processes[0].endpoint
400-
)
402+
check_registration_in_etcd(expected_count=2, endpoint=mockers.endpoint)
401403
)
402404

403405
finally:
@@ -406,8 +408,8 @@ def test_mocker_two_kv_router(request, runtime_services):
406408
kv_router.__exit__(None, None, None)
407409

408410
# Clean up mockers
409-
for mocker in mocker_processes:
410-
mocker.__exit__(None, None, None)
411+
if "mockers" in locals():
412+
mockers.__exit__(None, None, None)
411413

412414
if os.path.exists(mocker_args_file):
413415
os.unlink(mocker_args_file)
@@ -475,9 +477,9 @@ def test_mocker_kv_router_overload_503(request, runtime_services):
475477

476478
# Start single mocker instance with limited resources
477479
logger.info("Starting single mocker instance with limited resources")
478-
mocker = MockerProcess(request, mocker_args_file)
479-
logger.info(f"Mocker using endpoint: {mocker.endpoint}")
480-
mocker.__enter__()
480+
mockers = MockerProcess(request, mocker_args_file, num_mockers=1)
481+
logger.info(f"Mocker using endpoint: {mockers.endpoint}")
482+
mockers.__enter__()
481483

482484
url = f"http://localhost:{frontend_port}/v1/chat/completions"
483485

@@ -579,8 +581,8 @@ async def send_long_request(req_id, payload):
579581
if "kv_router" in locals():
580582
kv_router.__exit__(None, None, None)
581583

582-
if "mocker" in locals():
583-
mocker.__exit__(None, None, None)
584+
if "mockers" in locals():
585+
mockers.__exit__(None, None, None)
584586

585587
if os.path.exists(mocker_args_file):
586588
os.unlink(mocker_args_file)
@@ -604,28 +606,19 @@ def test_kv_push_router_bindings(request, runtime_services):
604606
with open(mocker_args_file, "w") as f:
605607
json.dump(mocker_args, f)
606608

607-
# Start mocker instances
608-
mocker_processes = []
609-
610609
try:
611-
# Start mockers
612-
for i in range(NUM_MOCKERS):
613-
logger.info(f"Starting mocker instance {i}")
614-
mocker = MockerProcess(request, mocker_args_file)
615-
logger.info(f"Mocker {i} using endpoint: {mocker.endpoint}")
616-
mocker_processes.append(mocker)
617-
618-
# Start all mockers
619-
for mocker in mocker_processes:
620-
mocker.__enter__()
610+
# Start mocker instances
611+
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
612+
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS)
613+
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
614+
mockers.__enter__()
621615

622616
# Wait for mockers to be ready by sending a dummy request with retry
623617
async def wait_for_mockers_ready():
624618
"""Send a dummy request to ensure mockers are ready"""
625619
runtime = get_runtime()
626-
# Use the namespace from the first mocker
627-
first_mocker_namespace = mocker_processes[0].namespace
628-
namespace = runtime.namespace(first_mocker_namespace)
620+
# Use the namespace from the mockers
621+
namespace = runtime.namespace(mockers.namespace)
629622
component = namespace.component("mocker")
630623
endpoint = component.endpoint("generate")
631624

@@ -687,9 +680,8 @@ async def wait_for_mockers_ready():
687680
async def test_kv_push_router():
688681
# Get runtime and create endpoint
689682
runtime = get_runtime()
690-
# Use the namespace from the first mocker
691-
first_mocker_namespace = mocker_processes[0].namespace
692-
namespace = runtime.namespace(first_mocker_namespace)
683+
# Use the namespace from the mockers
684+
namespace = runtime.namespace(mockers.namespace)
693685
component = namespace.component("mocker")
694686
endpoint = component.endpoint("generate")
695687

@@ -821,8 +813,8 @@ async def test_kv_push_router():
821813

822814
finally:
823815
# Clean up mockers
824-
for mocker in mocker_processes:
825-
mocker.__exit__(None, None, None)
816+
if "mockers" in locals():
817+
mockers.__exit__(None, None, None)
826818

827819
if os.path.exists(mocker_args_file):
828820
os.unlink(mocker_args_file)
@@ -845,28 +837,19 @@ def test_indexers_sync(request, runtime_services):
845837
with open(mocker_args_file, "w") as f:
846838
json.dump(mocker_args, f)
847839

848-
# Start mocker instances
849-
mocker_processes = []
850-
851840
try:
852-
# Start mockers first
853-
for i in range(NUM_MOCKERS):
854-
logger.info(f"Starting mocker instance {i}")
855-
mocker = MockerProcess(request, mocker_args_file)
856-
logger.info(f"Mocker {i} using endpoint: {mocker.endpoint}")
857-
mocker_processes.append(mocker)
858-
859-
# Start all mockers
860-
for mocker in mocker_processes:
861-
mocker.__enter__()
841+
# Start mocker instances
842+
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
843+
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS)
844+
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
845+
mockers.__enter__()
862846

863847
# Run the async test
864848
async def test_sync():
865849
# Get runtime and create endpoint
866850
runtime = get_runtime()
867-
# Use the namespace from the first mocker
868-
first_mocker_namespace = mocker_processes[0].namespace
869-
namespace = runtime.namespace(first_mocker_namespace)
851+
# Use the namespace from the mockers
852+
namespace = runtime.namespace(mockers.namespace)
870853
component = namespace.component("mocker")
871854
endpoint = component.endpoint("generate")
872855

@@ -1067,8 +1050,8 @@ def sort_key(event):
10671050

10681051
finally:
10691052
# Clean up mockers
1070-
for mocker in mocker_processes:
1071-
mocker.__exit__(None, None, None)
1053+
if "mockers" in locals():
1054+
mockers.__exit__(None, None, None)
10721055

10731056
if os.path.exists(mocker_args_file):
10741057
os.unlink(mocker_args_file)
@@ -1103,8 +1086,6 @@ def test_query_instance_id_returns_worker_and_tokens(request, runtime_services):
11031086
with open(mocker_args_file, "w") as f:
11041087
json.dump(mocker_args, f)
11051088

1106-
mocker_processes = []
1107-
11081089
try:
11091090
# Start KV router (frontend)
11101091
frontend_port = PORT + 30 # Use unique port to avoid conflicts
@@ -1113,14 +1094,10 @@ def test_query_instance_id_returns_worker_and_tokens(request, runtime_services):
11131094
kv_router.__enter__()
11141095

11151096
# Start multiple mocker engines to ensure worker selection logic
1116-
for i in range(NUM_MOCKERS):
1117-
logger.info(f"Starting mocker instance {i}")
1118-
mocker = MockerProcess(request, mocker_args_file)
1119-
logger.info(f"Mocker {i} using endpoint: {mocker.endpoint}")
1120-
mocker_processes.append(mocker)
1121-
1122-
for mocker in mocker_processes:
1123-
mocker.__enter__()
1097+
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
1098+
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS)
1099+
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
1100+
mockers.__enter__()
11241101

11251102
url = f"http://localhost:{frontend_port}/v1/chat/completions"
11261103

@@ -1263,7 +1240,7 @@ async def test_annotation_response():
12631240
finally:
12641241
if "kv_router" in locals():
12651242
kv_router.__exit__(None, None, None)
1266-
for mocker in mocker_processes:
1267-
mocker.__exit__(None, None, None)
1243+
if "mockers" in locals():
1244+
mockers.__exit__(None, None, None)
12681245
if os.path.exists(mocker_args_file):
12691246
os.unlink(mocker_args_file)

0 commit comments

Comments
 (0)