Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions launch/dynamo-run/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ pub struct Flags {
#[arg(long)]
pub request_template: Option<PathBuf>,

/// How many times a request can be migrated to another worker if the HTTP server lost
/// connection to the current worker.
#[arg(long, value_parser = clap::value_parser!(u32).range(0..1024))]
pub migration_limit: Option<u32>,

/// Everything after a `--`.
/// These are the command line arguments to the python engine when using `pystr` or `pytok`.
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
Expand All @@ -180,6 +185,9 @@ impl Flags {
if self.kv_cache_block_size.is_some() {
anyhow::bail!("'--kv-cache-block-size' flag should only be used on the worker node, not on the ingress");
}
if self.migration_limit.is_some() {
anyhow::bail!("'--migration-limit' flag should only be used on the worker node, not on the ingress");
}
}
Output::EchoFull => {}
Output::EchoCore => {
Expand Down
3 changes: 2 additions & 1 deletion launch/dynamo-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ pub async fn run(
.context_length(flags.context_length)
.http_port(Some(flags.http_port))
.router_config(Some(flags.router_config()))
.request_template(flags.request_template.clone());
.request_template(flags.request_template.clone())
.migration_limit(flags.migration_limit);

// If `in=dyn` we want the trtllm/sglang/vllm subprocess to listen on that endpoint.
// If not, then the endpoint isn't exposed so we let LocalModel invent one.
Expand Down
2 changes: 2 additions & 0 deletions launch/dynamo-run/src/subprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub async fn start(
card.kv_cache_block_size.to_string(),
"--context-length".to_string(),
card.context_length.to_string(),
"--migration-limit".to_string(),
card.migration_limit.to_string(),
];
// TRTLLM only
// The worker node will only publish events and metrics if the router mode is KV
Expand Down
16 changes: 15 additions & 1 deletion launch/dynamo-run/src/subprocess/sglang_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Config:
nnodes: int
node_rank: int
dist_init_addr: str
migration_limit: int
extra_engine_args: str


Expand Down Expand Up @@ -202,7 +203,13 @@ async def init(runtime: DistributedRuntime, config: Config):
model_type = (
ModelType.Backend if not engine_args.is_embedding else ModelType.Embedding
)
await register_llm(model_type, endpoint, config.model_path, config.model_name)
await register_llm(
model_type,
endpoint,
config.model_path,
config.model_name,
migration_limit=config.migration_limit,
)

# the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
# after the lease is revoked
Expand Down Expand Up @@ -268,6 +275,12 @@ def cmd_line_args():
default="",
help="Host address (e.g., `192.168.0.2:25000`) of the node with rank 0",
)
parser.add_argument(
"--migration-limit",
type=int,
default=0,
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
)
parser.add_argument(
"--extra-engine-args",
type=str,
Expand Down Expand Up @@ -304,6 +317,7 @@ def cmd_line_args():
config.nnodes = args.nnodes
config.node_rank = args.node_rank
config.dist_init_addr = args.dist_init_addr
config.migration_limit = args.migration_limit
config.extra_engine_args = args.extra_engine_args
return config

Expand Down
10 changes: 10 additions & 0 deletions launch/dynamo-run/src/subprocess/trtllm_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class Config:
model_name: Optional[str] = None
tensor_parallel_size: int
kv_block_size: int
migration_limit: int
extra_engine_args: str
publish_events_and_metrics: bool
disaggregation_mode: str
Expand All @@ -136,6 +137,7 @@ def __str__(self) -> str:
f"model_name={self.model_name}, "
f"tensor_parallel_size={self.tensor_parallel_size}, "
f"kv_block_size={self.kv_block_size}, "
f"migration_limit={self.migration_limit}, "
f"extra_engine_args={self.extra_engine_args}, "
f"publish_events_and_metrics={self.publish_events_and_metrics}, "
f"disaggregation_mode={self.disaggregation_mode}, "
Expand Down Expand Up @@ -404,6 +406,7 @@ async def init(runtime: DistributedRuntime, config: Config):
config.model_path,
config.model_name,
kv_cache_block_size=config.kv_block_size,
migration_limit=config.migration_limit,
)

# publisher will be set later if publishing is enabled.
Expand Down Expand Up @@ -476,6 +479,12 @@ def cmd_line_args():
default=None,
help="This argument is not used by TRTLLM. Please provide max_input_len, max_seq_len and max_output_len in yaml file and point --extra-engine-args to the yaml file.",
)
parser.add_argument(
"--migration-limit",
type=int,
default=0,
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
)
parser.add_argument(
"--extra-engine-args",
type=str,
Expand Down Expand Up @@ -557,6 +566,7 @@ def cmd_line_args():
config.endpoint = parsed_endpoint_name
config.tensor_parallel_size = args.tensor_parallel_size
config.kv_block_size = args.kv_block_size
config.migration_limit = args.migration_limit
config.extra_engine_args = args.extra_engine_args
config.publish_events_and_metrics = args.publish_events_and_metrics
config.disaggregation_mode = disaggregation_mode
Expand Down
9 changes: 9 additions & 0 deletions launch/dynamo-run/src/subprocess/vllm_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Config:
tensor_parallel_size: int
kv_block_size: int
context_length: int
migration_limit: int
extra_engine_args: str


Expand Down Expand Up @@ -233,6 +234,7 @@ async def init(runtime: DistributedRuntime, config: Config):
"max_model_len", None
), # if None, takes length from tokenizer
kv_cache_block_size=arg_map["block_size"],
migration_limit=config.migration_limit,
)
handler = RequestHandler(component, engine_client, default_sampling_params)
handler.setup_kv_metrics()
Expand Down Expand Up @@ -276,6 +278,12 @@ def cmd_line_args():
default=None,
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
)
parser.add_argument(
"--migration-limit",
type=int,
default=0,
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
)
parser.add_argument(
"--extra-engine-args",
type=str,
Expand Down Expand Up @@ -308,6 +316,7 @@ def cmd_line_args():
config.tensor_parallel_size = args.tensor_parallel_size
config.kv_block_size = args.kv_block_size
config.context_length = args.context_length
config.migration_limit = args.migration_limit
config.extra_engine_args = args.extra_engine_args

return config
Expand Down
9 changes: 9 additions & 0 deletions launch/dynamo-run/src/subprocess/vllm_v1_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Config:
tensor_parallel_size: int
kv_block_size: int
context_length: int
migration_limit: int
extra_engine_args: str


Expand Down Expand Up @@ -218,6 +219,7 @@ async def init(runtime: DistributedRuntime, config: Config):
config.model_path,
config.model_name,
kv_cache_block_size=config.kv_block_size,
migration_limit=config.migration_limit,
)

