Skip to content

Commit cd65d15

Browse files
authored
[refactor] Refactoring forward_context and model_runner_v1 (#1422)
### What this PR does / why we need it? A refactoring of `forward_context` and `model_runner_v1`, add some context which is necessary in model inference into `forward_context`, and refactor `dummy_run` logic, make it more reasonable. Some details for this PR: 1. Fix acc bug when online + multi-DP + eager mode + all_gather mode; 2. Fix bug when online + multi-DP + eager mode + mc2 mode; 3. Fix bug when A2 + eager mode + mc2 mode; 4. enable different token_num on different chip when mc2 mode; 5. Update scripts in `examples` dir; ### Does this PR introduce _any_ user-facing change? This PR remove `expert_tensor_parallel_size` in `additional_config`, we will use `enable_expert_parallel` to control whether expert_parallel is enable, which is consistent with vLLM. ### How was this patch tested? --------- Signed-off-by: zzzzwwjj <1183291235@qq.com>
1 parent ce4bdab commit cd65d15

27 files changed

+570
-434
lines changed

docs/source/user_guide/additional_config.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ The following table lists the additional configuration options available in vLLM
2828
|-------------------------------| ---- |------|-----------------------------------------------------------------------------------------------|
2929
| `torchair_graph_config` | dict | `{}` | The config options for torchair graph mode |
3030
| `ascend_scheduler_config` | dict | `{}` | The config options for ascend scheduler |
31-
| `expert_tensor_parallel_size` | str | `0` | Expert tensor parallel size the model to use. |
3231
| `refresh` | bool | `false` | Whether to refresh global ascend config content. This value is usually used by rlhf case. |
3332
| `expert_map_path` | str | None | When using expert load balancing for the MOE model, an expert map path needs to be passed in. |
3433
| `chunked_prefill_for_mla` | bool | `False` | Whether to enable the fused operator-like chunked_prefill. |

examples/disaggregate_prefill_v1/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ vllm serve /data01/deepseek_r1_w8a8_zhw \
4747
--data-parallel-address 172.19.32.175 \
4848
--data-parallel-rpc-port 13356 \
4949
--tensor-parallel-size 8 \
50+
--enable-expert-parallel \
5051
--no-enable-prefix-caching \
5152
--seed 1024 \
5253
--served-model-name deepseek \
@@ -88,6 +89,7 @@ vllm serve /data01/deepseek_r1_w8a8_zhw \
8889
--data-parallel-address 172.19.32.175 \
8990
--data-parallel-rpc-port 13356 \
9091
--tensor-parallel-size 8 \
92+
--enable-expert-parallel \
9193
--no-enable-prefix-caching \
9294
--seed 1024 \
9395
--served-model-name deepseek \
@@ -128,6 +130,7 @@ vllm serve /data01/deepseek_r1_w8a8_zhw \
128130
--data-parallel-address 172.19.123.51 \
129131
--data-parallel-rpc-port 13356 \
130132
--tensor-parallel-size 8 \
133+
--enable-expert-parallel \
131134
--no-enable-prefix-caching \
132135
--seed 1024 \
133136
--served-model-name deepseek \
@@ -169,6 +172,7 @@ vllm serve /data01/deepseek_r1_w8a8_zhw \
169172
--data-parallel-address 172.19.123.51 \
170173
--data-parallel-rpc-port 13356 \
171174
--tensor-parallel-size 8 \
175+
--enable-expert-parallel \
172176
--no-enable-prefix-caching \
173177
--seed 1024 \
174178
--served-model-name deepseek \
Lines changed: 198 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,226 @@
1-
#
2-
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
3-
# This file is a part of the vllm-ascend project.
4-
# Adapted from vllm-project/vllm/examples/offline_inference/data_parallel.py
51
# SPDX-License-Identifier: Apache-2.0
6-
# usage:
7-
# python examples/offline_inference_data_parallel.py
8-
# we need to have a launcher to create multiple data parallel
9-
# ranks. And each rank will create a vLLM instance to process its own prompts.
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
"""
4+
Usage:
5+
Single node:
6+
python examples/offline_inference/data_parallel.py \
7+
--model="ibm-research/PowerMoE-3b" \
8+
--dp-size=2 \
9+
--tp-size=2
10+
11+
Multi-node:
12+
Node 0 (assume the node has ip of 10.99.48.128):
13+
python examples/offline_inference/data_parallel.py \
14+
--model="ibm-research/PowerMoE-3b" \
15+
--dp-size=2 \
16+
--tp-size=2 \
17+
--node-size=2 \
18+
--node-rank=0 \
19+
--master-addr=10.99.48.128 \
20+
--master-port=13345
21+
Node 1:
22+
python examples/offline_inference/data_parallel.py \
23+
--model="ibm-research/PowerMoE-3b" \
24+
--dp-size=2 \
25+
--tp-size=2 \
26+
--node-size=2 \
27+
--node-rank=1 \
28+
--master-addr=10.99.48.128 \
29+
--master-port=13345
30+
"""
1031

11-
import gc
1232
import os
33+
from time import sleep
34+
35+
from vllm import LLM, SamplingParams
36+
from vllm.utils import get_open_port
37+
38+
39+
def parse_args():
40+
import argparse
1341

42+
parser = argparse.ArgumentParser(description="Data Parallel Inference")
43+
parser.add_argument(
44+
"--model",
45+
type=str,
46+
default="ibm-research/PowerMoE-3b",
47+
help="Model name or path",
48+
)
49+
parser.add_argument("--dp-size",
50+
type=int,
51+
default=2,
52+
help="Data parallel size")
53+
parser.add_argument("--tp-size",
54+
type=int,
55+
default=2,
56+
help="Tensor parallel size")
57+
parser.add_argument("--node-size",
58+
type=int,
59+
default=1,
60+
help="Total number of nodes")
61+
parser.add_argument("--node-rank",
62+
type=int,
63+
default=0,
64+
help="Rank of the current node")
65+
parser.add_argument("--master-addr",
66+
type=str,
67+
default="",
68+
help="Master node IP address")
69+
parser.add_argument("--master-port",
70+
type=int,
71+
default=0,
72+
help="Master node port")
73+
parser.add_argument("--enforce-eager",
74+
action="store_true",
75+
help="Enforce eager mode execution.")
76+
parser.add_argument("--trust-remote-code",
77+
action="store_true",
78+
help="Trust remote code.")
79+
return parser.parse_args()
1480

15-
def main():
16-
dp_rank = int(os.environ['RANK'])
17-
local_rank = int(os.environ['LOCAL_RANK'])
18-
dp_size = int(os.environ['WORLD_SIZE'])
19-
master_addr = os.environ['MASTER_ADDR']
20-
master_port = os.environ['MASTER_PORT']
21-
tp_size = 1
22-
etp_size = 1
2381

24-
os.environ["VLLM_DP_RANK"] = str(dp_rank)
82+
def main(
83+
model,
84+
dp_size,
85+
local_dp_rank,
86+
global_dp_rank,
87+
dp_master_ip,
88+
dp_master_port,
89+
GPUs_per_dp_rank,
90+
enforce_eager,
91+
trust_remote_code,
92+
):
93+
os.environ["VLLM_DP_RANK"] = str(global_dp_rank)
94+
os.environ["VLLM_DP_RANK_LOCAL"] = str(local_dp_rank)
2595
os.environ["VLLM_DP_SIZE"] = str(dp_size)
26-
os.environ["VLLM_DP_MASTER_IP"] = master_addr
27-
os.environ["VLLM_DP_MASTER_PORT"] = master_port
28-
os.environ["ASCEND_RT_VISIBLE_DEVICES"] = ",".join(
29-
str(i)
30-
for i in range(local_rank * tp_size, (local_rank + 1) * tp_size))
96+
os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip
97+
os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port)
3198

