Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vllm/config/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def _validate_parallel_config(self) -> Self:
)

if self.enable_eplb:
if not current_platform.is_cuda():
if not (current_platform.is_cuda() or current_platform.is_rocm()):
raise ValueError(
"Expert parallelism load balancing is only supported on "
"CUDA devices now."
Expand Down
1 change: 1 addition & 0 deletions vllm/distributed/eplb/eplb_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ def step(
# rearrangement step and perform rearrangement to ensure all ranks are
# performing collective communication.
self.expert_rearrangement_step += 1
print("self.expert_step: ", self.expert_rearrangement_step)
if self.expert_rearrangement_step >= self.expert_rearrangement_step_interval:
self.expert_rearrangement_step = 0
self.rearrange(model)
Expand Down
33 changes: 33 additions & 0 deletions vllm/distributed/eplb/rebalance_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def shuffle_layer(

p2p_ops: list[P2POp] = []


# 2. Initiate sending of weights.
experts_send_loc: dict[int, int] = {}
for src in range(num_local_experts):
Expand Down Expand Up @@ -224,11 +225,42 @@ def shuffle_layer(
for weight in expert_weights_buffer
]

# op_info = []
# for op in p2p_ops:
# if op is None:
# op_info.append(None)
# continue

# op_data = {
# 'isend': op.op.__name__ == "isend",
# 'peer': op.peer,
# 'tag': getattr(op, 'tag', 0),
# 'tensor_shape': op.tensor.shape,
# 'tensor_dtype': op.tensor.dtype,
# 'tensor_device': op.tensor.device,
# }
# op_info.append(op_data)
# torch.save({"op":op_info}, f"op_info_device_{ep_rank}.pt")

# 4. Execute the P2P operations. The real communication happens here.
if p2p_ops:
reqs = batch_isend_irecv(p2p_ops)
for req in reqs:
req.wait()
# if p2p_ops:
# reqs = []
# for op in p2p_ops:
# if op is None:
# continue
# if op.op.__name__ == "isend":
# print("isend: ", op.peer)
# req = torch.distributed.isend(op.tensor, op.peer, op.group, op.tag)
# else:
# print("irecv: ", op.peer)
# req = torch.distributed.irecv(op.tensor, op.peer, op.group, op.tag)
# reqs.append(req)
# for req in reqs:
# req.wait()

# 5. Copy the weights from the buffer back to the original weights.
for dst in range(num_local_experts):
Expand Down Expand Up @@ -325,6 +357,7 @@ def rearrange_expert_weights_inplace(
# NOTE(bowen): We need this synchronize to run, but I don't know why.
# If you figure out the reason, please let me know -- thank you!
torch.cuda.synchronize()
print("layer: ", layer)
shuffle_layer(
num_local_physical_experts,
ep_rank,
Expand Down
24 changes: 12 additions & 12 deletions vllm/model_executor/layers/fused_moe/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,17 +1271,17 @@
if self.enable_eplb:
from vllm.model_executor.layers.quantization.fp8 import Fp8MoEMethod

if not isinstance(quant_method, (Fp8MoEMethod, UnquantizedFusedMoEMethod)):
# TODO: Add support for additional quantization methods.
# The implementation for other quantization methods does not
# contain essential differences, but the current quant API
# design causes duplicated work when extending to new
# quantization methods, so I'm leaving it for now.
# If you plan to add support for more quantization methods,
# please refer to the implementation in `Fp8MoEMethod`.
raise NotImplementedError(
"EPLB is only supported for FP8 quantization for now."
)
# if not isinstance(quant_method, (Fp8MoEMethod, UnquantizedFusedMoEMethod)):

Check failure on line 1274 in vllm/model_executor/layers/fused_moe/layer.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/model_executor/layers/fused_moe/layer.py:1274:89: E501 Line too long (89 > 88)
# # TODO: Add support for additional quantization methods.
# # The implementation for other quantization methods does not
# # contain essential differences, but the current quant API
# # design causes duplicated work when extending to new
# # quantization methods, so I'm leaving it for now.
# # If you plan to add support for more quantization methods,
# # please refer to the implementation in `Fp8MoEMethod`.
# raise NotImplementedError(
# "EPLB is only supported for FP8 quantization for now."
# )

moe_quant_params = {
"num_experts": self.local_num_experts,
Expand Down Expand Up @@ -1890,7 +1890,7 @@

def get_expert_weights(self) -> Iterable[torch.Tensor]:
weights = list(self.named_parameters())
assert all(weight.is_contiguous() for _, weight in weights)
# assert all(weight.is_contiguous() for _, weight in weights)

# Filter out the non-expert weights.
# `e_score_correction_bias` is a bias for each logical expert,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,10 +1037,16 @@ def apply(
logical_to_physical_map: torch.Tensor | None = None,
logical_replica_count: torch.Tensor | None = None,
) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
# if enable_eplb:
# raise NotImplementedError(
# "EPLB not supported for `CompressedTensorsW8A8Fp8MoEMethod` yet."
# )

if enable_eplb:
raise NotImplementedError(
"EPLB not supported for `CompressedTensorsW8A8Fp8MoEMethod` yet."
)
assert expert_load_view is not None
assert logical_to_physical_map is not None
assert logical_replica_count is not None
assert isinstance(layer, FusedMoE)

topk_weights, topk_ids, _ = FusedMoE.select_experts(
hidden_states=x,
Expand All @@ -1056,6 +1062,11 @@ def apply(
e_score_correction_bias=e_score_correction_bias,
indices_type=self.topk_indices_dtype,
num_fused_shared_experts=layer.num_fused_shared_experts,
enable_eplb=enable_eplb,
expert_map=expert_map,
expert_load_view=expert_load_view,
logical_to_physical_map=logical_to_physical_map,
logical_replica_count=logical_replica_count,
)

per_act_token = self.input_quant.strategy == QuantizationStrategy.TOKEN
Expand Down
8 changes: 8 additions & 0 deletions vllm/model_executor/layers/quantization/utils/w8a8_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,13 @@ def __init__(
and current_platform.is_fp8_fnuz()
and pad_output is not True
)
# if pad_output:
# print("aa")
self.pad_output = pad_output
self.aiter = envs.VLLM_ROCM_USE_AITER
self.rocm = envs.VLLM_ROCM_USE_AITER_LINEAR
self.pla = current_platform.is_rocm()
self.f8 = current_platform.is_fp8_fnuz()

if self.use_aiter_and_is_supported:
self.preferred_backend = "aiter"
Expand All @@ -515,6 +522,7 @@ def __init__(
self.preferred_backend = "cutlass"
else:
self.preferred_backend = "torch"
self.bak = self.preferred_backend

# Note: we pad the input because torch._scaled_mm is more performant
# for matrices with batch dimension > 16.
Expand Down
Loading