Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions vllm_ascend/attention/mla_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class AscendMLAMetadata:
# For logging.
num_input_tokens: int = 0 # Number of tokens including padding.

max_num_tokens_across_dp: int = 0
with_prefill_across_dp: bool = False

query_lens: Optional[list[int]] = None
Expand Down Expand Up @@ -364,6 +365,7 @@ def build(
common_attn_metadata: CommonAttentionMetadata,
common_prefix_len: Optional[int] = None,
graph_pad_size: int = -1,
max_num_tokens_across_dp: int = 0,
with_prefill_across_dp: bool = False,
) -> AscendMLAMetadata:
assert self._num_decodes + self._num_prefills == num_reqs
Expand Down Expand Up @@ -509,6 +511,7 @@ def build(
query_start_loc=query_start_loc,
block_tables=block_table,
seq_lens=seq_lens,
max_num_tokens_across_dp=max_num_tokens_across_dp,
with_prefill_across_dp=with_prefill_across_dp,
)

Expand Down
8 changes: 0 additions & 8 deletions vllm_ascend/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,10 @@
# value is None, which means the system default C compiler will be used.
"C_COMPILER":
lambda: os.getenv("C_COMPILER", None),
# Whether to enable MC2 for DeepSeek. If not set, the default value is False.
# MC2 is a fusion operator provided by Ascend to speed up computing and communication.
# Find more detail here: https://www.hiascend.com/document/detail/zh/canncommercial/81RC1/developmentguide/opdevg/ascendcbestP/atlas_ascendc_best_practices_10_0043.html
"VLLM_ENABLE_MC2":
lambda: bool(int(os.getenv("VLLM_ENABLE_MC2", '0'))),
# Whether to enable the topk optimization. It's disabled by default for experimental support
# We'll make it enabled by default in the future.
"VLLM_ASCEND_ENABLE_TOPK_OPTIMIZE":
lambda: bool(int(os.getenv("VLLM_ASCEND_ENABLE_TOPK_OPTIMIZE", '0'))),
# Whether to use LCCL communication. If not set, the default value is False.
"USING_LCCL_COM":
lambda: bool(int(os.getenv("USING_LCCL_COM", '0'))),
# The version of the Ascend chip. If not set, the default value is
# ASCEND910B1. It's used for package building. Please make sure that the
# version is correct.
Expand Down
45 changes: 3 additions & 42 deletions vllm_ascend/models/deepseek_dbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
from vllm.model_executor.layers.vocab_parallel_embedding import (
ParallelLMHead, VocabParallelEmbedding)
from vllm.model_executor.models.deepseek_v2 import \
DeepseekV2ForCausalLM # ruff: noqa: E501
DeepseekV2ForCausalLM # noqa: E501
from vllm.model_executor.models.deepseek_v2 import \
yarn_get_mscale # ruff: noqa: E501
yarn_get_mscale # noqa: E501
from vllm.model_executor.models.deepseek_v2 import (DeepseekV2Attention,
DeepseekV2DecoderLayer,
DeepseekV2MLAAttention)
Expand All @@ -79,7 +79,6 @@
from vllm_ascend.utils import dispose_tensor

VLLM_ASCEND_ENABLE_DBO: bool = envs_ascend.VLLM_ASCEND_ENABLE_DBO
VLLM_ENABLE_MC2: bool = envs_ascend.VLLM_ENABLE_MC2


class CustomDeepseekDBOMLP(CustomDeepseekV2MLP):
Expand Down Expand Up @@ -189,26 +188,8 @@ def forward(
if hasattr(attn_metadata, 'with_prefill_across_dp'):
is_prefill = is_prefill or attn_metadata.with_prefill_across_dp

num_tokens, hidden_size = hidden_states.shape

old_hidden_states = hidden_states.clone()

if self.tp_size > 1:
if envs_ascend.VLLM_ENABLE_MC2 and not is_prefill:
chunks = torch.chunk(hidden_states, self.tp_size, dim=0)
hidden_states = chunks[self.tp_rank]
elif not self.torchair_graph_enabled:
num_padding_tokens = (self.tp_size -
num_tokens % self.tp_size) % self.tp_size
# Pad hidden_states to make it divisible by tp_size to avoid cross-ring AllGatherV on 910B2C
if num_padding_tokens > 0:
hidden_states = nn.functional.pad(
hidden_states, (0, 0, 0, num_padding_tokens))
chunk_hidden_states = torch.tensor_split(hidden_states,
self.tp_size,
dim=0)
hidden_states = chunk_hidden_states[self.tp_rank]

# router_logits: (num_tokens, n_experts)
router_logits, _ = self.gate(hidden_states)

Expand All @@ -220,33 +201,13 @@ def forward(
enable_force_load_balance=enable_force_load_balance,
) * self.routed_scaling_factor

if self.tp_size > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there are only delete in dbo, have you verified the functionality of this?

if self.torchair_graph_enabled:
if envs_ascend.VLLM_ENABLE_MC2 and not is_prefill:
final_hidden_states = torch.zeros(
[num_tokens, hidden_size],
dtype=self.params_dtype,
device="npu")
dist.all_gather_into_tensor(final_hidden_states,
hidden_states, self.tp_group)
hidden_states = final_hidden_states
else:
hidden_states = tensor_model_parallel_all_reduce(
hidden_states)
else:
dist.all_gather(list(chunk_hidden_states), hidden_states,
self.tp_group)
hidden_states = torch.cat(chunk_hidden_states, dim=0)
if num_padding_tokens > 0:
hidden_states = hidden_states[:-num_padding_tokens]

if self.n_shared_experts is not None:
shared_output = self.shared_experts(old_hidden_states)

if shared_output is not None:
hidden_states = hidden_states + shared_output

return hidden_states.view(num_tokens, hidden_size)
return hidden_states

# ----------------------------------------- TBO-related --------------------------------------------
def _forward_ms_op_shared_expert(
Expand Down
59 changes: 9 additions & 50 deletions vllm_ascend/models/deepseek_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import torch
import torch.distributed as dist
import torch_npu
import vllm.envs as envs
from torch import nn
Expand All @@ -37,7 +36,7 @@
from vllm.config import CacheConfig, ModelConfig, VllmConfig
from vllm.distributed import (get_pp_group,
get_tensor_model_parallel_world_size,
get_tp_group, tensor_model_parallel_all_reduce)
get_tp_group)
from vllm.distributed.parallel_state import get_dp_group
from vllm.forward_context import get_forward_context
from vllm.model_executor.layers.activation import SiluAndMul
Expand All @@ -54,9 +53,9 @@
from vllm.model_executor.layers.vocab_parallel_embedding import (
ParallelLMHead, VocabParallelEmbedding)
from vllm.model_executor.models.deepseek_v2 import \
DeepseekV2ForCausalLM # ruff: noqa: E501
DeepseekV2ForCausalLM # noqa: E501
from vllm.model_executor.models.deepseek_v2 import \
yarn_get_mscale # ruff: noqa: E501
yarn_get_mscale # noqa: E501
from vllm.model_executor.models.deepseek_v2 import (DeepseekV2Attention,
DeepseekV2DecoderLayer,
DeepseekV2MLAAttention)
Expand All @@ -65,7 +64,6 @@
maybe_prefix)
from vllm.sequence import IntermediateTensors