arg_map = {
Expand Down Expand Up @@ -333,6 +335,12 @@ def cmd_line_args():
default=None,
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
)
parser.add_argument(
"--migration-limit",
type=int,
default=0,
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
)
parser.add_argument(
"--extra-engine-args",
type=str,
Expand Down Expand Up @@ -365,6 +373,7 @@ def cmd_line_args():
config.tensor_parallel_size = args.tensor_parallel_size
config.kv_block_size = args.kv_block_size
config.context_length = args.context_length
config.migration_limit = args.migration_limit
config.extra_engine_args = args.extra_engine_args

return config
Expand Down
6 changes: 4 additions & 2 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32)
}

#[pyfunction]
#[pyo3(signature = (model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None))]
#[pyo3(signature = (model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None, migration_limit=0))]
#[allow(clippy::too_many_arguments)]
fn register_llm<'p>(
py: Python<'p>,
Expand All @@ -142,6 +142,7 @@ fn register_llm<'p>(
context_length: Option<u32>,
kv_cache_block_size: Option<u32>,
router_mode: Option<RouterMode>,
migration_limit: u32,
) -> PyResult<Bound<'p, PyAny>> {
let model_type_obj = match model_type {
ModelType::Chat => llm_rs::model_type::ModelType::Chat,
Expand All @@ -162,7 +163,8 @@ fn register_llm<'p>(
.model_name(model_name)
.context_length(context_length)
.kv_cache_block_size(kv_cache_block_size)
.router_config(Some(router_config));
.router_config(Some(router_config))
.migration_limit(Some(migration_limit));
// Download from HF, load the ModelDeploymentCard
let mut local_model = builder.build().await.map_err(to_pyerr)?;
// Advertise ourself on etcd so ingress can find us
Expand Down
9 changes: 9 additions & 0 deletions lib/llm/src/discovery/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use dynamo_runtime::{
use crate::{
backend::Backend,
kv_router::{KvPushRouter, KvRouterConfig},
migration::Migration,
model_type::ModelType,
preprocessor::{OpenAIPreprocessor, PreprocessedEmbeddingRequest, PreprocessedRequest},
protocols::common::llm_backend::{EmbeddingsEngineOutput, LLMEngineOutput},
Expand Down Expand Up @@ -197,12 +198,14 @@ impl ModelWatcher {
// function. Needs checking carefully, possibly we need to store it in state.
let _cache_dir = Some(card.move_from_nats(self.drt.nats_client()).await?);

// Chat Completions
let frontend = SegmentSource::<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
>::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
let router =
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
client.clone(),
Expand Down Expand Up @@ -231,19 +234,23 @@ impl ModelWatcher {
let chat_engine = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(migration.forward_edge())?
.link(service_backend)?
.link(migration.backward_edge())?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
self.manager
.add_chat_completions_model(&model_entry.name, chat_engine)?;

// Completions
let frontend = SegmentSource::<
SingleIn<NvCreateCompletionRequest>,
ManyOut<Annotated<NvCreateCompletionResponse>>,
>::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
let router =
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
client,
Expand Down Expand Up @@ -272,7 +279,9 @@ impl ModelWatcher {
let completions_engine = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(migration.forward_edge())?
.link(service_backend)?
.link(migration.backward_edge())?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod hub;
// pub mod key_value_store;
pub mod kv_router;
pub mod local_model;
pub mod migration;
pub mod mocker;
pub mod model_card;
pub mod model_type;
Expand Down
17 changes: 14 additions & 3 deletions lib/llm/src/local_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct LocalModelBuilder {
router_config: Option<RouterConfig>,
kv_cache_block_size: u32,
http_port: u16,
migration_limit: u32,
}

impl Default for LocalModelBuilder {
Expand All @@ -60,6 +61,7 @@ impl Default for LocalModelBuilder {
context_length: Default::default(),
template_file: Default::default(),
router_config: Default::default(),
migration_limit: Default::default(),
}
}
}
Expand Down Expand Up @@ -112,6 +114,11 @@ impl LocalModelBuilder {
self
}

pub fn migration_limit(&mut self, migration_limit: Option<u32>) -> &mut Self {
self.migration_limit = migration_limit.unwrap_or(0);
self
}

/// Make an LLM ready for use:
/// - Download it from Hugging Face (and NGC in future) if necessary
/// - Resolve the path
Expand All @@ -137,10 +144,12 @@ impl LocalModelBuilder {

// echo_full engine doesn't need a path. It's an edge case, move it out of the way.
if self.model_path.is_none() {
let mut card = ModelDeploymentCard::with_name_only(
self.model_name.as_deref().unwrap_or(DEFAULT_NAME),
);
card.migration_limit = self.migration_limit;
return Ok(LocalModel {
card: ModelDeploymentCard::with_name_only(
self.model_name.as_deref().unwrap_or(DEFAULT_NAME),
),
card,
full_path: PathBuf::new(),
endpoint_id,
template,
Expand Down Expand Up @@ -194,6 +203,8 @@ impl LocalModelBuilder {
card.context_length = context_length;
}

card.migration_limit = self.migration_limit;

Ok(LocalModel {
card,
full_path,
Expand Down
Loading
Loading