Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 78 additions & 4 deletions components/frontend/src/dynamo/frontend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,19 @@
# - Auto-discovery: Watches etcd for engine/worker registration (via `register_llm`).
# - Pre-processor: Prompt templating and tokenization.
# - Router, defaulting to round-robin (TODO: Add flags to enable KV routing).
#
# Pass `--interactive` or `-i` for text chat instead of HTTP server.
#
# For static mode (no etcd auto-discovery):
# - python -m dynamo.frontend --model-name Qwen3-0.6B-Q8_0.gguf --model-path ~/llms/Qwen3-0.6B --static-endpoint dynamo.backend.generate
# Worker example:
# - cd lib/bindings/python/examples/hello_world
# - python server_sglang_static.py

import argparse
import asyncio
import os
import re

import uvloop

Expand All @@ -26,6 +36,36 @@
from dynamo.runtime import DistributedRuntime


def validate_static_endpoint(value):
"""Validate that static-endpoint is three words separated by dots."""
if not re.match(
r"^[a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*$",
value,
):
raise argparse.ArgumentTypeError(
f"static-endpoint must be three words separated by dots, got: {value}"
)
return value


def validate_model_name(value):
"""Validate that model-name is a non-empty string."""
if not value or not isinstance(value, str) or len(value.strip()) == 0:
raise argparse.ArgumentTypeError(
f"model-name must be a non-empty string, got: {value}"
)
return value.strip()


def validate_model_path(value):
"""Validate that model-path is a valid directory on disk."""
if not os.path.isdir(value):
raise argparse.ArgumentTypeError(
f"model-path must be a valid directory on disk, got: {value}"
)
return value


def parse_args():
parser = argparse.ArgumentParser(
description="Dynamo Frontend: HTTP+Pre-processor+Router",
Expand Down Expand Up @@ -72,13 +112,35 @@ def parse_args():
help=" KV Router. Disable KV events.",
)
parser.set_defaults(use_kv_events=True)
parser.add_argument(
"--static-endpoint",
type=validate_static_endpoint,
help="Static endpoint in format: word.word.word (e.g., dynamo.backend.generate)",
)
parser.add_argument(
"--model-name",
type=validate_model_name,
help="Model name as a string (e.g., 'Llama-3.2-1B-Instruct')",
)
parser.add_argument(
"--model-path",
type=validate_model_path,
help="Path to model directory on disk (e.g., /tmp/model_cache/lama3.2_1B/)",
)

return parser.parse_args()
flags = parser.parse_args()

if flags.static_endpoint and (not flags.model_name or not flags.model_path):
parser.error("--static-endpoint requires both --model-name and --model-path")

return flags


async def async_main():
runtime = DistributedRuntime(asyncio.get_running_loop(), False)
flags = parse_args()
is_static = bool(flags.static_endpoint) # true if the string has a value

runtime = DistributedRuntime(asyncio.get_running_loop(), is_static)

if flags.router_mode == "kv":
router_mode = RouterMode.KV
Expand All @@ -100,8 +162,20 @@ async def async_main():
"router_config": RouterConfig(router_mode, kv_router_config),
}

# out=dyn
e = EntrypointArgs(EngineType.Dynamic, **kwargs)
if flags.static_endpoint:
kwargs["endpoint_id"] = flags.static_endpoint
if flags.model_name:
kwargs["model_name"] = flags.model_name
if flags.model_path:
kwargs["model_path"] = flags.model_path

if is_static:
# out=dyn://<static_endpoint>
engine_type = EngineType.Static
else:
# out=auto, most common
engine_type = EngineType.Dynamic
e = EntrypointArgs(engine_type, **kwargs)
engine = await make_engine(runtime, e)

