diff --git a/container/Dockerfile.vllm b/container/Dockerfile.vllm index a4628a0d29..d93509a2b8 100644 --- a/container/Dockerfile.vllm +++ b/container/Dockerfile.vllm @@ -12,23 +12,22 @@ ARG RELEASE_BUILD ARG ENABLE_KVBM=false ARG RUNTIME_IMAGE="nvcr.io/nvidia/cuda" ARG RUNTIME_IMAGE_TAG="12.8.1-runtime-ubuntu24.04" +ARG CUDA_VERSION="12.8" # Make sure to update the dependency version in pyproject.toml when updating this -ARG VLLM_REF="1da94e673c257373280026f75ceb4effac80e892" # from v0.10.1.1 +ARG VLLM_REF="v0.10.2" +# FlashInfer only respected when building vLLM from source, ie when VLLM_REF does not start with 'v' or for arm64 builds +ARG FLASHINF_REF="v0.3.0" ARG TORCH_BACKEND="cu128" +# If left blank, then we will fallback to vLLM defaults +ARG DEEPGEMM_REF="" + # sccache configuration - inherit from base build ARG USE_SCCACHE ARG SCCACHE_BUCKET="" ARG SCCACHE_REGION="" -# Match 0.10.1.1 vLLM release -# https://github.com/vllm-project/vllm/releases/tag/v0.10.1.1 -# Pinned to commit before https://github.com/deepseek-ai/DeepGEMM/pull/112 for DeepGEMM which seems to break on H100: -# "RuntimeError: Failed: CUDA runtime error csrc/jit/kernel_runtime.hpp:108 '98'" -ARG DEEPGEMM_REF="f85ec64" -ARG FLASHINF_REF="v0.2.11" - # Define general architecture ARGs for supporting both x86 and aarch64 builds. # ARCH: Used for package suffixes (e.g., amd64, arm64) # ARCH_ALT: Used for Rust targets, manylinux suffix (e.g., x86_64, aarch64) @@ -108,6 +107,7 @@ ARG VLLM_GIT_URL ARG DEEPGEMM_REF ARG FLASHINF_REF ARG TORCH_BACKEND +ARG CUDA_VERSION ARG MAX_JOBS=16 ENV MAX_JOBS=$MAX_JOBS @@ -138,18 +138,15 @@ RUN --mount=type=bind,source=./container/deps/,target=/tmp/deps \ --mount=type=cache,target=/root/.cache/uv \ --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ - # TODO - split vllm, DeepEP, DeepGeMM, PPLX installs - # Should be able to select how you want your build to go cp /tmp/deps/vllm/install_vllm.sh /tmp/install_vllm.sh && \ chmod +x /tmp/install_vllm.sh && \ - /tmp/install_vllm.sh --editable --vllm-ref $VLLM_REF --max-jobs $MAX_JOBS --arch $ARCH --installation-dir /opt --deepgemm-ref $DEEPGEMM_REF --flashinf-ref $FLASHINF_REF --torch-backend $TORCH_BACKEND && \ + /tmp/install_vllm.sh --editable --vllm-ref $VLLM_REF --max-jobs $MAX_JOBS --arch $ARCH --installation-dir /opt ${DEEPGEMM_REF:+--deepgemm-ref "$DEEPGEMM_REF"} ${FLASHINF_REF:+--flashinf-ref "$FLASHINF_REF"} --torch-backend $TORCH_BACKEND --cuda-version $CUDA_VERSION && \ /tmp/use-sccache.sh show-stats "vLLM"; ENV LD_LIBRARY_PATH=\ /opt/vllm/tools/ep_kernels/ep_kernels_workspace/nvshmem_install/lib:\ $LD_LIBRARY_PATH - ################################################## ########## Runtime Image ######################## ################################################## @@ -362,4 +359,4 @@ RUN uv pip install maturin[patchelf] ENV PYTHONPATH=${WORKSPACE_DIR}/components/metrics/src:${WORKSPACE_DIR}/components/frontend/src:${WORKSPACE_DIR}/components/planner/src:${WORKSPACE_DIR}/components/backends/mocker/src:${WORKSPACE_DIR}/components/backends/trtllm/src:${WORKSPACE_DIR}/components/backends/vllm/src:${WORKSPACE_DIR}/components/backends/sglang/src:${WORKSPACE_DIR}/components/backends/llama_cpp/src ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"] -CMD [] +CMD [] \ No newline at end of file diff --git a/container/deps/vllm/install_vllm.sh b/container/deps/vllm/install_vllm.sh index 57556dca59..f61ccd9a0a 100755 --- a/container/deps/vllm/install_vllm.sh +++ b/container/deps/vllm/install_vllm.sh @@ -1,44 +1,35 @@ #!/usr/bin/env bash # SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Install vllm and wideEP kernels from a specific git reference + +# This script is used to install vLLM and its dependencies +# If installing vLLM from a release tag, we will use pip to manage the install +# Otherwise, we will use git to checkout the vLLM source code and build it from source. +# The dependencies are installed in the following order: +# 1. vLLM +# 2. LMCache +# 3. DeepGEMM +# 4. EP kernels set -euo pipefail -# Parse arguments -EDITABLE=true -# REMOVE nvshmem cherry-pick when moving to next version of vllm -VLLM_REF="1da94e673c257373280026f75ceb4effac80e892" # from v0.10.1.1 -# When updating above VLLM_REF make sure precompiled wheel file URL is correct. Run this command: -# aws s3 ls s3://vllm-wheels/${VLLM_REF}/ --region us-west-2 --no-sign-request -VLLM_PRECOMPILED_WHEEL_LOCATION="https://vllm-wheels.s3.us-west-2.amazonaws.com/${VLLM_REF}/vllm-0.10.1.1-cp38-abi3-manylinux1_x86_64.whl" -VLLM_GIT_URL="https://github.com/vllm-project/vllm.git" +VLLM_REF="v0.10.2" + +# Basic Configurations +ARCH=$(uname -m) MAX_JOBS=16 INSTALLATION_DIR=/tmp -ARCH=$(uname -m) -DEEPGEMM_REF="f85ec64" -FLASHINF_REF="v0.2.11" + +# VLLM and Dependency Configurations TORCH_BACKEND="cu128" +TORCH_CUDA_ARCH_LIST="9.0;10.0" # For EP Kernels +DEEPGEMM_REF="" +CUDA_VERSION="12.8" # For DEEPGEMM -# Convert x86_64 to amd64 for consistency with Docker ARG -if [ "$ARCH" = "x86_64" ]; then - ARCH="amd64" -elif [ "$ARCH" = "aarch64" ]; then - ARCH="arm64" -fi +# These flags are applicable when installing vLLM from source code +EDITABLE=true +VLLM_GIT_URL="https://github.com/vllm-project/vllm.git" +FLASHINF_REF="v0.3.0" while [[ $# -gt 0 ]]; do case $1 in @@ -82,8 +73,16 @@ while [[ $# -gt 0 ]]; do TORCH_BACKEND="$2" shift 2 ;; + --torch-cuda-arch-list) + TORCH_CUDA_ARCH_LIST="$2" + shift 2 + ;; + --cuda-version) + CUDA_VERSION="$2" + shift 2 + ;; -h|--help) - echo "Usage: $0 [--editable|--no-editable] [--vllm-ref REF] [--max-jobs NUM] [--arch ARCH] [--deepgemm-ref REF] [--flashinf-ref REF] [--torch-backend BACKEND]" + echo "Usage: $0 [--editable|--no-editable] [--vllm-ref REF] [--max-jobs NUM] [--arch ARCH] [--deepgemm-ref REF] [--flashinf-ref REF] [--torch-backend BACKEND] [--torch-cuda-arch-list LIST] [--cuda-version VERSION]" echo "Options:" echo " --editable Install vllm in editable mode (default)" echo " --no-editable Install vllm in non-editable mode" @@ -94,6 +93,8 @@ while [[ $# -gt 0 ]]; do echo " --deepgemm-ref REF Git reference for DeepGEMM (default: ${DEEPGEMM_REF})" echo " --flashinf-ref REF Git reference for Flash Infer (default: ${FLASHINF_REF})" echo " --torch-backend BACKEND Torch backend to use (default: ${TORCH_BACKEND})" + echo " --torch-cuda-arch-list LIST CUDA architectures to compile for (default: ${TORCH_CUDA_ARCH_LIST})" + echo " --cuda-version VERSION CUDA version to use (default: ${CUDA_VERSION})" exit 0 ;; *) @@ -103,105 +104,143 @@ while [[ $# -gt 0 ]]; do esac done +# Convert x86_64 to amd64 for consistency with Docker ARG +if [ "$ARCH" = "x86_64" ]; then + ARCH="amd64" +elif [ "$ARCH" = "aarch64" ]; then + ARCH="arm64" +fi + export MAX_JOBS=$MAX_JOBS export CUDA_HOME=/usr/local/cuda -echo "Installing vllm with the following configuration:" -echo " EDITABLE: $EDITABLE" -echo " VLLM_REF: $VLLM_REF" -echo " MAX_JOBS: $MAX_JOBS" -echo " ARCH: $ARCH" -echo " TORCH_BACKEND: $TORCH_BACKEND" - -# Install common dependencies +echo "=== Installing prerequisites ===" uv pip install pip cuda-python -if [ "$ARCH" = "amd64" ]; then - # LMCache installation currently fails on arm64 due to CUDA dependency issues: - # OSError: CUDA_HOME environment variable is not set. Please set it to your CUDA install root. - # TODO: Re-enable for arm64 after verifying lmcache compatibility and resolving the build issue. - uv pip install lmcache==0.3.3 -fi +echo "\n=== Configuration Summary ===" +echo " VLLM_REF=$VLLM_REF | EDITABLE=$EDITABLE | ARCH=$ARCH" +echo " MAX_JOBS=$MAX_JOBS | TORCH_BACKEND=$TORCH_BACKEND | CUDA_VERSION=$CUDA_VERSION" +echo " TORCH_CUDA_ARCH_LIST=$TORCH_CUDA_ARCH_LIST" +echo " DEEPGEMM_REF=$DEEPGEMM_REF | FLASHINF_REF=$FLASHINF_REF" +echo " INSTALLATION_DIR=$INSTALLATION_DIR | VLLM_GIT_URL=$VLLM_GIT_URL" -# Create vllm directory and clone -mkdir -p $INSTALLATION_DIR +echo "\n=== Cloning vLLM repository ===" +# We need to clone to install dependencies cd $INSTALLATION_DIR git clone $VLLM_GIT_URL vllm cd vllm git checkout $VLLM_REF -# nvshmem fix - cherry-pick commit pinning pplx version -# https://github.com/ai-dynamo/dynamo/actions/runs/17907241473/job/50910654042?pr=2969#step:8:280 -# remove when moving to next version of vllm -# Configure git user for cherry-pick operation -GIT_COMMITTER_NAME="Container Build" GIT_COMMITTER_EMAIL="container@buildkitsandbox.local" git cherry-pick 906e461ed6ddccd3cc7b68fa72048d2d3fcbd72c - -if [ "$ARCH" = "arm64" ]; then - echo "Installing vllm for ARM64 architecture" - - # Try to install specific PyTorch version first, fallback to latest nightly - echo "Attempting to install pinned PyTorch nightly versions..." - if ! uv pip install torch==2.7.1+cu128 torchaudio==2.7.1 torchvision==0.22.1 --index-url https://download.pytorch.org/whl; then - echo "Pinned versions failed" - exit 1 - # uv pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu128 - fi - python use_existing_torch.py - uv pip install -r requirements/build.txt +# TODO remove in future vLLM release, re-instate ignore torch script +# https://github.com/vllm-project/vllm/pull/24729 +GIT_COMMITTER_NAME="Container Build" GIT_COMMITTER_EMAIL="container@buildkitsandbox.local" git cherry-pick 740f064 + + +echo "\n=== Installing vLLM & FlashInfer ===" + +if [[ $VLLM_REF =~ ^v ]] && [ "$ARCH" = "amd64" ]; then + # VLLM_REF starts with 'v' and amd64 - use pip install with version tag + echo "Installing vLLM $VLLM_REF from PyPI..." + + uv pip install vllm[flashinfer]==$VLLM_REF --torch-backend=$TORCH_BACKEND - if [ "$EDITABLE" = "true" ]; then - MAX_JOBS=${MAX_JOBS} uv pip install --no-build-isolation -e . -v - else - MAX_JOBS=${MAX_JOBS} uv pip install --no-build-isolation . -v - fi else - echo "Installing vllm for AMD64 architecture" + # VLLM_REF does not start with 'v' or amd64 - use git checkout path + if [ "$ARCH" = "arm64" ]; then - echo "Attempting to install pinned OpenAI version..." - if ! uv pip install openai==1.99.9; then - echo "Pinned versions failed" - exit 1 - fi + # torch 2.8.0 doesn't have a aarch wheel for cu128, vLLM uses torch 2.8.0 nightly wheel builds to compile its aarch wheel against + # nightly can be unstable so we will not use it here + # for now we will use torch 2.7.1+cu128 but this requires a recompilation from source + + echo "Building vLLM from source for ARM64 architecture..." - export VLLM_PRECOMPILED_WHEEL_LOCATION="${VLLM_PRECOMPILED_WHEEL_LOCATION}" + # Try to install specific PyTorch version first + echo "Attempting to install pinned PyTorch nightly versions..." + if ! uv pip install torch==2.7.1+cu128 torchaudio==2.7.1 torchvision==0.22.1 --index-url https://download.pytorch.org/whl/cu128; then + echo "Pinned versions failed" + exit 1 + fi + + # Create constraints file to pin all PyTorch-related versions + echo "Creating constraints file to preserve PyTorch ecosystem versions..." + TORCH_VERSION=$(python -c "import torch; print(torch.__version__)") + TORCHAUDIO_VERSION=$(python -c "import torchaudio; print(torchaudio.__version__)") + TORCHVISION_VERSION=$(python -c "import torchvision; print(torchvision.__version__)") + + rm -rf /tmp/torch_constraints.txt + echo "torch==$TORCH_VERSION" > /tmp/torch_constraints.txt + echo "torchaudio==$TORCHAUDIO_VERSION" >> /tmp/torch_constraints.txt + echo "torchvision==$TORCHVISION_VERSION" >> /tmp/torch_constraints.txt + + echo "Pinned versions:" + echo " - torch==$TORCH_VERSION" + echo " - torchaudio==$TORCHAUDIO_VERSION" + echo " - torchvision==$TORCHVISION_VERSION" + + python use_existing_torch.py + uv pip install -c /tmp/torch_constraints.txt -r requirements/build.txt + + if [ "$EDITABLE" = "true" ]; then + MAX_JOBS=${MAX_JOBS} uv pip install --no-build-isolation -c /tmp/torch_constraints.txt -e . -v + else + MAX_JOBS=${MAX_JOBS} uv pip install --no-build-isolation -c /tmp/torch_constraints.txt . -v + fi + + echo "\n=== Installing FlashInfer from source ===" + cd $INSTALLATION_DIR + git clone https://github.com/flashinfer-ai/flashinfer.git --recursive + cd flashinfer + git checkout $FLASHINF_REF + + # Install with constraints to prevent PyTorch upgrade + uv pip install -v --no-build-isolation -c /tmp/torch_constraints.txt . - if [ "$EDITABLE" = "true" ]; then - uv pip install -e . --torch-backend=$TORCH_BACKEND else - uv pip install . --torch-backend=$TORCH_BACKEND + echo "Building vLLM from source for AMD64 architecture..." + + # When updating above VLLM_REF make sure precompiled wheel file URL is correct. Run this command: + # aws s3 ls s3://vllm-wheels/${VLLM_REF}/ --region us-west-2 --no-sign-request + export VLLM_PRECOMPILED_WHEEL_LOCATION="https://vllm-wheels.s3.us-west-2.amazonaws.com/${VLLM_REF}/vllm-0.10.2-cp38-abi3-manylinux1_x86_64.whl" + + if [ "$EDITABLE" = "true" ]; then + uv pip install -e . --torch-backend=$TORCH_BACKEND + else + uv pip install . --torch-backend=$TORCH_BACKEND + fi + + echo "\n=== Installing FlashInfer from PyPI ===" + uv pip install flashinfer-python==$FLASHINF_REF + fi fi -# Install ep_kernels and DeepGEMM -echo "Installing ep_kernels and DeepGEMM" -cd tools/ep_kernels -TORCH_CUDA_ARCH_LIST="9.0;10.0" bash install_python_libraries.sh # These libraries aren't pinned. -cd ep_kernels_workspace -git clone https://github.com/deepseek-ai/DeepGEMM.git -cd DeepGEMM -git checkout $DEEPGEMM_REF # Pin Version - -sed -i 's|git@github.com:|https://github.com/|g' .gitmodules -git submodule sync --recursive -git submodule update --init --recursive +echo "✓ vLLM installation completed" -# command for 03d0be3 -python setup.py install +echo "\n=== Installing LMCache ===" +if [ "$ARCH" = "amd64" ]; then + # LMCache installation currently fails on arm64 due to CUDA dependency issues: + # OSError: CUDA_HOME environment variable is not set. Please set it to your CUDA install root. + # TODO: Re-enable for arm64 after verifying lmcache compatibility and resolving the build issue. -# new install command for post 03d0be3 -# cat install.sh -# ./install.sh + # Alec: Likely lmcache was compiled witha different version of torch and need to install it from source for arm64 + uv pip install lmcache==0.3.3 + echo "✓ LMCache installed" +else + echo "⚠ Skipping LMCache on ARM64 (compatibility issues)" +fi +echo "\n=== Installing DeepGEMM ===" +cd $INSTALLATION_DIR/vllm/tools -# Install Flash Infer -if [ "$ARCH" = "arm64" ]; then - uv pip install flashinfer-python +if [ -n "$DEEPGEMM_REF" ]; then + bash install_deepgemm.sh --cuda-version "${CUDA_VERSION}" --ref "$DEEPGEMM_REF" else - cd $INSTALLATION_DIR - git clone https://github.com/flashinfer-ai/flashinfer.git --recursive - cd flashinfer - git checkout $FLASHINF_REF - uv pip install -v --no-build-isolation . + bash install_deepgemm.sh --cuda-version "${CUDA_VERSION}" fi +echo "✓ DeepGEMM installation completed" + +echo "\n=== Installing EP Kernels (PPLX and DeepEP) ===" +cd ep_kernels/ +TORCH_CUDA_ARCH_LIST="$TORCH_CUDA_ARCH_LIST" bash install_python_libraries.sh -echo "vllm installation completed successfully" +echo "\n✅ All installations completed successfully!" \ No newline at end of file diff --git a/examples/multimodal/utils/chat_processor.py b/examples/multimodal/utils/chat_processor.py index 5327d6ea83..fe8d95dc81 100644 --- a/examples/multimodal/utils/chat_processor.py +++ b/examples/multimodal/utils/chat_processor.py @@ -162,7 +162,6 @@ async def preprocess(self, raw_request: ChatCompletionRequest) -> PreprocessResu documents=request.documents, chat_template_kwargs=request.chat_template_kwargs, tool_parser=self.openai_serving.tool_parser, - truncate_prompt_tokens=request.truncate_prompt_tokens, add_special_tokens=request.add_special_tokens, ) @@ -288,7 +287,6 @@ async def preprocess(self, raw_request: CompletionRequest) -> PreprocessResult: request, self.tokenizer, input_or_inputs=request.prompt, - truncate_prompt_tokens=request.truncate_prompt_tokens, add_special_tokens=request.add_special_tokens, ) diff --git a/examples/multimodal/utils/protocol.py b/examples/multimodal/utils/protocol.py index f9bf8d20d6..a2caee1efc 100644 --- a/examples/multimodal/utils/protocol.py +++ b/examples/multimodal/utils/protocol.py @@ -22,6 +22,7 @@ from pydantic_core import core_schema from typing_extensions import NotRequired from vllm.inputs.data import TokensPrompt +from vllm.multimodal.inputs import MultiModalUUIDDict # noqa: F401 from vllm.outputs import CompletionOutput from vllm.sampling_params import SamplingParams from vllm.sequence import PromptLogprobs, RequestMetrics diff --git a/lib/bindings/python/rust/llm/kv.rs b/lib/bindings/python/rust/llm/kv.rs index 51957868c7..d83c45e017 100644 --- a/lib/bindings/python/rust/llm/kv.rs +++ b/lib/bindings/python/rust/llm/kv.rs @@ -272,6 +272,7 @@ impl KvEventPublisher { lora_id: u64, parent_hash: Option, ) -> PyResult<()> { + let block_hashes_u64: Vec = block_hashes.iter().map(|&h| h as u64).collect(); let event = KvCacheEvent { event_id, data: KvCacheEventData::Stored(KvCacheStoreData { @@ -280,7 +281,7 @@ impl KvEventPublisher { self.kv_block_size as u32, &token_ids, &num_block_tokens, - &block_hashes, + &block_hashes_u64, lora_id, &self.warning_count, ), @@ -292,8 +293,8 @@ impl KvEventPublisher { fn publish_removed(&self, _py: Python, event_id: u64, block_hashes: Vec) -> PyResult<()> { let block_hashes: Vec = block_hashes - .iter() - .map(|&h| ExternalSequenceBlockHash::from(h)) + .into_iter() + .map(ExternalSequenceBlockHash::from) .collect(); let event = KvCacheEvent { event_id, diff --git a/lib/llm/src/kv_router/indexer.rs b/lib/llm/src/kv_router/indexer.rs index 10ff3d3b3e..e780dbbd43 100644 --- a/lib/llm/src/kv_router/indexer.rs +++ b/lib/llm/src/kv_router/indexer.rs @@ -277,10 +277,16 @@ impl RadixTree { let mut scores = OverlapScores::new(); let mut current = self.root.clone(); let now = Instant::now(); - for block_hash in sequence { + + tracing::trace!( + "RadixTree::find_matches: looking for sequence={:?}", + sequence.iter().map(|h| h.0).collect::>() + ); + + for (idx, block_hash) in sequence.iter().enumerate() { let next_block = { let current_borrow = current.borrow(); - current_borrow.children.get(&block_hash).cloned() + current_borrow.children.get(block_hash).cloned() }; if let Some(block) = next_block { scores.update_scores(&block.borrow().workers); @@ -305,10 +311,17 @@ impl RadixTree { current = block; } else { + tracing::trace!( + "RadixTree::find_matches: block not found at index {} for hash {}", + idx, + block_hash.0 + ); break; } } + tracing::trace!("RadixTree::find_matches: final scores={:?}", scores.scores); + scores } @@ -320,7 +333,7 @@ impl RadixTree { pub fn apply_event(&mut self, event: RouterEvent) -> Result<(), KvCacheEventError> { let (worker_id, event) = (event.worker_id, event.event); let (id, op) = (event.event_id, event.data); - tracing::trace!(id, "Store operation: {:?}", op); + tracing::trace!(id, "RadixTree::apply_event: Store operation: {:?}", op); let worker_lookup = self.lookup.entry(worker_id).or_default(); diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 03fb843ce8..98095392c3 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -28,6 +28,8 @@ use tokio_util::sync::CancellationToken; use rmp_serde as rmps; use serde::Deserialize; use serde::Serialize; +use serde::de::{self, Deserializer, IgnoredAny, MapAccess, SeqAccess, Visitor}; +use std::fmt; use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use zeromq::{Socket, SocketRecv, SubSocket}; @@ -363,26 +365,34 @@ fn convert_event( token_ids, block_size, lora_id, + .. } => { let num_block_tokens = vec![block_size as u64; block_hashes.len()]; + let block_hashes_u64: Vec = block_hashes + .into_iter() + .map(BlockHashValue::into_u64) + .collect(); KvCacheEvent { event_id, data: KvCacheEventData::Stored(KvCacheStoreData { - parent_hash: parent_block_hash.map(ExternalSequenceBlockHash::from), + parent_hash: parent_block_hash + .map(BlockHashValue::into_u64) + .map(ExternalSequenceBlockHash::from), blocks: create_stored_blocks( kv_block_size, &token_ids, &num_block_tokens, - &block_hashes, + &block_hashes_u64, lora_id.unwrap_or(0), warning_count, ), }), } } - RawKvEvent::BlockRemoved { block_hashes } => { + RawKvEvent::BlockRemoved { block_hashes, .. } => { let hashes = block_hashes .into_iter() + .map(BlockHashValue::into_u64) .map(ExternalSequenceBlockHash::from) .collect(); KvCacheEvent { @@ -401,11 +411,18 @@ fn convert_event( pub fn create_stored_block_from_parts( kv_block_size: u32, - block_hash: i64, + block_hash: u64, token_ids: &[u32], _lora_id: u64, ) -> KvCacheStoredBlockData { let tokens_hash = compute_block_hash_for_seq(token_ids, kv_block_size)[0]; + tracing::trace!( + "Creating stored block: external_block_hash={}, tokens_hash={}, token_ids={:?}, kv_block_size={}", + block_hash, + tokens_hash.0, + token_ids, + kv_block_size + ); KvCacheStoredBlockData { block_hash: ExternalSequenceBlockHash::from(block_hash), tokens_hash, @@ -416,7 +433,7 @@ pub fn create_stored_blocks( kv_block_size: u32, token_ids: &[u32], num_block_tokens: &[u64], - block_hashes: &[i64], + block_hashes: &[u64], lora_id: u64, warning_count: &Arc, ) -> Vec { @@ -460,22 +477,206 @@ struct KvEventBatch { data_parallel_rank: u32, // we are ignoring this for now } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[serde(untagged)] +enum BlockHashValue { + Signed(i64), + Unsigned(u64), +} + +impl BlockHashValue { + fn into_u64(self) -> u64 { + match self { + BlockHashValue::Signed(v) => v as u64, + BlockHashValue::Unsigned(v) => v, + } + } +} + +#[derive(Debug, Serialize)] #[serde(tag = "type")] // msgspec encodes variant tag as a string when `tag=True` enum RawKvEvent { BlockStored { - block_hashes: Vec, - parent_block_hash: Option, + /// Block hashes may be emitted as either signed or unsigned 64-bit values. + /// We normalize them to `u64` while deserializing to support both producers. + block_hashes: Vec, + parent_block_hash: Option, token_ids: Vec, block_size: usize, lora_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + medium: Option, }, BlockRemoved { - block_hashes: Vec, + block_hashes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + medium: Option, }, AllBlocksCleared, } +/// Our producers use msgspec with `tag=True` and `array_like=True`, which +/// encodes each event as either a tagged map or a tagged tuple. To be tolerant of +/// additional fields that may be appended in the future, we implement a custom +/// deserializer that ignores unknown keys and any extra positional elements. +/// +/// This keeps us compatible with older payloads while safely +/// accepting newer ones that include extra metadata. +impl<'de> Deserialize<'de> for RawKvEvent { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_any(RawKvEventVisitor) + } +} + +struct RawKvEventVisitor; + +impl<'de> Visitor<'de> for RawKvEventVisitor { + type Value = RawKvEvent; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a kv event encoded as a tagged map or sequence") + } + + fn visit_map(self, mut map: A) -> Result + where + A: MapAccess<'de>, + { + let mut event_type: Option = None; + let mut block_hashes: Option> = None; + let mut parent_block_hash: Option> = None; + let mut token_ids: Option> = None; + let mut block_size: Option = None; + let mut lora_id: Option> = None; + let mut medium: Option> = None; + + while let Some(key) = map.next_key::()? { + match key.as_str() { + "type" => { + event_type = Some(map.next_value()?); + } + "block_hashes" => { + block_hashes = Some(map.next_value()?); + } + "parent_block_hash" => { + parent_block_hash = Some(map.next_value()?); + } + "token_ids" => { + token_ids = Some(map.next_value()?); + } + "block_size" => { + block_size = Some(map.next_value()?); + } + "lora_id" => { + lora_id = Some(map.next_value()?); + } + "medium" => { + medium = Some(map.next_value()?); + } + _ => { + map.next_value::()?; + } + } + } + + match event_type.as_deref() { + Some("BlockStored") => { + let block_hashes = + block_hashes.ok_or_else(|| de::Error::missing_field("block_hashes"))?; + let token_ids = token_ids.ok_or_else(|| de::Error::missing_field("token_ids"))?; + let block_size = + block_size.ok_or_else(|| de::Error::missing_field("block_size"))?; + Ok(RawKvEvent::BlockStored { + block_hashes, + parent_block_hash: parent_block_hash.unwrap_or(None), + token_ids, + block_size, + lora_id: lora_id.unwrap_or(None), + medium: medium.unwrap_or(None), + }) + } + Some("BlockRemoved") => { + let block_hashes = + block_hashes.ok_or_else(|| de::Error::missing_field("block_hashes"))?; + Ok(RawKvEvent::BlockRemoved { + block_hashes, + medium: medium.unwrap_or(None), + }) + } + Some("AllBlocksCleared") => Ok(RawKvEvent::AllBlocksCleared), + Some(other) => Err(de::Error::unknown_variant( + other, + &["BlockStored", "BlockRemoved", "AllBlocksCleared"], + )), + None => Err(de::Error::missing_field("type")), + } + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let tag: Option = seq.next_element()?; + let Some(tag) = tag else { + return Err(de::Error::invalid_length( + 0, + &"sequence must start with event tag", + )); + }; + + match tag.as_str() { + "BlockStored" => { + let block_hashes: Vec = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(1, &"missing block_hashes"))?; + let parent_block_hash: Option = seq.next_element()?.unwrap_or(None); + let token_ids: Vec = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(3, &"missing token_ids"))?; + let block_size: usize = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(4, &"missing block_size"))?; + let lora_id: Option = seq.next_element()?.unwrap_or(None); + let medium: Option = seq.next_element()?.unwrap_or(None); + + while seq.next_element::()?.is_some() {} + + Ok(RawKvEvent::BlockStored { + block_hashes, + parent_block_hash, + token_ids, + block_size, + lora_id, + medium, + }) + } + "BlockRemoved" => { + let block_hashes: Vec = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(1, &"missing block_hashes"))?; + let medium: Option = seq.next_element()?.unwrap_or(None); + + while seq.next_element::()?.is_some() {} + + Ok(RawKvEvent::BlockRemoved { + block_hashes, + medium, + }) + } + "AllBlocksCleared" => { + while seq.next_element::()?.is_some() {} + Ok(RawKvEvent::AllBlocksCleared) + } + other => Err(de::Error::unknown_variant( + other, + &["BlockStored", "BlockRemoved", "AllBlocksCleared"], + )), + } + } +} + // ------------------------------------------------------------------------- // Metrics Publishers ------------------------------------------------------ // ------------------------------------------------------------------------- @@ -745,7 +946,7 @@ mod test_event_processing { let stored = create_stored_block_from_parts(kv_block_size, blk_hash, &token_ids, 0); - assert_eq!(stored.block_hash.0, blk_hash as u64); + assert_eq!(stored.block_hash.0, blk_hash); let expected_hash = compute_block_hash_for_seq(&token_ids, 4)[0]; assert_eq!(stored.tokens_hash, expected_hash); } @@ -759,7 +960,7 @@ mod test_event_processing { // two blocks, each of size 4 let token_ids = vec![1, 2, 3, 4, 5, 6, 7, 8]; let num_block_tokens = vec![4_u64, 4_u64]; - let block_hashes = vec![111_i64, 222_i64]; + let block_hashes = vec![111_u64, 222_u64]; let blocks = create_stored_blocks( kv_block_size, @@ -781,7 +982,7 @@ mod test_event_processing { // second block is the wrong size let token_ids = vec![1, 2, 3, 4, 5, 6, 7]; let num_block_tokens = vec![4_u64, 3_u64]; - let block_hashes = vec![111_i64, 222_i64]; + let block_hashes = vec![111_u64, 222_u64]; let warning_count = Arc::new(AtomicU32::new(0)); let blocks = create_stored_blocks( @@ -805,11 +1006,12 @@ mod test_event_processing { fn test_convert_event_block_stored() { let kv_block_size = 4; let raw_evt = RawKvEvent::BlockStored { - block_hashes: vec![10, 11], - parent_block_hash: Some(99), + block_hashes: vec![BlockHashValue::Unsigned(10), BlockHashValue::Unsigned(11)], + parent_block_hash: Some(BlockHashValue::Unsigned(99)), token_ids: vec![1, 2, 3, 4, 5, 6, 7, 8], block_size: 4, lora_id: Some(0), + medium: None, }; let out = convert_event(raw_evt, 42, kv_block_size, &Arc::new(AtomicU32::new(0))); @@ -820,7 +1022,8 @@ mod test_event_processing { fn test_convert_event_block_removed() { let kv_block_size = 4; let raw_evt = RawKvEvent::BlockRemoved { - block_hashes: vec![123, 456], + block_hashes: vec![BlockHashValue::Unsigned(123), BlockHashValue::Signed(456)], + medium: None, }; let out = convert_event(raw_evt, 7, kv_block_size, &Arc::new(AtomicU32::new(0))); @@ -965,11 +1168,12 @@ mod tests_startup_helpers { let seq: u64 = 77; let events = vec![RawKvEvent::BlockStored { - block_hashes: vec![42], + block_hashes: vec![BlockHashValue::Unsigned(42)], parent_block_hash: None, token_ids: vec![0, 1, 2, 3], block_size: 4, lora_id: None, + medium: None, }]; let batch = KvEventBatch { diff --git a/pyproject.toml b/pyproject.toml index 316608a51e..38241de79f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ trtllm =[ vllm = [ "uvloop", "nixl<=0.4.1", - "vllm[flashinfer]==0.10.1.1", + "vllm[flashinfer]==0.10.2", ] sglang = [