diff --git a/lib/llm/src/kv_router.rs b/lib/llm/src/kv_router.rs index 74e42ef4e2..34da7eef70 100644 --- a/lib/llm/src/kv_router.rs +++ b/lib/llm/src/kv_router.rs @@ -313,69 +313,81 @@ impl AsyncEngine, ManyOut { // Extract context ID for request tracking let context_id = request.context().id().to_string(); - let (instance_id, overlap_amount) = self .chooser .find_best_match(&context_id, &request.token_ids) .await?; + let query_instance_id = request.has_annotation("query_instance_id"); + // Extract context information before moving the request + let stream_context = request.context().clone(); // Update the request with the estimated prefix hit blocks let (mut backend_input, context) = request.into_parts(); let isl = backend_input.token_ids.len(); backend_input.estimated_prefix_hit_num_blocks = Some(overlap_amount); let updated_request = context.map(|_| backend_input); + // if request has the annotation "query_instance_id", for example + // curl -d '{... ,"nvext": { "annotations": ["query_instance_id"]}}' + // request will not be routed to worker immediately + if query_instance_id { + let instance_id_str = instance_id.to_string(); + let response = + Annotated::from_annotation("worker_instance_id", &instance_id_str)?; + let stream = stream::iter(vec![response]); + Ok(ResponseStream::new(Box::pin(stream), stream_context)) + } else { + // Get the response stream from the worker + let mut response_stream = + self.inner.direct(updated_request, instance_id).await?; + + // Wrap the stream to track tokens + let stream_context = response_stream.context(); + let chooser = self.chooser.clone(); + let request_id = context_id.clone(); + let block_size = chooser.block_size() as usize; + + let wrapped_stream = Box::pin(async_stream::stream! { + let mut accumulated_tokens = Vec::new(); + let mut total_output_length = 0usize; + let mut last_block_index = (isl.saturating_sub(1)) / block_size; + let mut first_push_done = false; + + while let Some(item) = response_stream.next().await { + // Track tokens if they exist in the response + let Some(ref output) = item.data else { + yield item; + continue; + }; + if output.token_ids.is_empty() { + yield item; + continue; + } - // Get the response stream from the worker - let mut response_stream = self.inner.direct(updated_request, instance_id).await?; - - // Wrap the stream to track tokens - let stream_context = response_stream.context(); - let chooser = self.chooser.clone(); - let request_id = context_id.clone(); - let block_size = chooser.block_size() as usize; - - let wrapped_stream = Box::pin(async_stream::stream! { - let mut accumulated_tokens = Vec::new(); - let mut total_output_length = 0usize; - let mut last_block_index = (isl.saturating_sub(1)) / block_size; - let mut first_push_done = false; + // Add tokens to accumulator + accumulated_tokens.extend_from_slice(&output.token_ids); + total_output_length += output.token_ids.len(); + + // Always push for the first generated token (to mark prefill done) + // or when we've moved to a new block + let current_block_index = (isl + total_output_length).saturating_sub(1) / block_size; + let should_push = (!first_push_done && total_output_length >= 1) || + (first_push_done && current_block_index > last_block_index); + + if should_push { + chooser.push(&request_id, &accumulated_tokens).await; + accumulated_tokens.clear(); + last_block_index = current_block_index; + if !first_push_done { + first_push_done = true; + } + } - while let Some(item) = response_stream.next().await { - // Track tokens if they exist in the response - let Some(ref output) = item.data else { - yield item; - continue; - }; - if output.token_ids.is_empty() { yield item; - continue; } - // Add tokens to accumulator - accumulated_tokens.extend_from_slice(&output.token_ids); - total_output_length += output.token_ids.len(); - - // Always push for the first generated token (to mark prefill done) - // or when we've moved to a new block - let current_block_index = (isl + total_output_length).saturating_sub(1) / block_size; - let should_push = (!first_push_done && total_output_length >= 1) || - (first_push_done && current_block_index > last_block_index); - - if should_push { - chooser.push(&request_id, &accumulated_tokens).await; - accumulated_tokens.clear(); - last_block_index = current_block_index; - if !first_push_done { - first_push_done = true; - } - } - - yield item; - } - - chooser.free(&request_id).await; - }); - - Ok(ResponseStream::new(wrapped_stream, stream_context)) + chooser.free(&request_id).await; + }); + Ok(ResponseStream::new(wrapped_stream, stream_context)) + } } } } diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index d80f99246e..eec8367806 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -397,8 +397,8 @@ impl OpenAIPreprocessor { // Only set event if not already set to avoid overriding existing events (like errors) if response.event.is_none() { response.event = metrics_annotated.event; + response.comment = metrics_annotated.comment; } - response.comment = metrics_annotated.comment; } tracing::trace!(