Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
35a9c34
impl EventPublisher trait for NatsQueue
PeaBrane Aug 27, 2025
cfc1fb3
publish kv events over js
PeaBrane Aug 28, 2025
67f2de8
clippy
PeaBrane Aug 28, 2025
7ef9afd
in KvIndexer binding, subscribe to nats queue also (fixes kv bindings…
PeaBrane Aug 28, 2025
b7f5185
radix uploader
PeaBrane Aug 28, 2025
da355f6
refactor into background.rs
PeaBrane Aug 28, 2025
d6386f1
make kv_routers prefix a const
PeaBrane Aug 28, 2025
0820747
make nats queue test persistence
PeaBrane Aug 28, 2025
efa98f4
e2e test passes, no radix snapshot yet
PeaBrane Aug 29, 2025
87402cd
Merge remote-tracking branch 'origin/main' into rupei/router-warm-res…
PeaBrane Aug 29, 2025
bd4bf16
fmt
PeaBrane Aug 29, 2025
9328f3c
purge + snapshot
PeaBrane Aug 29, 2025
8c58452
default behavior reset event stream + reset radix state; make logging…
PeaBrane Aug 29, 2025
5cd5c53
rm RadixUploader (not used)
PeaBrane Aug 29, 2025
738788f
docs
PeaBrane Aug 29, 2025
827ca4d
clippy
PeaBrane Aug 29, 2025
d29ba83
rename background.rs to subscriber.rs
PeaBrane Aug 29, 2025
19f0e55
fix bindings
PeaBrane Aug 29, 2025
6f701f4
extend todo (realized it's actually fine)
PeaBrane Aug 29, 2025
2d8b184
snapshot before purge
PeaBrane Aug 29, 2025
801959d
fix tight loop
PeaBrane Aug 29, 2025
38e2dea
Merge branch 'main' into rupei/router-warm-restarts
PeaBrane Aug 29, 2025
a475988
update docs
PeaBrane Aug 29, 2025
4a1f6ad
address minor comments
PeaBrane Aug 29, 2025
1b7cf29
native etcd lock
PeaBrane Aug 29, 2025
776f64c
reword deprecation of dynamo-run note
PeaBrane Aug 29, 2025
a752ed5
make SnapshotResources a struct
PeaBrane Aug 29, 2025
56e8cbc
correctly shut down router binding
PeaBrane Aug 29, 2025
903588d
make 'queue' a const str
PeaBrane Aug 29, 2025
36ace6f
slug + router prefix
PeaBrane Aug 29, 2025
c1ab255
cli args update
PeaBrane Aug 29, 2025
61f9840
make stream last an hour
PeaBrane Aug 29, 2025
f536e28
handle automatic durable consume cleanup
PeaBrane Aug 29, 2025
8562035
use component.subject() instead
PeaBrane Aug 30, 2025
0e496b8
register component path instead of model name for router replica key
PeaBrane Aug 30, 2025
f542d8c
use default snapshot_threshold of 10000
PeaBrane Aug 30, 2025
a5528f5
clean tests
PeaBrane Aug 30, 2025
84e14b5
use exponential backoffs for kv bindings test
PeaBrane Aug 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions components/frontend/src/dynamo/frontend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@ def parse_args():
default=False,
help="KV Router: Enable replica synchronization across multiple router instances. When true, routers will publish and subscribe to events to maintain consistent state.",
)
parser.add_argument(
"--router-snapshot-threshold",
type=int,
default=10000,
help="KV Router: Number of messages in stream before triggering a snapshot. Defaults to 10000.",
)
parser.add_argument(
"--router-persist-states",
action="store_false",
dest="router_reset_states",
default=True,
help="KV Router: Persist router state on startup. Keep existing state from stream and object store (default: reset states).",
)
parser.add_argument(
"--busy-threshold",
type=float,
Expand Down Expand Up @@ -212,6 +225,8 @@ async def async_main():
router_temperature=flags.router_temperature,
use_kv_events=flags.use_kv_events,
router_replica_sync=flags.router_replica_sync,
router_snapshot_threshold=flags.router_snapshot_threshold,
router_reset_states=flags.router_reset_states,
)
elif flags.router_mode == "random":
router_mode = RouterMode.Random
Expand Down
30 changes: 24 additions & 6 deletions docs/architecture/kv_cache_routing.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ When KV blocks are created or removed, the engine notifies the Dynamo router, wh

To evaluate the benefits of KV-aware routing, compare your workload's performance using `--router-mode random|round-robin` against KV-aware routing.

The KV-aware routing arguments:
The main KV-aware routing arguments:

- `--kv-overlap-score-weight`: Controls the importance of prefix cache overlaps in prefill cost calculations. Higher values improve Time To First Token (TTFT) at the cost of Inter-Token Latency (ITL). When set to 0, the router ignores prefix caches and uses pure load balancing. Defaults to 1.

- `--router-temperature`: Controls worker selection randomness through softmax sampling of router cost logits. A value of 0 (default) ensures deterministic selection of the lowest-cost worker, while higher values introduce more randomness.

- `--use-kv-events`/`--no-kv-events`: Determines how the router tracks cached blocks. When enabled (default), uses `KvIndexer` to monitor block creation and deletion events. When disabled, uses `ApproxKvIndexer`, which estimates cache hits based on a fixed time window (120s). Disable this if your backend doesn't support KV events.

- `--router-replica-sync`: Enables NATS-based state synchronization between router replicas. When enabled, routers share their KV cache distribution and active sequence information, ensuring optimal routing decisions across multiple router instances. This improves fault tolerance and routing accuracy in distributed deployments. Disabled by default.
- `--router-replica-sync`: Enables NATS-based synchronization of local routing decisions between router replicas. When enabled, routers share their active sequence information and local predictions of block usage, improving routing consistency across instances. Note that this does not sync the radix tree or cached KV block states themselves - those are synchronized through JetStream events. Disabled by default.

- `--router-reset-states`/`--router-persist-states`: Controls whether the router state is reset on startup. When `--router-reset-states` is used (default), the router clears both the JetStream event stream and NATs object store, starting with a fresh state. When `--router-persist-states` is used, the router retains existing state from previous runs, downloading any available snapshot from NATs object store and continuing to consume events from where it left off. This enables routers to maintain KV cache awareness across restarts. **Note**: State persistence is only available when `--use-kv-events` is enabled (default). When using `--no-kv-events` with `ApproxKvIndexer`, state persistence is not supported.

- `--router-snapshot-threshold`: Sets the number of messages in the JetStream before triggering a snapshot. When the message count exceeds this threshold, a router will attempt to purge acknowledged messages from the stream and create a snapshot of the current radix tree state in NATs object store. Defaults to 10000. This helps manage stream size and provides faster initialization for routers that restart.

## Architecture

Expand All @@ -50,17 +54,26 @@ KV Cache routing uses direct routing with a special worker selection algorithm.

For improved fault tolerance, you can launch two frontend + router replicas. Since the frontend and router are currently tied together, you'll need to use two different HTTP ports for each instance.

To enable state sharing between the router replicas (which provides more accurate routing decisions), use the `--router-replica-sync` flag when starting the frontend:
To enable state sharing between the router replicas (which provides more accurate routing decisions), use the `--router-replica-sync` flag when starting the frontend. Router replicas are currently tied to a component, and state syncing and sharing can only happen within the component group. Here's an example of running multiple router replicas:

```bash
# Router replica 1
python -m dynamo.frontend --router-mode kv --port 8000 --router-replica-sync

# Router replica 2
python -m dynamo.frontend --router-mode kv --port 8001 --router-replica-sync
# Router replica 2 (can be started later, note the extra --router-persist-states arg)
python -m dynamo.frontend --router-mode kv --port 8001 --router-replica-sync --router-persist-states
```

When `--router-replica-sync` is enabled, the router replicas will communicate with each other via NATS to maintain consistent state across instances. This allows both routers to have a complete view of the KV cache distribution and make optimal routing decisions, even when requests are distributed across multiple router instances.
After these two replicas are launched, they will share the same JetStream and snapshot state. The second replica can be started after the first has already been handling requests. As long as `--router-persist-states` is set, the new replica will sync its KV block indexer by consuming the JetStream events and/or downloading the latest snapshot, ensuring both replicas have the same view of cached blocks across workers. It's okay for one router to go down, or even both to go down - the state persistence ensures continuity (up to the message retention of an hour we set for the stream). When a third router starts (with `--router-persist-states`), the states will still persist:

```bash
# Router replica 3 (can be started even after replicas 1 and 2 have gone down)
python -m dynamo.frontend --router-mode kv --port 8002 --router-replica-sync --router-persist-states
```

> **Note:** If a router replica is launched without the `--router-persist-states` flag, the entire stream and radix snapshot will be purged. If you want to serve a separate router (targeting a different set of workers) independently without affecting the current state, consider using a new namespace/component (see [Distributed Runtime](distributed_runtime.md)) which will start a new stream and NATS object store path.

When `--router-replica-sync` is enabled, the router replicas will additionally share their local routing decisions and active sequence predictions via NATS. Active blocks information is communicated between routers in a fire-and-forget manner, but the routers will quickly become consistent as this information is tied to the request cycle. This helps maintain consistent load estimates across instances even when requests are distributed between routers.

## Understanding KV Cache
The leading Large Language Models (LLMs) today are auto-regressive and based off of the [transformer architecture](https://proceedings.neurips.cc/paper_files/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf). One key inference optimization technique is to cache the already computed keys and values and to reuse them for the future tokens. This is called the [KV Cache](https://developer.nvidia.com/blog/mastering-llm-techniques-inference-optimization/#key-value_caching).
Expand Down Expand Up @@ -182,3 +195,8 @@ In distributed deployments with multiple routers, each router maintains visibili

Each event carries a unique router ID to prevent self-event processing. This asynchronous communication system ensures optimal routing decisions by maintaining consistent KV cache state across all routers, even as they handle different request streams.

### Event Persistence and Recovery

KV cache events are persisted in NATS JetStream, allowing router replicas to maintain their global view of KV blocks across restarts. When a router starts with `--router-persist-states`, it downloads any available snapshot from NATs object store and continues consuming events from its last acknowledged position in the stream.

To manage stream growth, when the message count exceeds `--router-snapshot-threshold`, a router acquires an etcd-based distributed lock, purges acknowledged messages from the stream, and uploads the current radix tree state to NATs object store. This snapshot serves as a checkpoint for faster initialization of future router instances.
3 changes: 3 additions & 0 deletions launch/dynamo-run/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ impl Flags {
self.use_kv_events,
self.router_replica_sync,
self.max_num_batched_tokens,
// defaulting below args (no longer maintaining new flags for dynamo-run)
None,
None,
),
)
}
Expand Down
6 changes: 5 additions & 1 deletion lib/bindings/python/rust/llm/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,23 @@ impl KvRouterConfig {
#[pymethods]
impl KvRouterConfig {
#[new]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false))]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_snapshot_threshold=10000, router_reset_states=true))]
fn new(
overlap_score_weight: f64,
router_temperature: f64,
use_kv_events: bool,
router_replica_sync: bool,
router_snapshot_threshold: Option<u32>,
router_reset_states: bool,
) -> Self {
KvRouterConfig {
inner: RsKvRouterConfig {
overlap_score_weight,
router_temperature,
use_kv_events,
router_replica_sync,
router_snapshot_threshold,
router_reset_states,
..Default::default()
},
}
Expand Down
84 changes: 58 additions & 26 deletions lib/bindings/python/rust/llm/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicU32;
use tokio_stream::StreamExt;

use super::*;
use crate::Component;
use llm_rs::kv_router::indexer::compute_block_hash_for_seq;
use llm_rs::kv_router::indexer::KvIndexerInterface;
use llm_rs::kv_router::protocols::ForwardPassMetrics as RsForwardPassMetrics;
Expand Down Expand Up @@ -405,39 +406,36 @@ pub(crate) struct KvIndexer {
#[pymethods]
impl KvIndexer {
#[new]
fn new(component: Component, kv_block_size: usize) -> PyResult<Self> {
#[pyo3(signature = (component, kv_block_size, consumer_uuid=None))]
fn new(
component: Component,
kv_block_size: usize,
consumer_uuid: Option<String>,
) -> PyResult<Self> {
let runtime = pyo3_async_runtimes::tokio::get_runtime();
runtime.block_on(async {
let cancellation_token = component.inner.drt().runtime().child_token();
let inner: Arc<llm_rs::kv_router::indexer::KvIndexer> =
llm_rs::kv_router::indexer::KvIndexer::new(
component.inner.drt().runtime().child_token(),
cancellation_token.clone(),
kv_block_size as u32,
)
.into();
// [gluo TODO] try subscribe_with_type::<RouterEvent>,
// error checking below will be different.
let mut kv_events_rx = component
.inner
.subscribe(llm_rs::kv_router::KV_EVENT_SUBJECT)
.await
.map_err(to_pyerr)?;
let kv_events_tx = inner.event_sender();

// [FIXME] this is the added functionality to the indexer to subscribe to kv events,
// should have been made to a trait and implemented here? i.e. AsyncEngine style
tokio::spawn(async move {
while let Some(event) = kv_events_rx.next().await {
let event: llm_rs::kv_router::indexer::RouterEvent =
serde_json::from_slice(&event.payload).unwrap();
tracing::debug!("received kv event: {:?}", event);
if let Err(e) = kv_events_tx.send(event).await {
tracing::trace!(
"failed to send kv event to indexer; shutting down: {:?}",
e
);
}
}
});
// Use the shared start_kv_router_background function for event consumption
// Pass None for snapshot_tx to skip snapshot handling in Python bindings
llm_rs::kv_router::subscriber::start_kv_router_background(
component.inner.clone(),
consumer_uuid.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
inner.event_sender(),
None,
cancellation_token,
None,
true,
)
.await
.map_err(to_pyerr)?;

Ok(Self { inner })
})
}
Expand Down Expand Up @@ -845,6 +843,7 @@ impl SpecDecodeStats {
#[pyclass]
pub(crate) struct KvPushRouter {
inner: Arc<llm_rs::kv_router::KvPushRouter>,
primary_token: tokio_util::sync::CancellationToken,
}

#[pymethods]
Expand Down Expand Up @@ -875,12 +874,25 @@ impl KvPushRouter {
// Get component from endpoint
let component = endpoint.inner.component();

// Create KvRouter
// Get the primary token from the component's primary lease
let primary_token = component
.drt()
.primary_lease()
.ok_or_else(|| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Failed to get primary lease: Cannot KV route static workers",
)
})?
.primary_token();

// Create KvRouter with a unique consumer UUID
let consumer_uuid = uuid::Uuid::new_v4().to_string();
let kv_router = llm_rs::kv_router::KvRouter::new(
component.clone(),
block_size as u32,
None, // default selector
Some(kv_router_config.inner()),
consumer_uuid,
)
.await
.map_err(to_pyerr)?;
Expand All @@ -891,6 +903,7 @@ impl KvPushRouter {

Ok(Self {
inner: Arc::new(kv_push_router),
primary_token,
})
})
}
Expand Down Expand Up @@ -996,6 +1009,25 @@ impl KvPushRouter {
})
})
}

