From f4137ce6528a3fe0890baa1d65322cb15605e388 Mon Sep 17 00:00:00 2001 From: attafosu Date: Wed, 20 Aug 2025 21:24:59 +0300 Subject: [PATCH 1/6] Enable multimodal support and Qwen2.5-vl Signed-off-by: attafosu --- vllm_gaudi/v1/worker/hpu_model_runner.py | 488 ++++++++++++++++++++++- 1 file changed, 477 insertions(+), 11 deletions(-) diff --git a/vllm_gaudi/v1/worker/hpu_model_runner.py b/vllm_gaudi/v1/worker/hpu_model_runner.py index 34a5b0ad..25d7bf6b 100644 --- a/vllm_gaudi/v1/worker/hpu_model_runner.py +++ b/vllm_gaudi/v1/worker/hpu_model_runner.py @@ -14,6 +14,7 @@ import numpy as np import torch import torch.distributed +import torch.nn.functional as F import vllm_gaudi.extension.environment as environment from vllm_gaudi.extension.bucketing.common import HPUBucketingManager from vllm_gaudi.extension.profiler import (HabanaHighLevelProfiler, @@ -33,6 +34,13 @@ from vllm.model_executor.layers.vocab_parallel_embedding import ( VocabParallelEmbedding) from vllm.model_executor.model_loader import get_model, get_model_loader +from vllm.model_executor.models import supports_multimodal +from vllm.multimodal import MULTIMODAL_REGISTRY +from vllm.multimodal.inputs import (BatchedTensorInputs, MultiModalKwargs, + MultiModalKwargsItem, PlaceholderRange) +from vllm.multimodal.utils import group_mm_kwargs_by_modality +from vllm.model_executor.layers.rotary_embedding import MRotaryEmbedding +from vllm.multimodal.inputs import PlaceholderRange from vllm.sampling_params import SamplingType from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, LayerBlockType, cdiv, @@ -52,6 +60,8 @@ from vllm.model_executor.models.interfaces_base import ( is_pooling_model, is_text_generation_model) from vllm.tasks import GenerationTask, PoolingTask, SupportedTask +from vllm.v1.worker.utils import (gather_mm_placeholders, + sanity_check_mm_encoder_outputs, scatter_mm_placeholders) from vllm.v1.sample.logits_processor import build_logitsprocs if TYPE_CHECKING: @@ -411,7 +421,7 @@ def _update_metadata(self, attn_metadata, batch_size, seq_len, device, attn_metadata = self._set_block_mapping(attn_metadata, batch_size, device, dtype) return attn_metadata - + def forward(self, *args, **kwargs): # TODO(kzawora): something goes VERY WRONG when operating on # kwargs['attn_metadata'].slot_mapping, compared to untrimmed metadata @@ -429,12 +439,27 @@ def forward(self, *args, **kwargs): attn_meta = kwargs.pop('attn_metadata') if 'kv_caches' in kwargs: kwargs.pop('kv_caches') + + # If multimodal inputs, update kwargs + model_mm_kwargs = kwargs.pop('model_mm_kwargs', None) + if model_mm_kwargs is not None: + kwargs.update(model_mm_kwargs) + with set_forward_context(attn_meta, self.vllm_config): hidden_states = self.model(*args, **kwargs) if self._rotary_prepare_cos_sin is not None: self._reset_rotary_cos_sin() return hidden_states + def get_input_embeddings(self, input_ids, multimodal_embeddings=None): + return self.model.get_input_embeddings( + input_ids=input_ids, + multimodal_embeddings=multimodal_embeddings + ) + + def get_multimodal_embeddings(self, **batched_mm_inputs): + return self.model.get_multimodal_embeddings(**batched_mm_inputs) + def compute_logits(self, *args, **kwargs): return self.model.compute_logits(*args, **kwargs) @@ -609,12 +634,23 @@ def __init__( use_mla=self.model_config.use_mla, ) + # Mult-modal-related. + self.mm_registry = MULTIMODAL_REGISTRY + self.uses_mrope = model_config.uses_mrope + self.supports_mm_inputs = self.mm_registry.supports_multimodal_inputs( + model_config) + self.is_multimodal_raw_input_supported = ( + model_config.is_multimodal_raw_input_supported) + # Lazy initialization # self.model: nn.Module # set after load_model self.kv_caches: list[torch.Tensor] = [] self.inc_initialized_successfully = False self._is_inc_finalized = False + # req_id -> (input_id -> encoder_output) + self.encoder_cache: dict[str, dict[int, torch.Tensor]] = {} + # Request states. self.requests: dict[str, CachedRequestState] = {} # Persistent batch. @@ -721,6 +757,8 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> bool: # Remove finished requests from the cached states. for req_id in scheduler_output.finished_req_ids: self.requests.pop(req_id, None) + self.encoder_cache.pop(req_id, None) + # Remove the finished requests from the persistent batch. # NOTE(woosuk): There could be an edge case where finished_req_ids and # scheduled_req_ids overlap. This happens when a request is aborted and @@ -733,6 +771,14 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> bool: if req_index is not None: removed_req_indices.append(req_index) + # Free the cached encoder outputs. + for req_id, input_id in scheduler_output.free_encoder_input_ids: + encoder_outputs = self.encoder_cache.get(req_id) + if encoder_outputs is not None: + encoder_outputs.pop(input_id, None) + if not encoder_outputs: + self.encoder_cache.pop(req_id, None) + # Remove the unscheduled requests from the persistent batch. # NOTE(woosuk): The unscheduled requests are either preempted requests # or running requests that are not scheduled in this step. We remove @@ -775,6 +821,45 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> bool: output_token_ids=[], lora_request=new_req_data.lora_request, ) + + # Only relevant for models using M-RoPE (e.g, Qwen2-VL) + if self.uses_mrope: + image_grid_thw = [] + video_grid_thw = [] + second_per_grid_ts = [] + audio_feature_lengths = [] + use_audio_in_video = False + for mm_item in self.requests[req_id].mm_kwargs: + mm_input = mm_item.get_data() + if mm_input.get("image_grid_thw") is not None: + image_grid_thw.append( + mm_input["image_grid_thw"].tolist()) + if mm_input.get("video_grid_thw") is not None: + video_grid_thw.append( + mm_input["video_grid_thw"].tolist()) + if mm_input.get("second_per_grid_ts") is not None: + second_per_grid_ts.append( + mm_input["second_per_grid_ts"]) + if mm_input.get("audio_feature_lengths") is not None: + audio_feature_lengths.append( + mm_input["audio_feature_lengths"]) + if mm_input.get("use_audio_in_video") is True: + use_audio_in_video = True + + hf_config = self.model_config.hf_config + + self.requests[req_id].mrope_positions, \ + self.requests[req_id].mrope_position_delta = \ + MRotaryEmbedding.get_input_positions_tensor( + self.requests[req_id].prompt_token_ids, + hf_config=hf_config, + image_grid_thw=image_grid_thw, + video_grid_thw=video_grid_thw, + second_per_grid_ts=second_per_grid_ts, + audio_feature_lengths=audio_feature_lengths, + use_audio_in_video=use_audio_in_video, + ) + req_ids_to_add.append(req_id) # Update the states of the running/resumed requests. is_last_rank = get_pp_group().is_last_rank @@ -872,6 +957,159 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> bool: self.input_batch.refresh_sampling_metadata() return batch_changed + def _extract_mm_kwargs( + self, + scheduler_output: "SchedulerOutput", + ) -> BatchedTensorInputs: + if self.is_multimodal_raw_input_supported: # noqa: SIM102 + if scheduler_output: + mm_kwargs = list[MultiModalKwargsItem]() + for req in scheduler_output.scheduled_new_reqs: + req_mm_kwargs = req.mm_kwargs + if not isinstance(req_mm_kwargs, list): + req_mm_kwargs = list(req_mm_kwargs) + mm_kwargs.extend(req_mm_kwargs) + + # Input all modalities at once + mm_kwargs_combined: BatchedTensorInputs = {} + for _, _, mm_kwargs_group in group_mm_kwargs_by_modality( + mm_kwargs, + device=self.device, + pin_memory=self.pin_memory, + ): + mm_kwargs_combined.update(mm_kwargs_group) + + return mm_kwargs_combined + + return {} + + # source: vllm/v1/worker/gpu_model_runner.py + def _execute_mm_encoder(self, + scheduler_output: "SchedulerOutput", + req_ids: list[str]): + scheduled_encoder_inputs = scheduler_output.scheduled_encoder_inputs + if not scheduled_encoder_inputs: + return + + # NOTE (attafosu): Utilize cached mm embeddings to speed up processing + # After PR(#22711) mm_hashes for inputs will map to their cached embeddings, which can be reused for reqs sharing same mm_hash # noqa E501 + + # Batch the multi-modal inputs. + mm_kwargs = list[MultiModalKwargsItem]() + req_ids_pos = list[tuple[str, int, PlaceholderRange]]() + for req_id in req_ids: + encoder_input_ids = scheduled_encoder_inputs[req_id] + req_state = self.requests[req_id] + + for mm_input_id in encoder_input_ids: + mm_kwargs.append(req_state.mm_kwargs[mm_input_id]) + req_ids_pos.append( + (req_id, mm_input_id, req_state.mm_positions[mm_input_id])) + + # Batch mm inputs as much as we can: if a request in the batch has + # multiple modalities or a different modality than the previous one, + # we process it separately to preserve item order. + + # TODO (attafosu): Follow-up on the resolution to this. + # The ordering of the encoder outputs needs to match the request ids + # after fetching the embeddings. + # For now, we'll restrict mm support to just a single prefill at a time - + # Or that requests in the batch should have distinct modalities, + + # FIXME(ywang96): This is a hacky way to deal with multiple modalities + # in the same batch while still being able to benefit from batching + # multimodal inputs. The proper solution should be reordering the + # encoder outputs. + encoder_outputs = [] + for _, num_items, mm_kwargs_group in group_mm_kwargs_by_modality( + mm_kwargs, + device=self.device, + pin_memory=self.pin_memory, + ): + # Run the encoder. + # `curr_group_outputs` is either of the following: + # 1. A tensor of shape (num_items, feature_size, hidden_size) + # in case feature_size is fixed across all multimodal items. + # 2. A list or tuple (length: num_items) of tensors, each of shape + # (feature_size, hidden_size) in case the feature size is dynamic + # depending on the input multimodal items. + curr_group_outputs = self.model.get_multimodal_embeddings( + **mm_kwargs_group) + + sanity_check_mm_encoder_outputs( + curr_group_outputs, + expected_num_items=num_items, + ) + + for output in curr_group_outputs: + encoder_outputs.append(output) + + # FIXME (attafosu) Reorder the encoder outputs to match the request ids. + # This will be necessary after mm prefill batching constraints are removed + + # Cache the encoder outputs. + for (req_id, input_id, pos_info), output in zip( + req_ids_pos, + encoder_outputs, + ): + if req_id not in self.encoder_cache: + self.encoder_cache[req_id] = {} + + self.encoder_cache[req_id][input_id] = scatter_mm_placeholders( + output, + is_embed=pos_info.is_embed, + ) + + # modified from: vllm/v1/worker/gpu_model_runner.py + def _gather_mm_embeddings( + self, + scheduler_output: "SchedulerOutput", + req_ids : list[str], + shift_computed_tokens: int = 0, + ) -> list[torch.Tensor]: + mm_embeds: list[torch.Tensor] = [] + for req_id in req_ids: + num_scheduled_tokens = scheduler_output.num_scheduled_tokens[ + req_id] + req_state = self.requests[req_id] + num_computed_tokens = \ + req_state.num_computed_tokens + shift_computed_tokens + mm_positions = req_state.mm_positions + for i, pos_info in enumerate(mm_positions): + start_pos = pos_info.offset + num_encoder_tokens = pos_info.length + + # The encoder output is needed if the two ranges overlap: + # [num_computed_tokens, + # num_computed_tokens + num_scheduled_tokens) and + # [start_pos, start_pos + num_encoder_tokens) + if start_pos >= num_computed_tokens + num_scheduled_tokens: + # The encoder output is not needed in this step. + break + if start_pos + num_encoder_tokens <= num_computed_tokens: + # The encoder output is already processed and stored + # in the decoder's KV cache. + continue + + start_idx = max(num_computed_tokens - start_pos, 0) + end_idx = min( + num_computed_tokens - start_pos + num_scheduled_tokens, + num_encoder_tokens) + assert start_idx < end_idx + assert req_id in self.encoder_cache + assert i in self.encoder_cache[req_id] + encoder_output = self.encoder_cache[req_id][i] + + if (is_embed := pos_info.is_embed) is not None: + is_embed = is_embed[start_idx:end_idx] + + mm_embeds_item = gather_mm_placeholders( + encoder_output[start_idx:end_idx], + is_embed=is_embed, + ) + mm_embeds.append(mm_embeds_item) + return mm_embeds + def get_model(self) -> torch.nn.Module: assert self.model is not None return self.model @@ -1013,6 +1251,40 @@ def _align_and_pad(self, data, bucketing, padding_gen): data = pad_list(data, target_bs, itertools.repeat(padding)) return data + def _align_and_pad_mrope_positions(self, req_ids : list[str], + context_lens : list[int], + query_lens : list[int], + bucketing : tuple[int, int], + padding_gen : int ) -> torch.Tensor: + bs = len(context_lens) + target_bs, target_len = bucketing + out_shape = (3, target_len) if target_bs == 1 \ + else (target_bs, target_len) + + mrope_position_tensor = torch.full( + out_shape, + padding_gen, + dtype=torch.int32, + device='hpu') + dst_start = 0 + dst_end = dst_start + for b_idx, req_id in enumerate(req_ids): + cl = context_lens[b_idx] + qsl = query_lens[b_idx] + input_mrope_position = \ + self.requests[req_id].mrope_positions[:,cl:cl+qsl] + dst_end = dst_start + qsl + mrope_position_tensor[:,dst_start:dst_end].copy_( + input_mrope_position, + non_blocking=True) + + # Update dst_start depending on if pos_ids of requests are meant to be adjacent # noqa 501 + if target_bs == 1: + dst_start = dst_end + else: + dst_start += target_len + return mrope_position_tensor + def _bucketize_merged_prompt(self, seq_lens, num_blocks): seq = sum(seq_lens) num_blocks = sum(num_blocks) @@ -1133,9 +1405,11 @@ def _form_prefill_batch(self, contents): list(range(cl, cl + ql)) for cl, ql in zip(context_lens, query_lens) ] + block_assignment = [[ divmod(pos, self.block_size) for pos in positions ] for positions in token_positions] + token_slots = [[ blocks[bi] * self.block_size + bo for bi, bo in assignment ] for blocks, assignment in zip(contents.blocks, block_assignment)] @@ -1151,15 +1425,33 @@ def _form_prefill_batch(self, contents): num_context_blocks = [len(b) for b in context_blocks] context_groups = [[i] * b for i, b in enumerate(num_context_blocks)] has_context = sum(context_lens) > 0 - target_bs, target_seq, target_blocks = self._get_prompt_bucketing_fn()( query_lens, num_context_blocks) + + # If the model uses M-RoPE, we need to fill + # and pad the M-RoPE positions for the scheduled prefill tokens + if self.uses_mrope: + mrope_token_positions = self._align_and_pad_mrope_positions( + contents.req_ids, + context_lens, + query_lens, + (target_bs, target_seq), + -1, + ) + + # NOTE: If model does not support multimodal inputs, we pad here. + # For models with multimodal support, we may want to get embeddings + # for the valid tokens before padding. + # This would require getting multimodal input embeddings here as well token_ids = self._align_and_pad(contents.token_ids, (target_bs, target_seq), itertools.repeat(-1)) - token_positions = self._align_and_pad(token_positions, - (target_bs, target_seq), - itertools.repeat(-1)) + if self.uses_mrope: + token_positions = mrope_token_positions + else: + token_positions = self._align_and_pad(token_positions, + (target_bs, target_seq), + itertools.repeat(-1)) token_slots = self._align_and_pad(token_slots, (target_bs, target_seq), itertools.repeat(-1)) token_groups = self._align_and_pad(token_groups, @@ -1204,7 +1496,8 @@ def _form_prefill_batch(self, contents): query_lens = _async_h2d_tensor(query_lens, torch.int32) token_ids = _async_h2d_tensor(token_ids, torch.int32) - token_positions = _async_h2d_tensor(token_positions, torch.int32) + if not self.uses_mrope: + token_positions = _async_h2d_tensor(token_positions, torch.int32) token_slots = _async_h2d_tensor(token_slots, torch.int64) logits_indices = _async_h2d_tensor(logits_indices, torch.int32) context_lens = _async_h2d_tensor(context_lens, torch.int32) @@ -1222,7 +1515,6 @@ def _form_prefill_batch(self, contents): block_list=context_blocks_t, attn_bias=attn_bias, block_size=self.block_size) - return PrefillInputData(request_ids=[req_ids], prompt_lens=[query_lens], token_ids=[token_ids], @@ -1287,6 +1579,36 @@ def _prepare_decode_inputs(self, num_decodes, padded_index = torch.zeros((padded_batch_size, 1), dtype=torch.int64) index = positions.to(torch.int64)[:num_decodes] padded_index[:num_decodes] = index + + input_mrope_positions: list[list[int]] = [[] for _ in range(3)] + if self.uses_mrope: + for idx, req_id in enumerate( + self.input_batch.req_ids[:num_decodes]): + seq_data = self.requests[req_id] + context_len = context_lens[idx] + position = context_len + if seq_data.mrope_position_delta is not None: + pos_for_mrope = MRotaryEmbedding \ + .get_next_input_positions( + seq_data.mrope_position_delta, + context_len=context_len, + seq_len=context_len + 1) + else: + pos_for_mrope = [[position]] * 3 + for idx in range(3): + input_mrope_positions[idx].extend(pos_for_mrope[idx]) + + input_mrope_positions = torch.tensor( + input_mrope_positions, + dtype=torch.int32, + device='cpu').to('hpu', non_blocking=True) + + # Pad the right side of input_mrope_positions by padded_batch_size + pad_size = padded_batch_size - input_mrope_positions.size(1) # noqa + if pad_size > 0: + input_mrope_positions = F.pad(input_mrope_positions, + (0, pad_size), + value=-1, mode='constant') # TOKEN_IDS. [batch, 1] token_ids = torch.zeros((padded_batch_size, 1), dtype=torch.int32) @@ -1335,7 +1657,8 @@ def _prepare_decode_inputs(self, num_decodes, # CPU<>HPU sync *should not* happen here. token_ids_device = _async_h2d_tensor_copy(token_ids, self.device) - positions_device = _async_h2d_tensor_copy(positions, self.device) + positions_device = input_mrope_positions if self.uses_mrope \ + else _async_h2d_tensor_copy(positions, self.device) logits_indices_device = _async_h2d_tensor_copy(logits_indices, self.device) block_list_device = _async_h2d_tensor_copy(block_list, self.device) @@ -1414,7 +1737,9 @@ def _execute_model_generic(self, attn_metadata, logits_indices, kv_caches, - warmup_mode=False): + warmup_mode=False, + inputs_embeds=None, + model_mm_kwargs=None): # FORWARD. batch_size = token_ids.size(0) @@ -1444,7 +1769,10 @@ def _execute_model_generic(self, input_ids=token_ids, positions=position_ids, attn_metadata=trimmed_attn_metadata, - kv_caches=kv_caches) + kv_caches=kv_caches, + inputs_embeds=inputs_embeds, + model_mm_kwargs=model_mm_kwargs + ) # NOTE(kzawora): returning hidden_states is required in prompt logprobs # scenarios, as they will do logit processing on their own non_flattened_hidden_states = hidden_states @@ -1708,6 +2036,29 @@ def execute_model( attn_metadata, logits_indices, logits_requests) in enumerate( zip(*shallow_tuple(prefill_data))): + + inputs_embeds=None + model_mm_kwargs = None + if self.supports_mm_inputs: + # Run the multimodal encoder if any. + with self.profiler.record_event('internal', 'prepare_input_encoders'): + self._execute_mm_encoder(scheduler_output, req_id) + + mm_embeds = self._gather_mm_embeddings(scheduler_output, req_id) + #TODO: Only get embeddings for valid token_ids. Ignore token_ids[] + # This may require moving multimodal input preps into _prepare_inputs, + # to avoid padding issues. + inputs_embeds = self.model.get_input_embeddings( + input_ids=token_ids, + multimodal_embeddings=mm_embeds or None, + ) + + model_mm_kwargs = self._extract_mm_kwargs(scheduler_output) + model_mm_kwargs = MultiModalKwargs.as_kwargs( + model_mm_kwargs, + device=self.device, + ) + self.event_start = self.profiler.get_timestamp_us() self.profiler.start("internal", "prefill") # Align behavior of incomplete prompt with gpu_model_runner @@ -1722,7 +2073,10 @@ def execute_model( prefill_hidden_states_ts, logits_device = \ self._execute_model_generic( token_ids, position_ids, attn_metadata, logits_indices, - self.kv_caches, warmup_mode=warmup_mode) + self.kv_caches, + inputs_embeds=inputs_embeds, + model_mm_kwargs=model_mm_kwargs, + warmup_mode=warmup_mode) htorch.core.mark_step() # Skip separate sampling for structured output if structured_output: @@ -2045,6 +2399,118 @@ def log_graph_warmup_summary(self, buckets, is_prompt, total_mem): f'used_mem:{format_bytes(total_mem)}') logger.info(msg) + def warmup_scenario(self, + batch_size, + seq_or_block, + num_blocks, + is_prompt, + kv_caches, + is_pt_profiler_run=True) -> None: + """Dummy warmup run for memory usage and graph compilation.""" + + query_seq_len = seq_or_block if is_prompt else 1 + input_ids = torch.zeros((batch_size, query_seq_len), + dtype=torch.int32, + device='cpu') + # Position ids shape different for mrope + pos_ids_shape = (3, batch_size * query_seq_len) if self.uses_mrope \ + else (batch_size, query_seq_len) + position_ids = torch.zeros(pos_ids_shape, + dtype=torch.int32, + device='cpu') + slot_mapping = torch.zeros((batch_size, query_seq_len), + dtype=torch.int64, + device='cpu') + + input_ids_device = _async_h2d_tensor_copy(input_ids, self.device) + position_ids_device = _async_h2d_tensor_copy(position_ids, self.device) + slot_mapping_device = _async_h2d_tensor_copy(slot_mapping, self.device) + + use_graphs = self._use_graphs() + phase = "prompt" if is_prompt else "decode" + scenario_name = ("warmup_" + f"{phase}_" + f"bs{batch_size}_" + f"seq{query_seq_len}_" + f"ctx{num_blocks}_" + f"graphs{'T' if use_graphs else 'F'}") + input_ids = torch.zeros((batch_size, query_seq_len), + dtype=torch.int32, + device='cpu') + position_ids = torch.zeros(pos_ids_shape, + dtype=torch.int32, + device='cpu') + slot_mapping = torch.zeros((batch_size, query_seq_len), + dtype=torch.int64, + device='cpu') + + input_ids_device = _async_h2d_tensor_copy(input_ids, self.device) + position_ids_device = _async_h2d_tensor_copy(position_ids, self.device) + slot_mapping_device = _async_h2d_tensor_copy(slot_mapping, self.device) + self.profiler.start('internal', scenario_name) + + times = 3 if use_graphs or is_pt_profiler_run else 1 + for time_index in range(times): + if is_prompt: + seq_lens = torch.zeros((batch_size), + dtype=torch.int32, + device='cpu') + seq_lens.fill_(seq_or_block) + seq_lens_device = _async_h2d_tensor_copy(seq_lens, self.device) + block_list_device = None + if num_blocks: + prefix_block_tables = torch.ones( + (batch_size, num_blocks), + dtype=torch.int32, + device='cpu') * self._PAD_BLOCK_ID + block_list_device = _async_h2d_tensor_copy( + prefix_block_tables.flatten(), self.device) + attn_metadata = \ + HPUAttentionMetadataV1.make_prefill_metadata( + attn_bias=None, + seq_lens_tensor=seq_lens_device, + context_lens_tensor=seq_lens_device, + slot_mapping=slot_mapping_device, + block_list=block_list_device, + block_size=self.block_size) + else: + block_tables = [ + x.tolist() + for x in np.array_split(np.arange(num_blocks), batch_size) + ] + block_list, block_groups, block_usage = \ + self.get_habana_paged_attn_buffers( + slot_mapping=slot_mapping, + block_tables=block_tables, + batch_size=batch_size) + block_list_device = _async_h2d_tensor_copy( + block_list, self.device) + block_usage_device = _async_h2d_tensor_copy( + block_usage, self.device) + block_groups_device = _async_h2d_tensor_copy( + block_groups, self.device) + attn_metadata = HPUAttentionMetadataV1.make_decode_metadata( + block_list=block_list_device, + block_usage=block_usage_device, + block_groups=block_groups_device, + num_decode_tokens=batch_size, + input_positions=None, + slot_mapping=slot_mapping_device, + block_size=self.block_size) + + logits_indices = torch.arange(0, batch_size, device='cpu') + logits_indices_device = _async_h2d_tensor_copy(logits_indices, + self.device) + # Dummy run. + htorch.core.mark_step() + _ = self._execute_model_generic(input_ids_device, position_ids_device, + attn_metadata, logits_indices_device, + kv_caches, True) + # TODO: do sampling on logits, warmup sampler and prefill joiner + htorch.core.mark_step() + self.profiler.end() + return None + def log_warmup(self, phase, i, max_i, batch_size, seq_len, num_blocks): free_mem = format_bytes( HabanaMemoryProfiler.current_free_device_memory()) From 6b1595a38b90c39f0f1ad1e43c37bbad30e20df9 Mon Sep 17 00:00:00 2001 From: attafosu Date: Wed, 20 Aug 2025 21:26:34 +0300 Subject: [PATCH 2/6] Add CI test for qwen2.5-vl Signed-off-by: attafosu --- tests/full_tests/ci_gsm8k_tests.sh | 12 ++ .../full_tests/model_cards/qwen2.5-vl-7b.yaml | 20 ++ .../language/generation/generation_mm.py | 204 ++++++++++++++++++ 3 files changed, 236 insertions(+) create mode 100644 tests/full_tests/model_cards/qwen2.5-vl-7b.yaml create mode 100644 tests/models/language/generation/generation_mm.py diff --git a/tests/full_tests/ci_gsm8k_tests.sh b/tests/full_tests/ci_gsm8k_tests.sh index 8b659302..dd03f309 100644 --- a/tests/full_tests/ci_gsm8k_tests.sh +++ b/tests/full_tests/ci_gsm8k_tests.sh @@ -132,3 +132,15 @@ if [ $? -ne 0 ]; then exit -1 fi echo "Test with QWEN3-30B-A3B passed" + +# multimodal-support with qwen2.5-vl +echo "Testing Qwen2.5-VL-7B" +echo "VLLM_SKIP_WARMUP=true VLLM_CONTIGUOUS_PA=False PT_HPU_LAZY_MODE=1 VLLM_USE_V1=1 \ +python -u vllm-gaudi/tests/models/language/generation/generation_mm.py --model-card-path vllm-gaudi/tests/full_tests/model_cards/qwen2.5-vl-7b.yaml" +VLLM_SKIP_WARMUP=true VLLM_CONTIGUOUS_PA=False PT_HPU_LAZY_MODE=1 VLLM_USE_V1=1 \ +python -u vllm-gaudi/tests/models/language/generation/generation_mm.py --model-card-path vllm-gaudi/tests/full_tests/model_cards/qwen2.5-vl-7b.yaml +if [ $? -ne 0 ]; then + echo "Error: Test failed for multimodal-support with qwen2.5-vl-7b" >&2 + exit -1 +fi +echo "Test with multimodal-support with qwen2.5-vl-7b passed" \ No newline at end of file diff --git a/tests/full_tests/model_cards/qwen2.5-vl-7b.yaml b/tests/full_tests/model_cards/qwen2.5-vl-7b.yaml new file mode 100644 index 00000000..5c5fc51e --- /dev/null +++ b/tests/full_tests/model_cards/qwen2.5-vl-7b.yaml @@ -0,0 +1,20 @@ +model_name: "Qwen/Qwen2.5-VL-7B-Instruct" +test_config: # List of test configurations. - modality is test for + - modality: image # modality (currently supports image and video) + extra_engine_args: # Optional extra arguments for the engine + mm_processor_kwargs: + min_pixels: 784 + max_pixels: 1003520 + fps: 1 + input_data_config: # Configuration for the input data + num_prompts: 4 # Number of samples to run + media_source: default # Source of the data to load + - modality: video + extra_engine_args: + mm_processor_kwargs: + min_pixels: 784 + max_pixels: 1003520 + fps: 1 + input_data_config: + num_prompts: 2 + media_source: default diff --git a/tests/models/language/generation/generation_mm.py b/tests/models/language/generation/generation_mm.py new file mode 100644 index 00000000..da9eafa1 --- /dev/null +++ b/tests/models/language/generation/generation_mm.py @@ -0,0 +1,204 @@ +from argparse import ArgumentParser +from vllm import LLM, EngineArgs, SamplingParams +from vllm.assets.image import ImageAsset +from vllm.assets.video import VideoAsset +from vllm.multimodal.image import convert_image_mode +from dataclasses import asdict +from typing import Union +from PIL import Image +from collections import UserDict +from dataclasses import dataclass +import yaml +from vllm_gaudi.extension.logger import logger as init_logger +logger = init_logger() + +@dataclass +class PROMPT_DATA: + _questions = {"image": [ + "What is the most prominent object in this image?", + "Describe the scene in the image.", + "What is the weather like in the image?", + "Write a short poem about this image." + ], + "video": [ + "Describe this video", + "Which movie would you associate this video with?" + ] + } + + _data = {"image": lambda source : + convert_image_mode( + ImageAsset("cherry_blossom").pil_image \ + if source == "default" else Image.open(source), "RGB" + ), + "video": lambda source : + VideoAsset(name="baby_reading" if source == "default" \ + else source, num_frames=16).np_ndarrays + } + + def __post_init__(self): + self._questions = self._questions + self._data = self._data + + def get_prompts(self, + modality: str = "image", + media_source: str = "default", + num_prompts: int = 1, + skip_vision_data=False + ) -> Union[dict, list[dict]]: + if modality == "image": + placeholder = "<|image_pad|>" + elif modality == "video": + placeholder = "<|video_pad|>" + else: + raise ValueError( + (f"Unsupported modality: {modality}." + f" Supported modality: [image, video]" + ) + ) + questions = self._questions[modality] + prompts = [ + ( + "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" + f"<|im_start|>user\n<|vision_start|>{placeholder}<|vision_end|>" + f"{question}<|im_end|>\n" + "<|im_start|>assistant\n" + ) + for question in questions + ] + + data = self._data[modality](media_source) + if num_prompts==1: + inputs = { + "prompt": prompts[0], + "multi_modal_data": {modality: data}, + } if not skip_vision_data else { + "prompt": questions[0], + } + else: + inputs = [ + { + "prompt": prompts[i % len(prompts)], + "multi_modal_data": {modality: data}, + } if not skip_vision_data \ + else { + "prompt": questions[i % len(questions)], + } + for i in range(num_prompts) + ] + + return inputs + +def run_model( + model_name: str, + inputs: Union[dict, list[dict]], + modality: str, + **extra_engine_args + ): + # Default mm_processor_kwargs + # mm_processor_kwargs={ + # "min_pixels": 28 * 28, + # "max_pixels": 1280 * 28 * 28, + # "fps": 1, + #} + passed_mm_processor_kwargs = extra_engine_args.get( + "mm_processor_kwargs", {} + ) + passed_mm_processor_kwargs.setdefault("min_pixels", 28 * 28) + passed_mm_processor_kwargs.setdefault("max_pixels", 1280 * 28 * 28) + passed_mm_processor_kwargs.setdefault("fps", 1) + extra_engine_args.update({ + "mm_processor_kwargs": passed_mm_processor_kwargs + }) + + extra_engine_args.setdefault("max_model_len", 32768) + extra_engine_args.setdefault("max_num_seqs", 5) + extra_engine_args.setdefault( + "limit_mm_per_prompt", + {modality: 1} + ) + + sampling_params = SamplingParams( + temperature=0.0, max_tokens=64, + ) + + engine_args = EngineArgs( + model=model_name, + **extra_engine_args + ) + + engine_args = asdict(engine_args) + llm = LLM(**engine_args) + + outputs = llm.generate( + inputs, + sampling_params=sampling_params, + use_tqdm=False, # Disable tqdm for CI tests + ) + return outputs + +def start_test(model_card_path: str): + with open(model_card_path) as f: + model_card = yaml.safe_load(f) + + model_name = model_card.get("model_name", "Qwen/Qwen2.5-VL-7B-Instruct") + test_config = model_card.get("test_config", []) + if not test_config: + logger.warning("No test configurations found.") + return + + for config in test_config: + try: + modality = config.get("modality", "image") + extra_engine_args = config.get("extra_engine_args", {}) + input_data_config = config.get("input_data_config", {}) + num_prompts = input_data_config.get("num_prompts", 1) + media_source = input_data_config.get("media_source", "default") + + logger.info(f"================================================\n" + f"Running test with configs:\n" + f"modality: {modality}\n" + f"input_data_config: {input_data_config}\n" + f"extra_engine_args: {extra_engine_args}\n" + f"================================================") + + data = PROMPT_DATA() + inputs = data.get_prompts( + modality=modality, + media_source=media_source, + num_prompts=num_prompts + ) + + logger.info(f"*** Questions for modality {modality}:" + f" {data._questions[modality]}" + ) + responses = run_model( + model_name, inputs, + modality, + **extra_engine_args + ) + for response in responses: + print(f"{response.outputs[0].text}") + print("=" * 80) + except Exception as e: + logger.error(f"Error during test with modality {modality}: {e}") + raise + +def main(): + parser = ArgumentParser() + parser.add_argument("--model-card-path", + required=True, + help="Path to .yaml file describing model parameters" + ) + args = parser.parse_args() + start_test(args.model_card_path) + +if __name__ == "__main__": + try: + main() + except Exception: + import os + import traceback + print("An error occurred during generation:") + traceback.print_exc() + os._exit(1) \ No newline at end of file From d8a23f582dcc85f49cc801550827410998ff8f86 Mon Sep 17 00:00:00 2001 From: Thomas Atta-Fosu Date: Wed, 20 Aug 2025 17:25:05 -0700 Subject: [PATCH 3/6] Style formatting (#2) * Style formatting Signed-off-by: attafosu * Extra mops Signed-off-by: attafosu * appease yapf and ruff Signed-off-by: attafosu --------- Signed-off-by: attafosu --- .../language/generation/generation_mm.py | 198 ++++++++---------- vllm_gaudi/v1/worker/hpu_model_runner.py | 135 ++++++------ 2 files changed, 155 insertions(+), 178 deletions(-) diff --git a/tests/models/language/generation/generation_mm.py b/tests/models/language/generation/generation_mm.py index da9eafa1..5721b678 100644 --- a/tests/models/language/generation/generation_mm.py +++ b/tests/models/language/generation/generation_mm.py @@ -6,34 +6,37 @@ from dataclasses import asdict from typing import Union from PIL import Image -from collections import UserDict from dataclasses import dataclass import yaml from vllm_gaudi.extension.logger import logger as init_logger + logger = init_logger() + @dataclass class PROMPT_DATA: - _questions = {"image": [ - "What is the most prominent object in this image?", - "Describe the scene in the image.", - "What is the weather like in the image?", - "Write a short poem about this image." - ], - "video": [ - "Describe this video", - "Which movie would you associate this video with?" - ] + _questions = { + "image": [ + "What is the most prominent object in this image?", + "Describe the scene in the image.", + "What is the weather like in the image?", + "Write a short poem about this image." + ], + "video": [ + "Describe this video", + "Which movie would you associate this video with?" + ] } - _data = {"image": lambda source : - convert_image_mode( - ImageAsset("cherry_blossom").pil_image \ - if source == "default" else Image.open(source), "RGB" - ), - "video": lambda source : - VideoAsset(name="baby_reading" if source == "default" \ - else source, num_frames=16).np_ndarrays + _data = { + "image": + lambda source: convert_image_mode( + ImageAsset("cherry_blossom").pil_image + if source == "default" else Image.open(source), "RGB"), + "video": + lambda source: VideoAsset(name="baby_reading" + if source == "default" else source, + num_frames=16).np_ndarrays } def __post_init__(self): @@ -41,92 +44,65 @@ def __post_init__(self): self._data = self._data def get_prompts(self, - modality: str = "image", - media_source: str = "default", - num_prompts: int = 1, - skip_vision_data=False - ) -> Union[dict, list[dict]]: + modality: str = "image", + media_source: str = "default", + num_prompts: int = 1, + skip_vision_data=False): if modality == "image": - placeholder = "<|image_pad|>" + pholder = "<|image_pad|>" elif modality == "video": - placeholder = "<|video_pad|>" + pholder = "<|video_pad|>" else: - raise ValueError( - (f"Unsupported modality: {modality}." - f" Supported modality: [image, video]" - ) - ) + raise ValueError(f"Unsupported modality: {modality}." + " Supported modality: [image, video]") questions = self._questions[modality] prompts = [ - ( - "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" - f"<|im_start|>user\n<|vision_start|>{placeholder}<|vision_end|>" - f"{question}<|im_end|>\n" - "<|im_start|>assistant\n" - ) - for question in questions + ("<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" + f"<|im_start|>user\n<|vision_start|>{pholder}<|vision_end|>" + f"{question}<|im_end|>\n" + "<|im_start|>assistant\n") for question in questions ] data = self._data[modality](media_source) - if num_prompts==1: - inputs = { - "prompt": prompts[0], - "multi_modal_data": {modality: data}, - } if not skip_vision_data else { - "prompt": questions[0], - } - else: - inputs = [ - { - "prompt": prompts[i % len(prompts)], - "multi_modal_data": {modality: data}, - } if not skip_vision_data \ - else { - "prompt": questions[i % len(questions)], - } - for i in range(num_prompts) - ] + inputs = [{ + "prompt": prompts[i % len(prompts)], + "multi_modal_data": { + modality: data + }, + } if not skip_vision_data else { + "prompt": questions[i % len(questions)], + } for i in range(num_prompts)] return inputs -def run_model( - model_name: str, - inputs: Union[dict, list[dict]], - modality: str, - **extra_engine_args - ): + +def run_model(model_name: str, inputs: Union[dict, list[dict]], modality: str, + **extra_engine_args): # Default mm_processor_kwargs # mm_processor_kwargs={ # "min_pixels": 28 * 28, # "max_pixels": 1280 * 28 * 28, # "fps": 1, - #} - passed_mm_processor_kwargs = extra_engine_args.get( - "mm_processor_kwargs", {} - ) + # } + passed_mm_processor_kwargs = extra_engine_args.get("mm_processor_kwargs", + {}) passed_mm_processor_kwargs.setdefault("min_pixels", 28 * 28) passed_mm_processor_kwargs.setdefault("max_pixels", 1280 * 28 * 28) passed_mm_processor_kwargs.setdefault("fps", 1) - extra_engine_args.update({ - "mm_processor_kwargs": passed_mm_processor_kwargs - }) + extra_engine_args.update( + {"mm_processor_kwargs": passed_mm_processor_kwargs}) extra_engine_args.setdefault("max_model_len", 32768) extra_engine_args.setdefault("max_num_seqs", 5) - extra_engine_args.setdefault( - "limit_mm_per_prompt", - {modality: 1} - ) + extra_engine_args.setdefault("limit_mm_per_prompt", {modality: 1}) sampling_params = SamplingParams( - temperature=0.0, max_tokens=64, + temperature=0.0, + max_tokens=64, ) - engine_args = EngineArgs( - model=model_name, - **extra_engine_args - ) - + engine_args = EngineArgs(model=model_name, **extra_engine_args) + engine_args = asdict(engine_args) llm = LLM(**engine_args) @@ -137,17 +113,19 @@ def run_model( ) return outputs + def start_test(model_card_path: str): with open(model_card_path) as f: model_card = yaml.safe_load(f) - + model_name = model_card.get("model_name", "Qwen/Qwen2.5-VL-7B-Instruct") test_config = model_card.get("test_config", []) if not test_config: logger.warning("No test configurations found.") return - + for config in test_config: + modality = "image" # Ensure modality is always defined try: modality = config.get("modality", "image") extra_engine_args = config.get("extra_engine_args", {}) @@ -155,44 +133,46 @@ def start_test(model_card_path: str): num_prompts = input_data_config.get("num_prompts", 1) media_source = input_data_config.get("media_source", "default") - logger.info(f"================================================\n" - f"Running test with configs:\n" - f"modality: {modality}\n" - f"input_data_config: {input_data_config}\n" - f"extra_engine_args: {extra_engine_args}\n" - f"================================================") + logger.info( + "================================================\n" + "Running test with configs:\n" + "modality: %(modality)s\n" + "input_data_config: %(input_data_config)s\n" + "extra_engine_args: %(extra_engine_args)s\n" + "================================================", + dict(modality=modality, + input_data_config=input_data_config, + extra_engine_args=extra_engine_args)) data = PROMPT_DATA() - inputs = data.get_prompts( - modality=modality, - media_source=media_source, - num_prompts=num_prompts - ) - - logger.info(f"*** Questions for modality {modality}:" - f" {data._questions[modality]}" - ) - responses = run_model( - model_name, inputs, - modality, - **extra_engine_args - ) + inputs = data.get_prompts(modality=modality, + media_source=media_source, + num_prompts=num_prompts) + + logger.info( + "*** Questions for modality %(modality)s: %(questions)s", + dict(modality=modality, questions=data._questions[modality])) + responses = run_model(model_name, inputs, modality, + **extra_engine_args) for response in responses: print(f"{response.outputs[0].text}") print("=" * 80) except Exception as e: - logger.error(f"Error during test with modality {modality}: {e}") - raise - + logger.error("Error during test with modality %(modality)s: %(e)s", + dict(modality=modality, e=e)) + + raise + + def main(): - parser = ArgumentParser() + parser = ArgumentParser() parser.add_argument("--model-card-path", - required=True, - help="Path to .yaml file describing model parameters" - ) + required=True, + help="Path to .yaml file describing model parameters") args = parser.parse_args() start_test(args.model_card_path) + if __name__ == "__main__": try: main() @@ -201,4 +181,4 @@ def main(): import traceback print("An error occurred during generation:") traceback.print_exc() - os._exit(1) \ No newline at end of file + os._exit(1) diff --git a/vllm_gaudi/v1/worker/hpu_model_runner.py b/vllm_gaudi/v1/worker/hpu_model_runner.py index 25d7bf6b..0372add1 100644 --- a/vllm_gaudi/v1/worker/hpu_model_runner.py +++ b/vllm_gaudi/v1/worker/hpu_model_runner.py @@ -34,10 +34,9 @@ from vllm.model_executor.layers.vocab_parallel_embedding import ( VocabParallelEmbedding) from vllm.model_executor.model_loader import get_model, get_model_loader -from vllm.model_executor.models import supports_multimodal from vllm.multimodal import MULTIMODAL_REGISTRY from vllm.multimodal.inputs import (BatchedTensorInputs, MultiModalKwargs, - MultiModalKwargsItem, PlaceholderRange) + MultiModalKwargsItem) from vllm.multimodal.utils import group_mm_kwargs_by_modality from vllm.model_executor.layers.rotary_embedding import MRotaryEmbedding from vllm.multimodal.inputs import PlaceholderRange @@ -60,8 +59,9 @@ from vllm.model_executor.models.interfaces_base import ( is_pooling_model, is_text_generation_model) from vllm.tasks import GenerationTask, PoolingTask, SupportedTask -from vllm.v1.worker.utils import (gather_mm_placeholders, - sanity_check_mm_encoder_outputs, scatter_mm_placeholders) +from vllm.v1.worker.utils import (gather_mm_placeholders, + sanity_check_mm_encoder_outputs, + scatter_mm_placeholders) from vllm.v1.sample.logits_processor import build_logitsprocs if TYPE_CHECKING: @@ -421,7 +421,7 @@ def _update_metadata(self, attn_metadata, batch_size, seq_len, device, attn_metadata = self._set_block_mapping(attn_metadata, batch_size, device, dtype) return attn_metadata - + def forward(self, *args, **kwargs): # TODO(kzawora): something goes VERY WRONG when operating on # kwargs['attn_metadata'].slot_mapping, compared to untrimmed metadata @@ -439,7 +439,7 @@ def forward(self, *args, **kwargs): attn_meta = kwargs.pop('attn_metadata') if 'kv_caches' in kwargs: kwargs.pop('kv_caches') - + # If multimodal inputs, update kwargs model_mm_kwargs = kwargs.pop('model_mm_kwargs', None) if model_mm_kwargs is not None: @@ -453,9 +453,7 @@ def forward(self, *args, **kwargs): def get_input_embeddings(self, input_ids, multimodal_embeddings=None): return self.model.get_input_embeddings( - input_ids=input_ids, - multimodal_embeddings=multimodal_embeddings - ) + input_ids=input_ids, multimodal_embeddings=multimodal_embeddings) def get_multimodal_embeddings(self, **batched_mm_inputs): return self.model.get_multimodal_embeddings(**batched_mm_inputs) @@ -982,10 +980,9 @@ def _extract_mm_kwargs( return mm_kwargs_combined return {} - + # source: vllm/v1/worker/gpu_model_runner.py - def _execute_mm_encoder(self, - scheduler_output: "SchedulerOutput", + def _execute_mm_encoder(self, scheduler_output: "SchedulerOutput", req_ids: list[str]): scheduled_encoder_inputs = scheduler_output.scheduled_encoder_inputs if not scheduled_encoder_inputs: @@ -1013,7 +1010,7 @@ def _execute_mm_encoder(self, # TODO (attafosu): Follow-up on the resolution to this. # The ordering of the encoder outputs needs to match the request ids # after fetching the embeddings. - # For now, we'll restrict mm support to just a single prefill at a time - + # For now, we'll restrict mm support to just a single prefill at a time - # noqa E501 # Or that requests in the batch should have distinct modalities, # FIXME(ywang96): This is a hacky way to deal with multiple modalities @@ -1045,7 +1042,7 @@ def _execute_mm_encoder(self, encoder_outputs.append(output) # FIXME (attafosu) Reorder the encoder outputs to match the request ids. - # This will be necessary after mm prefill batching constraints are removed + # This will be necessary after mm prefill batching constraints are removed # noqa E501 # Cache the encoder outputs. for (req_id, input_id, pos_info), output in zip( @@ -1064,7 +1061,7 @@ def _execute_mm_encoder(self, def _gather_mm_embeddings( self, scheduler_output: "SchedulerOutput", - req_ids : list[str], + req_ids: list[str], shift_computed_tokens: int = 0, ) -> list[torch.Tensor]: mm_embeds: list[torch.Tensor] = [] @@ -1251,32 +1248,29 @@ def _align_and_pad(self, data, bucketing, padding_gen): data = pad_list(data, target_bs, itertools.repeat(padding)) return data - def _align_and_pad_mrope_positions(self, req_ids : list[str], - context_lens : list[int], - query_lens : list[int], - bucketing : tuple[int, int], - padding_gen : int ) -> torch.Tensor: - bs = len(context_lens) + def _align_and_pad_mrope_positions(self, req_ids: list[str], + context_lens: list[int], + query_lens: list[int], + bucketing: tuple[int, int], + padding_gen: int) -> torch.Tensor: target_bs, target_len = bucketing out_shape = (3, target_len) if target_bs == 1 \ - else (target_bs, target_len) - - mrope_position_tensor = torch.full( - out_shape, - padding_gen, - dtype=torch.int32, - device='hpu') + else (target_bs, target_len) + + mrope_position_tensor = torch.full(out_shape, + padding_gen, + dtype=torch.int32, + device='hpu') dst_start = 0 dst_end = dst_start for b_idx, req_id in enumerate(req_ids): cl = context_lens[b_idx] qsl = query_lens[b_idx] input_mrope_position = \ - self.requests[req_id].mrope_positions[:,cl:cl+qsl] + self.requests[req_id].mrope_positions[:, cl:cl+qsl] dst_end = dst_start + qsl - mrope_position_tensor[:,dst_start:dst_end].copy_( - input_mrope_position, - non_blocking=True) + mrope_position_tensor[:, dst_start:dst_end].copy_( + input_mrope_position, non_blocking=True) # Update dst_start depending on if pos_ids of requests are meant to be adjacent # noqa 501 if target_bs == 1: @@ -1427,21 +1421,21 @@ def _form_prefill_batch(self, contents): has_context = sum(context_lens) > 0 target_bs, target_seq, target_blocks = self._get_prompt_bucketing_fn()( query_lens, num_context_blocks) - + # If the model uses M-RoPE, we need to fill # and pad the M-RoPE positions for the scheduled prefill tokens if self.uses_mrope: mrope_token_positions = self._align_and_pad_mrope_positions( - contents.req_ids, - context_lens, - query_lens, - (target_bs, target_seq), - -1, + contents.req_ids, + context_lens, + query_lens, + (target_bs, target_seq), + -1, ) # NOTE: If model does not support multimodal inputs, we pad here. - # For models with multimodal support, we may want to get embeddings - # for the valid tokens before padding. + # For models with multimodal support, we may want to get embeddings + # for the valid tokens before padding. # This would require getting multimodal input embeddings here as well token_ids = self._align_and_pad(contents.token_ids, (target_bs, target_seq), @@ -1450,8 +1444,8 @@ def _form_prefill_batch(self, contents): token_positions = mrope_token_positions else: token_positions = self._align_and_pad(token_positions, - (target_bs, target_seq), - itertools.repeat(-1)) + (target_bs, target_seq), + itertools.repeat(-1)) token_slots = self._align_and_pad(token_slots, (target_bs, target_seq), itertools.repeat(-1)) token_groups = self._align_and_pad(token_groups, @@ -1579,9 +1573,9 @@ def _prepare_decode_inputs(self, num_decodes, padded_index = torch.zeros((padded_batch_size, 1), dtype=torch.int64) index = positions.to(torch.int64)[:num_decodes] padded_index[:num_decodes] = index - - input_mrope_positions: list[list[int]] = [[] for _ in range(3)] - if self.uses_mrope: + + input_mrope_positions_list: list[list[int]] = [[] for _ in range(3)] + if self.uses_mrope: for idx, req_id in enumerate( self.input_batch.req_ids[:num_decodes]): seq_data = self.requests[req_id] @@ -1596,19 +1590,21 @@ def _prepare_decode_inputs(self, num_decodes, else: pos_for_mrope = [[position]] * 3 for idx in range(3): - input_mrope_positions[idx].extend(pos_for_mrope[idx]) - - input_mrope_positions = torch.tensor( - input_mrope_positions, - dtype=torch.int32, - device='cpu').to('hpu', non_blocking=True) + input_mrope_positions_list[idx].extend(pos_for_mrope[idx]) + + input_mrope_positions = torch.tensor(input_mrope_positions_list, + dtype=torch.int32, + device='cpu').to( + 'hpu', non_blocking=True) # Pad the right side of input_mrope_positions by padded_batch_size - pad_size = padded_batch_size - input_mrope_positions.size(1) # noqa + pad_size = padded_batch_size - input_mrope_positions.size( + 1) # noqa if pad_size > 0: - input_mrope_positions = F.pad(input_mrope_positions, - (0, pad_size), - value=-1, mode='constant') + input_mrope_positions = F.pad(input_mrope_positions, + (0, pad_size), + value=-1, + mode='constant') # TOKEN_IDS. [batch, 1] token_ids = torch.zeros((padded_batch_size, 1), dtype=torch.int32) @@ -1658,7 +1654,7 @@ def _prepare_decode_inputs(self, num_decodes, # CPU<>HPU sync *should not* happen here. token_ids_device = _async_h2d_tensor_copy(token_ids, self.device) positions_device = input_mrope_positions if self.uses_mrope \ - else _async_h2d_tensor_copy(positions, self.device) + else _async_h2d_tensor_copy(positions, self.device) logits_indices_device = _async_h2d_tensor_copy(logits_indices, self.device) block_list_device = _async_h2d_tensor_copy(block_list, self.device) @@ -1771,8 +1767,7 @@ def _execute_model_generic(self, attn_metadata=trimmed_attn_metadata, kv_caches=kv_caches, inputs_embeds=inputs_embeds, - model_mm_kwargs=model_mm_kwargs - ) + model_mm_kwargs=model_mm_kwargs) # NOTE(kzawora): returning hidden_states is required in prompt logprobs # scenarios, as they will do logit processing on their own non_flattened_hidden_states = hidden_states @@ -2037,16 +2032,18 @@ def execute_model( logits_requests) in enumerate( zip(*shallow_tuple(prefill_data))): - inputs_embeds=None + inputs_embeds = None model_mm_kwargs = None if self.supports_mm_inputs: # Run the multimodal encoder if any. - with self.profiler.record_event('internal', 'prepare_input_encoders'): + with self.profiler.record_event('internal', + 'prepare_input_encoders'): self._execute_mm_encoder(scheduler_output, req_id) - mm_embeds = self._gather_mm_embeddings(scheduler_output, req_id) - #TODO: Only get embeddings for valid token_ids. Ignore token_ids[] - # This may require moving multimodal input preps into _prepare_inputs, + mm_embeds = self._gather_mm_embeddings( + scheduler_output, req_id) + # TODO: Only get embeddings for valid token_ids. Ignore token_ids[] # noqa E501 + # This may require moving multimodal input preps into _prepare_inputs, # noqa E501 # to avoid padding issues. inputs_embeds = self.model.get_input_embeddings( input_ids=token_ids, @@ -2055,9 +2052,9 @@ def execute_model( model_mm_kwargs = self._extract_mm_kwargs(scheduler_output) model_mm_kwargs = MultiModalKwargs.as_kwargs( - model_mm_kwargs, - device=self.device, - ) + model_mm_kwargs, + device=self.device, + ) self.event_start = self.profiler.get_timestamp_us() self.profiler.start("internal", "prefill") @@ -2073,8 +2070,8 @@ def execute_model( prefill_hidden_states_ts, logits_device = \ self._execute_model_generic( token_ids, position_ids, attn_metadata, logits_indices, - self.kv_caches, - inputs_embeds=inputs_embeds, + self.kv_caches, + inputs_embeds=inputs_embeds, model_mm_kwargs=model_mm_kwargs, warmup_mode=warmup_mode) htorch.core.mark_step() @@ -2510,7 +2507,7 @@ def warmup_scenario(self, htorch.core.mark_step() self.profiler.end() return None - + def log_warmup(self, phase, i, max_i, batch_size, seq_len, num_blocks): free_mem = format_bytes( HabanaMemoryProfiler.current_free_device_memory()) From 485eeb54f9538e21d2d8c859cf79b323b3d1f1aa Mon Sep 17 00:00:00 2001 From: attafosu Date: Fri, 22 Aug 2025 00:41:16 +0300 Subject: [PATCH 4/6] Remove redundant warmp_scenario Signed-off-by: attafosu --- vllm_gaudi/v1/worker/hpu_model_runner.py | 112 ----------------------- 1 file changed, 112 deletions(-) diff --git a/vllm_gaudi/v1/worker/hpu_model_runner.py b/vllm_gaudi/v1/worker/hpu_model_runner.py index 6cc1b83e..dbb1f2ff 100644 --- a/vllm_gaudi/v1/worker/hpu_model_runner.py +++ b/vllm_gaudi/v1/worker/hpu_model_runner.py @@ -2403,118 +2403,6 @@ def log_graph_warmup_summary(self, buckets, is_prompt, total_mem): f'used_mem:{format_bytes(total_mem)}') logger.info(msg) - def warmup_scenario(self, - batch_size, - seq_or_block, - num_blocks, - is_prompt, - kv_caches, - is_pt_profiler_run=True) -> None: - """Dummy warmup run for memory usage and graph compilation.""" - - query_seq_len = seq_or_block if is_prompt else 1 - input_ids = torch.zeros((batch_size, query_seq_len), - dtype=torch.int32, - device='cpu') - # Position ids shape different for mrope - pos_ids_shape = (3, batch_size * query_seq_len) if self.uses_mrope \ - else (batch_size, query_seq_len) - position_ids = torch.zeros(pos_ids_shape, - dtype=torch.int32, - device='cpu') - slot_mapping = torch.zeros((batch_size, query_seq_len), - dtype=torch.int64, - device='cpu') - - input_ids_device = _async_h2d_tensor_copy(input_ids, self.device) - position_ids_device = _async_h2d_tensor_copy(position_ids, self.device) - slot_mapping_device = _async_h2d_tensor_copy(slot_mapping, self.device) - - use_graphs = self._use_graphs() - phase = "prompt" if is_prompt else "decode" - scenario_name = ("warmup_" - f"{phase}_" - f"bs{batch_size}_" - f"seq{query_seq_len}_" - f"ctx{num_blocks}_" - f"graphs{'T' if use_graphs else 'F'}") - input_ids = torch.zeros((batch_size, query_seq_len), - dtype=torch.int32, - device='cpu') - position_ids = torch.zeros(pos_ids_shape, - dtype=torch.int32, - device='cpu') - slot_mapping = torch.zeros((batch_size, query_seq_len), - dtype=torch.int64, - device='cpu') - - input_ids_device = _async_h2d_tensor_copy(input_ids, self.device) - position_ids_device = _async_h2d_tensor_copy(position_ids, self.device) - slot_mapping_device = _async_h2d_tensor_copy(slot_mapping, self.device) - self.profiler.start('internal', scenario_name) - - times = 3 if use_graphs or is_pt_profiler_run else 1 - for time_index in range(times): - if is_prompt: - seq_lens = torch.zeros((batch_size), - dtype=torch.int32, - device='cpu') - seq_lens.fill_(seq_or_block) - seq_lens_device = _async_h2d_tensor_copy(seq_lens, self.device) - block_list_device = None - if num_blocks: - prefix_block_tables = torch.ones( - (batch_size, num_blocks), - dtype=torch.int32, - device='cpu') * self._PAD_BLOCK_ID - block_list_device = _async_h2d_tensor_copy( - prefix_block_tables.flatten(), self.device) - attn_metadata = \ - HPUAttentionMetadataV1.make_prefill_metadata( - attn_bias=None, - seq_lens_tensor=seq_lens_device, - context_lens_tensor=seq_lens_device, - slot_mapping=slot_mapping_device, - block_list=block_list_device, - block_size=self.block_size) - else: - block_tables = [ - x.tolist() - for x in np.array_split(np.arange(num_blocks), batch_size) - ] - block_list, block_groups, block_usage = \ - self.get_habana_paged_attn_buffers( - slot_mapping=slot_mapping, - block_tables=block_tables, - batch_size=batch_size) - block_list_device = _async_h2d_tensor_copy( - block_list, self.device) - block_usage_device = _async_h2d_tensor_copy( - block_usage, self.device) - block_groups_device = _async_h2d_tensor_copy( - block_groups, self.device) - attn_metadata = HPUAttentionMetadataV1.make_decode_metadata( - block_list=block_list_device, - block_usage=block_usage_device, - block_groups=block_groups_device, - num_decode_tokens=batch_size, - input_positions=None, - slot_mapping=slot_mapping_device, - block_size=self.block_size) - - logits_indices = torch.arange(0, batch_size, device='cpu') - logits_indices_device = _async_h2d_tensor_copy(logits_indices, - self.device) - # Dummy run. - htorch.core.mark_step() - _ = self._execute_model_generic(input_ids_device, position_ids_device, - attn_metadata, logits_indices_device, - kv_caches, True) - # TODO: do sampling on logits, warmup sampler and prefill joiner - htorch.core.mark_step() - self.profiler.end() - return None - def log_warmup(self, phase, i, max_i, batch_size, seq_len, num_blocks): free_mem = format_bytes( HabanaMemoryProfiler.current_free_device_memory()) From 5f439cb14af2c765900b6c529861ddfaeeb45d78 Mon Sep 17 00:00:00 2001 From: attafosu Date: Fri, 22 Aug 2025 01:56:07 +0300 Subject: [PATCH 5/6] Consolidate position_ids casts for mrope/non-mrope Signed-off-by: attafosu --- vllm_gaudi/v1/worker/hpu_model_runner.py | 43 +++++++++++------------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/vllm_gaudi/v1/worker/hpu_model_runner.py b/vllm_gaudi/v1/worker/hpu_model_runner.py index dbb1f2ff..019e63b6 100644 --- a/vllm_gaudi/v1/worker/hpu_model_runner.py +++ b/vllm_gaudi/v1/worker/hpu_model_runner.py @@ -203,6 +203,8 @@ def gather_list(input, indices, v): def _async_h2d_tensor(data, dtype, device='hpu'): + if isinstance(data, torch.Tensor): + return data.to(device=device, dtype=dtype, non_blocking=True) return torch.tensor(data, dtype=dtype, device='cpu').to(device, non_blocking=True) @@ -1429,10 +1431,17 @@ def _form_prefill_batch(self, contents): target_bs, target_seq, target_blocks = self._get_prompt_bucketing_fn()( query_lens, num_context_blocks) + # NOTE: If model does not support multimodal inputs, we pad here. + # For models with multimodal support, we may want to get embeddings + # for the valid tokens before padding. + # This would require getting multimodal input embeddings here as well + token_ids = self._align_and_pad(contents.token_ids, + (target_bs, target_seq), + itertools.repeat(-1)) # If the model uses M-RoPE, we need to fill # and pad the M-RoPE positions for the scheduled prefill tokens if self.uses_mrope: - mrope_token_positions = self._align_and_pad_mrope_positions( + token_positions = self._align_and_pad_mrope_positions( contents.req_ids, context_lens, query_lens, @@ -1440,15 +1449,6 @@ def _form_prefill_batch(self, contents): -1, ) - # NOTE: If model does not support multimodal inputs, we pad here. - # For models with multimodal support, we may want to get embeddings - # for the valid tokens before padding. - # This would require getting multimodal input embeddings here as well - token_ids = self._align_and_pad(contents.token_ids, - (target_bs, target_seq), - itertools.repeat(-1)) - if self.uses_mrope: - token_positions = mrope_token_positions else: token_positions = self._align_and_pad(token_positions, (target_bs, target_seq), @@ -1497,8 +1497,7 @@ def _form_prefill_batch(self, contents): query_lens = _async_h2d_tensor(query_lens, torch.int32) token_ids = _async_h2d_tensor(token_ids, torch.int32) - if not self.uses_mrope: - token_positions = _async_h2d_tensor(token_positions, torch.int32) + token_positions = _async_h2d_tensor(token_positions, torch.int32) token_slots = _async_h2d_tensor(token_slots, torch.int64) logits_indices = _async_h2d_tensor(logits_indices, torch.int32) context_lens = _async_h2d_tensor(context_lens, torch.int32) @@ -1599,19 +1598,16 @@ def _prepare_decode_inputs(self, num_decodes, for idx in range(3): input_mrope_positions_list[idx].extend(pos_for_mrope[idx]) - input_mrope_positions = torch.tensor(input_mrope_positions_list, - dtype=torch.int32, - device='cpu').to( - 'hpu', non_blocking=True) + positions = torch.tensor(input_mrope_positions_list, + dtype=torch.int32, + device='cpu') # Pad the right side of input_mrope_positions by padded_batch_size - pad_size = padded_batch_size - input_mrope_positions.size( - 1) # noqa + pad_size = padded_batch_size - positions.size(1) if pad_size > 0: - input_mrope_positions = F.pad(input_mrope_positions, - (0, pad_size), - value=-1, - mode='constant') + positions = F.pad(positions, (0, pad_size), + value=-1, + mode='constant') # TOKEN_IDS. [batch, 1] token_ids = torch.zeros((padded_batch_size, 1), dtype=torch.int32) @@ -1660,8 +1656,7 @@ def _prepare_decode_inputs(self, num_decodes, # CPU<>HPU sync *should not* happen here. token_ids_device = _async_h2d_tensor_copy(token_ids, self.device) - positions_device = input_mrope_positions if self.uses_mrope \ - else _async_h2d_tensor_copy(positions, self.device) + positions_device = _async_h2d_tensor_copy(positions, self.device) logits_indices_device = _async_h2d_tensor_copy(logits_indices, self.device) block_list_device = _async_h2d_tensor_copy(block_list, self.device) From 2617898baf0e0eca3740839ca8b4e30216282322 Mon Sep 17 00:00:00 2001 From: attafosu Date: Fri, 22 Aug 2025 02:16:57 +0300 Subject: [PATCH 6/6] Clean up mrope tensors aux Signed-off-by: attafosu --- vllm_gaudi/v1/worker/hpu_model_runner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vllm_gaudi/v1/worker/hpu_model_runner.py b/vllm_gaudi/v1/worker/hpu_model_runner.py index 019e63b6..66b6707f 100644 --- a/vllm_gaudi/v1/worker/hpu_model_runner.py +++ b/vllm_gaudi/v1/worker/hpu_model_runner.py @@ -203,8 +203,6 @@ def gather_list(input, indices, v): def _async_h2d_tensor(data, dtype, device='hpu'): - if isinstance(data, torch.Tensor): - return data.to(device=device, dtype=dtype, non_blocking=True) return torch.tensor(data, dtype=dtype, device='cpu').to(device, non_blocking=True) @@ -1269,7 +1267,7 @@ def _align_and_pad_mrope_positions(self, req_ids: list[str], mrope_position_tensor = torch.full(out_shape, padding_gen, dtype=torch.int32, - device='hpu') + device='cpu') dst_start = 0 dst_end = dst_start for b_idx, req_id in enumerate(req_ids):