Skip to content

Commit 0dabfa4

Browse files
committed
[V1][Metrics] Add queue time histogram
The engine core process sends back a list of request IDs for newly scheduled requests with its iteration outputs, and the frontend computes queue_time relative to arrival_time. We take this approach rather than having the scheduler compute this interval to avoid comparing timestamps across processes to avoid any possible future issues if these processes run on different kernels. Signed-off-by: Mark McLoughlin <markmc@redhat.com>
1 parent 8eb4731 commit 0dabfa4

File tree

5 files changed

+58
-16
lines changed

5 files changed

+58
-16
lines changed

vllm/v1/core/scheduler.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ def update_from_output(
477477
self.running = new_running
478478
return EngineCoreOutputs(
479479
outputs=outputs,
480-
scheduler_stats=self.make_stats(),
480+
scheduler_stats=self.make_stats(scheduler_output),
481481
)
482482

483483
def _check_stop(self, request: Request) -> bool:
@@ -548,11 +548,22 @@ def has_unfinished_requests(self) -> bool:
548548
def reset_prefix_cache(self) -> bool:
549549
return self.kv_cache_manager.reset_prefix_cache()
550550

551-
def make_stats(self) -> SchedulerStats:
551+
def make_stats(
552+
self,
553+
scheduler_output: Optional["SchedulerOutput"] = None
554+
) -> SchedulerStats:
555+
if scheduler_output is not None and scheduler_output.scheduled_new_reqs:
556+
new_req_ids = [
557+
req_data.req_id
558+
for req_data in scheduler_output.scheduled_new_reqs
559+
]
560+
else:
561+
new_req_ids = None
552562
return SchedulerStats(
553563
num_running_reqs=len(self.running),
554564
num_waiting_reqs=len(self.waiting),
555565
gpu_cache_usage=self.kv_cache_manager.usage,
566+
new_req_ids=new_req_ids,
556567
)
557568

558569

vllm/v1/engine/async_llm.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,12 @@ async def _run_output_handler(self):
246246
# 1) Pull EngineCoreOutputs from the EngineCore.
247247
outputs = await self.engine_core.get_output_async()
248248

249+
# Record a timestamp for newly scheduled requests
250+
iteration_stats = IterationStats(self.log_stats)
251+
if outputs.scheduler_stats.new_req_ids is not None:
252+
self.output_processor.record_first_scheduled_time(
253+
outputs.scheduler_stats.new_req_ids, iteration_stats)
254+
249255
# Split outputs into chunks of at most
250256
# VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the
251257
# event loop for too long.
@@ -257,14 +263,12 @@ async def _run_output_handler(self):
257263
outputs.outputs,
258264
cdiv(num_outputs, VLLM_V1_OUTPUT_PROC_CHUNK_SIZE))
259265

260-
iteration_stats = None
261266
for i, outputs_slice in enumerate(slices):
262267
# 2) Process EngineCoreOutputs.
263268
processed_outputs = self.output_processor.process_outputs(
264269
outputs_slice, iteration_stats)
265270
# NOTE: RequestOutputs are pushed to their queues.
266271
assert not processed_outputs.request_outputs
267-
iteration_stats = processed_outputs.iteration_stats
268272

269273
# Allow other asyncio tasks to run between chunks
270274
if i + 1 < len(slices):

vllm/v1/engine/output_processor.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ class OutputProcessorOutput:
1818

1919
request_outputs: List[RequestOutput]
2020
reqs_to_abort: List[str]
21-
iteration_stats: IterationStats
2221

2322

2423
class RequestState:
@@ -105,6 +104,15 @@ def add_request(
105104
request=request,
106105
queue=queue)
107106

