Skip to content

Commit e82e946

Browse files
committed
[main][refactor] Refactoring forward_context and model_runner_v1
Signed-off-by: zzzzwwjj <1183291235@qq.com>
1 parent ff97740 commit e82e946

23 files changed

+878
-333
lines changed

examples/data_parallel.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
"""
4+
Usage:
5+
Single node:
6+
python examples/offline_inference/data_parallel.py \
7+
--model="ibm-research/PowerMoE-3b" \
8+
--dp-size=2 \
9+
--tp-size=2
10+
11+
Multi-node:
12+
Node 0 (assume the node has ip of 10.99.48.128):
13+
python examples/offline_inference/data_parallel.py \
14+
--model="ibm-research/PowerMoE-3b" \
15+
--dp-size=2 \
16+
--tp-size=2 \
17+
--node-size=2 \
18+
--node-rank=0 \
19+
--master-addr=10.99.48.128 \
20+
--master-port=13345
21+
Node 1:
22+
python examples/offline_inference/data_parallel.py \
23+
--model="ibm-research/PowerMoE-3b" \
24+
--dp-size=2 \
25+
--tp-size=2 \
26+
--node-size=2 \
27+
--node-rank=1 \
28+
--master-addr=10.99.48.128 \
29+
--master-port=13345
30+
"""
31+
32+
import os
33+
from time import sleep
34+
35+
from vllm import LLM, SamplingParams
36+
from vllm.utils import get_open_port
37+
38+
39+
def parse_args():
40+
import argparse
41+
42+
parser = argparse.ArgumentParser(description="Data Parallel Inference")
43+
parser.add_argument(
44+
"--model",
45+
type=str,
46+
default="ibm-research/PowerMoE-3b",
47+
help="Model name or path",
48+
)
49+
parser.add_argument("--dp-size",
50+
type=int,
51+
default=2,
52+
help="Data parallel size")
53+
parser.add_argument("--tp-size",
54+
type=int,
55+
default=2,
56+
help="Tensor parallel size")
57+
parser.add_argument("--node-size",
58+
type=int,
59+
default=1,
60+
help="Total number of nodes")
61+
parser.add_argument("--node-rank",
62+
type=int,
63+
default=0,
64+
help="Rank of the current node")
65+
parser.add_argument("--master-addr",
66+
type=str,
67+
default="",
68+
help="Master node IP address")
69+
parser.add_argument("--master-port",
70+
type=int,
71+
default=0,
72+
help="Master node port")
73+
parser.add_argument("--enforce-eager",
74+
action="store_true",
75+
help="Enforce eager mode execution.")
76+
parser.add_argument("--trust-remote-code",
77+
action="store_true",
78+
help="Trust remote code.")
79+
return parser.parse_args()
80+
81+
82+
def main(
83+
model,
84+
dp_size,
85+
local_dp_rank,
86+
global_dp_rank,
87+
dp_master_ip,
88+
dp_master_port,
89+
GPUs_per_dp_rank,
90+
enforce_eager,
91+
trust_remote_code,
92+
):
93+
os.environ["VLLM_DP_RANK"] = str(global_dp_rank)
94+
os.environ["VLLM_DP_RANK_LOCAL"] = str(local_dp_rank)
95+
os.environ["VLLM_DP_SIZE"] = str(dp_size)
96+
os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip
97+
os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port)
98+
99+
# CUDA_VISIBLE_DEVICES for each DP rank is set automatically inside the
100+
# engine processes.
101+
102+
# Sample prompts.
103+
prompts = [
104+
"Hello, my name is",
105+
"The president of the United States is",
106+
"The capital of France is",
107+
"The future of AI is",
108+
] * 100
109+
110+
# with DP, each rank should process different prompts.
111+
# usually all the DP ranks process a full dataset,
112+
# and each rank processes a different part of the dataset.
113+
floor = len(prompts) // dp_size
114+
remainder = len(prompts) % dp_size
115+
116+
# Distribute prompts into even groups.
117+
def start(rank):
118+
return rank * floor + min(rank, remainder)
119+
120+
prompts = prompts[start(global_dp_rank):start(global_dp_rank + 1)]
121+
if len(prompts) == 0:
122+
# if any rank has no prompts to process,
123+
# we need to set a placeholder prompt
124+
prompts = ["Placeholder"]
125+
print(f"DP rank {global_dp_rank} needs to process {len(prompts)} prompts")
126+
127+
# Create a sampling params object.
128+
# since we are doing data parallel, every rank can have different
129+
# sampling params. here we set different max_tokens for different
130+
# ranks for demonstration.
131+
sampling_params = SamplingParams(
132+
temperature=0.0,
133+
max_tokens=32,
134+
)
135+
136+
# Create an LLM.
137+
llm = LLM(
138+
model=model,
139+
tensor_parallel_size=GPUs_per_dp_rank,
140+
enforce_eager=enforce_eager,
141+
trust_remote_code=trust_remote_code,
142+
distributed_executor_backend="mp",
143+
max_model_len=2048,
144+
max_num_batched_tokens=2048,
145+
max_num_seqs=16,
146+
enable_prefix_caching=False,
147+
enable_expert_parallel=True,
148+
gpu_memory_utilization=0.9,
149+
additional_config={
150+
"ascend_scheduler_config": {
151+
"enabled": True
152+
},
153+
"torchair_graph_config": {
154+
"enabled": False,
155+
"enable_multistream_shared_expert": False
156+
},
157+
},
158+
)
159+
160+
outputs = llm.generate(prompts, sampling_params)
161+
# Print the outputs.
162+
for i, output in enumerate(outputs):
163+
if i >= 5:
164+
# print only 5 outputs
165+
break
166+
prompt = output.prompt
167+
generated_text = output.outputs[0].text
168+
print(f"DP rank {global_dp_rank}, Prompt: {prompt!r}, "
169+
f"Generated text: {generated_text!r}")
170+
171+
# Give engines time to pause their processing loops before exiting.
172+
sleep(1)
173+
174+
175+
if __name__ == "__main__":
176+
args = parse_args()
177+
178+
dp_size = args.dp_size
179+
tp_size = args.tp_size
180+
node_size = args.node_size
181+
node_rank = args.node_rank
182+
183+
if node_size == 1:
184+
dp_master_ip = "127.0.0.1"
185+
dp_master_port = get_open_port()
186+
else:
187+
dp_master_ip = args.master_addr
188+
dp_master_port = args.master_port
189+
190+
assert dp_size % node_size == 0, "dp_size should be divisible by node_size"
191+
dp_per_node = dp_size // node_size
192+
193+
from multiprocessing import Process
194+
195+
procs = []
196+
for local_dp_rank, global_dp_rank in enumerate(
197+
range(node_rank * dp_per_node, (node_rank + 1) * dp_per_node)):
198+
proc = Process(
199+
target=main,
200+
args=(
201+
args.model,
202+
dp_size,
203+
local_dp_rank,
204+
global_dp_rank,
205+
dp_master_ip,
206+
dp_master_port,
207+
tp_size,
208+
args.enforce_eager,
209+
args.trust_remote_code,
210+
),
211+
)
212+
proc.start()
213+
procs.append(proc)
214+
exit_code = 0
215+
for proc in procs:
216+
proc.join(timeout=3000)
217+
if proc.exitcode is None:
218+
print(
219+
f"Killing process {proc.pid} that didn't stop within 5 minutes."
220+
)
221+
proc.kill()
222+
exit_code = 1
223+
elif proc.exitcode:
224+
exit_code = proc.exitcode
225+
226+
exit(exit_code)