try:
Expand Down
19 changes: 15 additions & 4 deletions docs/guides/dynamo_run.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ You will need [etcd](https://etcd.io/) and [nats](https://nats.io) with jetstrea
**Node 1:** OpenAI compliant HTTP server, optional pre-processing, worker discovery:

```
dynamo-run in=http out=dyn
dynamo-run in=http out=auto
```

**Node 2:** Vllm engine. Receives and returns requests over the network:
Expand Down Expand Up @@ -141,7 +141,18 @@ Example 4: Multiple component in a pipeline.

In the P/D disaggregated setup you would have `deepseek-distill-llama8b.prefill.generate` (possibly multiple instances of this) and `deepseek-distill-llama8b.decode.generate`.

For output it is always only `out=dyn`. This tells Dynamo to auto-discover the instances, group them by model, and load balance appropriately (depending on `--router-mode` flag). The old syntax of `dyn://...` is still accepted for backwards compatibility.
For output it is always only `out=auto`. This tells Dynamo to auto-discover the instances, group them by model, and load balance appropriately (depending on `--router-mode` flag). The exception is static workers, see that section.

### Static workers without etcd

Normally in the distributed system the frontend uses etcd to discover workers. The option exists to have a static endpoint without etcd.

```
Node 1: dynamo-run in=http out=dyn://dynamo.backend.generate --model-name Qwen3-0.6B-Q8_0.gguf --model-path ~/llms/Qwen3-0.6B
Node 2: dynamo-run in=dyn://dynamo.backend.generate out=llamacpp ~/llms/Qwen3-0.6B-Q8_0.gguf --static-worker --context-length 4096
```

Note how `out=` points to a single endpoint, which must match the worker. The model's name and config (to do pre-processing) are usually discovered by the frontend via etcd. Now we must pass them in (`--model-name` and `--model-path`).

### KV-aware routing

Expand Down Expand Up @@ -194,7 +205,7 @@ dynamo-run in=dyn://dynamo.endpoint.generate out=vllm /data/llms/Qwen/Qwen3-4B
**Start the ingress node**

```
dynamo-run in=http out=dyn --router-mode kv
dynamo-run in=http out=auto --router-mode kv
```

The only difference from the distributed system above is `--router-mode kv`. The patched vllm announces when a KV block is created or removed. The Dynamo router run finds the worker with the best match for those KV blocks and directs the traffic to that node.
Expand Down Expand Up @@ -569,7 +580,7 @@ And below are arguments that are mocker-specific:
```bash
echo '{"speedup_ratio": 10.0}' > mocker_args.json
dynamo-run in=dyn://dynamo.mocker.generate out=mocker --model-path TinyLlama/TinyLlama-1.1B-Chat-v1.0 --extra-engine-args mocker_args.json
dynamo-run in=http out=dyn --router-mode kv
dynamo-run in=http out=auto --router-mode kv
```

### Extra engine arguments
Expand Down
38 changes: 36 additions & 2 deletions launch/dynamo-run/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::path::PathBuf;

use clap::ValueEnum;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::RouterConfig;
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_llm::local_model::LocalModel;
Expand Down Expand Up @@ -127,6 +128,12 @@ pub struct Flags {
#[arg(long, value_parser = clap::value_parser!(u32).range(0..1024))]
pub migration_limit: Option<u32>,

/// Make this a static worker.
/// Do not connect to or advertise self on etcd.
/// in=dyn://x.y.z only
#[arg(long, default_value = "false")]
pub static_worker: bool,

/// Everything after a `--`.
/// These are the command line arguments to the python engine when using `pystr` or `pytok`.
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
Expand All @@ -136,9 +143,23 @@ pub struct Flags {
impl Flags {
/// For each Output variant, check if it would be able to run.
/// This takes validation out of the main engine creation path.
pub fn validate(&self, local_model: &LocalModel, out_opt: &Output) -> anyhow::Result<()> {
pub fn validate(
&self,
local_model: &LocalModel,
in_opt: &Input,
out_opt: &Output,
) -> anyhow::Result<()> {
match in_opt {
Input::Endpoint(_) => {}
_ => {
if self.static_worker {
anyhow::bail!("'--static-worker true' only applies to in=dyn://x.y.z");
}
}
}

match out_opt {
Output::Dynamic => {
Output::Auto => {
if self.context_length.is_some() {
anyhow::bail!("'--context-length' flag should only be used on the worker node, not on the ingress");
}
Expand All @@ -149,6 +170,19 @@ impl Flags {
anyhow::bail!("'--migration-limit' flag should only be used on the worker node, not on the ingress");
}
}
Output::Static(_) => {
if self.model_name.is_none()
|| self
.model_path_pos
.as_ref()
.or(self.model_path_flag.as_ref())
.is_none()
{
anyhow::bail!(
"out=dyn://<path> requires --model-name and --model-path, which are the name and path on disk of the model we expect to serve."
);
}
}
Output::EchoFull => {}
Output::EchoCore => {
if !local_model.card().has_tokenizer() {
Expand Down
23 changes: 20 additions & 3 deletions launch/dynamo-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::Context as _;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::EngineConfig;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::distributed::DistributedConfig;
use dynamo_runtime::CancellationToken;
use dynamo_runtime::{DistributedRuntime, Runtime};

Expand Down Expand Up @@ -50,9 +51,13 @@ pub async fn run(
if let Input::Endpoint(path) = &in_opt {
builder.endpoint_id(Some(path.parse().with_context(|| path.clone())?));

let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let dst_config = DistributedConfig::from_settings(flags.static_worker);
let distributed_runtime = DistributedRuntime::new(runtime.clone(), dst_config).await?;
rt = Either::Right(distributed_runtime);
};
if let Some(Output::Static(path)) = &out_opt {
builder.endpoint_id(Some(path.parse().with_context(|| path.clone())?));
}

let local_model = builder.build().await?;

Expand All @@ -64,7 +69,7 @@ pub async fn run(
print_cuda(&out_opt);

// Now that we know the output we're targeting, check if we expect it to work
flags.validate(&local_model, &out_opt)?;
flags.validate(&local_model, &in_opt, &out_opt)?;

// Make an engine from the local_model, flags and output.
let engine_config = engine_for(
Expand Down Expand Up @@ -94,24 +99,35 @@ async fn engine_for(
rt: Either<Runtime, DistributedRuntime>,
) -> anyhow::Result<EngineConfig> {
match out_opt {
Output::Dynamic => Ok(EngineConfig::Dynamic(Box::new(local_model))),
Output::Auto => {
// Auto-discover backends
Ok(EngineConfig::Dynamic(Box::new(local_model)))
}
Output::Static(_) => {
// A single static backend, no etcd
Ok(EngineConfig::StaticRemote(Box::new(local_model)))
}
Output::EchoFull => Ok(EngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(),
is_static: flags.static_worker,
}),
Output::EchoCore => Ok(EngineConfig::StaticCore {
engine: dynamo_llm::engines::make_engine_core(),
model: Box::new(local_model),
is_static: flags.static_worker,
}),
#[cfg(feature = "mistralrs")]
Output::MistralRs => Ok(EngineConfig::StaticFull {
engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
model: Box::new(local_model),
is_static: flags.static_worker,
}),
#[cfg(feature = "llamacpp")]
Output::LlamaCpp => Ok(EngineConfig::StaticCore {
engine: dynamo_engine_llamacpp::make_engine(cancel_token, &local_model).await?,
model: Box::new(local_model),
is_static: flags.static_worker,
}),
Output::Mocker => {
let Either::Right(drt) = rt else {
Expand All @@ -127,6 +143,7 @@ async fn engine_for(
Ok(EngineConfig::StaticCore {
engine,
model: Box::new(local_model),
is_static: flags.static_worker,
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions launch/dynamo-run/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Example:
- OR: ./dynamo-run /data/models/Llama-3.2-1B-Instruct-Q4_K_M.gguf
"#;

const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv] [--kv-overlap-score-weight=2.0] [--kv-gpu-cache-usage-weight=1.0] [--kv-waiting-requests-weight=1.0] [--migration-limit=0] [--verbosity (-v|-vv)]";
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|auto|dyn://<path> [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--static-worker] [--router-mode random|round-robin|kv] [--kv-overlap-score-weight=2.0] [--kv-gpu-cache-usage-weight=1.0] [--kv-waiting-requests-weight=1.0] [--migration-limit=0] [--verbosity (-v|-vv)]";

fn main() -> anyhow::Result<()> {
// Set log level based on verbosity flag
Expand Down Expand Up @@ -134,5 +134,5 @@ fn is_in_dynamic(in_opt: &Input) -> bool {
}

fn is_out_dynamic(out_opt: &Option<Output>) -> bool {
matches!(out_opt, Some(Output::Dynamic))
matches!(out_opt, Some(Output::Auto) | Some(Output::Static(_)))
}
22 changes: 13 additions & 9 deletions launch/dynamo-run/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ pub enum Output {
EchoCore,

/// Listen for models on nats/etcd, add/remove dynamically
Dynamic,
Auto,

/// Static remote: The dyn://namespace.component.endpoint name of a remote worker we expect to
/// exists. THIS DISABLES AUTO-DISCOVERY. Only this endpoint will be connected.
/// `--model-name and `--model-path` must also be set.
///
/// A static remote setup avoids having to run etcd.
Static(String),

#[cfg(feature = "mistralrs")]
/// Run inference on a model in a GGUF file using mistralrs w/ candle
Expand Down Expand Up @@ -40,15 +47,11 @@ impl TryFrom<&str> for Output {
"echo_full" => Ok(Output::EchoFull),
"echo_core" => Ok(Output::EchoCore),

"dyn" => Ok(Output::Dynamic),
"dyn" | "auto" => Ok(Output::Auto),

// Deprecated, should only use `out=dyn`
endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => {
tracing::warn!(
"out=dyn://<path> is deprecated, the path is not used. Please use 'out=dyn'"
);
//let path = endpoint_path.strip_prefix(ENDPOINT_SCHEME).unwrap();
Ok(Output::Dynamic)
let path = endpoint_path.strip_prefix(ENDPOINT_SCHEME).unwrap();
Ok(Output::Static(path.to_string()))
}

e => Err(anyhow::anyhow!("Invalid out= option '{e}'")),
Expand All @@ -69,7 +72,8 @@ impl fmt::Display for Output {
Output::EchoFull => "echo_full",
Output::EchoCore => "echo_core",

Output::Dynamic => "dyn",
Output::Auto => "auto",
Output::Static(endpoint) => &format!("{ENDPOINT_SCHEME}{endpoint}"),
};
write!(f, "{s}")
}
Expand Down
Loading
Loading