/// Dump all events from the KV router's indexer as a JSON string
fn dump_events<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();

pyo3_async_runtimes::tokio::future_into_py(py, async move {
let events = inner.dump_events().await.map_err(to_pyerr)?;
// Serialize to JSON string
let json_str = serde_json::to_string(&events).map_err(to_pyerr)?;
Ok(json_str)
})
}
}

impl Drop for KvPushRouter {
fn drop(&mut self) {
// Cancel the primary token to shut down background tasks
self.primary_token.cancel();
}
}

// Python async generator wrapper for the stream
Expand Down
44 changes: 33 additions & 11 deletions lib/bindings/python/tests/test_kv_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,38 @@ async def test_event_handler(distributed_runtime):

event_publisher.store_event(test_token, lora_id)
# wait for the event to be processed as it is sent asynchronously
await asyncio.sleep(1)
scores = await indexer.find_matches_for_request(test_token, lora_id)
assert scores.scores
assert worker_id in scores.scores
assert scores.scores[worker_id] == 1
# Retry loop for CI environments where processing may take longer
for retry in range(10): # Try up to 10 times
await asyncio.sleep(0.5) # Wait 500ms between retries
scores = await indexer.find_matches_for_request(test_token, lora_id)
if (
scores.scores
and worker_id in scores.scores
and scores.scores[worker_id] == 1
):
break
if retry == 9: # Last iteration
# Provide detailed error message for debugging
assert scores.scores, f"No scores found after {(retry+1)*0.5}s"
assert (
worker_id in scores.scores
), f"Worker {worker_id} not in scores after {(retry+1)*0.5}s"
assert (
scores.scores[worker_id] == 1
), f"Expected score 1, got {scores.scores.get(worker_id)} after {(retry+1)*0.5}s"

