Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
949c914
MoE init
cakeng Jan 27, 2025
ba0549e
EP config integrations
cakeng Jan 28, 2025
7d7285d
Weight loading
cakeng Jan 29, 2025
991d126
Added EP info to moe_align_block_size
cakeng Jan 29, 2025
df154b6
Bugs
cakeng Jan 29, 2025
3d700c4
Working EP+TP Prototype
cakeng Jan 29, 2025
2d05161
Removed debugging print statements
cakeng Jan 30, 2025
60284b5
Fixes
cakeng Jan 30, 2025
9cdb728
Merge branch 'main' into moe
cakeng Jan 30, 2025
03b8afb
Fused MoE Kernel fixes, Errors on CudaGraph Capture
cakeng Jan 30, 2025
cdb252d
Merge branch 'vllm-project:main' into moe
cakeng Jan 30, 2025
f7dcd7b
Expert mapping fixes, num_experts does not need to be divisible by EP…
cakeng Jan 31, 2025
b3e00f5
Merge branch 'main' into moe
cakeng Feb 4, 2025
d8cb2b3
Merge branch 'main' into moe
cakeng Feb 4, 2025
837a0fb
Merge branch 'main' into moe
cakeng Feb 7, 2025
cefcef6
Integrating to DeepSeekV3
cakeng Feb 7, 2025
e550752
EP correctness on DeepSeekV3 checked
cakeng Feb 8, 2025
ea5fbdc
Moved expert_parallel_size argument to expertimental_expert_parallel_…
cakeng Feb 8, 2025
8485a9a
Added environment variable VLLM_TEST_ENABLE_EP to turn on EP, default…
cakeng Feb 11, 2025
4c7ba48
Merge branch 'main' into moe
cakeng Feb 11, 2025
5cadaa0
Errors running test scripts
cakeng Feb 13, 2025
83a7190
Fixed FusedMoE apply base method
cakeng Feb 13, 2025
7102ae7
Added test_expert_parallel.py and modified test_moe.py to include EP …
cakeng Feb 14, 2025
1951aa7
Pre-commit
cakeng Feb 14, 2025
b9c8c18
Merge branch 'main' into moe
cakeng Feb 14, 2025
1254ba6
Clean debug print statements
cakeng Feb 14, 2025
c8a6c64
Fixed FusedMoE apply function signatures
cakeng Feb 14, 2025
b721ee7
Fixed remaining FusedMoE apply function signatures
cakeng Feb 14, 2025
2a9993b
Removed debugging print statement.
cakeng Feb 14, 2025
56f9828
Update parallel_state.py
cakeng Feb 14, 2025
5e53fbb
Update parallel_state.py
cakeng Feb 14, 2025
182423a
Reduce diffs
cakeng Feb 15, 2025
5ced495
Updated comments from Lucas and Tyler
cakeng Feb 19, 2025
9bf31be
Made TP rank setting more explicit during the FusedMoE weight loading
cakeng Feb 19, 2025
88ce3a9
Merge branch 'main' into moe
tlrmchlsmth Feb 20, 2025
99e2d98
test_moe.py fixes
cakeng Feb 21, 2025
ee3f981
Merge branch 'main' into moe
cakeng Feb 21, 2025
5df1e06
Merge branch 'main' into moe
cakeng Feb 21, 2025
c5a21c6
fix test_expert_parallel.py env_variable
cakeng Feb 22, 2025
accb533
fix test_expert_parallel.py test configurations
cakeng Feb 22, 2025
14864f7
Merge branch 'main' into moe
cakeng Feb 23, 2025
a0f2c21
Fix pre-commit
cakeng Feb 23, 2025
5554d35
Revert FusedMoE num_experts variable name change and add deepseekV2 t…
cakeng Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benchmarks/kernels/benchmark_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ def main(args: argparse.Namespace):
topk = config.num_experts_per_tok
intermediate_size = config.intermediate_size
shard_intermediate_size = 2 * intermediate_size // args.tp_size
elif config.architectures[0] == "DeepseekV3ForCausalLM":
elif (config.architectures[0] == "DeepseekV3ForCausalLM"
or config.architectures[0] == "DeepseekV2ForCausalLM"):
E = config.n_routed_experts
topk = config.num_experts_per_tok
intermediate_size = config.moe_intermediate_size
Expand Down
227 changes: 227 additions & 0 deletions tests/distributed/test_expert_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# SPDX-License-Identifier: Apache-2.0

