Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use std::net::SocketAddr;
use std::time::Duration as StdDuration;

use dynamo_llm::kv_router::protocols::{ForwardPassMetrics, LoadMetrics};
use dynamo_llm::kv_router::scheduler::Endpoint;
use dynamo_llm::kv_router::scoring::Endpoint;
use dynamo_llm::kv_router::scoring::ProcessedEndpoints;

use dynamo_runtime::{
Expand Down
17 changes: 16 additions & 1 deletion lib/bindings/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ derive-getters = "0.5"
offset-allocator = "0.2"
regex = "1"
rayon = "1"
dashmap = "6"

# input/text
dialoguer = { version = "0.11", default-features = false, features = ["editor", "history"] }
Expand Down
38 changes: 15 additions & 23 deletions lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use dynamo_runtime::{
protocols::annotated::Annotated,
};
use futures::stream::{self, StreamExt};
use tokio::sync::Mutex;

pub mod approx;
pub mod indexer;
pub mod metrics_aggregator;
pub mod prefill_counter;
pub mod protocols;
pub mod publisher;
pub mod recorder;
Expand Down Expand Up @@ -48,9 +48,18 @@ use dynamo_runtime::traits::events::EventSubscriber;

// [gluo TODO] shouldn't need to be public
// this should be discovered from the component

// for metric scraping (pull-based)
pub const KV_METRICS_ENDPOINT: &str = "load_metrics";

// for metric publishing (push-based)
pub const KV_EVENT_SUBJECT: &str = "kv_events";
pub const KV_HIT_RATE_SUBJECT: &str = "kv-hit-rate";
pub const KV_METRICS_ENDPOINT: &str = "load_metrics";
pub const KV_METRICS_SUBJECT: &str = "kv_metrics";

// for inter-router comms
pub const PREFILL_SUBJECT: &str = "prefill_events";
pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events";

/// A trait that users can implement to define custom selection logic
pub trait WorkerSelector {
Expand Down Expand Up @@ -135,10 +144,6 @@ pub struct KvRouter {
scheduler: KvScheduler,

block_size: u32,

// To ensure blocking reads / writes
// TODO: benchmark tradeoffs
find_best_match_mutex: Mutex<()>,
}

impl KvRouter {
Expand Down Expand Up @@ -175,13 +180,8 @@ impl KvRouter {
))
};

let scheduler = KvScheduler::start(
component.namespace().clone(),
block_size,
instances_rx,
selector,
)
.await?;
let scheduler =
KvScheduler::start(component.clone(), block_size, instances_rx, selector).await?;

// [gluo TODO] try subscribe_with_type::<RouterEvent>,
// error checking below will be different.
Expand Down Expand Up @@ -215,7 +215,6 @@ impl KvRouter {
indexer,
scheduler,
block_size,
find_best_match_mutex: Mutex::new(()), // Add this
})
}

Expand All @@ -227,10 +226,6 @@ impl KvRouter {
context_id: &str,
tokens: &[u32],
) -> anyhow::Result<(i64, u32)> {
// Acquire mutex to serialize access
// TODO: may as well make all the subroutines synchronous if benchmarking favors this
let _guard = self.find_best_match_mutex.lock().await;

let isl_tokens = tokens.len();

let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
Expand Down Expand Up @@ -263,17 +258,14 @@ impl KvRouter {
Ok((best_worker_id, overlap_amount))
}

/// Free all blocks associated with a request
pub async fn mark_prefill_completed(&self, request_id: &String) {
pub async fn mark_prefill_completed(&self, request_id: &str) {
self.scheduler.mark_prefill_completed(request_id).await
}

/// Free all blocks associated with a request
pub async fn free(&self, request_id: &String) {
pub async fn free(&self, request_id: &str) {
self.scheduler.free(request_id).await
}

/// Get the block size this router was configured with
pub fn block_size(&self) -> u32 {
self.block_size
}
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/kv_router/metrics_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Once;
pub use crate::kv_router::protocols::{ForwardPassMetrics, LoadMetrics, PredictiveLoadMetrics};
use crate::kv_router::KV_METRICS_ENDPOINT;

use crate::kv_router::scheduler::Endpoint;
use crate::kv_router::scoring::Endpoint;
use crate::kv_router::ProcessedEndpoints;
use dynamo_runtime::component::Component;
use dynamo_runtime::{service::EndpointInfo, utils::Duration, Result};
Expand Down
Loading
Loading