# remove event
event_publisher.remove_event()
await asyncio.sleep(1)
scores = await indexer.find_matches_for_request(test_token, lora_id)
assert not scores.scores
# Retry loop for event removal verification
for retry in range(10): # Try up to 10 times
await asyncio.sleep(0.5) # Wait 500ms between retries
scores = await indexer.find_matches_for_request(test_token, lora_id)
if not scores.scores:
break
if retry == 9: # Last iteration
assert (
not scores.scores
), f"Scores still present after {(retry+1)*0.5}s: {scores.scores}"


async def test_approx_kv_indexer(distributed_runtime):
Expand Down Expand Up @@ -235,12 +256,13 @@ async def test_metrics_aggregator(distributed_runtime):
asyncio.create_task(metrics_publisher_task(kv_listener, expected_metrics))

# needs time for publisher to spawn up
for i in range(10):
await asyncio.sleep(1)
# Using shorter intervals for faster detection in normal cases
for i in range(20): # Try up to 20 times (10 seconds total)
await asyncio.sleep(0.5) # Wait 500ms between retries
metrics = await metrics_aggregator.get_metrics()
if metrics.endpoints:
break
assert metrics.endpoints
assert metrics.endpoints, f"No metrics endpoints found after {(i+1)*0.5}s"
for endpoint in metrics.endpoints:
# [TODO] not really checking id for now, can't get it as create_endpoint()
# create and serve the endpoint internally
Expand Down
3 changes: 3 additions & 0 deletions lib/llm/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ pub use watcher::{ModelUpdate, ModelWatcher};