from dataclasses import dataclass
from typing import List, Literal, NamedTuple, Optional

import pytest

from vllm.config import TaskOption
from vllm.logger import init_logger

from ..utils import compare_two_settings, fork_new_process_for_each_test

logger = init_logger("test_expert_parallel")


class ParallelSetup(NamedTuple):
tp_size: int
eager_mode: bool
chunked_prefill: bool


class EPTestOptions(NamedTuple):
trust_remote_code: bool
tokenizer_mode: Optional[str]
load_format: Optional[str] = None
hf_overrides: Optional[str] = None


@dataclass
class EPTestSettings:
parallel_setups: List[ParallelSetup]
distributed_backends: List[str]
task: TaskOption
test_options: EPTestOptions

@staticmethod
def detailed(
*,
tp_base: int = 2,
task: TaskOption = "auto",
trust_remote_code: bool = False,
tokenizer_mode: Optional[str] = None,
load_format: Optional[str] = None,
hf_overrides: Optional[str] = None,
):
return EPTestSettings(
parallel_setups=[
ParallelSetup(tp_size=tp_base,
eager_mode=False,
chunked_prefill=False),
ParallelSetup(tp_size=tp_base,
eager_mode=False,
chunked_prefill=True),
ParallelSetup(tp_size=tp_base,
eager_mode=True,
chunked_prefill=False),
ParallelSetup(tp_size=2 * tp_base,
eager_mode=False,
chunked_prefill=True),
ParallelSetup(tp_size=2 * tp_base,
eager_mode=True,
chunked_prefill=False),
],
distributed_backends=["mp", "ray"],
task=task,
test_options=EPTestOptions(trust_remote_code=trust_remote_code,
tokenizer_mode=tokenizer_mode,
load_format=load_format,
hf_overrides=hf_overrides),
)

@staticmethod
def fast(
*,
tp_base: int = 2,
task: TaskOption = "auto",
trust_remote_code: bool = False,
tokenizer_mode: Optional[str] = None,
load_format: Optional[str] = None,
hf_overrides: Optional[str] = None,
):
return EPTestSettings(
parallel_setups=[
ParallelSetup(tp_size=tp_base,
eager_mode=True,
chunked_prefill=False),
],
distributed_backends=["mp"],
task=task,
test_options=EPTestOptions(trust_remote_code=trust_remote_code,
tokenizer_mode=tokenizer_mode,
load_format=load_format,
hf_overrides=hf_overrides),
)

def iter_params(self, model_name: str):
opts = self.test_options

for parallel_setup in self.parallel_setups:
for distributed_backend in self.distributed_backends:
yield (model_name, parallel_setup, distributed_backend,
self.task, opts)


# NOTE: You can adjust tp_base locally to fit the model in GPU
# The values displayed here are only a rough indicator of the size of the model

# yapf: disable
TEST_MODELS = {
"deepseek-ai/DeepSeek-V2-Lite-Chat": EPTestSettings.fast(
trust_remote_code=True),
"mistralai/Mixtral-8x7B-Instruct-v0.1": EPTestSettings.fast(tp_base=4),
}


def _compare_tp(
model_name: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
task: TaskOption,
test_options: EPTestOptions,
num_gpus_available: int,
*,
method: Literal["generate"],
):
(
tp_size,
eager_mode,
chunked_prefill,
) = parallel_setup
(
trust_remote_code,
tokenizer_mode,
load_format,
hf_overrides,
) = test_options

if num_gpus_available < tp_size:
pytest.skip(f"Need at least {tp_size} GPUs")