import vllm_ascend.envs as envs_ascend
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.distributed.parallel_state import get_ep_group
from vllm_ascend.ops.fused_moe import AscendFusedMoE
Expand All @@ -74,8 +72,6 @@
from vllm_ascend.utils import (dispose_tensor, npu_stream_switch,
npu_wait_tensor)

VLLM_ENABLE_MC2: bool = envs_ascend.VLLM_ENABLE_MC2


class CustomDeepseekV2SiluAndMul(SiluAndMul):

Expand Down Expand Up @@ -240,9 +236,8 @@ def __init__(

ascend_config = get_ascend_config()
self.torchair_graph_enabled = ascend_config.torchair_graph_config.enabled
# NOTE: multistream only effective when `VLLM_ENABLE_MC2` is on
self.enable_multistream_moe = \
ascend_config.torchair_graph_config.enable_multistream_moe and VLLM_ENABLE_MC2
ascend_config.torchair_graph_config.enable_multistream_moe

self.gate = ReplicatedLinear(config.hidden_size,
config.n_routed_experts,
Expand Down Expand Up @@ -312,22 +307,6 @@ def forward(
enable_force_load_balance = False
if hasattr(attn_metadata, 'with_prefill_across_dp'):
is_prefill = is_prefill or attn_metadata.with_prefill_across_dp
num_tokens, hidden_size = hidden_states.shape
old_hidden_states = hidden_states
use_separated_shared_experts = (self.shared_experts is not None
and not self.enable_multistream_moe)

if self.tp_size > 1:
if (VLLM_ENABLE_MC2
and not is_prefill) or not (self.torchair_graph_enabled or
self.ep_group.world_size == 1):
if num_tokens < self.tp_size:
hidden_states = nn.functional.pad(
hidden_states, (0, 0, 0, self.tp_size - num_tokens))
chunk_hidden_states = torch.tensor_split(hidden_states,
self.tp_size,
dim=0)
hidden_states = chunk_hidden_states[self.tp_rank]

# router_logits: (num_tokens, n_experts)
router_logits, _ = self.gate(hidden_states)
Expand All @@ -338,34 +317,14 @@ def forward(
is_prefill=is_prefill,
top_k=CustomDeepseekV2MoE.top_k,
enable_force_load_balance=enable_force_load_balance,
shared_experts=(self.shared_experts
if not use_separated_shared_experts else None),
shared_experts=self.shared_experts,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments here to illustrate the expected input and output shape of self.expert?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you pass in shared_experts, self.expert will return a tuple: (router_hidden_states, shared_hidden_states), else will return a tensor of router_hidden_states.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add this note in code at next PR.


if not isinstance(experts_hidden_states, tuple):
hidden_states = experts_hidden_states * self.routed_scaling_factor
else:
hidden_states = (
experts_hidden_states[0] * self.routed_scaling_factor +
experts_hidden_states[1])

if self.tp_size > 1:
if (VLLM_ENABLE_MC2
and not is_prefill) or not (self.torchair_graph_enabled or
self.ep_group.world_size == 1):
dist.all_gather(list(chunk_hidden_states), hidden_states,
self.tp_group)
hidden_states = torch.cat(chunk_hidden_states, dim=0)
if num_tokens < self.tp_size:
hidden_states = hidden_states[:num_tokens]
else:
hidden_states = tensor_model_parallel_all_reduce(hidden_states)
hidden_states = (
experts_hidden_states[0] * self.routed_scaling_factor +
experts_hidden_states[1])

if use_separated_shared_experts:
hidden_states = hidden_states + self.shared_experts(
old_hidden_states)

return hidden_states.view(num_tokens, hidden_size)
return hidden_states


class CustomDeepseekV2MLAAttention(DeepseekV2MLAAttention):
Expand Down
Loading
Loading