32-
import torch
33-
from vllm import LLM, SamplingParams
34-
from vllm.distributed.parallel_state import (
35-
destroy_distributed_environment, destroy_model_parallel)
99+
# CUDA_VISIBLE_DEVICES for each DP rank is set automatically inside the
100+
# engine processes.
36101

102+
# Sample prompts.
37103
prompts = [
38104
"Hello, my name is",
39105
"The president of the United States is",
40106
"The capital of France is",
41107
"The future of AI is",
42-
] * 4
108+
] * 100
43109

44-
promts_per_rank = len(prompts) // dp_size
45-
start = dp_rank * promts_per_rank
46-
end = start + promts_per_rank
47-
prompts = prompts[start:end]
110+
# with DP, each rank should process different prompts.
111+
# usually all the DP ranks process a full dataset,
112+
# and each rank processes a different part of the dataset.
113+
floor = len(prompts) // dp_size
114+
remainder = len(prompts) % dp_size
115+
116+
# Distribute prompts into even groups.
117+
def start(rank):
118+
return rank * floor + min(rank, remainder)
119+
120+
prompts = prompts[start(global_dp_rank):start(global_dp_rank + 1)]
48121
if len(prompts) == 0:
122+
# if any rank has no prompts to process,
123+
# we need to set a placeholder prompt
49124
prompts = ["Placeholder"]
50-
print(f"DP rank {dp_rank} needs to process {len(prompts)} prompts")
51-
num_seqs = len(prompts)
125+
print(f"DP rank {global_dp_rank} needs to process {len(prompts)} prompts")
126+
127+
# Create a sampling params object.
128+
# since we are doing data parallel, every rank can have different
129+
# sampling params. here we set different max_tokens for different
130+
# ranks for demonstration.
131+
sampling_params = SamplingParams(
132+
temperature=0.0,
133+
max_tokens=32,
134+
)
52135