107+
def record_first_scheduled_time(self, new_req_ids: List[str],
108+
iteration_stats: IterationStats) -> None:
109+
for req_id in new_req_ids:
110+
req_state = self.request_states.get(req_id)
111+
if req_state is None:
112+
# Ignore output for already-aborted request.
113+
continue
114+
iteration_stats.update_from_newly_scheduled(req_state.stats)
115+
108116
def process_outputs(
109117
self,
110118
engine_core_outputs: List[EngineCoreOutput],
@@ -141,8 +149,6 @@ def process_outputs(
141149

142150
request_outputs: List[RequestOutput] = []
143151
reqs_to_abort: List[str] = []
144-
if not iteration_stats:
145-
iteration_stats = IterationStats(self.log_stats)
146152
for engine_core_output in engine_core_outputs:
147153
req_id = engine_core_output.request_id
148154
req_state = self.request_states.get(req_id)
@@ -151,10 +157,11 @@ def process_outputs(
151157
continue
152158

153159
# 1) Compute stats for this iteration.
154-
iteration_stats.update_from_output(engine_core_output,
155-
req_state.is_prefilling,
156-
req_state.prompt_len,
157-
req_state.stats)
160+
if iteration_stats is not None:
161+
iteration_stats.update_from_output(engine_core_output,
162+
req_state.is_prefilling,
163+
req_state.prompt_len,
164+
req_state.stats)
158165
req_state.is_prefilling = False
159166

160167
# 2) Detokenize the token ids into text.
@@ -184,14 +191,14 @@ def process_outputs(
184191
reqs_to_abort.append(req_id)
185192

186193
# Track per-request stats
187-
iteration_stats.update_from_finished_request(
188-
detokenizer_output.finish_reason, request_output,
189-
req_state.stats)
194+
if iteration_stats is not None:
195+
iteration_stats.update_from_finished_request(
196+
detokenizer_output.finish_reason, request_output,
197+
req_state.stats)
190198

191199
return OutputProcessorOutput(
192200
request_outputs=request_outputs,
193201
reqs_to_abort=reqs_to_abort,
194-
iteration_stats=iteration_stats,
195202
)
196203

197204
@staticmethod

vllm/v1/metrics/loggers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,13 @@ def __init__(self, model_config: ModelConfig):
172172
documentation="Histogram of e2e request latency in seconds.",
173173
buckets=request_latency_buckets,
174174
labelnames=labelnames).labels(*labelvalues)
175+
self.histogram_queue_time_request = \
176+
prometheus_client.Histogram(
177+
name="vllm:request_queue_time_seconds",
178+
documentation=
179+
"Histogram of time spent in WAITING phase for request.",
180+
buckets=request_latency_buckets,
181+
labelnames=labelnames).labels(*labelvalues)
175182

176183
def log(self, scheduler_stats: SchedulerStats,
177184
iteration_stats: IterationStats):
@@ -198,6 +205,8 @@ def log(self, scheduler_stats: SchedulerStats,
198205
self.histogram_time_to_first_token.observe(ttft)
199206
for tpot in iteration_stats.time_per_output_tokens_iter:
200207
self.histogram_time_per_output_token.observe(tpot)
208+
for queue_time in iteration_stats.queue_times_iter:
209+
self.histogram_queue_time_request.observe(queue_time)
201210

202211
@staticmethod
203212
def _unregister_vllm_metrics():

vllm/v1/metrics/stats.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import time
44
from dataclasses import dataclass
5-
from typing import TYPE_CHECKING, List
5+
from typing import TYPE_CHECKING, List, Optional
66

77
if TYPE_CHECKING:
88
from vllm.outputs import RequestOutput
@@ -19,13 +19,16 @@ class SchedulerStats:
1919
gpu_cache_usage: float = 0.0
2020
# gpu_prefix_cache_hit_rate: float = 0.0
2121

22+
new_req_ids: Optional[List[str]] = None
23+
2224

2325
@dataclass
2426
class RequestStateStats:
2527
"""Stats that need to be tracked across delta updates."""
2628

2729
num_generation_tokens: int = 0
2830
arrival_time: float = 0.0
31+
first_scheduled_time: float = 0.0
2932
last_token_time: float = 0.0
3033

3134

@@ -49,6 +52,7 @@ def __init__(self, log_stats: bool):
4952
self.finished_requests: List[FinishedRequestStats] = []
5053
self.time_to_first_tokens_iter: List[float] = []
5154
self.time_per_output_tokens_iter: List[float] = []
55+
self.queue_times_iter: List[float] = []
5256

5357
def update_from_output(self, output: "EngineCoreOutput",
5458
is_prefilling: bool, prompt_len: int,
@@ -76,6 +80,13 @@ def update_from_output(self, output: "EngineCoreOutput",
7680
request_state_stats.num_generation_tokens += num_new_generation_tokens
7781
request_state_stats.last_token_time = now
7882

83+
def update_from_newly_scheduled(self,
84+
request_state_stats: RequestStateStats):
85+
now = time.time()
86+
request_state_stats.first_scheduled_time = now
87+
queue_time = now - request_state_stats.arrival_time
88+
self.queue_times_iter.append(queue_time)
89+
7990
def update_from_finished_request(self, finish_reason: "FinishReason",
8091
request_output: "RequestOutput",
8192
request_state_stats: RequestStateStats):

0 commit comments

Comments
 (0)