diff --git a/tests/ut/kv_connector/test_remote_decode_lifecycle.py b/tests/ut/kv_connector/test_remote_decode_lifecycle.py index 2f241f1c32..0a337437d0 100644 --- a/tests/ut/kv_connector/test_remote_decode_lifecycle.py +++ b/tests/ut/kv_connector/test_remote_decode_lifecycle.py @@ -25,6 +25,7 @@ create_model_runner_output, create_request, create_scheduler, create_vllm_config) +from vllm_ascend.utils import vllm_version_is def test_basic_lifecycle(): @@ -102,7 +103,13 @@ def test_basic_lifecycle(): # (3b): execute_model() model_runner_output = copy.deepcopy(EMPTY_MODEL_RUNNER_OUTPUT) - model_runner_output.finished_sending = [request_id] + if vllm_version_is("0.10.0"): + model_runner_output.finished_sending = [request_id] + else: + from vllm.v1.worker.kv_connector_model_runner_mixin import \ + KVConnectorOutput # type: ignore # noqa + model_runner_output.kv_connector_output = KVConnectorOutput( + finished_sending=[request_id]) # (3c): update_from_output() scheduler.update_from_output(scheduler_output, model_runner_output) @@ -157,7 +164,13 @@ def test_prefix_cache_lifecycle(): scheduler_output = scheduler.schedule() scheduler.schedule() model_runner_output = copy.deepcopy(EMPTY_MODEL_RUNNER_OUTPUT) - model_runner_output.finished_sending = [request_remote.request_id] + if vllm_version_is("0.10.0"): + model_runner_output.finished_sending = [request_remote.request_id] + else: + from vllm.v1.worker.kv_connector_model_runner_mixin import \ + KVConnectorOutput # noqa + model_runner_output.kv_connector_output = KVConnectorOutput( + finished_sending=[request_remote.request_id]) scheduler.update_from_output(scheduler_output, model_runner_output) _ = scheduler.schedule() assert_scheduler_empty(scheduler) diff --git a/tests/ut/kv_connector/test_remote_prefill_lifecycle.py b/tests/ut/kv_connector/test_remote_prefill_lifecycle.py index 516d6c6fcf..cb070ad74d 100644 --- a/tests/ut/kv_connector/test_remote_prefill_lifecycle.py +++ b/tests/ut/kv_connector/test_remote_prefill_lifecycle.py @@ -19,7 +19,7 @@ import copy from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT -from vllm.v1.request import FinishReason, RequestStatus +from vllm.v1.request import RequestStatus from tests.ut.kv_connector.utils import (assert_scheduler_empty, create_model_runner_output, @@ -55,10 +55,7 @@ def test_basic_lifecycle(): # Nothing running and empty scheduler output. assert len(scheduler.running) == 0 assert len(scheduler_output.scheduled_new_reqs) == 0 - if vllm_version_is("0.9.1"): - assert len(scheduler_output.scheduled_cached_reqs) == 0 - else: - assert scheduler_output.scheduled_cached_reqs.num_reqs == 0 + assert scheduler_output.scheduled_cached_reqs.num_reqs == 0 assert len(scheduler_output.num_scheduled_tokens) == 0 assert scheduler_output.total_num_scheduled_tokens == 0 @@ -94,7 +91,13 @@ def test_basic_lifecycle(): # (2b): forward(): request finishes recv. model_runner_output = copy.deepcopy(EMPTY_MODEL_RUNNER_OUTPUT) - model_runner_output.finished_recving = [request_id] + if vllm_version_is("0.10.0"): + model_runner_output.finished_recving = [request_id] + else: + from vllm.v1.worker.kv_connector_model_runner_mixin import \ + KVConnectorOutput # type: ignore # noqa + model_runner_output.kv_connector_output = KVConnectorOutput( + finished_recving=[request_id]) # (2c): update_from_output(): engine_core_outputs = scheduler.update_from_output(scheduler_output, @@ -135,11 +138,6 @@ def test_basic_lifecycle(): model_runner_output) scheduler.schedule() - if vllm_version_is("0.9.1"): - outputs = engine_core_outputs[0].outputs - assert len(outputs) == 1 - output = outputs[0] - assert output.finish_reason == FinishReason.STOP assert_scheduler_empty(scheduler) @@ -213,7 +211,13 @@ def test_full_block_prompt(): # # STEP (2): Recv. scheduler_output = scheduler.schedule() model_runner_output = copy.deepcopy(EMPTY_MODEL_RUNNER_OUTPUT) - model_runner_output.finished_recving = [request_id] + if vllm_version_is("0.10.0"): + model_runner_output.finished_recving = [request_id] + else: + from vllm.v1.worker.kv_connector_model_runner_mixin import \ + KVConnectorOutput # type: ignore # noqa + model_runner_output.kv_connector_output = KVConnectorOutput( + finished_recving=[request_id]) scheduler.update_from_output(scheduler_output, model_runner_output) assert len(scheduler.waiting) == 1 assert (request_id in scheduler.finished_recving_kv_req_ids) @@ -236,13 +240,6 @@ def test_full_block_prompt(): # # Step (4): Hit EOS. scheduler_output = scheduler.schedule() model_runner_output = create_model_runner_output([request], use_eos=True) - engine_core_outputs = scheduler.update_from_output(scheduler_output, - model_runner_output) scheduler.schedule() - if vllm_version_is("0.9.1"): - outputs = engine_core_outputs[0].outputs - assert len(outputs) == 1 - output = outputs[0] - assert output.finish_reason == FinishReason.STOP assert_scheduler_empty(scheduler) diff --git a/tests/ut/kv_connector/utils.py b/tests/ut/kv_connector/utils.py index 450d62e036..2c540b30f0 100644 --- a/tests/ut/kv_connector/utils.py +++ b/tests/ut/kv_connector/utils.py @@ -186,6 +186,20 @@ def create_model_runner_output( sampled_token_ids = [[sampled_token] for _ in req_ids] # Make output data structure. + extra_args = {} + if not vllm_version_is("0.10.0"): + from vllm.v1.worker.kv_connector_model_runner_mixin import \ + KVConnectorOutput # type: ignore # noqa + kv_connector_output = KVConnectorOutput( + finished_sending=finished_sending, + finished_recving=finished_recving) + extra_args = {"kv_connector_output": kv_connector_output} + else: + extra_args = { + "finished_sending": finished_sending, + "finished_recving": finished_recving, + } + return ModelRunnerOutput( req_ids=req_ids, req_id_to_index=req_id_to_index, @@ -193,9 +207,6 @@ def create_model_runner_output( spec_token_ids=None, logprobs=None, prompt_logprobs_dict={}, - **({ - "pooler_output": [] - } if not vllm_version_is("0.9.1") else {}), - finished_sending=finished_sending, - finished_recving=finished_recving, + pooler_output=[], + **extra_args, ) diff --git a/vllm_ascend/models/qwen2_5_vl.py b/vllm_ascend/models/qwen2_5_vl.py index d1a94d1dac..4629f760eb 100644 --- a/vllm_ascend/models/qwen2_5_vl.py +++ b/vllm_ascend/models/qwen2_5_vl.py @@ -18,7 +18,7 @@ # limitations under the License. from functools import partial -from typing import Callable, Iterable, Optional, Set, Tuple +from typing import Callable, Iterable, Optional, Set, Tuple, Union import torch import torch.nn as nn @@ -30,7 +30,8 @@ from vllm.config import VllmConfig from vllm.distributed import parallel_state from vllm.distributed import utils as dist_utils -from vllm.model_executor.layers.activation import _ACTIVATION_REGISTRY +from vllm.model_executor.layers.activation import (_ACTIVATION_REGISTRY, + get_act_and_mul_fn) from vllm.model_executor.layers.layernorm import RMSNorm from vllm.model_executor.layers.quantization import QuantizationConfig from vllm.model_executor.model_loader.weight_utils import default_weight_loader @@ -42,6 +43,8 @@ from vllm.model_executor.models.utils import maybe_prefix from vllm.multimodal import MULTIMODAL_REGISTRY +from vllm_ascend.utils import vllm_version_is + MIN_PAD_SIZE = 64 # min_size to pad weight MAX_PAD_SIZE = 128 # max_size to pad weight @@ -197,12 +200,16 @@ def __init__( in_channels=vision_config.in_channels, hidden_size=self.hidden_size, ) + + act_fn = get_act_and_mul_fn(vision_config.hidden_act) + if vllm_version_is("0.10.0"): + act_fn = _ACTIVATION_REGISTRY[vision_config.hidden_act] self.blocks = nn.ModuleList([ AscendQwen2_5_VisionBlock( dim=self.hidden_size, num_heads=self.num_heads, mlp_hidden_dim=vision_config.intermediate_size, - act_fn=_ACTIVATION_REGISTRY[vision_config.hidden_act], + act_fn=act_fn, norm_layer=norm_layer, quant_config=quant_config, prefix=f"{prefix}.blocks.{layer_idx}") @@ -291,12 +298,17 @@ def pad_proj_weight(self, data): def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]) -> Set[str]: - stacked_params_mapping = [ + stacked_params_mapping: list[tuple[str, str, Union[str, int]]] = [ # (param_name, shard_name, shard_id) ("qkv_proj", "q_proj", "q"), ("qkv_proj", "k_proj", "k"), ("qkv_proj", "v_proj", "v"), ] + if not vllm_version_is("0.10.0"): + stacked_params_mapping.extend([ + ("mlp.gate_up_proj.", "mlp.gate_proj.", 0), + ("mlp.gate_up_proj.", "mlp.up_proj.", 1), + ]) params_dict = dict(self.named_parameters(remove_duplicate=False)) loaded_params: Set[str] = set() for name, loaded_weight in weights: diff --git a/vllm_ascend/models/qwen2_5_vl_without_padding.py b/vllm_ascend/models/qwen2_5_vl_without_padding.py index 4629a02f0d..8877456a6d 100644 --- a/vllm_ascend/models/qwen2_5_vl_without_padding.py +++ b/vllm_ascend/models/qwen2_5_vl_without_padding.py @@ -30,7 +30,8 @@ from vllm.config import VllmConfig from vllm.distributed import parallel_state from vllm.distributed import utils as dist_utils -from vllm.model_executor.layers.activation import _ACTIVATION_REGISTRY +from vllm.model_executor.layers.activation import (_ACTIVATION_REGISTRY, + get_act_and_mul_fn) from vllm.model_executor.layers.layernorm import RMSNorm from vllm.model_executor.layers.quantization import QuantizationConfig from vllm.model_executor.models.qwen2_5_vl import ( @@ -42,6 +43,7 @@ from vllm.multimodal import MULTIMODAL_REGISTRY from vllm_ascend.models.qwen2_5_vl import AscendQwen2_5_VisionRotaryEmbedding +from vllm_ascend.utils import vllm_version_is class AscendQwen2_5_VisionAttention_Without_Padding(Qwen2_5_VisionAttention): @@ -171,12 +173,16 @@ def __init__( in_channels=vision_config.in_channels, hidden_size=self.hidden_size, ) + + act_fn = get_act_and_mul_fn(vision_config.hidden_act) + if vllm_version_is("0.10.0"): + act_fn = _ACTIVATION_REGISTRY[vision_config.hidden_act] self.blocks = nn.ModuleList([ AscendQwen2_5_VisionBlock_Without_Padding( dim=self.hidden_size, num_heads=self.num_heads, mlp_hidden_dim=vision_config.intermediate_size, - act_fn=_ACTIVATION_REGISTRY[vision_config.hidden_act], + act_fn=act_fn, norm_layer=norm_layer, quant_config=quant_config, prefix=f"{prefix}.blocks.{layer_idx}") diff --git a/vllm_ascend/models/qwen3_moe.py b/vllm_ascend/models/qwen3_moe.py index 0c5ad39c17..c133acc66a 100644 --- a/vllm_ascend/models/qwen3_moe.py +++ b/vllm_ascend/models/qwen3_moe.py @@ -100,6 +100,8 @@ def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): cache_config = vllm_config.cache_config quant_config = vllm_config.quant_config + parallel_config = vllm_config.parallel_config + self.num_redundant_experts = parallel_config.num_redundant_experts self.padding_idx = config.pad_token_id self.vocab_size = config.vocab_size self.config = config diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 01f01e610d..c933fa71b5 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -94,6 +94,8 @@ if not vllm_version_is("0.10.0"): from vllm.tasks import GenerationTask, SupportedTask + from vllm.v1.worker.kv_connector_model_runner_mixin import \ + KVConnectorOutput if TYPE_CHECKING: import xgrammar as xgr # type: ignore[import-untyped] @@ -1472,8 +1474,9 @@ def _pool( hidden_states: torch.Tensor, num_scheduled_tokens: int, num_scheduled_tokens_np: np.ndarray, - finished_sending: Optional[set[str]], - finished_receiving: Optional[set[str]], + finished_sending: Optional[set[str]] = None, + finished_recving: Optional[set[str]] = None, + kv_connector_output: Optional["KVConnectorOutput"] = None, ) -> ModelRunnerOutput: assert self.input_batch.num_reqs ==\ len(self.input_batch.pooling_params), \ @@ -1499,6 +1502,12 @@ def _pool( pooler_output.append(raw_output.data.cpu()) else: pooler_output.append(None) + extra_args = ({ + "finished_sending": finished_sending, + "finished_recving": finished_recving + } if vllm_version_is("0.10.0") else { + "kv_connector_output": kv_connector_output + }) return ModelRunnerOutput( req_ids=self.input_batch.req_ids, @@ -1508,8 +1517,8 @@ def _pool( logprobs=None, prompt_logprobs_dict={}, pooler_output=pooler_output, - finished_sending=finished_sending, - finished_recving=finished_receiving) + **extra_args, + ) @torch.inference_mode() def execute_model( @@ -1533,7 +1542,13 @@ def execute_model( num_scheduled_tokens_np, finished_sending, finished_recving) = (self._process_reqs(scheduler_output, intermediate_tensors)) - + kv_connector_output = None + if not vllm_version_is("0.10.0"): + kv_connector_output = KVConnectorOutput( + finished_sending=finished_sending, + finished_recving=finished_recving) + finished_sending = None + finished_recving = None with ProfileExecuteDuration().capture_async("post process"): # Broadcast PP output for external_launcher (torchrun) # to make sure we are synced across pp ranks @@ -1545,7 +1560,10 @@ def execute_model( if not get_pp_group().is_last_rank: # For mid-pipeline stages, return the hidden states. if not broadcast_pp_output: - if finished_sending or finished_recving: + if kv_connector_output is not None: + hidden_states.kv_connector_output = kv_connector_output + else: + #TODO: Remove this after we drop vllm v0.10.0 hidden_states.finished_sending = finished_sending hidden_states.finished_recving = finished_recving return hidden_states @@ -1557,7 +1575,8 @@ def execute_model( if self.input_batch.pooling_params: return self._pool(hidden_states, num_scheduled_tokens, num_scheduled_tokens_np, - finished_sending, finished_recving) + finished_sending, finished_recving, + kv_connector_output) sample_hidden_states = hidden_states[logits_indices] logits = self.model.compute_logits(sample_hidden_states, None) if broadcast_pp_output: @@ -1691,17 +1710,23 @@ def execute_model( if has_kv_transfer_group(): get_kv_transfer_group().clear_connector_metadata() - model_runner_output = ModelRunnerOutput( - req_ids=self.input_batch.req_ids, - req_id_to_index=self.input_batch.req_id_to_index, - sampled_token_ids=valid_sampled_token_ids, - spec_token_ids=spec_token_ids, - logprobs=logprobs_lists, - prompt_logprobs_dict=prompt_logprobs_dict, - pooler_output=[], - finished_sending=finished_sending, - finished_recving=finished_recving, - ) + extra_args = ({ + "finished_sending": finished_sending, + "finished_recving": finished_recving + } if vllm_version_is("0.10.0") else { + "kv_connector_output": kv_connector_output + }) + + model_runner_output = ModelRunnerOutput( + req_ids=self.input_batch.req_ids, + req_id_to_index=self.input_batch.req_id_to_index, + sampled_token_ids=valid_sampled_token_ids, + spec_token_ids=spec_token_ids, + logprobs=logprobs_lists, + prompt_logprobs_dict=prompt_logprobs_dict, + pooler_output=[], + **extra_args, + ) durations = ProfileExecuteDuration().pop_captured_sync() if durations: diff --git a/vllm_ascend/worker/worker_v1.py b/vllm_ascend/worker/worker_v1.py index 4988ef4689..65d2f51477 100644 --- a/vllm_ascend/worker/worker_v1.py +++ b/vllm_ascend/worker/worker_v1.py @@ -209,12 +209,27 @@ def execute_model( if not has_kv_transfer_group(): return None - new_output = EMPTY_MODEL_RUNNER_OUTPUT - if output.finished_sending or output.finished_recving: - new_output = copy.copy(new_output) - new_output.finished_sending = output.finished_sending - new_output.finished_recving = output.finished_recving - output = new_output + is_legacy = vllm_version_is("0.10.0") + + if is_legacy: + finished_sending = output.finished_sending + finished_recving = output.finished_recving + else: + kv_connector_output = output.kv_connector_output + finished_sending = kv_connector_output.finished_sending + finished_recving = kv_connector_output.finished_recving + + if not finished_sending and not finished_recving: + return EMPTY_MODEL_RUNNER_OUTPUT + + new_output = copy.copy(EMPTY_MODEL_RUNNER_OUTPUT) + + if is_legacy: + new_output.finished_sending = finished_sending + new_output.finished_recving = finished_recving + else: + new_output.kv_connector_output = kv_connector_output + return new_output assert isinstance(output, ModelRunnerOutput) return output