|
2 | 2 | # SPDX-FileCopyrightText: Copyright contributors to the vLLM project |
3 | 3 | import asyncio |
4 | 4 | import os |
5 | | -import re |
6 | 5 |
|
7 | 6 | import openai # use the official client for correctness check |
8 | 7 | import pytest |
9 | 8 | import pytest_asyncio |
10 | | -import requests |
11 | 9 |
|
12 | 10 | from tests.utils import RemoteOpenAIServer |
| 11 | +from tests.v1.test_utils import check_request_balancing |
13 | 12 |
|
14 | 13 | MODEL_NAME = "ibm-research/PowerMoE-3b" |
15 | 14 |
|
16 | 15 | DP_SIZE = os.getenv("DP_SIZE", "1") |
17 | 16 |
|
18 | 17 |
|
19 | | -def get_prometheus_metrics( |
20 | | - server: RemoteOpenAIServer) -> dict[str, dict[str, float]]: |
21 | | - """Fetch and parse Prometheus metrics from the /metrics endpoint. |
22 | | - |
23 | | - Returns: |
24 | | - Dict mapping metric names to their values grouped by labels. |
25 | | - For example: {"vllm:request_success": { |
26 | | - "engine=0": 5.0, "engine=1": 3.0} |
27 | | - } |
28 | | - """ |
29 | | - try: |
30 | | - response = requests.get(server.url_for("metrics"), timeout=10) |
31 | | - response.raise_for_status() |
32 | | - |
33 | | - metrics: dict[str, dict[str, float]] = {} |
34 | | - |
35 | | - # Regex patterns for Prometheus metrics |
36 | | - metric_with_labels = re.compile( |
37 | | - r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\{([^}]*)\}\s+([\d\.\-\+e]+)$') |
38 | | - metric_simple = re.compile( |
39 | | - r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s+([\d\.\-\+e]+)$') |
40 | | - |
41 | | - for line in response.text.split('\n'): |
42 | | - line = line.strip() |
43 | | - # Skip comments and empty lines |
44 | | - if not line or line.startswith('#'): |
45 | | - continue |
46 | | - |
47 | | - # Try to match metric with labels first |
48 | | - match = metric_with_labels.match(line) |
49 | | - if match: |
50 | | - metric_name, labels_part, value_str = match.groups() |
51 | | - try: |
52 | | - value = float(value_str) |
53 | | - if metric_name not in metrics: |
54 | | - metrics[metric_name] = {} |
55 | | - metrics[metric_name][f'{{{labels_part}}}'] = value |
56 | | - except ValueError: |
57 | | - continue |
58 | | - else: |
59 | | - # Try simple metric without labels |
60 | | - match = metric_simple.match(line) |
61 | | - if match: |
62 | | - metric_name, value_str = match.groups() |
63 | | - try: |
64 | | - value = float(value_str) |
65 | | - if metric_name not in metrics: |
66 | | - metrics[metric_name] = {} |
67 | | - metrics[metric_name][''] = value |
68 | | - except ValueError: |
69 | | - continue |
70 | | - |
71 | | - return metrics |
72 | | - except Exception as e: |
73 | | - pytest.fail(f"Failed to fetch Prometheus metrics: {e}") |
74 | | - return {} |
75 | | - |
76 | | - |
77 | | -def get_engine_request_counts( |
78 | | - metrics: dict[str, dict[str, float]]) -> dict[str, float]: |
79 | | - """Extract request counts per engine from Prometheus metrics. |
80 | | - |
81 | | - Returns: |
82 | | - Dict mapping engine indices to request counts. |
83 | | - For example: {"0": 15.0, "1": 12.0} |
84 | | - """ |
85 | | - engine_counts = {} |
86 | | - |
87 | | - # Look for request success metrics with engine labels |
88 | | - success_metrics = metrics.get("vllm:request_success_total", {}) |
89 | | - engine_pattern = re.compile(r'engine="([^"]*)"') |
90 | | - |
91 | | - for labels, count in success_metrics.items(): |
92 | | - # Extract engine ID from labels using regex |
93 | | - match = engine_pattern.search(labels) |
94 | | - if match: |
95 | | - engine_id = match.group(1) |
96 | | - if engine_id not in engine_counts: |
97 | | - engine_counts[engine_id] = 0.0 |
98 | | - engine_counts[engine_id] += count |
99 | | - |
100 | | - return engine_counts |
101 | | - |
102 | | - |
103 | | -def check_request_balancing(server: RemoteOpenAIServer): |
104 | | - """Check request balancing via Prometheus metrics if DP_SIZE > 1. |
105 | | - |
106 | | - Args: |
107 | | - server: The RemoteOpenAIServer instance |
108 | | - """ |
109 | | - dp_size = int(DP_SIZE) |
110 | | - if dp_size <= 1: |
111 | | - return |
112 | | - |
113 | | - # Get metrics after all requests are completed |
114 | | - metrics = get_prometheus_metrics(server) |
115 | | - engine_counts = get_engine_request_counts(metrics) |
116 | | - |
117 | | - # Check that multiple engines received requests |
118 | | - engines_with_requests = [ |
119 | | - engine for engine, count in engine_counts.items() if count > 0 |
120 | | - ] |
121 | | - assert len(engines_with_requests) == dp_size, ( |
122 | | - f"Expected requests to be distributed across multiple engines," |
123 | | - f" but only engine(s) {engines_with_requests} received " |
124 | | - f"requests. Engine counts: {engine_counts}") |
125 | | - |
126 | | - # Verify that the load is reasonably balanced |
127 | | - # (no engine should handle all requests) |
128 | | - total_requests = sum(engine_counts.values()) |
129 | | - |
130 | | - for count in engine_counts.values(): |
131 | | - assert count > total_requests // (dp_size + 1), ( |
132 | | - f"requests are imbalanced: {engine_counts}") |
133 | | - |
134 | | - |
135 | 18 | @pytest.fixture(scope="module") |
136 | 19 | def default_server_args(): |
137 | 20 | return [ |
@@ -217,7 +100,7 @@ async def make_request(): |
217 | 100 | assert all(completion is not None for completion in results) |
218 | 101 |
|
219 | 102 | # Check request balancing via Prometheus metrics if DP_SIZE > 1 |
220 | | - check_request_balancing(server) |
| 103 | + check_request_balancing(server, int(DP_SIZE)) |
221 | 104 |
|
222 | 105 |
|
223 | 106 | @pytest.mark.asyncio |
@@ -295,4 +178,4 @@ async def make_streaming_request(): |
295 | 178 | assert all(results), "Not all streaming requests completed successfully." |
296 | 179 |
|
297 | 180 | # Check request balancing via Prometheus metrics if DP_SIZE > 1 |
298 | | - check_request_balancing(server) |
| 181 | + check_request_balancing(server, int(DP_SIZE)) |
0 commit comments