From 1335d3f3d42011ca682514b868bfb154ce50766c Mon Sep 17 00:00:00 2001 From: Cody Yu Date: Wed, 17 Jul 2024 11:42:47 -0700 Subject: [PATCH 1/4] Mixtral PP --- vllm/config.py | 1 + vllm/model_executor/models/mixtral.py | 44 +++++++++++++++++++-------- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/vllm/config.py b/vllm/config.py index de7bb3943a45f..a20e830955671 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -34,6 +34,7 @@ "MistralForCausalLM", "Phi3ForCausalLM", "GPT2LMHeadModel", + "MixtralForCausalLM", ] diff --git a/vllm/model_executor/models/mixtral.py b/vllm/model_executor/models/mixtral.py index e739df87cf96a..9a7be342d2a4e 100644 --- a/vllm/model_executor/models/mixtral.py +++ b/vllm/model_executor/models/mixtral.py @@ -29,7 +29,7 @@ from vllm.attention import Attention, AttentionMetadata from vllm.config import CacheConfig, LoRAConfig -from vllm.distributed import get_tensor_model_parallel_world_size +from vllm.distributed import get_pp_group, get_tensor_model_parallel_world_size from vllm.model_executor.layers.fused_moe import FusedMoE from vllm.model_executor.layers.layernorm import RMSNorm from vllm.model_executor.layers.linear import (QKVParallelLinear, @@ -48,6 +48,7 @@ from vllm.sequence import IntermediateTensors, SamplerOutput from .interfaces import SupportsLoRA +from .utils import is_pp_missing_parameter, make_layers class MixtralMoE(nn.Module): @@ -255,12 +256,11 @@ def __init__( config.hidden_size, org_num_embeddings=config.vocab_size, ) - self.layers = nn.ModuleList([ - MixtralDecoderLayer(config, - cache_config, - quant_config=quant_config) - for _ in range(config.num_hidden_layers) - ]) + + self.start_layer, self.end_layer, self.layers = make_layers( + config.num_hidden_layers, lambda: MixtralDecoderLayer( + config, cache_config, quant_config=quant_config)) + self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) def forward( @@ -269,14 +269,25 @@ def forward( positions: torch.Tensor, kv_caches: List[torch.Tensor], attn_metadata: AttentionMetadata, + intermediate_tensors: Optional[IntermediateTensors], ) -> torch.Tensor: - hidden_states = self.embed_tokens(input_ids) - residual = None - for i in range(len(self.layers)): + if get_pp_group().is_first_rank: + hidden_states = self.embed_tokens(input_ids) + residual = None + else: + assert intermediate_tensors is not None + hidden_states = intermediate_tensors["hidden_states"] + residual = intermediate_tensors["residual"] + for i in range(self.start_layer, self.end_layer): layer = self.layers[i] hidden_states, residual = layer(positions, hidden_states, - kv_caches[i], attn_metadata, - residual) + kv_caches[i - self.start_layer], + attn_metadata, residual) + if not get_pp_group().is_last_rank: + return IntermediateTensors({ + "hidden_states": hidden_states, + "residual": residual + }) hidden_states, _ = self.norm(hidden_states, residual) return hidden_states @@ -347,7 +358,7 @@ def forward( intermediate_tensors: Optional[IntermediateTensors] = None, ) -> torch.Tensor: hidden_states = self.model(input_ids, positions, kv_caches, - attn_metadata) + attn_metadata, intermediate_tensors) return hidden_states def compute_logits(self, hidden_states: torch.Tensor, @@ -392,6 +403,10 @@ def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]): # Skip loading extra bias for GPTQ models. if name.endswith(".bias") and name not in params_dict: continue + # Skip layers on other devices. + if is_pp_missing_parameter(name, self): + continue + param = params_dict[name] weight_loader = param.weight_loader weight_loader(param, loaded_weight, shard_id) @@ -414,6 +429,9 @@ def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]): # Skip loading extra bias for GPTQ models. if name.endswith(".bias") and name not in params_dict: continue + # Skip layers on other devices. + if is_pp_missing_parameter(name, self): + continue # Remapping the name of FP8 kv-scale. name = maybe_remap_kv_scale_name(name, params_dict) if name is None: From a4c7f1665490141d1d08e57c4ad26289167719b4 Mon Sep 17 00:00:00 2001 From: Cody Yu Date: Wed, 17 Jul 2024 14:50:25 -0700 Subject: [PATCH 2/4] fix --- vllm/model_executor/models/mixtral.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/vllm/model_executor/models/mixtral.py b/vllm/model_executor/models/mixtral.py index 9a7be342d2a4e..28dbcb30bdf55 100644 --- a/vllm/model_executor/models/mixtral.py +++ b/vllm/model_executor/models/mixtral.py @@ -367,6 +367,20 @@ def compute_logits(self, hidden_states: torch.Tensor, sampling_metadata) return logits + def make_empty_intermediate_tensors( + self, batch_size: int, dtype: torch.dtype, + device: torch.device) -> IntermediateTensors: + return IntermediateTensors({ + "hidden_states": + torch.zeros((batch_size, self.config.hidden_size), + dtype=dtype, + device=device), + "residual": + torch.zeros((batch_size, self.config.hidden_size), + dtype=dtype, + device=device), + }) + def sample( self, logits: Optional[torch.Tensor], @@ -417,6 +431,9 @@ def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]): if weight_name not in name: continue name = name.replace(weight_name, param_name) + # Skip layers on other devices. + if is_pp_missing_parameter(name, self): + continue param = params_dict[name] weight_loader = param.weight_loader weight_loader(param, From edd20189f242a30c0e5aefe9d8e3bbc36a98ea56 Mon Sep 17 00:00:00 2001 From: Cody Yu Date: Wed, 17 Jul 2024 17:28:59 -0700 Subject: [PATCH 3/4] test --- tests/distributed/test_pipeline_parallel.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/tests/distributed/test_pipeline_parallel.py b/tests/distributed/test_pipeline_parallel.py index 123a77e14ad74..84e3f609fb2a5 100644 --- a/tests/distributed/test_pipeline_parallel.py +++ b/tests/distributed/test_pipeline_parallel.py @@ -1,4 +1,5 @@ import pytest +from transformers import AutoTokenizer from ..utils import RemoteOpenAIServer @@ -12,6 +13,8 @@ (1, 4, 1, 0, "meta-llama/Meta-Llama-3-8B"), ]) def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME): + tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) + pp_args = [ # use half precision for speed and memory savings in CI environment "--dtype", @@ -34,7 +37,9 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME): "--dtype", "bfloat16", "--tensor-parallel-size", - str(max(TP_SIZE, 2)), # use at least TP_SIZE=2 to hold the model + # Use the same number or at most 8 GPUs to hold the model. + # In this test we assume the model can fit in 8 GPUs. + str(min(TP_SIZE * PP_SIZE, 8)), "--distributed-executor-backend", "mp", ] @@ -45,8 +50,10 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME): pp_args.append("--enforce-eager") tp_args.append("--enforce-eager") + prompt = "Hello, my name is" + token_ids = tokenizer(prompt)["input_ids"] results = [] - for args in [pp_args, tp_args]: + for args in (pp_args, tp_args): with RemoteOpenAIServer(MODEL_NAME, args) as server: client = server.get_client() @@ -62,7 +69,7 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME): # test with text prompt completion = client.completions.create(model=MODEL_NAME, - prompt="Hello, my name is", + prompt=prompt, max_tokens=5, temperature=0.0) @@ -76,7 +83,7 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME): # test using token IDs completion = client.completions.create( model=MODEL_NAME, - prompt=[0, 0, 0, 0, 0], + prompt=token_ids, max_tokens=5, temperature=0.0, ) @@ -91,7 +98,7 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME): # test simple list batch = client.completions.create( model=MODEL_NAME, - prompt=["Hello, my name is", "Hello, my name is"], + prompt=[prompt, prompt], max_tokens=5, temperature=0.0, ) @@ -105,7 +112,7 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME): # test streaming batch = client.completions.create( model=MODEL_NAME, - prompt=["Hello, my name is", "Hello, my name is"], + prompt=[prompt, prompt], max_tokens=5, temperature=0.0, stream=True, From 2afde67870075a681e24af22099455cb5208d358 Mon Sep 17 00:00:00 2001 From: Cody Yu Date: Wed, 17 Jul 2024 17:46:35 -0700 Subject: [PATCH 4/4] revert --- tests/distributed/test_pipeline_parallel.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/distributed/test_pipeline_parallel.py b/tests/distributed/test_pipeline_parallel.py index 84e3f609fb2a5..d7e640ce96995 100644 --- a/tests/distributed/test_pipeline_parallel.py +++ b/tests/distributed/test_pipeline_parallel.py @@ -37,9 +37,7 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, MODEL_NAME): "--dtype", "bfloat16", "--tensor-parallel-size", - # Use the same number or at most 8 GPUs to hold the model. - # In this test we assume the model can fit in 8 GPUs. - str(min(TP_SIZE * PP_SIZE, 8)), + str(max(TP_SIZE, 2)), # We only use 2 GPUs in the CI. "--distributed-executor-backend", "mp", ]