diff --git a/tests/e2e/singlecard/core/test_ascend_scheduler.py b/tests/e2e/singlecard/core/test_ascend_scheduler.py index c382ebdf40..7d9c1b1ef5 100644 --- a/tests/e2e/singlecard/core/test_ascend_scheduler.py +++ b/tests/e2e/singlecard/core/test_ascend_scheduler.py @@ -98,11 +98,7 @@ def create_scheduler( ) kv_cache_config = KVCacheConfig( num_blocks=num_blocks, # A large number of blocks to hold all requests - **({ - "tensors": {} - } if vllm_version_is("0.9.0") else { - "kv_cache_tensors": [] - }), + kv_cache_tensors=[], kv_cache_groups=[ KVCacheGroupSpec(['layer'], FullAttentionSpec(block_size, 1, 1, torch.float32, @@ -145,8 +141,8 @@ def create_requests(num_requests: int, multi_modal_hashes=None, eos_token_id=EOS_TOKEN_ID, **({ - "arrival_time": 0.0 - } if vllm_version_is("0.9.0") else {}), + "pooling_params": None + } if not vllm_version_is("0.9.1") else {}), ) requests.append(request) return requests @@ -262,7 +258,9 @@ def test_schedule_concurrent_partial_requests(enable_prefix_caching: bool): spec_token_ids=None, logprobs=None, prompt_logprobs_dict={}, - ) + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(output, model_runner_output) # Schedule the next step. All three requests are running. @@ -286,7 +284,10 @@ def test_schedule_concurrent_partial_requests(enable_prefix_caching: bool): spec_token_ids=None, logprobs=None, prompt_logprobs_dict={}, - ) + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) + scheduler.update_from_output(output1, model_runner_output) output2 = scheduler.schedule() assert len(scheduler.running) == 3 @@ -337,7 +338,10 @@ def test_stop_via_update_from_output(): 11]], # First request hits EOS, second continues spec_token_ids=None, logprobs=None, - prompt_logprobs_dict={}) + prompt_logprobs_dict={}, + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(scheduler_output, model_output) @@ -385,7 +389,10 @@ def test_stop_via_update_from_output(): [13, 14]], # First request hits stop token spec_token_ids=None, logprobs=None, - prompt_logprobs_dict={}) + prompt_logprobs_dict={}, + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(scheduler_output, model_output) @@ -432,7 +439,10 @@ def test_stop_via_update_from_output(): [13]], # First request exceeds max_tokens spec_token_ids=None, logprobs=None, - prompt_logprobs_dict={}) + prompt_logprobs_dict={}, + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(scheduler_output, model_output) @@ -474,7 +484,10 @@ def test_stop_via_update_from_output(): sampled_token_ids=[[EOS_TOKEN_ID, 10, 11]], spec_token_ids=None, logprobs=None, - prompt_logprobs_dict={}) + prompt_logprobs_dict={}, + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(scheduler_output, model_output) @@ -524,7 +537,10 @@ def test_schedule_concurrent_batches(enable_prefix_caching: Optional[bool], spec_token_ids=None, logprobs=None, prompt_logprobs_dict={}, - ) + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) + scheduler.update_from_output(scheduler_output0, model_runner_output) # Schedule the next step. @@ -541,7 +557,10 @@ def test_schedule_concurrent_batches(enable_prefix_caching: Optional[bool], spec_token_ids=None, logprobs=None, prompt_logprobs_dict={}, - ) + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) + scheduler.update_from_output(scheduler_output1, model_runner_output) @@ -565,8 +584,6 @@ def test_schedule_spec_decoding_stats(spec_tokens, output_tokens, expected): 1. Speculated tokens get scheduled correctly 2. Spec decoding stats properly count number of draft and accepted tokens """ - if vllm_version_is("0.9.0"): - return num_spec_tokens = max(1, max(len(t) for t in spec_tokens)) scheduler = create_scheduler(num_speculative_tokens=num_spec_tokens) requests = create_requests(num_requests=len(spec_tokens), num_tokens=1) @@ -593,7 +610,10 @@ def test_schedule_spec_decoding_stats(spec_tokens, output_tokens, expected): spec_token_ids=spec_tokens, logprobs=None, prompt_logprobs_dict={}, - ) + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) + engine_core_outputs = scheduler.update_from_output(output, model_runner_output) @@ -632,7 +652,10 @@ def test_schedule_spec_decoding_stats(spec_tokens, output_tokens, expected): spec_token_ids=None, logprobs=None, prompt_logprobs_dict={}, - ) + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) + engine_core_outputs = scheduler.update_from_output(output, model_runner_output) @@ -727,7 +750,9 @@ def make_output(scheduler: AscendScheduler): spec_token_ids=None, logprobs=None, prompt_logprobs_dict={}, - ) + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) def assert_scheduler_empty(scheduler: AscendScheduler): @@ -744,11 +769,10 @@ def assert_scheduler_empty(scheduler: AscendScheduler): assert len(scheduler.encoder_cache_manager.cached) == 0 # KVCache Manager. - if not vllm_version_is("0.9.0"): - assert len(scheduler.kv_cache_manager.coordinator. - single_type_managers[0].req_to_blocks) == 0 - assert len(scheduler.kv_cache_manager.coordinator. - single_type_managers[0].num_cached_block) == 0 + assert len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. + req_to_blocks) == 0 + assert len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. + num_cached_block) == 0 assert len(scheduler.kv_cache_manager.req_to_block_hashes) == 0 num_free_blocks = ( scheduler.kv_cache_manager.block_pool.free_block_queue.num_free_blocks) @@ -789,4 +813,4 @@ def test_memory_leak(): scheduler.update_from_output(scheduler_output, model_runner_output) # Confirm no memory leak. - assert_scheduler_empty(scheduler) \ No newline at end of file + assert_scheduler_empty(scheduler) diff --git a/tests/e2e/singlecard/test_scheduler.py b/tests/e2e/singlecard/test_scheduler.py index 8021f0306c..b3adf945bf 100644 --- a/tests/e2e/singlecard/test_scheduler.py +++ b/tests/e2e/singlecard/test_scheduler.py @@ -31,6 +31,7 @@ from vllm.v1.structured_output import StructuredOutputManager from vllm_ascend.core.scheduler import AscendScheduler +from vllm_ascend.utils import vllm_version_is EOS_TOKEN_ID = 50256 @@ -130,6 +131,9 @@ def create_requests(num_requests: int, multi_modal_placeholders=mm_position, multi_modal_hashes=None, eos_token_id=EOS_TOKEN_ID, + **({ + "pooling_params": None + } if not vllm_version_is("0.9.1") else {}), ) requests.append(request) return requests @@ -237,7 +241,10 @@ def test_stop_via_update_from_output(): 11]], # First request hits EOS, second continues spec_token_ids=None, logprobs=None, - prompt_logprobs_dict={}) + prompt_logprobs_dict={}, + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(scheduler_output, model_output) @@ -283,7 +290,10 @@ def test_stop_via_update_from_output(): [13, 14]], # First request hits stop token spec_token_ids=None, logprobs=None, - prompt_logprobs_dict={}) + prompt_logprobs_dict={}, + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(scheduler_output, model_output) @@ -328,7 +338,10 @@ def test_stop_via_update_from_output(): [13]], # First request exceeds max_tokens spec_token_ids=None, logprobs=None, - prompt_logprobs_dict={}) + prompt_logprobs_dict={}, + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(scheduler_output, model_output) @@ -369,7 +382,10 @@ def test_stop_via_update_from_output(): sampled_token_ids=[[EOS_TOKEN_ID, 10, 11]], spec_token_ids=None, logprobs=None, - prompt_logprobs_dict={}) + prompt_logprobs_dict={}, + **({ + "pooler_output": [] + } if not vllm_version_is("0.9.1") else {})) scheduler.update_from_output(scheduler_output, model_output) diff --git a/tests/ut/ops/test_expert_load_balancer.py b/tests/ut/ops/test_expert_load_balancer.py index 049f916fc9..3b7a69ddd4 100644 --- a/tests/ut/ops/test_expert_load_balancer.py +++ b/tests/ut/ops/test_expert_load_balancer.py @@ -3,9 +3,10 @@ import vllm_ascend.patch.worker.patch_common.patch_utils # type: ignore[import] # isort: skip # noqa import json +import unittest from typing import List, TypedDict +from unittest import mock -import pytest import torch from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer @@ -46,101 +47,101 @@ class MockData(TypedDict): } -@pytest.fixture -def mock_expert_load_balancer(tmp_path): - json_file = tmp_path / "expert_map.json" - with open(json_file, 'w') as f: - json.dump(MOCK_DATA, f) - - return ExpertLoadBalancer(str(json_file), global_expert_num=8) - - -def test_init(mock_expert_load_balancer): - assert isinstance(mock_expert_load_balancer.expert_map_tensor, - torch.Tensor) - assert mock_expert_load_balancer.layers_num == MOCK_DATA["moe_layer_count"] - assert mock_expert_load_balancer.ranks_num == MOCK_DATA["layer_list"][0][ - "device_count"] - - -def test_generate_index_dicts(mock_expert_load_balancer): - tensor_2d = torch.tensor([[7, 2, 0, 3, 5], [6, 1, 4, 7, 2]]) - result = mock_expert_load_balancer.generate_index_dicts(tensor_2d) - expected_result = [{ - 7: 0, - 2: 1, - 0: 2, - 3: 3, - 5: 4 - }, { - 6: 5, - 1: 6, - 4: 7, - 7: 8, - 2: 9 - }] - assert result == expected_result - - -def test_generate_expert_placement_map(mock_expert_load_balancer): - expert_placement_map = mock_expert_load_balancer.generate_expert_placement_map( - ) - assert expert_placement_map.shape == (mock_expert_load_balancer.layers_num, - mock_expert_load_balancer.ranks_num, - 8) - assert torch.all(expert_placement_map >= -1) - - -def test_generate_log2phy_expert_map(mock_expert_load_balancer): - layer_id = 0 - log2phy_map = mock_expert_load_balancer.generate_log2phy_expert_map( - layer_id) - assert log2phy_map.shape == (mock_expert_load_balancer.ranks_num, 8) - assert torch.all(log2phy_map >= -1) - - -def test_get_rank_placement_map(mock_expert_load_balancer, mocker): - mocker.patch("torch_npu.npu._lazy_init") - mocker.patch('torch.npu.current_device', return_value='cpu') - layer_id = 0 - rank_id = 0 - rank_local_expert_num, rank_expert_map = mock_expert_load_balancer.get_rank_placement_map( - layer_id, rank_id) - assert rank_local_expert_num == 5 - expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0], - dtype=torch.int32).to( - rank_expert_map.device) - assert rank_expert_map.equal(expected_tensor) - - rank_id = 1 - rank_local_expert_num, rank_expert_map = mock_expert_load_balancer.get_rank_placement_map( - layer_id, rank_id) - expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3], - dtype=torch.int32).to( - rank_expert_map.device) - assert rank_expert_map.equal(expected_tensor) - - -def test_get_rank_log2phy_map(mock_expert_load_balancer): - layer_id = 0 - rank_id = 0 - log2phy_map = mock_expert_load_balancer.get_rank_log2phy_map( - layer_id, rank_id) - expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0], - dtype=torch.int32).to(log2phy_map.device) - assert log2phy_map.equal(expected_tensor) - - rank_id = 1 - log2phy_map = mock_expert_load_balancer.get_rank_log2phy_map( - layer_id, rank_id) - expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8], - dtype=torch.int32).to(log2phy_map.device) - assert log2phy_map.equal(expected_tensor) - - -def test_get_global_redundant_expert_num(mock_expert_load_balancer): - redundant_expert_num = mock_expert_load_balancer.get_global_redundant_expert_num( - ) - expected_redundant_expert_num = len(MOCK_DATA["layer_list"][0]["device_list"][0]["device_expert"]) * \ - MOCK_DATA["layer_list"][0]["device_count"] - 8 - assert redundant_expert_num == expected_redundant_expert_num +class TestExpertLoadBalancer(unittest.TestCase): + + def setUp(self): + json_file = "expert_map.json" + with open(json_file, 'w') as f: + json.dump(MOCK_DATA, f) + + self.expert_load_balancer = ExpertLoadBalancer(json_file, + global_expert_num=8) + + def test_init(self): + + self.assertIsInstance(self.expert_load_balancer.expert_map_tensor, + torch.Tensor) + self.assertEqual(self.expert_load_balancer.layers_num, + MOCK_DATA["moe_layer_count"]) + self.assertEqual(self.expert_load_balancer.ranks_num, + MOCK_DATA["layer_list"][0]["device_count"]) + + def test_generate_index_dicts(self): + tensor_2d = torch.tensor([[7, 2, 0, 3, 5], [6, 1, 4, 7, 2]]) + result = self.expert_load_balancer.generate_index_dicts(tensor_2d) + expected_result = [{ + 7: 0, + 2: 1, + 0: 2, + 3: 3, + 5: 4 + }, { + 6: 5, + 1: 6, + 4: 7, + 7: 8, + 2: 9 + }] + self.assertEqual(result, expected_result) + + def test_generate_expert_placement_map(self): + expert_placement_map = self.expert_load_balancer.generate_expert_placement_map( + ) + self.assertEqual(expert_placement_map.shape, + (self.expert_load_balancer.layers_num, + self.expert_load_balancer.ranks_num, 8)) + self.assertTrue(torch.all(expert_placement_map >= -1)) + + def test_generate_log2phy_expert_map(self): + layer_id = 0 + log2phy_map = self.expert_load_balancer.generate_log2phy_expert_map( + layer_id) + self.assertEqual(log2phy_map.shape, + (self.expert_load_balancer.ranks_num, 8)) + self.assertTrue(torch.all(log2phy_map >= -1)) + + @mock.patch("torch_npu.npu._lazy_init") + @mock.patch("torch.npu.current_device", return_value="cpu") + def test_get_rank_placement_map(self, mock_current_device, mock_lazy_init): + layer_id = 0 + rank_id = 0 + rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map( + layer_id, rank_id) + self.assertEqual(rank_local_expert_num, 5) + expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0], + dtype=torch.int32).to( + rank_expert_map.device) + self.assertTrue(rank_expert_map.equal(expected_tensor)) + + rank_id = 1 + rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map( + layer_id, rank_id) + expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3], + dtype=torch.int32).to( + rank_expert_map.device) + self.assertTrue(rank_expert_map.equal(expected_tensor)) + + def test_get_rank_log2phy_map(self): + layer_id = 0 + rank_id = 0 + log2phy_map = self.expert_load_balancer.get_rank_log2phy_map( + layer_id, rank_id) + expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0], + dtype=torch.int32).to( + log2phy_map.device) + self.assertTrue(log2phy_map.equal(expected_tensor)) + + rank_id = 1 + log2phy_map = self.expert_load_balancer.get_rank_log2phy_map( + layer_id, rank_id) + expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8], + dtype=torch.int32).to( + log2phy_map.device) + self.assertTrue(log2phy_map.equal(expected_tensor)) + + def test_get_global_redundant_expert_num(self): + redundant_expert_num = self.expert_load_balancer.get_global_redundant_expert_num( + ) + expected_redundant_expert_num = len(MOCK_DATA["layer_list"][0]["device_list"][0]["device_expert"]) * \ + MOCK_DATA["layer_list"][0]["device_count"] - 8 + self.assertEqual(redundant_expert_num, expected_redundant_expert_num) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 801819b039..89f30bc43c 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -74,7 +74,7 @@ from vllm_ascend.attention.mla_v1 import CommonAttentionMetadata from vllm_ascend.platform import NPUPlatform from vllm_ascend.sample.rejection_sampler import AscendRejectionSampler -from vllm_ascend.utils import ProfileExecuteDuration +from vllm_ascend.utils import ProfileExecuteDuration, vllm_version_is from vllm_ascend.worker.mtp_proposer_v1 import MtpProposer if TYPE_CHECKING: @@ -420,19 +420,33 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> None: generator.manual_seed(sampling_params.seed) else: generator = None - - self.requests[req_id] = CachedRequestState( - req_id=req_id, - prompt_token_ids=new_req_data.prompt_token_ids, - mm_inputs=new_req_data.mm_inputs, - mm_positions=new_req_data.mm_positions, - sampling_params=sampling_params, - generator=generator, - block_ids=new_req_data.block_ids, - num_computed_tokens=new_req_data.num_computed_tokens, - output_token_ids=[], - lora_request=new_req_data.lora_request, - ) + if vllm_version_is("0.9.1"): + self.requests[req_id] = CachedRequestState( + req_id=req_id, + prompt_token_ids=new_req_data.prompt_token_ids, + mm_inputs=new_req_data.mm_inputs, + mm_positions=new_req_data.mm_positions, + sampling_params=sampling_params, + generator=generator, + block_ids=new_req_data.block_ids, + num_computed_tokens=new_req_data.num_computed_tokens, + output_token_ids=[], + lora_request=new_req_data.lora_request, + ) + else: + self.requests[req_id] = CachedRequestState( + req_id=req_id, + prompt_token_ids=new_req_data.prompt_token_ids, + mm_inputs=new_req_data.mm_inputs, + mm_positions=new_req_data.mm_positions, + sampling_params=sampling_params, + pooling_params=None, + generator=generator, + block_ids=new_req_data.block_ids, + num_computed_tokens=new_req_data.num_computed_tokens, + output_token_ids=[], + lora_request=new_req_data.lora_request, + ) # Only relevant for models using M-RoPE (e.g, Qwen2-VL) if self.uses_mrope: @@ -1305,15 +1319,25 @@ def execute_model( hidden_states, attn_metadata, ) - - model_runner_output = ModelRunnerOutput( - req_ids=self.input_batch.req_ids, - req_id_to_index=self.input_batch.req_id_to_index, - sampled_token_ids=valid_sampled_token_ids, - spec_token_ids=spec_token_ids, - logprobs=logprobs_lists, - prompt_logprobs_dict={}, - ) + if vllm_version_is("0.9.1"): + model_runner_output = ModelRunnerOutput( + req_ids=self.input_batch.req_ids, + req_id_to_index=self.input_batch.req_id_to_index, + sampled_token_ids=valid_sampled_token_ids, + spec_token_ids=spec_token_ids, + logprobs=logprobs_lists, + prompt_logprobs_dict={}, + ) + else: + model_runner_output = ModelRunnerOutput( + req_ids=self.input_batch.req_ids, + req_id_to_index=self.input_batch.req_id_to_index, + sampled_token_ids=valid_sampled_token_ids, + spec_token_ids=spec_token_ids, + logprobs=logprobs_lists, + prompt_logprobs_dict={}, + pooler_output=[], + ) durations = ProfileExecuteDuration().pop_captured_sync() if durations: