Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/source/user_guide/additional_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ The following table lists the additional configuration options available in vLLM
|-------------------------------| ---- |------|-----------------------------------------------------------------------------------------------|
| `torchair_graph_config` | dict | `{}` | The config options for torchair graph mode |
| `ascend_scheduler_config` | dict | `{}` | The config options for ascend scheduler |
| `expert_tensor_parallel_size` | str | `0` | Expert tensor parallel size the model to use. |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we maintain etp any more?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the absence of relevant scenarios, employing EP or full TP is sufficient, for now. We may subsequently advocate implementing expert tensor parallelism in vLLM to support scenarios where the number of nodes exceeds the number of experts.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, we do have customer scenarios that require such configurations. While DeepSeek models might not need this, there are use cases involving large-scale MoE (Mixture of Experts) models that require splitting across both Tensor Parallelism (TP) and Expert Parallelism (EP), or sometimes just TP alone. This is exactly the case with the current Jieyue Xingchen models

| `refresh` | bool | `false` | Whether to refresh global ascend config content. This value is usually used by rlhf case. |
| `expert_map_path` | str | None | When using expert load balancing for the MOE model, an expert map path needs to be passed in. |
| `chunked_prefill_for_mla` | bool | `False` | Whether to enable the fused operator-like chunked_prefill. |
Expand Down
4 changes: 4 additions & 0 deletions examples/disaggregate_prefill_v1/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ vllm serve /data01/deepseek_r1_w8a8_zhw \
--data-parallel-address 172.19.32.175 \
--data-parallel-rpc-port 13356 \
--tensor-parallel-size 8 \
--enable-expert-parallel \
--no-enable-prefix-caching \
--seed 1024 \
--served-model-name deepseek \
Expand Down Expand Up @@ -88,6 +89,7 @@ vllm serve /data01/deepseek_r1_w8a8_zhw \
--data-parallel-address 172.19.32.175 \
--data-parallel-rpc-port 13356 \
--tensor-parallel-size 8 \
--enable-expert-parallel \
--no-enable-prefix-caching \
--seed 1024 \
--served-model-name deepseek \
Expand Down Expand Up @@ -128,6 +130,7 @@ vllm serve /data01/deepseek_r1_w8a8_zhw \
--data-parallel-address 172.19.123.51 \
--data-parallel-rpc-port 13356 \
--tensor-parallel-size 8 \
--enable-expert-parallel \
--no-enable-prefix-caching \
--seed 1024 \
--served-model-name deepseek \
Expand Down Expand Up @@ -169,6 +172,7 @@ vllm serve /data01/deepseek_r1_w8a8_zhw \
--data-parallel-address 172.19.123.51 \
--data-parallel-rpc-port 13356 \
--tensor-parallel-size 8 \
--enable-expert-parallel \
--no-enable-prefix-caching \
--seed 1024 \
--served-model-name deepseek \
Expand Down
255 changes: 198 additions & 57 deletions examples/dp_offline/data_parallel.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,226 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# This file is a part of the vllm-ascend project.
# Adapted from vllm-project/vllm/examples/offline_inference/data_parallel.py
# SPDX-License-Identifier: Apache-2.0
# usage:
# python examples/offline_inference_data_parallel.py
# we need to have a launcher to create multiple data parallel
# ranks. And each rank will create a vLLM instance to process its own prompts.
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Usage:
Single node:
python examples/offline_inference/data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2

