From 02436843b91e719fd2782ab6c6262024fdcfe902 Mon Sep 17 00:00:00 2001 From: perzhang Date: Tue, 28 Oct 2025 13:59:27 +0000 Subject: [PATCH] [feat](eplb): support eplb on rocm platform --- vllm/config/parallel.py | 2 +- vllm/distributed/eplb/eplb_state.py | 1 + vllm/distributed/eplb/rebalance_execute.py | 33 +++++++++++++++++++ vllm/model_executor/layers/fused_moe/layer.py | 24 +++++++------- .../compressed_tensors_moe.py | 17 ++++++++-- .../layers/quantization/utils/w8a8_utils.py | 8 +++++ 6 files changed, 69 insertions(+), 16 deletions(-) diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index b79bc6983b54..f0a8ed689bd8 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -267,7 +267,7 @@ def _validate_parallel_config(self) -> Self: ) if self.enable_eplb: - if not current_platform.is_cuda(): + if not (current_platform.is_cuda() or current_platform.is_rocm()): raise ValueError( "Expert parallelism load balancing is only supported on " "CUDA devices now." diff --git a/vllm/distributed/eplb/eplb_state.py b/vllm/distributed/eplb/eplb_state.py index 17716e8a07ac..894d3a64db1d 100644 --- a/vllm/distributed/eplb/eplb_state.py +++ b/vllm/distributed/eplb/eplb_state.py @@ -429,6 +429,7 @@ def step( # rearrangement step and perform rearrangement to ensure all ranks are # performing collective communication. self.expert_rearrangement_step += 1 + print("self.expert_step: ", self.expert_rearrangement_step) if self.expert_rearrangement_step >= self.expert_rearrangement_step_interval: self.expert_rearrangement_step = 0 self.rearrange(model) diff --git a/vllm/distributed/eplb/rebalance_execute.py b/vllm/distributed/eplb/rebalance_execute.py index f8ec3e956401..7766cd00ad95 100644 --- a/vllm/distributed/eplb/rebalance_execute.py +++ b/vllm/distributed/eplb/rebalance_execute.py @@ -141,6 +141,7 @@ def shuffle_layer( p2p_ops: list[P2POp] = [] + # 2. Initiate sending of weights. experts_send_loc: dict[int, int] = {} for src in range(num_local_experts): @@ -224,11 +225,42 @@ def shuffle_layer( for weight in expert_weights_buffer ] + # op_info = [] + # for op in p2p_ops: + # if op is None: + # op_info.append(None) + # continue + + # op_data = { + # 'isend': op.op.__name__ == "isend", + # 'peer': op.peer, + # 'tag': getattr(op, 'tag', 0), + # 'tensor_shape': op.tensor.shape, + # 'tensor_dtype': op.tensor.dtype, + # 'tensor_device': op.tensor.device, + # } + # op_info.append(op_data) + # torch.save({"op":op_info}, f"op_info_device_{ep_rank}.pt") + # 4. Execute the P2P operations. The real communication happens here. if p2p_ops: reqs = batch_isend_irecv(p2p_ops) for req in reqs: req.wait() + # if p2p_ops: + # reqs = [] + # for op in p2p_ops: + # if op is None: + # continue + # if op.op.__name__ == "isend": + # print("isend: ", op.peer) + # req = torch.distributed.isend(op.tensor, op.peer, op.group, op.tag) + # else: + # print("irecv: ", op.peer) + # req = torch.distributed.irecv(op.tensor, op.peer, op.group, op.tag) + # reqs.append(req) + # for req in reqs: + # req.wait() # 5. Copy the weights from the buffer back to the original weights. for dst in range(num_local_experts): @@ -325,6 +357,7 @@ def rearrange_expert_weights_inplace( # NOTE(bowen): We need this synchronize to run, but I don't know why. # If you figure out the reason, please let me know -- thank you! torch.cuda.synchronize() + print("layer: ", layer) shuffle_layer( num_local_physical_experts, ep_rank, diff --git a/vllm/model_executor/layers/fused_moe/layer.py b/vllm/model_executor/layers/fused_moe/layer.py index 04d8e91b0d25..3505d719e501 100644 --- a/vllm/model_executor/layers/fused_moe/layer.py +++ b/vllm/model_executor/layers/fused_moe/layer.py @@ -1271,17 +1271,17 @@ def __init__( if self.enable_eplb: from vllm.model_executor.layers.quantization.fp8 import Fp8MoEMethod - if not isinstance(quant_method, (Fp8MoEMethod, UnquantizedFusedMoEMethod)): - # TODO: Add support for additional quantization methods. - # The implementation for other quantization methods does not - # contain essential differences, but the current quant API - # design causes duplicated work when extending to new - # quantization methods, so I'm leaving it for now. - # If you plan to add support for more quantization methods, - # please refer to the implementation in `Fp8MoEMethod`. - raise NotImplementedError( - "EPLB is only supported for FP8 quantization for now." - ) + # if not isinstance(quant_method, (Fp8MoEMethod, UnquantizedFusedMoEMethod)): + # # TODO: Add support for additional quantization methods. + # # The implementation for other quantization methods does not + # # contain essential differences, but the current quant API + # # design causes duplicated work when extending to new + # # quantization methods, so I'm leaving it for now. + # # If you plan to add support for more quantization methods, + # # please refer to the implementation in `Fp8MoEMethod`. + # raise NotImplementedError( + # "EPLB is only supported for FP8 quantization for now." + # ) moe_quant_params = { "num_experts": self.local_num_experts, @@ -1890,7 +1890,7 @@ def load_weights( def get_expert_weights(self) -> Iterable[torch.Tensor]: weights = list(self.named_parameters()) - assert all(weight.is_contiguous() for _, weight in weights) + # assert all(weight.is_contiguous() for _, weight in weights) # Filter out the non-expert weights. # `e_score_correction_bias` is a bias for each logical expert, diff --git a/vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe.py b/vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe.py index 3b82f8a98bbd..6b93055116b5 100644 --- a/vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe.py +++ b/vllm/model_executor/layers/quantization/compressed_tensors/compressed_tensors_moe.py @@ -1037,10 +1037,16 @@ def apply( logical_to_physical_map: torch.Tensor | None = None, logical_replica_count: torch.Tensor | None = None, ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]: + # if enable_eplb: + # raise NotImplementedError( + # "EPLB not supported for `CompressedTensorsW8A8Fp8MoEMethod` yet." + # ) + if enable_eplb: - raise NotImplementedError( - "EPLB not supported for `CompressedTensorsW8A8Fp8MoEMethod` yet." - ) + assert expert_load_view is not None + assert logical_to_physical_map is not None + assert logical_replica_count is not None + assert isinstance(layer, FusedMoE) topk_weights, topk_ids, _ = FusedMoE.select_experts( hidden_states=x, @@ -1056,6 +1062,11 @@ def apply( e_score_correction_bias=e_score_correction_bias, indices_type=self.topk_indices_dtype, num_fused_shared_experts=layer.num_fused_shared_experts, + enable_eplb=enable_eplb, + expert_map=expert_map, + expert_load_view=expert_load_view, + logical_to_physical_map=logical_to_physical_map, + logical_replica_count=logical_replica_count, ) per_act_token = self.input_quant.strategy == QuantizationStrategy.TOKEN diff --git a/vllm/model_executor/layers/quantization/utils/w8a8_utils.py b/vllm/model_executor/layers/quantization/utils/w8a8_utils.py index bfc40eb971bb..9b0c5e31dd7b 100644 --- a/vllm/model_executor/layers/quantization/utils/w8a8_utils.py +++ b/vllm/model_executor/layers/quantization/utils/w8a8_utils.py @@ -503,6 +503,13 @@ def __init__( and current_platform.is_fp8_fnuz() and pad_output is not True ) + # if pad_output: + # print("aa") + self.pad_output = pad_output + self.aiter = envs.VLLM_ROCM_USE_AITER + self.rocm = envs.VLLM_ROCM_USE_AITER_LINEAR + self.pla = current_platform.is_rocm() + self.f8 = current_platform.is_fp8_fnuz() if self.use_aiter_and_is_supported: self.preferred_backend = "aiter" @@ -515,6 +522,7 @@ def __init__( self.preferred_backend = "cutlass" else: self.preferred_backend = "torch" + self.bak = self.preferred_backend # Note: we pad the input because torch._scaled_mm is more performant # for matrices with batch dimension > 16.