Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

16 changes: 0 additions & 16 deletions tests/v1/kv_connector/kv_load_exception_handling/test.sh

This file was deleted.

16 changes: 1 addition & 15 deletions vllm/distributed/kv_transfer/kv_connector/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,13 @@ def update_finished_set(req_ids: Optional[set[str]],
finished_set.add(req_id)
del remaining_count_dict[req_id]

def update_finished_load_dict(worker_finished_loading_dict: dict[str,
int],
finished_loading_dict: dict[str, int]):
for req_id, num_actual_load_tokens in (worker_finished_loading_dict
or {}).items():
if req_id in finished_loading_dict:
finished_loading_dict[req_id] = min(
finished_loading_dict[req_id], num_actual_load_tokens)
else:
finished_loading_dict[req_id] = num_actual_load_tokens

finished_sending = set[str]()
finished_recving = set[str]()
finished_loading_dict: dict[str, int] = {}
for output in outputs:
update_finished_set(output.finished_sending,
self._send_remaining_count, finished_sending)
update_finished_set(output.finished_recving,
self._recv_remaining_count, finished_recving)
update_finished_load_dict(output.finished_loading_dict,
finished_loading_dict)

# select output of the worker specified by output_rank
output = outputs[output_rank]
Expand All @@ -171,7 +157,7 @@ def update_finished_load_dict(worker_finished_loading_dict: dict[str,
# send/recv
output.finished_sending = finished_sending if finished_sending else None
output.finished_recving = finished_recving if finished_recving else None
output.finished_loading_dict = finished_loading_dict or None

return output

def async_aggregate(self,
Expand Down
20 changes: 0 additions & 20 deletions vllm/distributed/kv_transfer/kv_connector/v1/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@

get_finished() - called with ids of finished requests, returns
ids of requests that have completed async sending/recving.
get_finished_loading() - called with scheduler outputs, returns
a dictionary that the keys are request IDs and the values are
the actual number of tokens loaded from the remote KV cache
"""

import enum
Expand Down Expand Up @@ -222,23 +219,6 @@ def get_finished(
"""
return None, None

def get_finished_loading(
self, scheduler_output: "SchedulerOutput") -> dict[str, int]:
"""
Retrieves the actual number of tokens loaded for requests that have
completed the asynchronous loading process from the remote KV cache.

This function is used by the scheduler process (via the Executors)
to track the progress of requests and determine which requests have
successfully finished loading their KV cache data.

Returns:
A dictionary where the keys are request IDs and the values are the
corresponding number of tokens that have been successfully loaded
for each request.
"""
return {}

# ==============================
# Scheduler-side methods
# ==============================
Expand Down
2 changes: 0 additions & 2 deletions vllm/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,8 +1167,6 @@ class IntermediateTensors:
# [req_ids]
finished_sending: Optional[set[str]] = None
finished_recving: Optional[set[str]] = None
#req_id -> num_actual_load_tokens
finished_loading_dict: Optional[dict[str, int]] = None

def __init__(self, tensors):
# manually define this function, so that
Expand Down
30 changes: 0 additions & 30 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ def __init__(

# KV Connector: requests in process of async KV loading or recving
self.finished_recving_kv_req_ids: set[str] = set()
# The keys are request IDs, and the values are corresponding token
# count that have been successfully loaded from the remote KV store
self.finished_loading_dict: dict[str, int] = {}

# Encoder-related.
# Calculate encoder cache size if applicable
Expand Down Expand Up @@ -1097,27 +1094,6 @@ def _connector_finished(
(block_ids, ) = self.kv_cache_manager.get_block_ids(request.request_id)
return self.connector.request_finished(request, block_ids)

def _update_actual_load_token_num_from_remote_kv(self,
request: Request) -> bool:

num_actual_load_tokens = self.finished_loading_dict.pop(
request.request_id)
num_computed_tokens = num_actual_load_tokens
assert self.connector is not None
if num_actual_load_tokens <= 0 and hasattr(self.connector,
"add_failure_request"):
self.connector.add_failure_request(request)
return True

if num_actual_load_tokens == request.num_tokens:
num_computed_tokens -= 1

self.kv_cache_manager.cache_blocks(request, num_computed_tokens)

# Update the request state for scheduling.
request.num_computed_tokens = num_computed_tokens
return True

def _update_waiting_for_remote_kv(self, request: Request) -> bool:
"""
KV Connector: check if the request_id is finished_recving.
Expand All @@ -1131,9 +1107,6 @@ def _update_waiting_for_remote_kv(self, request: Request) -> bool:
WAITING_FOR_REMOTE_KV.
"""
assert self.connector is not None
if request.request_id in self.finished_loading_dict:
return self._update_actual_load_token_num_from_remote_kv(request)

if request.request_id not in self.finished_recving_kv_req_ids:
return False

Expand Down Expand Up @@ -1172,6 +1145,3 @@ def _update_from_kv_xfer_finished(self,
for req_id in (model_runner_output.finished_sending or ()):
logger.debug("Finished sending KV transfer for request %s", req_id)
self._free_blocks(self.requests[req_id])
if model_runner_output.finished_loading_dict:
self.finished_loading_dict.update(
model_runner_output.finished_loading_dict)
3 changes: 0 additions & 3 deletions vllm/v1/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ class ModelRunnerOutput:
# [req_ids]
finished_sending: Optional[set[str]] = None
finished_recving: Optional[set[str]] = None
# req_id -> actual_load_token from connector
finished_loading_dict: Optional[dict[str, int]] = None

# req_id -> num_nans_in_logits
num_nans_in_logits: Optional[dict[str, int]] = None
Expand All @@ -123,5 +121,4 @@ class ModelRunnerOutput:
pooler_output=[],
finished_sending=None,
finished_recving=None,
finished_loading_dict=None,
num_nans_in_logits=None)
10 changes: 2 additions & 8 deletions vllm/v1/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,6 @@ def _pool(
num_scheduled_tokens_np: np.ndarray,
finished_sending: Optional[set[str]],
finished_recving: Optional[set[str]],
finished_loading_dict: Optional[dict[str, int]],
) -> ModelRunnerOutput:
assert self.input_batch.num_reqs ==\
len(self.input_batch.pooling_params), \
Expand Down Expand Up @@ -1412,7 +1411,6 @@ def _pool(
pooler_output=pooler_output,
finished_sending=finished_sending,
finished_recving=finished_recving,
finished_loading_dict=finished_loading_dict,
)

@torch.inference_mode()
Expand Down Expand Up @@ -1532,7 +1530,6 @@ def execute_model(
self.maybe_wait_for_kv_save()
finished_sending, finished_recving = (
self.get_finished_kv_transfers(scheduler_output))
finished_loading_dict = self.get_finished_loading(scheduler_output)

if self.use_aux_hidden_state_outputs:
hidden_states, aux_hidden_states = model_output
Expand All @@ -1550,11 +1547,9 @@ def execute_model(
if not get_pp_group().is_last_rank:
# For mid-pipeline stages, return the hidden states.
if not broadcast_pp_output:
if (finished_sending or finished_recving
or finished_loading_dict):
if finished_sending or finished_recving:
hidden_states.finished_sending = finished_sending
hidden_states.finished_recving = finished_recving
hidden_states.finished_loading_dict = finished_loading_dict
return hidden_states
assert isinstance(hidden_states, IntermediateTensors)
get_pp_group().send_tensor_dict(hidden_states.tensors,
Expand All @@ -1564,7 +1559,7 @@ def execute_model(
if self.input_batch.pooling_params:
return self._pool(hidden_states, num_scheduled_tokens,
num_scheduled_tokens_np, finished_sending,
finished_recving, finished_loading_dict)
finished_recving)

sample_hidden_states = hidden_states[logits_indices]
logits = self.model.compute_logits(sample_hidden_states, None)
Expand Down Expand Up @@ -1716,7 +1711,6 @@ def execute_model(
pooler_output=[],
finished_sending=finished_sending,
finished_recving=finished_recving,
finished_loading_dict=finished_loading_dict,
num_nans_in_logits=num_nans_in_logits,
)

Expand Down
4 changes: 1 addition & 3 deletions vllm/v1/worker/gpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,10 @@ def execute_model(
# In case of PP with kv transfer, we need to pass through the
# finished_sending and finished_recving buffers.
new_output = EMPTY_MODEL_RUNNER_OUTPUT
if (output.finished_sending or output.finished_recving
or output.finished_loading_dict):
if output.finished_sending or output.finished_recving:
new_output = copy.copy(new_output)
new_output.finished_sending = output.finished_sending
new_output.finished_recving = output.finished_recving
new_output.finished_loading_dict = output.finished_loading_dict
output = new_output

assert isinstance(output, ModelRunnerOutput)
Expand Down
13 changes: 1 addition & 12 deletions vllm/v1/worker/kv_connector_model_runner_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,18 @@ def get_finished_kv_transfers(
scheduler_output.finished_req_ids)
return None, None

@staticmethod
def get_finished_loading(
scheduler_output: "SchedulerOutput", ) -> dict[str, int]:
if has_kv_transfer_group():
return get_kv_transfer_group().get_finished_loading(
scheduler_output)
return {}

def kv_connector_no_forward(self, scheduler_output: "SchedulerOutput",
vllm_config: VllmConfig) -> ModelRunnerOutput:
# KV send/recv even if no work to do.
with set_forward_context(None, vllm_config):
self.maybe_setup_kv_connector(scheduler_output)
finished_sending, finished_recving = (
self.get_finished_kv_transfers(scheduler_output))
finished_loading_dict = self.get_finished_loading(scheduler_output)

if (not finished_sending and not finished_recving
and not finished_loading_dict):
if not finished_sending and not finished_recving:
return EMPTY_MODEL_RUNNER_OUTPUT

output = copy.copy(EMPTY_MODEL_RUNNER_OUTPUT)
output.finished_sending = finished_sending
output.finished_recving = finished_recving
output.finished_loading_dict = finished_loading_dict
return output