Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion components/frontend/src/dynamo/frontend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ def parse_args():
default=False,
help="KV Router: Enable replica synchronization across multiple router instances. When true, routers will publish and subscribe to events to maintain consistent state.",
)
parser.add_argument(
"--busy-threshold",
type=float,
default=None,
help="Threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache usage. If not set, busy detection is disabled.",
)
parser.add_argument(
"--static-endpoint",
type=validate_static_endpoint,
Expand Down Expand Up @@ -205,7 +211,9 @@ async def async_main():
kwargs = {
"http_port": flags.http_port,
"kv_cache_block_size": flags.kv_cache_block_size,
"router_config": RouterConfig(router_mode, kv_router_config),
"router_config": RouterConfig(
router_mode, kv_router_config, flags.busy_threshold
),
}

if flags.static_endpoint:
Expand Down
11 changes: 9 additions & 2 deletions lib/bindings/python/rust/llm/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,22 @@ impl KvRouterConfig {
pub struct RouterConfig {
router_mode: RouterMode,
kv_router_config: KvRouterConfig,
busy_threshold: Option<f64>,
}

#[pymethods]
impl RouterConfig {
#[new]
#[pyo3(signature = (mode, config=None))]
pub fn new(mode: RouterMode, config: Option<KvRouterConfig>) -> Self {
#[pyo3(signature = (mode, config=None, busy_threshold=None))]
pub fn new(
mode: RouterMode,
config: Option<KvRouterConfig>,
busy_threshold: Option<f64>,
) -> Self {
Self {
router_mode: mode,
kv_router_config: config.unwrap_or_default(),
busy_threshold,
}
}
}
Expand All @@ -79,6 +85,7 @@ impl From<RouterConfig> for RsRouterConfig {
RsRouterConfig {
router_mode: rc.router_mode.into(),
kv_router_config: rc.kv_router_config.inner,
busy_threshold: rc.busy_threshold,
}
}
}
Expand Down
49 changes: 34 additions & 15 deletions lib/llm/src/discovery/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct ModelWatcher {
notify_on_model: Notify,
model_update_tx: Option<Sender<ModelUpdate>>,
kv_router_config: Option<KvRouterConfig>,
busy_threshold: Option<f64>,
}

const ALL_MODEL_TYPES: &[ModelType] =
Expand All @@ -61,6 +62,7 @@ impl ModelWatcher {
model_manager: Arc<ModelManager>,
router_mode: RouterMode,
kv_router_config: Option<KvRouterConfig>,
busy_threshold: Option<f64>,
) -> ModelWatcher {
Self {
manager: model_manager,
Expand All @@ -69,6 +71,7 @@ impl ModelWatcher {
notify_on_model: Notify::new(),
model_update_tx: None,
kv_router_config,
busy_threshold,
}
}

Expand Down Expand Up @@ -316,29 +319,41 @@ impl ModelWatcher {
None
};

let chat_engine =
entrypoint::build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, &client, self.router_mode, kv_chooser.clone())
.await?;
let chat_engine = entrypoint::build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(
&card,
&client,
self.router_mode,
self.busy_threshold,
kv_chooser.clone(),
)
.await?;
self.manager
.add_chat_completions_model(&model_entry.name, chat_engine)?;

let completions_engine =
entrypoint::build_routed_pipeline::<
NvCreateCompletionRequest,
NvCreateCompletionResponse,
>(&card, &client, self.router_mode, kv_chooser)
.await?;
let completions_engine = entrypoint::build_routed_pipeline::<
NvCreateCompletionRequest,
NvCreateCompletionResponse,
>(
&card,
&client,
self.router_mode,
self.busy_threshold,
kv_chooser,
)
.await?;
self.manager
.add_completions_model(&model_entry.name, completions_engine)?;
}
ModelType::Chat => {
let push_router = PushRouter::<
NvCreateChatCompletionRequest,
Annotated<NvCreateChatCompletionStreamResponse>,
>::from_client(client, Default::default())
>::from_client_with_threshold(
client, Default::default(), self.busy_threshold
)
.await?;
let engine = Arc::new(push_router);
self.manager
Expand All @@ -348,7 +363,9 @@ impl ModelWatcher {
let push_router = PushRouter::<
NvCreateCompletionRequest,
Annotated<NvCreateCompletionResponse>,
>::from_client(client, Default::default())
>::from_client_with_threshold(
client, Default::default(), self.busy_threshold
)
.await?;
let engine = Arc::new(push_router);
self.manager
Expand All @@ -374,7 +391,9 @@ impl ModelWatcher {
let router = PushRouter::<
PreprocessedEmbeddingRequest,
Annotated<EmbeddingsEngineOutput>,
>::from_client(client, self.router_mode)
>::from_client_with_threshold(
client, self.router_mode, self.busy_threshold
)
.await?;

// Note: Embeddings don't need KV routing complexity
Expand Down
7 changes: 7 additions & 0 deletions lib/llm/src/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@ use crate::{
pub struct RouterConfig {
pub router_mode: RouterMode,
pub kv_router_config: KvRouterConfig,
pub busy_threshold: Option<f64>,
}

impl RouterConfig {
pub fn new(router_mode: RouterMode, kv_router_config: KvRouterConfig) -> Self {
Self {
router_mode,
kv_router_config,
busy_threshold: None,
}
}

pub fn with_busy_threshold(mut self, threshold: Option<f64>) -> Self {
self.busy_threshold = threshold;
self
}
}

#[derive(Clone)]
Expand Down
16 changes: 10 additions & 6 deletions lib/llm/src/entrypoint/input/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub async fn prepare_engine(
model_manager.clone(),
dynamo_runtime::pipeline::RouterMode::RoundRobin,
None,
None,
));
let models_watcher = etcd_client.kv_get_and_watch_prefix(MODEL_ROOT_PATH).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve();
Expand Down Expand Up @@ -133,7 +134,7 @@ pub async fn prepare_engine(
let chat_engine = entrypoint::build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(card, &client, router_mode, kv_chooser.clone())
>(card, &client, router_mode, None, kv_chooser.clone())
.await?;

let service_name = local_model.service_name().to_string();
Expand Down Expand Up @@ -216,6 +217,7 @@ pub async fn build_routed_pipeline<Req, Resp>(
card: &ModelDeploymentCard,
client: &Client,
router_mode: RouterMode,
busy_threshold: Option<f64>,
chooser: Option<Arc<KvRouter>>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
where
Expand All @@ -232,11 +234,13 @@ where
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
let router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
client.clone(),
router_mode,
)
.await?;
let router =
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold(
client.clone(),
router_mode,
busy_threshold,
)
.await?;
let service_backend = match router_mode {
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
ServiceBackend::from_engine(Arc::new(router))
Expand Down
15 changes: 12 additions & 3 deletions lib/llm/src/entrypoint/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
MODEL_ROOT_PATH,
router_config.router_mode,
Some(router_config.kv_router_config),
router_config.busy_threshold,
Arc::new(http_service.clone()),
)
.await?;
Expand Down Expand Up @@ -109,14 +110,14 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
let chat_engine = entrypoint::build_routed_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(card, &client, router_mode, kv_chooser.clone())
>(card, &client, router_mode, None, kv_chooser.clone())
.await?;
manager.add_chat_completions_model(local_model.display_name(), chat_engine)?;

let completions_engine = entrypoint::build_routed_pipeline::<
NvCreateCompletionRequest,
NvCreateCompletionResponse,
>(card, &client, router_mode, kv_chooser)
>(card, &client, router_mode, None, kv_chooser)
.await?;
manager.add_completions_model(local_model.display_name(), completions_engine)?;

Expand Down Expand Up @@ -188,16 +189,24 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul

/// Spawns a task that watches for new models in etcd at network_prefix,
/// and registers them with the ModelManager so that the HTTP service can use them.
#[allow(clippy::too_many_arguments)]
async fn run_watcher(
runtime: DistributedRuntime,
model_manager: Arc<ModelManager>,
etcd_client: etcd::Client,
network_prefix: &str,
router_mode: RouterMode,
kv_router_config: Option<KvRouterConfig>,
busy_threshold: Option<f64>,
http_service: Arc<HttpService>,
) -> anyhow::Result<()> {
let mut watch_obj = ModelWatcher::new(runtime, model_manager, router_mode, kv_router_config);
let mut watch_obj = ModelWatcher::new(
runtime,
model_manager,
router_mode,
kv_router_config,
busy_threshold,
);
tracing::info!("Watching for remote model at {network_prefix}");
let models_watcher = etcd_client.kv_get_and_watch_prefix(network_prefix).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve();
Expand Down
34 changes: 34 additions & 0 deletions lib/llm/src/http/service/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ impl ErrorMessage {
/// If successful, it will return the [`HttpError`] as an [`ErrorMessage::internal_server_error`]
/// with the details of the error.
pub fn from_anyhow(err: anyhow::Error, alt_msg: &str) -> ErrorResponse {
// First check for PipelineError::ServiceOverloaded
if let Some(pipeline_err) =
err.downcast_ref::<dynamo_runtime::pipeline::error::PipelineError>()
{
if matches!(
pipeline_err,
dynamo_runtime::pipeline::error::PipelineError::ServiceOverloaded(_)
) {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorMessage {
error: pipeline_err.to_string(),
}),
);
}
}

// Then check for HttpError
match err.downcast::<HttpError>() {
Ok(http_error) => ErrorMessage::from_http_error(http_error),
Err(err) => ErrorMessage::internal_server_error(&format!("{alt_msg}: {err}")),
Expand Down Expand Up @@ -1150,6 +1168,22 @@ mod tests {
);
}

#[test]
fn test_service_overloaded_error_response_from_anyhow() {
use dynamo_runtime::pipeline::error::PipelineError;

let err: anyhow::Error = PipelineError::ServiceOverloaded(
"All workers are busy, please retry later".to_string(),
)
.into();
let (status, response) = ErrorMessage::from_anyhow(err, BACKUP_ERROR_MESSAGE);
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(
response.error,
"Service temporarily unavailable: All workers are busy, please retry later"
);
}

#[test]
fn test_validate_input_is_text_only_accepts_text() {
let request = make_base_request();
Expand Down
19 changes: 15 additions & 4 deletions lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ pub mod scoring;
pub mod sequence;

use crate::{
discovery::{ModelEntry, MODEL_ROOT_PATH},
kv_router::{
approx::ApproxKvIndexer,
indexer::{
compute_block_hash_for_seq, compute_seq_hash_for_block, KvIndexer, KvIndexerInterface,
KvRouterError, OverlapScores, RouterEvent,
},
metrics_aggregator::watch_model_runtime_configs,
protocols::{LocalBlockHash, RouterRequest, RouterResponse, WorkerSelectionResult},
scheduler::{KvScheduler, KvSchedulerError, SchedulingRequest},
scoring::ProcessedEndpoints,
Expand Down Expand Up @@ -177,14 +177,25 @@ impl KvRouter {
}
};

// Create runtime config watcher
// Create runtime config watcher using the generic etcd watcher
// TODO: Migrate to discovery_client() once it exposes kv_get_and_watch_prefix functionality
let etcd_client = component
.drt()
.etcd_client()
.expect("Cannot KV route without etcd client");
let runtime_configs_rx =
watch_model_runtime_configs(etcd_client, cancellation_token.clone()).await?;

use dynamo_runtime::utils::typed_prefix_watcher::{
key_extractors, watch_prefix_with_extraction,
};
let runtime_configs_watcher = watch_prefix_with_extraction(
etcd_client,
MODEL_ROOT_PATH,
key_extractors::lease_id,
|model_entry: ModelEntry| model_entry.runtime_config,
cancellation_token.clone(),
)
.await?;
let runtime_configs_rx = runtime_configs_watcher.receiver();

let indexer = if kv_router_config.use_kv_events {
Indexer::KvIndexer(KvIndexer::new(cancellation_token.clone(), block_size))
Expand Down
Loading
Loading