common_args = [
# use half precision for speed and memory savings in CI environment
"--dtype",
"float16",
"--max-model-len",
"2048",
"--max-num-seqs",
"8",
"--load-format",
"auto",
]
if chunked_prefill:
common_args.append("--enable-chunked-prefill")
if eager_mode:
common_args.append("--enforce-eager")
if task != "auto":
common_args.extend(["--task", task])
if trust_remote_code:
common_args.append("--trust-remote-code")
if tokenizer_mode:
common_args.extend(["--tokenizer-mode", tokenizer_mode])
if load_format:
common_args.extend(["--load-format", load_format])
if hf_overrides:
common_args.extend(["--hf-overrides", hf_overrides])

ep_env = {
"VLLM_TEST_ENABLE_EP": "1",
}

ep_args = [
*common_args,
"--tensor-parallel-size",
str(tp_size),
"--distributed-executor-backend",
distributed_backend,
]

# compare without expert parallelism
tp_env = {
"VLLM_TEST_ENABLE_EP": "0",
}

tp_args = [
*common_args,
"--tensor-parallel-size",
str(tp_size),
"--distributed-executor-backend",
"mp",
]

try:
compare_two_settings(model_name,
ep_args,
tp_args,
ep_env,
tp_env,
method=method,
max_wait_seconds=360)
except Exception:
raise


@pytest.mark.parametrize(
("model_name", "parallel_setup", "distributed_backend", "task",
"test_options"),
[
params for model_name, settings in TEST_MODELS.items()
for params in settings.iter_params(model_name)
],
)
@fork_new_process_for_each_test
def test_ep(
model_name: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
task: TaskOption,
test_options: EPTestOptions,
num_gpus_available,
):
_compare_tp(model_name,
parallel_setup,
distributed_backend,
task,
test_options,
num_gpus_available,
method="generate")
9 changes: 2 additions & 7 deletions tests/kernels/test_awq_marlin.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,8 @@ def test_fused_marlin_moe_awq(
num_bits=num_bits,
)

torch_output = torch_moe(
a,
w_ref1.transpose(1, 2),
w_ref2.transpose(1, 2),
score,
topk,
)
torch_output = torch_moe(a, w_ref1.transpose(1, 2), w_ref2.transpose(1, 2),
score, topk, None)

assert compute_max_diff(marlin_output, torch_output) < 4e-2

Expand Down
65 changes: 59 additions & 6 deletions tests/kernels/test_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from vllm.scalar_type import scalar_types

NUM_EXPERTS = [8, 64]
EP_SIZE = [1, 4]
TOP_KS = [2, 6]


Expand All @@ -34,24 +35,54 @@
@pytest.mark.parametrize("k", [128, 511, 1024])
@pytest.mark.parametrize("e", NUM_EXPERTS)
@pytest.mark.parametrize("topk", TOP_KS)
@pytest.mark.parametrize("ep_size", EP_SIZE)
@pytest.mark.parametrize("dtype", [torch.float16, torch.bfloat16])
def test_fused_moe(
m: int,
n: int,
k: int,
e: int,
topk: int,
ep_size: int,
dtype: torch.dtype,
):
a = torch.randn((m, k), device="cuda", dtype=dtype) / 10
w1 = torch.randn((e, 2 * n, k), device="cuda", dtype=dtype) / 10
w2 = torch.randn((e, k, n), device="cuda", dtype=dtype) / 10

score = torch.randn((m, e), device="cuda", dtype=dtype)
triton_output = fused_moe(a, w1, w2, score, topk, renormalize=False)
torch_output = torch_moe(a, w1, w2, score, topk)

if ep_size > 1:
local_e = e // ep_size
e_ids = torch.randint(0,
e, (local_e, ),
device="cuda",
dtype=torch.int32)
e_map = torch.full((e, ), -1, device="cuda", dtype=torch.int32)
e_map[e_ids] = torch.arange(local_e, device="cuda", dtype=torch.int32)
w1 = w1[e_ids]
w2 = w2[e_ids]
else:
e_map = None

