Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,16 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
InstanceSource::Dynamic(_) => {
// Extract context ID for request tracking
let context_id = request.context().id().to_string();
let (instance_id, overlap_amount) = self
.chooser
.find_best_match(&context_id, &request.token_ids)
.await?;
let (instance_id, overlap_amount) = if let Some(id) = request.backend_instance_id {
// If instance_id is set, use it
(id, 0)
} else {
// Otherwise, find the best match
self.chooser
.find_best_match(&context_id, &request.token_ids)
.await?
};

let query_instance_id = request.has_annotation("query_instance_id");
// Extract context information before moving the request
let stream_context = request.context().clone();
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ mod tests {
mdc_sum: None,
annotations: vec![],
estimated_prefix_hit_num_blocks: None,
backend_instance_id: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/mocker/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ mod integration_tests {
mdc_sum: None,
annotations: vec![format!("dp_rank:{dp_rank}")],
estimated_prefix_hit_num_blocks: None,
backend_instance_id: None,
};

let requests = vec![
Expand Down
4 changes: 4 additions & 0 deletions lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ impl OpenAIPreprocessor {
builder.annotations(request.annotations().unwrap_or_default());
builder.mdc_sum(Some(self.mdcsum.clone()));
builder.estimated_prefix_hit_num_blocks(None);
// Extract backend_instance_id from nvext if present
if let Some(nvext) = request.nvext() {
builder.backend_instance_id(nvext.backend_instance_id);
}

Ok((builder.build()?, annotations))
}
Expand Down
4 changes: 4 additions & 0 deletions lib/llm/src/protocols/common/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ pub struct PreprocessedRequest {
/// Estimated number of prefix hit tokens (only used in kv aware routing)
#[builder(default)]
pub estimated_prefix_hit_num_blocks: Option<u32>,

/// Targeted backend instance ID for the request
#[builder(default)]
pub backend_instance_id: Option<i64>,
}

impl PreprocessedRequest {
Expand Down
6 changes: 6 additions & 0 deletions lib/llm/src/protocols/openai/nvext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ pub struct NvExt {
#[builder(default, setter(strip_option))]
pub annotations: Option<Vec<String>>,

/// Targeted backend instance ID for the request
/// If set, the request will be routed to backend instance with the given ID.
/// If not set, the request will be routed to the best matching instance.
#[builder(default, setter(strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub backend_instance_id: Option<i64>,
/// Guided Decoding Options
/// If specified, the output will be a JSON object. Can be a string, an object, or null.
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down
Loading