examples/offline_dualbatch_overlap_npu.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ def main():
2121
tensor_parallel_size=2,
2222
max_model_len=4096,
2323
trust_remote_code=True,
24+
enable_expert_parallel=True,
2425
additional_config={
2526
"torchair_graph_config": {
2627
"enabled": False
2728
},
2829
"ascend_scheduler_config": {
2930
"enabled": True
3031
},
31-
"expert_tensor_parallel_size": 1
3232
})
3333

3434
# Generate texts from the prompts. The output is a list of RequestOutput

examples/run_dp_offline.sh

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
rm -rf ./.torchair_cache/
2+
rm -rf ./dynamo_*
3+
rm -rf /root/ascend/log/debug/plog/*
4+
5+
ifname="ifname"
6+
local_ip="local ip"
7+
master_addr="master ip"
8+
model_path="path to model ckpt"
9+
10+
export HCCL_IF_IP=${local_ip}
11+
export GLOO_SOCKET_IFNAME=${ifname}
12+
export TP_SOCKET_IFNAME=${ifname}
13+
export HCCL_SOCKET_IFNAME=${ifname}
14+
15+
export VLLM_USE_V1=1
16+
export ASCEND_LAUNCH_BLOCKING=0
17+
# export VLLM_VERSION=0.9.0
18+
19+
python data_parallel.py \
20+
--model=${model_path} \
21+
--dp-size=4 \
22+
--tp-size=4 \
23+
--enforce-eager \
24+
--trust-remote-code \
25+
--node-size=1 \
26+
--node-rank=0 \
27+
--master-addr=${master_addr} \
28+
--master-port=13345

examples/run_dp_server.sh

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
rm -rf ./.torchair_cache/
2+
rm -rf ./dynamo_*
3+
rm -rf /root/ascend/log/debug/plog/*
4+
15
export HCCL_IF_IP=2.0.0.0
26
export GLOO_SOCKET_IFNAME="enp189s0f0"
37
export TP_SOCKET_IFNAME="enp189s0f0"
@@ -6,25 +10,24 @@ export HCCL_SOCKET_IFNAME="enp189s0f0"
610
export OMP_PROC_BIND=false
711
export OMP_NUM_THREADS=100
812

9-
export VLLM_USE_V1=0
10-
11-
export ASCEND_RT_VISIBLE_DEVICES=0,1
12-
export VLLM_DP_SIZE=2
13-
export VLLM_DP_RANK=0
14-
export VLLM_DP_MASTER_IP="2.0.0.0"
15-
export VLLM_DP_MASTER_PORT=40001
16-
export VLLM_DP_PROXY_IP="2.0.0.0"
17-
export VLLM_DP_PROXY_PORT=30002
18-
export VLLM_DP_MONITOR_PORT=30003
19-
export VLLM_HTTP_PORT=20001
13+
export VLLM_USE_V1=1
14+
export ASCEND_LAUNCH_BLOCKING=0
2015

2116
vllm serve /data/weights/Qwen2.5-0.5B-Instruct \
2217
--host 0.0.0.0 \
23-
--port 20001 \
24-
--tensor-parallel-size 1 \
25-
--seed 1024 \
18+
--port 20002 \
2619
--served-model-name Qwen \
27-
--max-model-len 2000 \
28-
--max-num-batched-tokens 2000 \
20+
--data-parallel-size 4 \
21+
--data-parallel-size-local 4 \
22+
--data-parallel-address 2.0.0.0 \
23+
--data-parallel-rpc-port 13389 \
24+
--tensor-parallel-size 4 \
25+
--enable-expert-parallel \
26+
--no-enable-prefix-caching \
27+
--max-num-seqs 16 \
28+
--max-model-len 4096 \
29+
--max-num-batched-tokens 4096 \
30+
--gpu-memory-utilization 0.9 \
2931
--trust-remote-code \
30-
--gpu-memory-utilization 0.9 \
32+
--enforce-eager \
33+
--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":false, "enable_multistream_moe":false, "use_cached_graph":false}}'

tests/ut/models/test_deepseek_v2.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,15 @@ def mock_distributed():
114114
return_value=Mock(is_first_rank=False, is_last_rank=False)), \
115115
patch("vllm_ascend.ops.fused_moe.get_current_vllm_config", return_value=mock_vllm_config), \
116116
patch.dict("vllm.distributed.parallel_state.__dict__", _TP=tp_group, _EP=ep_group, _DP=dp_group,
117-
_PP=pp_group):
117+
_PP=pp_group), \
118+
patch.dict("vllm_ascend.distributed.parallel_state.__dict__", _MC2=ep_group):
119+
yield
120+
121+
122+
@pytest.fixture
123+
def mock_forward_context():
124+
forward_context = Mock(in_profile_run=False, with_prefill=False)
125+
with patch("vllm_ascend.models.deepseek_v2.get_forward_context", return_value=forward_context):
118126
yield
119127

120128

@@ -205,7 +213,7 @@ def test_custom_deepseek_v2_mlp(mock_distributed, base_config):
205213
quant_config=None)
206214

207215

208-
def test_custom_deepseek_v2_moe(mock_distributed, base_config):
216+
def test_custom_deepseek_v2_moe(mock_distributed, base_config, mock_forward_context):
209217
base_config.n_shared_experts = 1
210218
moe = CustomDeepseekV2MoE(config=base_config,
211219
quant_config=None,

0 commit comments

Comments
 (0)