triton_output = fused_moe(a,
w1,
w2,
score,
topk,
global_num_experts=e,
expert_map=e_map,
renormalize=False)
torch_output = torch_moe(a, w1, w2, score, topk, e_map)
torch.testing.assert_close(triton_output, torch_output, atol=2e-2, rtol=0)
iterative_output = iterative_moe(a, w1, w2, score, topk, renormalize=False)
iterative_output = iterative_moe(a,
w1,
w2,
score,
topk,
global_num_experts=e,
expert_map=e_map,
renormalize=False)
torch.testing.assert_close(iterative_output,
torch_output,
atol=2e-2,
Expand All @@ -63,13 +94,14 @@ def test_fused_moe(
@pytest.mark.parametrize("k", [128, 1024])
@pytest.mark.parametrize("e", NUM_EXPERTS)
@pytest.mark.parametrize("topk", TOP_KS)
@pytest.mark.parametrize("ep_size", EP_SIZE)
@pytest.mark.parametrize("dtype", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("group_size", [64, 128])
@pytest.mark.parametrize("has_zp", [True, False])
@pytest.mark.parametrize("weight_bits", [4, 8])
def test_fused_moe_wn16(m: int, n: int, k: int, e: int, topk: int,
dtype: torch.dtype, group_size: int, has_zp: bool,
weight_bits: int):
ep_size: int, dtype: torch.dtype, group_size: int,
has_zp: bool, weight_bits: int):
print(m, n, k, e, topk, dtype, group_size, has_zp, weight_bits)
a = torch.randn((m, k), device="cuda", dtype=dtype) / 10
w1 = torch.randn((e, 2 * n, k), device="cuda", dtype=dtype) / 10
Expand Down Expand Up @@ -130,6 +162,25 @@ def test_fused_moe_wn16(m: int, n: int, k: int, e: int, topk: int,
if has_zp:
w_qzeros[expert_id] = qzeros

if ep_size > 1:
local_e = e // ep_size
e_ids = torch.randint(0,
e, (local_e, ),
device="cuda",
dtype=torch.int32)
e_map = torch.full((e, ), -1, device="cuda", dtype=torch.int32)
e_map[e_ids] = torch.arange(local_e, device="cuda", dtype=torch.int32)
w1_ref = w1_ref[e_ids]
w2_ref = w2_ref[e_ids]
w1_qweight = w1_qweight[e_ids]
w2_qweight = w2_qweight[e_ids]
w1_scales = w1_scales[e_ids]
w2_scales = w2_scales[e_ids]
w1_qzeros = w1_qzeros[e_ids]
w2_qzeros = w2_qzeros[e_ids]
else:
e_map = None

triton_output = fused_moe(a,
w1_qweight,
w2_qweight,
Expand All @@ -138,12 +189,14 @@ def test_fused_moe_wn16(m: int, n: int, k: int, e: int, topk: int,
renormalize=False,
use_int4_w4a16=weight_bits == 4,
use_int8_w8a16=weight_bits == 8,
global_num_experts=e,
expert_map=e_map,
w1_scale=w1_scales,
w2_scale=w2_scales,
w1_zp=w1_qzeros if has_zp else None,
w2_zp=w2_qzeros if has_zp else None,
block_shape=[0, group_size])
torch_output = torch_moe(a, w1_ref, w2_ref, score, topk)
torch_output = torch_moe(a, w1_ref, w2_ref, score, topk, e_map)
torch.testing.assert_close(triton_output, torch_output, atol=2e-2, rtol=0)


Expand Down
4 changes: 3 additions & 1 deletion tests/kernels/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,14 +1053,16 @@ def compute_max_diff(output, output_ref):
torch.abs(output_ref))


def torch_moe(a, w1, w2, score, topk):
def torch_moe(a, w1, w2, score, topk, expert_map):
B, D = a.shape
a = a.view(B, -1, D).repeat(1, topk, 1).reshape(-1, D)
out = torch.zeros(B * topk, w2.shape[1], dtype=a.dtype, device=a.device)
score = torch.softmax(score, dim=-1, dtype=torch.float32)
topk_weight, topk_ids = torch.topk(score, topk)
topk_weight = topk_weight.view(-1)
topk_ids = topk_ids.view(-1)
if expert_map is not None:
topk_ids = expert_map[topk_ids]
for i in range(w1.shape[0]):
mask = topk_ids == i
if mask.sum():
Expand Down
Loading