/// The root etcd path for ModelEntry
pub const MODEL_ROOT_PATH: &str = "models";

/// The root etcd path for KV Router registrations
pub const KV_ROUTERS_ROOT_PATH: &str = "kv_routers";
12 changes: 7 additions & 5 deletions lib/llm/src/discovery/model_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use parking_lot::Mutex;

use dynamo_runtime::component::Component;
use dynamo_runtime::prelude::DistributedRuntimeProvider;
use dynamo_runtime::slug::Slug;

use crate::discovery::ModelEntry;
use crate::discovery::{KV_ROUTERS_ROOT_PATH, ModelEntry};
use crate::kv_router::{KvRouterConfig, scheduler::DefaultWorkerSelector};
use crate::{
kv_router::KvRouter,
Expand Down Expand Up @@ -218,10 +217,12 @@ impl ModelManager {
.drt()
.etcd_client()
.ok_or_else(|| anyhow::anyhow!("KV routing requires etcd (dynamic mode)"))?;
let router_uuid = uuid::Uuid::new_v4();
let router_key = format!(
"kv_routers/{}/{}",
Slug::from_string(model_name),
uuid::Uuid::new_v4()
"{}/{}/{}",
KV_ROUTERS_ROOT_PATH,
component.path(),
router_uuid
);
etcd_client
.kv_create(
Expand All @@ -237,6 +238,7 @@ impl ModelManager {
kv_cache_block_size,
Some(selector),
kv_router_config,
router_uuid.to_string(),
)
.await?;
let new_kv_chooser = Arc::new(chooser);
Expand Down
Loading
Loading