Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/vllm_ascend_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ jobs:
pytest -sv tests/e2e/multicard/ --ignore=tests/e2e/multicard/test_ilama_lora_tp2.py \
--ignore=tests/e2e/multicard/test_offline_inference_distributed.py \
--ignore=tests/e2e/multicard/test_data_parallel.py
pytest -sv tests/e2e/eplb/test_eplb_e2e.py

- name: Run vllm-project/vllm-ascend test on V0 engine
if: ${{ github.event_name == 'schedule' }}
Expand Down
166 changes: 166 additions & 0 deletions tests/e2e/eplb/test_eplb_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import json
import os
import random
import signal
import subprocess
import time

import psutil
import pytest
import requests


def kill_process_and_children(pid):
try:
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
print(f"Killing child process {child.pid}")
child.kill()
print(f"Killing parent process {pid}")
parent.kill()
except psutil.NoSuchProcess:
pass


def kill_all_vllm_related():
current_pid = os.getpid()

for proc in psutil.process_iter(['pid', 'cmdline']):
try:
if proc.pid == current_pid:
continue
cmd = ' '.join(proc.info['cmdline'])
if "vllm" in cmd or "proxy" in cmd or "engine_worker" in cmd:
kill_process_and_children(proc.pid)
except Exception:
continue


def build_expert_map(expert_map_path,
num_redundant_expert=0,
num_layer=2,
num_device=4,
num_original_expert=256,
random_seed=42):
expert_num_list = list(range(num_original_expert))
random.seed(random_seed)
if num_redundant_expert > 0:
expert_num_list = expert_num_list + random.choices(
expert_num_list, k=num_redundant_expert)
local_num_expert = len(expert_num_list) // num_device

expert_map = {
"moe_layer_count": num_layer,
"device_count": num_device,
"layer_list": []
}
for layer_id in range(num_layer):
random.shuffle(expert_num_list)
current_expert_distribution = [
expert_num_list[i * local_num_expert:(i + 1) * local_num_expert]
for i in range(num_device)
]
layer_info = {
"layer_id": layer_id,
"device_count": num_device,
"device_list": []
}
for device_id in range(num_device):
layer_info["device_list"].append({
"device_id":
device_id,
"device_expert":
current_expert_distribution[device_id]
})
expert_map["layer_list"].append(layer_info)
with open(expert_map_path, "w") as f:
json.dump(expert_map, f)


