Skip to content

Commit a483008

Browse files
ptovamilmarkov
authored andcommitted
[Metrics] [KVConnector] Add connector prefix cache hit rate stats (vllm-project#26245)
Signed-off-by: tovam <tovam@pliops.com>
1 parent c9b7549 commit a483008

File tree

5 files changed

+152
-11
lines changed

5 files changed

+152
-11
lines changed

tests/v1/core/test_scheduler.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,66 @@ def test_kv_connector_basic():
10141014
)
10151015

10161016

1017+
def test_external_prefix_cache_metrics():
1018+
"""
1019+
Verify connector prefix cache metrics are updated
1020+
correctly when the scheduler processes requests with KV connector hits.
1021+
"""
1022+
1023+
# Setup Scheduler.
1024+
scheduler = create_scheduler(
1025+
enable_prefix_caching=False,
1026+
use_kv_connector=True,
1027+
)
1028+
1029+
# Mock connector to simulate a partial external cache hit
1030+
NUM_MATCHED_NEW_TOKENS = 4
1031+
scheduler.connector.get_num_new_matched_tokens = Mock(name="method")
1032+
scheduler.connector.get_num_new_matched_tokens.return_value = (
1033+
NUM_MATCHED_NEW_TOKENS,
1034+
False,
1035+
)
1036+
1037+
# --- Prepare simple requests ---
1038+
NUM_REQUESTS = 2
1039+
NUM_TOKENS = 8
1040+
MAX_TOKENS = 2
1041+
requests = create_requests(
1042+
num_requests=NUM_REQUESTS,
1043+
num_tokens=NUM_TOKENS,
1044+
max_tokens=MAX_TOKENS,
1045+
)
1046+
1047+
for req in requests:
1048+
scheduler.add_request(req)
1049+
1050+
# --- Trigger scheduling and simulate model output ---
1051+
output = scheduler.schedule()
1052+
MODEL_RUNNER_OUTPUT = ModelRunnerOutput(
1053+
req_ids=[r.request_id for r in requests],
1054+
req_id_to_index={r.request_id: i for i, r in enumerate(requests)},
1055+
sampled_token_ids=[[1000]] * NUM_REQUESTS,
1056+
logprobs=None,
1057+
prompt_logprobs_dict={},
1058+
pooler_output=[],
1059+
)
1060+
1061+
# Update scheduler stats
1062+
ecos = scheduler.update_from_output(output, MODEL_RUNNER_OUTPUT)
1063+
1064+
# --- Assertions ---
1065+
assert ecos is not None and len(ecos) > 0
1066+
assert ecos[0].scheduler_stats is not None
1067+
1068+
external_stats = ecos[0].scheduler_stats.connector_prefix_cache_stats
1069+
assert external_stats is not None
1070+
1071+
assert external_stats.queries == NUM_TOKENS * NUM_REQUESTS
1072+
assert external_stats.hits == NUM_MATCHED_NEW_TOKENS * NUM_REQUESTS
1073+
assert external_stats.requests == NUM_REQUESTS
1074+
assert external_stats.preempted_requests == 0
1075+
1076+
10171077
def test_kv_connector_unable_to_allocate():
10181078
"""
10191079
Test whether scheduler with KVConnector is able to handle

vllm/v1/core/kv_cache_manager.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -208,16 +208,11 @@ def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]:
208208

209209
if self.log_stats:
210210
assert self.prefix_cache_stats is not None
211-
if request.num_preemptions > 0:
212-
# Previously preempted request
213-
self.prefix_cache_stats.preempted_requests += 1
214-
self.prefix_cache_stats.preempted_queries += request.num_tokens
215-
self.prefix_cache_stats.preempted_hits += num_new_computed_tokens
216-
else:
217-
# New request
218-
self.prefix_cache_stats.requests += 1
219-
self.prefix_cache_stats.queries += request.num_tokens
220-
self.prefix_cache_stats.hits += num_new_computed_tokens
211+
self.prefix_cache_stats.record(
212+
num_tokens=request.num_tokens,
213+
num_hits=num_new_computed_tokens,
214+
preempted=request.num_preemptions > 0,
215+
)
221216

222217
return self.create_kv_cache_blocks(computed_blocks), num_new_computed_tokens
223218

vllm/v1/core/sched/scheduler.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from vllm.v1.core.sched.utils import check_stop, remove_all
2929
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs
3030
from vllm.v1.kv_cache_interface import KVCacheConfig
31-
from vllm.v1.metrics.stats import SchedulerStats
31+
from vllm.v1.metrics.stats import PrefixCacheStats, SchedulerStats
3232
from vllm.v1.outputs import DraftTokenIds, KVConnectorOutput, ModelRunnerOutput
3333
from vllm.v1.request import Request, RequestStatus
3434
from vllm.v1.spec_decode.metrics import SpecDecodingStats
@@ -84,6 +84,7 @@ def __init__(
8484
# will have a corresponding KVConnector with Role=WORKER.
8585
# KV Connector pushes/pull of remote KVs for P/D and offloading.
8686
self.connector = None
87+
self.connector_prefix_cache_stats: PrefixCacheStats | None = None
8788
if self.vllm_config.kv_transfer_config is not None:
8889
assert len(self.kv_cache_config.kv_cache_groups) == 1, (
8990
"Multiple KV cache groups are not currently supported "
@@ -95,6 +96,8 @@ def __init__(
9596
self.connector = KVConnectorFactory.create_connector(
9697
config=self.vllm_config, role=KVConnectorRole.SCHEDULER
9798
)
99+
if self.log_stats:
100+
self.connector_prefix_cache_stats = PrefixCacheStats()
98101

99102
self.kv_event_publisher = EventPublisherFactory.create(
100103
self.kv_events_config,
@@ -526,6 +529,9 @@ def schedule(self) -> SchedulerOutput:
526529
new_computed_blocks + new_blocks,
527530
num_external_computed_tokens,
528531
)
532+
self._update_connector_prefix_cache_stats(
533+
request, num_external_computed_tokens
534+
)
529535

530536
# Request was already popped from self.waiting
531537
# unless it was re-added above due to new_blocks being None.
@@ -1247,11 +1253,13 @@ def make_stats(
12471253
return None
12481254
prefix_cache_stats = self.kv_cache_manager.make_prefix_cache_stats()
12491255
assert prefix_cache_stats is not None
1256+
connector_prefix_cache_stats = self._make_connector_prefix_cache_stats()
12501257
return SchedulerStats(
12511258
num_running_reqs=len(self.running),
12521259
num_waiting_reqs=len(self.waiting),
12531260
kv_cache_usage=self.kv_cache_manager.usage,
12541261
prefix_cache_stats=prefix_cache_stats,
1262+
connector_prefix_cache_stats=connector_prefix_cache_stats,
12551263
spec_decoding_stats=spec_decoding_stats,
12561264
num_corrupted_reqs=sum(req.is_output_corrupted for req in self.running),
12571265
kv_connector_stats=kv_connector_stats.data if kv_connector_stats else None,
@@ -1282,6 +1290,25 @@ def shutdown(self) -> None:
12821290
# KV Connector Related Methods
12831291
########################################################################
12841292

1293+
def _update_connector_prefix_cache_stats(
1294+
self, request: Request, num_external_tokens: int
1295+
) -> None:
1296+
if self.connector_prefix_cache_stats is None:
1297+
return
1298+
1299+
self.connector_prefix_cache_stats.record(
1300+
num_tokens=request.num_tokens,
1301+
num_hits=num_external_tokens,
1302+
preempted=request.num_preemptions > 0,
1303+
)
1304+
1305+
def _make_connector_prefix_cache_stats(self) -> PrefixCacheStats | None:
1306+
if self.connector_prefix_cache_stats is None:
1307+
return None
1308+
stats = self.connector_prefix_cache_stats
1309+
self.connector_prefix_cache_stats = PrefixCacheStats()
1310+
return stats
1311+
12851312
def get_kv_connector(self) -> KVConnectorBase_V1 | None:
12861313
return self.connector
12871314

vllm/v1/metrics/loggers.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
9393
# Caching metrics. This cannot be reset.
9494
# TODO: Make the interval configurable.
9595
self.prefix_caching_metrics = CachingMetrics()
96+
self.connector_prefix_caching_metrics = CachingMetrics()
9697
self.mm_caching_metrics = CachingMetrics()
9798

9899
self.spec_decoding_logging = SpecDecodingLogging()
@@ -140,6 +141,11 @@ def record(
140141
if scheduler_stats is not None:
141142
self.prefix_caching_metrics.observe(scheduler_stats.prefix_cache_stats)
142143

144+
if scheduler_stats.connector_prefix_cache_stats is not None:
145+
self.connector_prefix_caching_metrics.observe(
146+
scheduler_stats.connector_prefix_cache_stats
147+
)
148+
143149
if scheduler_stats.spec_decoding_stats is not None:
144150
self.spec_decoding_logging.observe(scheduler_stats.spec_decoding_stats)
145151
if kv_connector_stats := scheduler_stats.kv_connector_stats:
@@ -192,6 +198,9 @@ def log(self):
192198
self.last_scheduler_stats.kv_cache_usage * 100,
193199
self.prefix_caching_metrics.hit_rate * 100,
194200
]
201+
if not self.connector_prefix_caching_metrics.empty:
202+
log_parts.append("External prefix cache hit rate: %.1f%%")
203+
log_args.append(self.connector_prefix_caching_metrics.hit_rate * 100)
195204
if not self.mm_caching_metrics.empty:
196205
log_parts.append("MM cache hit rate: %.1f%%")
197206
log_args.append(self.mm_caching_metrics.hit_rate * 100)
@@ -457,6 +466,34 @@ def __init__(
457466
counter_prefix_cache_hits, engine_indexes, model_name
458467
)
459468

469+
#
470+
# External - KV connector prefix cache
471+
#
472+
473+
counter_connector_prefix_cache_queries = self._counter_cls(
474+
name="vllm:external_prefix_cache_queries",
475+
documentation=(
476+
"External prefix cache queries from KV connector "
477+
"cross-instance cache sharing, in terms of number of queried tokens."
478+
),
479+
labelnames=labelnames,
480+
)
481+
self.counter_connector_prefix_cache_queries = make_per_engine(
482+
counter_connector_prefix_cache_queries, engine_indexes, model_name
483+
)
484+
485+
counter_connector_prefix_cache_hits = self._counter_cls(
486+
name="vllm:external_prefix_cache_hits",
487+
documentation=(
488+
"External prefix cache hits from KV connector "
489+
"cross-instance cache sharing, in terms of number of cached tokens."
490+
),
491+
labelnames=labelnames,
492+
)
493+
self.counter_connector_prefix_cache_hits = make_per_engine(
494+
counter_connector_prefix_cache_hits, engine_indexes, model_name
495+
)
496+
460497
#
461498
# Multi-modal cache
462499
#
@@ -883,6 +920,14 @@ def record(
883920
scheduler_stats.prefix_cache_stats.hits
884921
)
885922

923+
if scheduler_stats.connector_prefix_cache_stats is not None:
924+
self.counter_connector_prefix_cache_queries[engine_idx].inc(
925+
scheduler_stats.connector_prefix_cache_stats.queries
926+
)
927+
self.counter_connector_prefix_cache_hits[engine_idx].inc(
928+
scheduler_stats.connector_prefix_cache_stats.hits
929+
)
930+
886931
if scheduler_stats.spec_decoding_stats is not None:
887932
self.spec_decoding_prom.observe(
888933
scheduler_stats.spec_decoding_stats, engine_idx

vllm/v1/metrics/stats.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,19 @@ class PrefixCacheStats(BaseCacheStats):
126126
preempted_hits: int = 0
127127
"""The `hits` number for preempted requests."""
128128

129+
def record(self, num_tokens: int, num_hits: int, preempted: bool) -> None:
130+
"""Aggregate request information into the stats."""
131+
if preempted:
132+
# Previously preempted request
133+
self.preempted_requests += 1
134+
self.preempted_queries += num_tokens
135+
self.preempted_hits += num_hits
136+
else:
137+
# New request
138+
self.requests += 1
139+
self.queries += num_tokens
140+
self.hits += num_hits
141+
129142

130143
@dataclass
131144
class MultiModalCacheStats(BaseCacheStats):
@@ -151,6 +164,7 @@ class SchedulerStats:
151164
kv_cache_usage: float = 0.0
152165

153166
prefix_cache_stats: PrefixCacheStats = field(default_factory=PrefixCacheStats)
167+
connector_prefix_cache_stats: PrefixCacheStats | None = None
154168

155169
spec_decoding_stats: SpecDecodingStats | None = None
156170
kv_connector_stats: dict[str, Any] | None = None

0 commit comments

Comments
 (0)