Multi-node:
Node 0 (assume the node has ip of 10.99.48.128):
python examples/offline_inference/data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2 \
--node-size=2 \
--node-rank=0 \
--master-addr=10.99.48.128 \
--master-port=13345
Node 1:
python examples/offline_inference/data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2 \
--node-size=2 \
--node-rank=1 \
--master-addr=10.99.48.128 \
--master-port=13345
"""

import gc
import os
from time import sleep

from vllm import LLM, SamplingParams
from vllm.utils import get_open_port


def parse_args():
import argparse

parser = argparse.ArgumentParser(description="Data Parallel Inference")
parser.add_argument(
"--model",
type=str,
default="ibm-research/PowerMoE-3b",
help="Model name or path",
)
parser.add_argument("--dp-size",
type=int,
default=2,
help="Data parallel size")
parser.add_argument("--tp-size",
type=int,
default=2,
help="Tensor parallel size")
parser.add_argument("--node-size",
type=int,
default=1,
help="Total number of nodes")
parser.add_argument("--node-rank",
type=int,
default=0,
help="Rank of the current node")
parser.add_argument("--master-addr",
type=str,
default="",
help="Master node IP address")
parser.add_argument("--master-port",
type=int,
default=0,
help="Master node port")
parser.add_argument("--enforce-eager",
action="store_true",
help="Enforce eager mode execution.")
parser.add_argument("--trust-remote-code",
action="store_true",
help="Trust remote code.")
return parser.parse_args()

def main():
dp_rank = int(os.environ['RANK'])
local_rank = int(os.environ['LOCAL_RANK'])
dp_size = int(os.environ['WORLD_SIZE'])
master_addr = os.environ['MASTER_ADDR']
master_port = os.environ['MASTER_PORT']
tp_size = 1
etp_size = 1

os.environ["VLLM_DP_RANK"] = str(dp_rank)
def main(
model,
dp_size,
local_dp_rank,
global_dp_rank,
dp_master_ip,
dp_master_port,
GPUs_per_dp_rank,
enforce_eager,
trust_remote_code,
):
os.environ["VLLM_DP_RANK"] = str(global_dp_rank)
os.environ["VLLM_DP_RANK_LOCAL"] = str(local_dp_rank)
os.environ["VLLM_DP_SIZE"] = str(dp_size)
os.environ["VLLM_DP_MASTER_IP"] = master_addr
os.environ["VLLM_DP_MASTER_PORT"] = master_port
os.environ["ASCEND_RT_VISIBLE_DEVICES"] = ",".join(
str(i)
for i in range(local_rank * tp_size, (local_rank + 1) * tp_size))
os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip
os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port)

import torch
from vllm import LLM, SamplingParams
from vllm.distributed.parallel_state import (
destroy_distributed_environment, destroy_model_parallel)
# CUDA_VISIBLE_DEVICES for each DP rank is set automatically inside the
# engine processes.

# Sample prompts.
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
] * 4
] * 100

promts_per_rank = len(prompts) // dp_size
start = dp_rank * promts_per_rank
end = start + promts_per_rank
prompts = prompts[start:end]
# with DP, each rank should process different prompts.
# usually all the DP ranks process a full dataset,
# and each rank processes a different part of the dataset.
floor = len(prompts) // dp_size
remainder = len(prompts) % dp_size

# Distribute prompts into even groups.
def start(rank):
return rank * floor + min(rank, remainder)

prompts = prompts[start(global_dp_rank):start(global_dp_rank + 1)]
if len(prompts) == 0:
# if any rank has no prompts to process,
# we need to set a placeholder prompt
prompts = ["Placeholder"]
print(f"DP rank {dp_rank} needs to process {len(prompts)} prompts")
num_seqs = len(prompts)
print(f"DP rank {global_dp_rank} needs to process {len(prompts)} prompts")

# Create a sampling params object.
# since we are doing data parallel, every rank can have different
# sampling params. here we set different max_tokens for different
# ranks for demonstration.
sampling_params = SamplingParams(
temperature=0.0,
max_tokens=32,
)

sampling_params = SamplingParams(temperature=0.8,
top_p=0.95,
max_tokens=4,
min_tokens=4)
# Create an LLM.
llm = LLM(model="deepseek-ai/DeepSeek-V2-Lite-Chat",
tensor_parallel_size=tp_size,
trust_remote_code=True,
max_model_len=4096,
max_num_seqs=num_seqs,
additional_config={
'expert_tensor_parallel_size': etp_size,
'torchair_graph_config': {
'enabled': False,
},
})
llm = LLM(
model=model,
tensor_parallel_size=GPUs_per_dp_rank,
enforce_eager=enforce_eager,
trust_remote_code=trust_remote_code,
distributed_executor_backend="mp",
max_model_len=2048,
max_num_batched_tokens=2048,
max_num_seqs=16,
enable_prefix_caching=False,
enable_expert_parallel=True,
gpu_memory_utilization=0.9,
additional_config={
"ascend_scheduler_config": {
"enabled": True
},
"torchair_graph_config": {
"enabled": False,
"enable_multistream_shared_expert": False
},
},
)

outputs = llm.generate(prompts, sampling_params)
for output in outputs:
# Print the outputs.
for i, output in enumerate(outputs):
if i >= 5:
# print only 5 outputs
break
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"DP rank {dp_rank}, Prompt: {prompt!r}, "
print(f"DP rank {global_dp_rank}, Prompt: {prompt!r}, "
f"Generated text: {generated_text!r}")

del llm
destroy_model_parallel()
destroy_distributed_environment()
gc.collect()
torch.npu.empty_cache()
# Give engines time to pause their processing loops before exiting.
sleep(1)


if __name__ == "__main__":
main()
args = parse_args()

dp_size = args.dp_size
tp_size = args.tp_size
node_size = args.node_size
node_rank = args.node_rank

if node_size == 1:
dp_master_ip = "127.0.0.1"
dp_master_port = get_open_port()
else:
dp_master_ip = args.master_addr
dp_master_port = args.master_port

assert dp_size % node_size == 0, "dp_size should be divisible by node_size"
dp_per_node = dp_size // node_size

from multiprocessing import Process

procs = []
for local_dp_rank, global_dp_rank in enumerate(
range(node_rank * dp_per_node, (node_rank + 1) * dp_per_node)):
proc = Process(
target=main,
args=(
args.model,
dp_size,
local_dp_rank,
global_dp_rank,
dp_master_ip,
dp_master_port,
tp_size,
args.enforce_eager,
args.trust_remote_code,
),
)
proc.start()
procs.append(proc)
exit_code = 0
for proc in procs:
proc.join(timeout=3000)
if proc.exitcode is None:
print(
f"Killing process {proc.pid} that didn't stop within 5 minutes."
)
proc.kill()
exit_code = 1
elif proc.exitcode:
exit_code = proc.exitcode

exit(exit_code)
35 changes: 22 additions & 13 deletions examples/dp_offline/run_dp.sh
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
rm -rf ./.torchair_cache/
rm -rf ./dynamo_*
rm -rf /root/ascend/log/debug/plog/*

ifname="ifname"
local_ip="local ip"
master_addr="master ip"
model_path="path to model ckpt"

export HCCL_IF_IP=${local_ip}
export GLOO_SOCKET_IFNAME=${ifname}
export TP_SOCKET_IFNAME=${ifname}
export HCCL_SOCKET_IFNAME=${ifname}

# dp_size = node_size * dp_per_node
node_size=1
node_rank=0
dp_per_node=4
master_addr=127.0.0.1
master_port=12345

rm -rf ./.torchair_cache/
rm -rf ./dynamo_*
rm -rf /root/ascend/log/debug/plog/*
export VLLM_USE_V1=1
export ASCEND_LAUNCH_BLOCKING=0
# export VLLM_VERSION=0.9.0

torchrun --nproc_per_node ${dp_per_node} --nnodes ${node_size} \
--node_rank ${node_rank} --master_addr ${master_addr} --master_port ${master_port} \
data_parallel.py
python data_parallel.py \
--model=${model_path} \
--dp-size=4 \
--tp-size=4 \
--enforce-eager \
--trust-remote-code \
--node-size=1 \
--node-rank=0 \
--master-addr=${master_addr} \
--master-port=13345
2 changes: 1 addition & 1 deletion examples/offline_dualbatch_overlap_npu.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ def main():
tensor_parallel_size=2,
max_model_len=4096,
trust_remote_code=True,
enable_expert_parallel=True,
additional_config={
"torchair_graph_config": {
"enabled": False
},
"ascend_scheduler_config": {
"enabled": True
},
"expert_tensor_parallel_size": 1
})

# Generate texts from the prompts. The output is a list of RequestOutput
Expand Down
Loading