53-
sampling_params = SamplingParams(temperature=0.8,
54-
top_p=0.95,
55-
max_tokens=4,
56-
min_tokens=4)
57136
# Create an LLM.
58-
llm = LLM(model="deepseek-ai/DeepSeek-V2-Lite-Chat",
59-
tensor_parallel_size=tp_size,
60-
trust_remote_code=True,
61-
max_model_len=4096,
62-
max_num_seqs=num_seqs,
63-
additional_config={
64-
'expert_tensor_parallel_size': etp_size,
65-
'torchair_graph_config': {
66-
'enabled': False,
67-
},
68-
})
137+
llm = LLM(
138+
model=model,
139+
tensor_parallel_size=GPUs_per_dp_rank,
140+
enforce_eager=enforce_eager,
141+
trust_remote_code=trust_remote_code,
142+
distributed_executor_backend="mp",
143+
max_model_len=2048,
144+
max_num_batched_tokens=2048,
145+
max_num_seqs=16,
146+
enable_prefix_caching=False,
147+
enable_expert_parallel=True,
148+
gpu_memory_utilization=0.9,
149+
additional_config={
150+
"ascend_scheduler_config": {
151+
"enabled": True
152+
},
153+
"torchair_graph_config": {
154+
"enabled": False,
155+
"enable_multistream_shared_expert": False
156+
},
157+
},
158+
)
69159