def is_port_in_use(port):
"""Check if a port is currently in use."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
return sock.connect_ex(("127.0.0.1", port)) == 0


def ensure_port_available(port, timeout=30):
"""Wait for a port to become available."""
start = time.time()
while time.time() - start < timeout:
if not is_port_in_use(port):
return True
print(f"Port {port} is still in use, waiting...")
time.sleep(2)
return False


def wait_for_port(port, timeout=30):
import socket
start = time.time()
while time.time() - start < timeout:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
if sock.connect_ex(("127.0.0.1", port)) == 0:
return True
time.sleep(1)
raise TimeoutError(f"Port {port} not ready after {timeout}s")


SCRIPT_PATH = os.path.abspath("./tests/e2e/run_eplb.sh")
PROXY_PORT = 10102
EXPERT_MAP_PATH = "./tests/e2e/eplb/expert_map.json"


@pytest.mark.parametrize("num_redundant_expert", [0, 4])
def test_eplb_with_redundant_expert(num_redundant_expert):
# Ensure port is available before starting the test
if is_port_in_use(PROXY_PORT):
print(
f"Port {PROXY_PORT} is still in use from previous test, waiting for it to become available..."
)
if not ensure_port_available(PROXY_PORT, timeout=300):
pytest.skip(
f"Port {PROXY_PORT} is still in use after waiting 60 seconds")

print("Launching bash script to run eplb setup...")
build_expert_map(EXPERT_MAP_PATH,
num_redundant_expert=num_redundant_expert)
proc = subprocess.Popen(["bash", SCRIPT_PATH, str(num_redundant_expert)])
try:
print("Waiting for proxy port to be available...")
wait_for_port(PROXY_PORT, timeout=600)

# request
payload = {
"model": "Deepseek",
"prompt": "The future of AI is",
"max_tokens": 64,
"temperature": 0,
}
response = requests.post(
f"http://localhost:{PROXY_PORT}/v1/completions",
headers={"Content-Type": "application/json"},
json=payload,
timeout=10)
assert response.status_code == 200, f"HTTP failed: {response.status_code}"
result = response.json()
print("Response:", result)
assert "text" in result["choices"][0]
assert len(result["choices"][0]["text"].strip()) > 0

finally:
# clean up subprocesses
print("Cleaning up subprocess...")
proc.send_signal(signal.SIGINT)
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
if os.path.exists(EXPERT_MAP_PATH):
os.remove(EXPERT_MAP_PATH)
kill_all_vllm_related()

# Wait for port to be fully released
print("Waiting for port to be fully released...")
time.sleep(3)
50 changes: 50 additions & 0 deletions tests/e2e/run_eplb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/bin/bash

set -eo errexit

. $(dirname "$0")/common.sh

export VLLM_ENABLE_MC2=1
export VLLM_USE_V1=1
export TASK_QUEUE_ENABLE=1
# FIXME: unset HCCL_OP_EXPANSION_MODE to avoid the torch_air bug
unset HCCL_OP_EXPANSION_MODE

MODEL_NAME="vllm-ascend/DeepSeek-V3-Pruning"
TP_SIZE=2
DP_SIZE=2
REGISTER_PORT=10102
ASCEND_VISIBLE_DEVICES=0,1,2,3
NUM_REDUNDANT_EXPERT=$1


function run_eplb_instance() {
local model_name=$1
local tp_size=$2
local dp_size=$3
local register_port=$4
local num_redundant_expert=$5

_info "====> Test model: $model_name"
_info "====> TP size: $tp_size"
_info "====> DP size: $dp_size"
_info "====> Register port: $register_port"
_info "====> Expert map path: ./tests/e2e/eplb/expert_map.json"
_info "====> Num redundant expert: $num_redundant_expert"

ASCEND_RT_VISIBLE_DEVICES=$ASCEND_VISIBLE_DEVICES vllm serve $model_name \
--host 0.0.0.0 \
--port $register_port \
--tensor-parallel-size $tp_size \
--data-parallel-size $dp_size \
--enable-expert-parallel \
--served-model-name Deepseek \
--max-model-len 8192 \
--max-num-seqs 24 \
--trust-remote-code \
--additional-config '{"torchair_graph_config": {"enabled": true, "graph_batch_sizes": [24]}, "ascend_scheduler_config": {"enabled": true}, "expert_map_path": "./tests/e2e/eplb/expert_map.json"}'
}


_info "====> Start staic_eplb test"
run_eplb_instance $MODEL_NAME $TP_SIZE $DP_SIZE $REGISTER_PORT $NUM_REDUNDANT_EXPERT
5 changes: 4 additions & 1 deletion vllm_ascend/ops/expert_load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ def get_rank_placement_map(self, layer_id, rank_id):
layer_expert_map = expert_placement_map[layer_id]
rank_expert_map = layer_expert_map[rank_id].to(
torch.npu.current_device())
rank_local_expert_num = torch.sum(torch.ne(rank_expert_map, -1)).item()

# valid num in expert_map may not equal to
# rank_local_expert_num when num_redundant_experts > 0
rank_local_expert_num = len(self.expert_map_tensor[layer_id, rank_id])
return rank_local_expert_num, rank_expert_map

def get_rank_log2phy_map(self, layer_id, rank_id):
Expand Down
7 changes: 5 additions & 2 deletions vllm_ascend/ops/fused_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,8 +1212,11 @@

assert self.quant_method is not None

local_num_experts = torch.sum(self.expert_map != -1) \
if self.expert_map is not None else num_experts
if self.log2phy is not None:
local_num_experts = self.local_num_experts

Check warning on line 1216 in vllm_ascend/ops/fused_moe.py

View check run for this annotation

Codecov / codecov/patch

vllm_ascend/ops/fused_moe.py#L1215-L1216

Added lines #L1215 - L1216 were not covered by tests
else:
local_num_experts = torch.sum(self.expert_map != -1) \

Check warning on line 1218 in vllm_ascend/ops/fused_moe.py

View check run for this annotation

Codecov / codecov/patch

vllm_ascend/ops/fused_moe.py#L1218

Added line #L1218 was not covered by tests
if self.expert_map is not None else num_experts
Comment on lines -1215 to +1219
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use the self.local_num_experts value when log2phy is None? It's already set by determine_expert_map.

self.local_num_experts, self.expert_map = determine_expert_map(
self.ep_size,
get_ep_group().rank_in_group, self.global_num_experts)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should return the same value. The current implementation intentionally preserves the original logic.


moe_quant_params = {
"num_experts": local_num_experts,
Expand Down
Loading