Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn app(runtime: Runtime) -> Result<()> {
// the cli when operating on an `http` component will validate the namespace.component is
// registered with HttpServiceComponentDefinition

let watch_obj = ModelWatcher::new(distributed.clone(), manager, RouterMode::Random);
let watch_obj = ModelWatcher::new(distributed.clone(), manager, RouterMode::Random, None);

if let Some(etcd_client) = distributed.etcd_client() {
let models_watcher: PrefixWatcher =
Expand Down
53 changes: 33 additions & 20 deletions docs/guides/dynamo_run.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,37 @@
# Running Dynamo (`dynamo run`)

* [Quickstart with pip and vllm](#quickstart-with-pip-and-vllm)
* [Automatically download a model from Hugging Face](#use-model-from-hugging-face)
* [Run a model from local file](#run-a-model-from-local-file)
* [Distributed system](#distributed-system)
* [Network names](#network-names)
* [KV-aware routing](#kv-aware-routing)
* [Full usage details](#full-usage-details)
* [Setup](#setup)
* [mistral.rs](#mistralrs)
* [llama.cpp](#llamacpp)
* [Sglang](#sglang)
* [Vllm](#vllm)
* [TensorRT-LLM](#trtllm)
* [Echo Engines](#echo-engines)
* [Writing your own engine in Python](#writing-your-own-engine-in-python)
* [Batch mode](#batch-mode)
* [Defaults](#defaults)
* [Extra engine arguments](#extra-engine-arguments)

- [Running Dynamo (`dynamo run`)](#running-dynamo-dynamo-run)
- [Quickstart with pip and vllm](#quickstart-with-pip-and-vllm)
- [Use model from Hugging Face](#use-model-from-hugging-face)
- [Run a model from local file](#run-a-model-from-local-file)
- [Download model from Hugging Face](#download-model-from-hugging-face)
- [Run model from local file](#run-model-from-local-file)
- [Distributed System](#distributed-system)
- [Network names](#network-names)
- [KV-aware routing](#kv-aware-routing)
- [Full usage details](#full-usage-details)
- [Getting Started](#getting-started)
- [Setup](#setup)
- [Step 1: Install libraries](#step-1-install-libraries)
- [Step 2: Install Rust](#step-2-install-rust)
- [Step 3: Build](#step-3-build)
- [Defaults](#defaults)
- [Running Inference with Pre-built Engines](#running-inference-with-pre-built-engines)
- [mistralrs](#mistralrs)
- [llamacpp](#llamacpp)
- [sglang](#sglang)
- [vllm](#vllm)
- [trtllm](#trtllm)
- [Step 1: Build the environment](#step-1-build-the-environment)
- [Step 2: Run the environment](#step-2-run-the-environment)
- [Step 3: Execute `dynamo run` command](#step-3-execute-dynamo-run-command)
- [Echo Engines](#echo-engines)
- [echo\_core](#echo_core)
- [echo\_full](#echo_full)
- [Configuration](#configuration)
- [Batch mode](#batch-mode)
- [Extra engine arguments](#extra-engine-arguments)
- [Writing your own engine in Python](#writing-your-own-engine-in-python)

This guide explains the`dynamo run` command.

Expand All @@ -28,7 +41,7 @@ It supports these engines: mistralrs, llamacpp, sglang, vllm, and tensorrt-llm.

Usage:
```
dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=echo_core|echo_full|mistralrs|llamacpp|sglang|vllm|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]
dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=echo_core|echo_full|mistralrs|llamacpp|sglang|vllm|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv] [--kv-overlap-score-weight=2.0] [--kv-gpu-cache-usage-weight=1.0] [--kv-waiting-requests-weight=1.0]
```

Example: `dynamo run Qwen/Qwen3-0.6B`
Expand Down
37 changes: 37 additions & 0 deletions launch/dynamo-run/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::path::PathBuf;

use clap::ValueEnum;
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_runtime::pipeline::RouterMode as RuntimeRouterMode;

/// Required options depend on the in and out choices
Expand Down Expand Up @@ -105,6 +106,21 @@ pub struct Flags {
#[arg(long, default_value = "round-robin")]
pub router_mode: RouterMode,

/// KV Router: Weight for overlap score in worker selection.
/// Higher values prioritize KV cache reuse. Default: 2.0
#[arg(long)]
pub kv_overlap_score_weight: Option<f64>,

/// KV Router: Weight for GPU cache usage in worker selection.
/// Higher values avoid workers with nearly full KV caches. Default: 1.0
#[arg(long)]
pub kv_gpu_cache_usage_weight: Option<f64>,

/// KV Router: Weight for waiting requests in worker selection.
/// Higher values avoid workers with queued requests. Default: 1.0
#[arg(long)]
pub kv_waiting_requests_weight: Option<f64>,

/// Max model context length. Reduce this if you don't have enough VRAM for the full model
/// context length (e.g. Llama 4).
/// Defaults to the model's max, which is usually model_max_length in tokenizer_config.json.
Expand Down Expand Up @@ -138,6 +154,15 @@ pub struct Flags {
}

impl Flags {
/// Get KV router configuration
pub fn kv_router_config(&self) -> KvRouterConfig {
KvRouterConfig::new(
self.kv_overlap_score_weight,
self.kv_gpu_cache_usage_weight,
self.kv_waiting_requests_weight,
)
}

/// Convert the flags back to a command line. Including only the non-null values, but
/// include the defaults. Includes the canonicalized model path and normalized model name.
///
Expand Down Expand Up @@ -175,6 +200,18 @@ impl Flags {
out.push("--extra-engine-args".to_string());
out.push(extra_engine_args.display().to_string());
}
if let Some(weight) = self.kv_overlap_score_weight {
out.push("--kv-overlap-score-weight".to_string());
out.push(weight.to_string());
}
if let Some(weight) = self.kv_gpu_cache_usage_weight {
out.push("--kv-gpu-cache-usage-weight".to_string());
out.push(weight.to_string());
}
if let Some(weight) = self.kv_waiting_requests_weight {
out.push("--kv-waiting-requests-weight".to_string());
out.push(weight.to_string());
}
out.extend(self.last.clone());
out
}
Expand Down
1 change: 1 addition & 0 deletions launch/dynamo-run/src/input/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub async fn prepare_engine(
distributed_runtime,
model_manager.clone(),
dynamo_runtime::pipeline::RouterMode::RoundRobin,
None,
));
let models_watcher = etcd_client.kv_get_and_watch_prefix(MODEL_ROOT_PATH).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve();
Expand Down
5 changes: 4 additions & 1 deletion launch/dynamo-run/src/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use crate::input::common;
use crate::{EngineConfig, Flags};
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_llm::{
discovery::{ModelManager, ModelWatcher, MODEL_ROOT_PATH},
engines::StreamingEngineAdapter,
Expand Down Expand Up @@ -46,6 +47,7 @@ pub async fn run(
etcd_client.clone(),
MODEL_ROOT_PATH,
flags.router_mode.into(),
Some(flags.kv_router_config()),
)
.await?;
}
Expand Down Expand Up @@ -102,8 +104,9 @@ async fn run_watcher(
etcd_client: etcd::Client,
network_prefix: &str,
router_mode: RouterMode,
kv_router_config: Option<KvRouterConfig>,
) -> anyhow::Result<()> {
let watch_obj = ModelWatcher::new(runtime, model_manager, router_mode);
let watch_obj = ModelWatcher::new(runtime, model_manager, router_mode, kv_router_config);
tracing::info!("Watching for remote model at {network_prefix}");
let models_watcher = etcd_client.kv_get_and_watch_prefix(network_prefix).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve();
Expand Down
2 changes: 1 addition & 1 deletion launch/dynamo-run/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Example:
- OR: ./dynamo-run /data/models/Llama-3.2-1B-Instruct-Q4_K_M.gguf
"#;

const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]";
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv] [--kv-overlap-score-weight=2.0] [--kv-gpu-cache-usage-weight=1.0] [--kv-waiting-requests-weight=1.0]";

fn main() -> anyhow::Result<()> {
// Set log level based on verbosity flag
Expand Down
2 changes: 2 additions & 0 deletions launch/llmctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ async fn list_models(
distributed.clone(),
Arc::new(ModelManager::new()),
RouterMode::Random,
None,
);

let mut models = Vec::new();
Expand Down Expand Up @@ -313,6 +314,7 @@ async fn remove_model(
distributed.clone(),
Arc::new(ModelManager::new()),
RouterMode::Random,
None,
);
let Some(etcd_client) = distributed.etcd_client() else {
anyhow::bail!("llmctl is only useful with dynamic workers");
Expand Down
8 changes: 5 additions & 3 deletions lib/llm/src/discovery/model_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dynamo_runtime::component::Component;

use crate::discovery::ModelEntry;

use crate::kv_router::scheduler::DefaultWorkerSelector;
use crate::kv_router::{scheduler::DefaultWorkerSelector, KvRouterConfig};
use crate::{
kv_router::KvRouter,
types::openai::{
Expand Down Expand Up @@ -183,6 +183,7 @@ impl ModelManager {
model_name: &str,
component: &Component,
kv_cache_block_size: usize,
kv_router_config: Option<KvRouterConfig>,
) -> anyhow::Result<Arc<KvRouter>> {
if let Some(kv_chooser) = self.get_kv_chooser(model_name) {
// Check if the existing router has a different block size
Expand All @@ -197,7 +198,7 @@ impl ModelManager {
}
return Ok(kv_chooser);
}
self.create_kv_chooser(model_name, component, kv_cache_block_size)
self.create_kv_chooser(model_name, component, kv_cache_block_size, kv_router_config)
.await
}

Expand All @@ -211,8 +212,9 @@ impl ModelManager {
model_name: &str,
component: &Component,
kv_cache_block_size: usize,
kv_router_config: Option<KvRouterConfig>,
) -> anyhow::Result<Arc<KvRouter>> {
let selector = Box::new(DefaultWorkerSelector {});
let selector = Box::new(DefaultWorkerSelector::new(kv_router_config));
let chooser = KvRouter::new(component.clone(), kv_cache_block_size, Some(selector)).await?;
let new_kv_chooser = Arc::new(chooser);
self.kv_choosers
Expand Down
19 changes: 16 additions & 3 deletions lib/llm/src/discovery/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use dynamo_runtime::{

use crate::{
backend::Backend,
kv_router::KvPushRouter,
kv_router::{KvPushRouter, KvRouterConfig},
model_type::ModelType,
preprocessor::{BackendInput, OpenAIPreprocessor},
protocols::common::llm_backend::LLMEngineOutput,
Expand All @@ -36,19 +36,22 @@ pub struct ModelWatcher {
drt: DistributedRuntime,
router_mode: RouterMode,
notify_on_model: Notify,
kv_router_config: Option<KvRouterConfig>,
}

impl ModelWatcher {
pub fn new(
runtime: DistributedRuntime,
model_manager: Arc<ModelManager>,
router_mode: RouterMode,
kv_router_config: Option<KvRouterConfig>,
) -> ModelWatcher {
Self {
manager: model_manager,
drt: runtime,
router_mode,
notify_on_model: Notify::new(),
kv_router_config,
}
}

Expand Down Expand Up @@ -208,7 +211,12 @@ impl ModelWatcher {
RouterMode::KV => {
let chooser = self
.manager
.kv_chooser_for(&model_entry.name, &component, card.kv_cache_block_size)
.kv_chooser_for(
&model_entry.name,
&component,
card.kv_cache_block_size,
self.kv_router_config.clone(),
)
.await?;
let kv_push_router = KvPushRouter::new(router, chooser);
ServiceBackend::from_engine(Arc::new(kv_push_router))
Expand Down Expand Up @@ -243,7 +251,12 @@ impl ModelWatcher {
RouterMode::KV => {
let chooser = self
.manager
.kv_chooser_for(&model_entry.name, &component, card.kv_cache_block_size)
.kv_chooser_for(
&model_entry.name,
&component,
card.kv_cache_block_size,
self.kv_router_config.clone(),
)
.await?;
let kv_push_router = KvPushRouter::new(router, chooser);
ServiceBackend::from_engine(Arc::new(kv_push_router))
Expand Down
45 changes: 45 additions & 0 deletions lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,51 @@ pub trait WorkerSelector {
) -> Result<WorkerSelectionResult, KvSchedulerError>;
}

/// KV Router configuration parameters
#[derive(Debug, Clone)]
pub struct KvRouterConfig {
/// Weight for overlap score in worker selection.
/// Higher values prioritize KV cache reuse. Default: 2.0
pub overlap_score_weight: f64,

/// Weight for GPU cache usage in worker selection.
/// Higher values avoid workers with nearly full KV caches. Default: 1.0
pub gpu_cache_usage_weight: f64,

/// Weight for waiting requests in worker selection.
/// Higher values avoid workers with queued requests. Default: 1.0
pub waiting_requests_weight: f64,
}

impl Default for KvRouterConfig {
fn default() -> Self {
Self {
overlap_score_weight: 2.0,
gpu_cache_usage_weight: 1.0,
waiting_requests_weight: 1.0,
}
}
}

impl KvRouterConfig {
/// Create a new KvRouterConfig with optional weight values.
/// If a weight is None, the default value will be used.
pub fn new(
overlap_score_weight: Option<f64>,
gpu_cache_usage_weight: Option<f64>,
waiting_requests_weight: Option<f64>,
) -> Self {
let default = Self::default();
Self {
overlap_score_weight: overlap_score_weight.unwrap_or(default.overlap_score_weight),
gpu_cache_usage_weight: gpu_cache_usage_weight
.unwrap_or(default.gpu_cache_usage_weight),
waiting_requests_weight: waiting_requests_weight
.unwrap_or(default.waiting_requests_weight),
}
}
}

/// A KvRouter only decides which worker you should use. It doesn't send you there.
/// TODO: Rename this to indicate it only selects a worker, it does not route.
pub struct KvRouter {
Expand Down
Loading
Loading