Skip to content

Commit db93e20

Browse files
committed
Revert "[V1] Exception Handling when Loading KV Cache from Remote Store (#21534)"
This reverts commit 15a72ac. Signed-off-by: KuntaiDu <kuntai@uchicago.edu>
1 parent ec261b0 commit db93e20

File tree

10 files changed

+5
-229
lines changed

10 files changed

+5
-229
lines changed

tests/v1/kv_connector/kv_load_exception_handling/random_drop_connector.py

Lines changed: 0 additions & 120 deletions
This file was deleted.

tests/v1/kv_connector/kv_load_exception_handling/test.sh

Lines changed: 0 additions & 16 deletions
This file was deleted.

vllm/distributed/kv_transfer/kv_connector/utils.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -139,27 +139,13 @@ def update_finished_set(req_ids: Optional[set[str]],
139139
finished_set.add(req_id)
140140
del remaining_count_dict[req_id]
141141

142-
def update_finished_load_dict(worker_finished_loading_dict: dict[str,
143-
int],
144-
finished_loading_dict: dict[str, int]):
145-
for req_id, num_actual_load_tokens in (worker_finished_loading_dict
146-
or {}).items():
147-
if req_id in finished_loading_dict:
148-
finished_loading_dict[req_id] = min(
149-
finished_loading_dict[req_id], num_actual_load_tokens)
150-
else:
151-
finished_loading_dict[req_id] = num_actual_load_tokens
152-
153142
finished_sending = set[str]()
154143
finished_recving = set[str]()
155-
finished_loading_dict: dict[str, int] = {}
156144
for output in outputs:
157145
update_finished_set(output.finished_sending,
158146
self._send_remaining_count, finished_sending)
159147
update_finished_set(output.finished_recving,
160148
self._recv_remaining_count, finished_recving)
161-
update_finished_load_dict(output.finished_loading_dict,
162-
finished_loading_dict)
163149

164150
# select output of the worker specified by output_rank
165151
output = outputs[output_rank]
@@ -171,7 +157,7 @@ def update_finished_load_dict(worker_finished_loading_dict: dict[str,
171157
# send/recv
172158
output.finished_sending = finished_sending if finished_sending else None
173159
output.finished_recving = finished_recving if finished_recving else None
174-
output.finished_loading_dict = finished_loading_dict or None
160+
175161
return output
176162

177163
def async_aggregate(self,

vllm/distributed/kv_transfer/kv_connector/v1/base.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
2929
get_finished() - called with ids of finished requests, returns
3030
ids of requests that have completed async sending/recving.
31-
get_finished_loading() - called with scheduler outputs, returns
32-
a dictionary that the keys are request IDs and the values are
33-
the actual number of tokens loaded from the remote KV cache
3431
"""
3532

3633
import enum
@@ -222,23 +219,6 @@ def get_finished(
222219
"""
223220
return None, None
224221

225-
def get_finished_loading(
226-
self, scheduler_output: "SchedulerOutput") -> dict[str, int]:
227-
"""
228-
Retrieves the actual number of tokens loaded for requests that have
229-
completed the asynchronous loading process from the remote KV cache.
230-
231-
This function is used by the scheduler process (via the Executors)
232-
to track the progress of requests and determine which requests have
233-
successfully finished loading their KV cache data.
234-
235-
Returns:
236-
A dictionary where the keys are request IDs and the values are the
237-
corresponding number of tokens that have been successfully loaded
238-
for each request.
239-
"""
240-
return {}
241-
242222
# ==============================
243223
# Scheduler-side methods
244224
# ==============================

vllm/sequence.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,8 +1167,6 @@ class IntermediateTensors:
11671167
# [req_ids]
11681168
finished_sending: Optional[set[str]] = None
11691169
finished_recving: Optional[set[str]] = None
1170-
#req_id -> num_actual_load_tokens
1171-
finished_loading_dict: Optional[dict[str, int]] = None
11721170

11731171
def __init__(self, tensors):
11741172
# manually define this function, so that

vllm/v1/core/sched/scheduler.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,6 @@ def __init__(
118118

119119
# KV Connector: requests in process of async KV loading or recving
120120
self.finished_recving_kv_req_ids: set[str] = set()
121-
# The keys are request IDs, and the values are corresponding token
122-
# count that have been successfully loaded from the remote KV store
123-
self.finished_loading_dict: dict[str, int] = {}
124121

125122
# Encoder-related.
126123
# Calculate encoder cache size if applicable
@@ -1097,27 +1094,6 @@ def _connector_finished(
10971094
(block_ids, ) = self.kv_cache_manager.get_block_ids(request.request_id)
10981095
return self.connector.request_finished(request, block_ids)
10991096

1100-
def _update_actual_load_token_num_from_remote_kv(self,
1101-
request: Request) -> bool:
1102-
1103-
num_actual_load_tokens = self.finished_loading_dict.pop(
1104-
request.request_id)
1105-
num_computed_tokens = num_actual_load_tokens
1106-
assert self.connector is not None
1107-
if num_actual_load_tokens <= 0 and hasattr(self.connector,
1108-
"add_failure_request"):
1109-
self.connector.add_failure_request(request)
1110-
return True
1111-
1112-
if num_actual_load_tokens == request.num_tokens:
1113-
num_computed_tokens -= 1
1114-
1115-
self.kv_cache_manager.cache_blocks(request, num_computed_tokens)
1116-
1117-
# Update the request state for scheduling.
1118-
request.num_computed_tokens = num_computed_tokens
1119-
return True
1120-
11211097
def _update_waiting_for_remote_kv(self, request: Request) -> bool:
11221098
"""
11231099
KV Connector: check if the request_id is finished_recving.
@@ -1131,9 +1107,6 @@ def _update_waiting_for_remote_kv(self, request: Request) -> bool:
11311107
WAITING_FOR_REMOTE_KV.
11321108
"""
11331109
assert self.connector is not None
1134-
if request.request_id in self.finished_loading_dict:
1135-
return self._update_actual_load_token_num_from_remote_kv(request)
1136-
11371110
if request.request_id not in self.finished_recving_kv_req_ids:
11381111
return False
11391112

@@ -1172,6 +1145,3 @@ def _update_from_kv_xfer_finished(self,
11721145
for req_id in (model_runner_output.finished_sending or ()):
11731146
logger.debug("Finished sending KV transfer for request %s", req_id)
11741147
self._free_blocks(self.requests[req_id])
1175-
if model_runner_output.finished_loading_dict:
1176-
self.finished_loading_dict.update(
1177-
model_runner_output.finished_loading_dict)

vllm/v1/outputs.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,6 @@ class ModelRunnerOutput:
107107
# [req_ids]
108108
finished_sending: Optional[set[str]] = None
109109
finished_recving: Optional[set[str]] = None
110-
# req_id -> actual_load_token from connector
111-
finished_loading_dict: Optional[dict[str, int]] = None
112110

113111
# req_id -> num_nans_in_logits
114112
num_nans_in_logits: Optional[dict[str, int]] = None
@@ -123,5 +121,4 @@ class ModelRunnerOutput:
123121
pooler_output=[],
124122
finished_sending=None,
125123
finished_recving=None,
126-
finished_loading_dict=None,
127124
num_nans_in_logits=None)

vllm/v1/worker/gpu_model_runner.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,7 +1375,6 @@ def _pool(
13751375
num_scheduled_tokens_np: np.ndarray,
13761376
finished_sending: Optional[set[str]],
13771377
finished_recving: Optional[set[str]],
1378-
finished_loading_dict: Optional[dict[str, int]],
13791378
) -> ModelRunnerOutput:
13801379
assert self.input_batch.num_reqs ==\
13811380
len(self.input_batch.pooling_params), \
@@ -1412,7 +1411,6 @@ def _pool(
14121411
pooler_output=pooler_output,
14131412
finished_sending=finished_sending,
14141413
finished_recving=finished_recving,
1415-
finished_loading_dict=finished_loading_dict,
14161414
)
14171415

14181416
@torch.inference_mode()
@@ -1532,7 +1530,6 @@ def execute_model(
15321530
self.maybe_wait_for_kv_save()
15331531
finished_sending, finished_recving = (
15341532
self.get_finished_kv_transfers(scheduler_output))
1535-
finished_loading_dict = self.get_finished_loading(scheduler_output)
15361533

15371534
if self.use_aux_hidden_state_outputs:
15381535
hidden_states, aux_hidden_states = model_output
@@ -1550,11 +1547,9 @@ def execute_model(
15501547
if not get_pp_group().is_last_rank:
15511548
# For mid-pipeline stages, return the hidden states.
15521549
if not broadcast_pp_output:
1553-
if (finished_sending or finished_recving
1554-
or finished_loading_dict):
1550+
if finished_sending or finished_recving:
15551551
hidden_states.finished_sending = finished_sending
15561552
hidden_states.finished_recving = finished_recving
1557-
hidden_states.finished_loading_dict = finished_loading_dict
15581553
return hidden_states
15591554
assert isinstance(hidden_states, IntermediateTensors)
15601555
get_pp_group().send_tensor_dict(hidden_states.tensors,
@@ -1564,7 +1559,7 @@ def execute_model(
15641559
if self.input_batch.pooling_params:
15651560
return self._pool(hidden_states, num_scheduled_tokens,
15661561
num_scheduled_tokens_np, finished_sending,
1567-
finished_recving, finished_loading_dict)
1562+
finished_recving)
15681563

15691564
sample_hidden_states = hidden_states[logits_indices]
15701565
logits = self.model.compute_logits(sample_hidden_states, None)
@@ -1716,7 +1711,6 @@ def execute_model(
17161711
pooler_output=[],
17171712
finished_sending=finished_sending,
17181713
finished_recving=finished_recving,
1719-
finished_loading_dict=finished_loading_dict,
17201714
num_nans_in_logits=num_nans_in_logits,
17211715
)
17221716

vllm/v1/worker/gpu_worker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,12 +359,10 @@ def execute_model(
359359
# In case of PP with kv transfer, we need to pass through the
360360
# finished_sending and finished_recving buffers.
361361
new_output = EMPTY_MODEL_RUNNER_OUTPUT
362-
if (output.finished_sending or output.finished_recving
363-
or output.finished_loading_dict):
362+
if output.finished_sending or output.finished_recving:
364363
new_output = copy.copy(new_output)
365364
new_output.finished_sending = output.finished_sending
366365
new_output.finished_recving = output.finished_recving
367-
new_output.finished_loading_dict = output.finished_loading_dict
368366
output = new_output
369367

370368
assert isinstance(output, ModelRunnerOutput)

vllm/v1/worker/kv_connector_model_runner_mixin.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,29 +53,18 @@ def get_finished_kv_transfers(
5353
scheduler_output.finished_req_ids)
5454
return None, None
5555

56-
@staticmethod
57-
def get_finished_loading(
58-
scheduler_output: "SchedulerOutput", ) -> dict[str, int]:
59-
if has_kv_transfer_group():
60-
return get_kv_transfer_group().get_finished_loading(
61-
scheduler_output)
62-
return {}
63-
6456
def kv_connector_no_forward(self, scheduler_output: "SchedulerOutput",
6557
vllm_config: VllmConfig) -> ModelRunnerOutput:
6658
# KV send/recv even if no work to do.
6759
with set_forward_context(None, vllm_config):
6860
self.maybe_setup_kv_connector(scheduler_output)
6961
finished_sending, finished_recving = (
7062
self.get_finished_kv_transfers(scheduler_output))
71-
finished_loading_dict = self.get_finished_loading(scheduler_output)
7263

73-
if (not finished_sending and not finished_recving
74-
and not finished_loading_dict):
64+
if not finished_sending and not finished_recving:
7565
return EMPTY_MODEL_RUNNER_OUTPUT
7666

7767
output = copy.copy(EMPTY_MODEL_RUNNER_OUTPUT)
7868
output.finished_sending = finished_sending
7969
output.finished_recving = finished_recving
80-
output.finished_loading_dict = finished_loading_dict
8170
return output

0 commit comments

Comments
 (0)