70160
outputs = llm.generate(prompts, sampling_params)
71-
for output in outputs:
161+
# Print the outputs.
162+
for i, output in enumerate(outputs):
163+
if i >= 5:
164+
# print only 5 outputs
165+
break
72166
prompt = output.prompt
73167
generated_text = output.outputs[0].text
74-
print(f"DP rank {dp_rank}, Prompt: {prompt!r}, "
168+
print(f"DP rank {global_dp_rank}, Prompt: {prompt!r}, "
75169
f"Generated text: {generated_text!r}")
76170

77-
del llm
78-
destroy_model_parallel()
79-
destroy_distributed_environment()
80-
gc.collect()
81-
torch.npu.empty_cache()
171+
# Give engines time to pause their processing loops before exiting.
172+
sleep(1)
82173

83174

84175
if __name__ == "__main__":
85-
main()
176+
args = parse_args()
177+
178+
dp_size = args.dp_size
179+
tp_size = args.tp_size
180+
node_size = args.node_size
181+
node_rank = args.node_rank
182+
183+
if node_size == 1:
184+
dp_master_ip = "127.0.0.1"
185+
dp_master_port = get_open_port()
186+
else:
187+
dp_master_ip = args.master_addr
188+
dp_master_port = args.master_port
189+
190+
assert dp_size % node_size == 0, "dp_size should be divisible by node_size"
191+
dp_per_node = dp_size // node_size
192+
193+
from multiprocessing import Process
194+
195+
procs = []
196+
for local_dp_rank, global_dp_rank in enumerate(
197+
range(node_rank * dp_per_node, (node_rank + 1) * dp_per_node)):
198+
proc = Process(
199+
target=main,
200+
args=(
201+
args.model,
202+
dp_size,
203+
local_dp_rank,
204+
global_dp_rank,
205+
dp_master_ip,
206+
dp_master_port,
207+
tp_size,
208+
args.enforce_eager,
209+
args.trust_remote_code,
210+
),
211+
)
212+
proc.start()
213+
procs.append(proc)
214+
exit_code = 0
215+
for proc in procs:
216+
proc.join(timeout=3000)
217+
if proc.exitcode is None:
218+
print(
219+
f"Killing process {proc.pid} that didn't stop within 5 minutes."
220+
)
221+
proc.kill()
222+
exit_code = 1
223+
elif proc.exitcode:
224+
exit_code = proc.exitcode
225+
226+
exit(exit_code)

examples/dp_offline/run_dp.sh

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
1+
rm -rf ./.torchair_cache/
2+
rm -rf ./dynamo_*
3+
rm -rf /root/ascend/log/debug/plog/*
4+
5+
ifname="ifname"
6+
local_ip="local ip"
7+
master_addr="master ip"
8+
model_path="path to model ckpt"
9+
110
export HCCL_IF_IP=${local_ip}
211
export GLOO_SOCKET_IFNAME=${ifname}
312
export TP_SOCKET_IFNAME=${ifname}
413
export HCCL_SOCKET_IFNAME=${ifname}
514

6-
# dp_size = node_size * dp_per_node
7-
node_size=1
8-
node_rank=0
9-
dp_per_node=4
10-
master_addr=127.0.0.1
11-
master_port=12345
12-
13-
rm -rf ./.torchair_cache/
14-
rm -rf ./dynamo_*
15-
rm -rf /root/ascend/log/debug/plog/*
15+
export VLLM_USE_V1=1
16+
export ASCEND_LAUNCH_BLOCKING=0
17+
# export VLLM_VERSION=0.9.0
1618

17-
torchrun --nproc_per_node ${dp_per_node} --nnodes ${node_size} \
18-
--node_rank ${node_rank} --master_addr ${master_addr} --master_port ${master_port} \
19-
data_parallel.py
19+
python data_parallel.py \
20+
--model=${model_path} \
21+
--dp-size=4 \
22+
--tp-size=4 \
23+
--enforce-eager \
24+
--trust-remote-code \
25+
--node-size=1 \
26+
--node-rank=0 \
27+
--master-addr=${master_addr} \
28+
--master-port=13345

examples/offline_dualbatch_overlap_npu.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ def main():
2020
tensor_parallel_size=2,
2121
max_model_len=4096,
2222
trust_remote_code=True,
23+
enable_expert_parallel=True,
2324
additional_config={
2425
"torchair_graph_config": {
2526
"enabled": False
2627
},
2728
"ascend_scheduler_config": {
2829
"enabled": True
2930
},
30-
"expert_tensor_parallel_size": 1
3131
})
3232

3333
# Generate texts from the prompts. The output is a list of RequestOutput

0 commit comments

Comments
 (0)