Skip to content

Commit 7c9c52f

Browse files
committed
Add --migration-limit flag
1 parent c4101d5 commit 7c9c52f

File tree

12 files changed

+94
-53
lines changed

12 files changed

+94
-53
lines changed

launch/dynamo-run/src/flags.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ pub struct Flags {
161161
#[arg(long)]
162162
pub request_template: Option<PathBuf>,
163163

164+
/// How many times a request can be migrated to another worker if the HTTP server lost
165+
/// connection to the current worker.
166+
#[arg(long, value_parser = clap::value_parser!(u32).range(0..1024))]
167+
pub migration_limit: Option<u32>,
168+
164169
/// Everything after a `--`.
165170
/// These are the command line arguments to the python engine when using `pystr` or `pytok`.
166171
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
@@ -179,6 +184,9 @@ impl Flags {
179184
if self.kv_cache_block_size.is_some() {
180185
anyhow::bail!("'--kv-cache-block-size' flag should only be used on the worker node, not on the ingress");
181186
}
187+
if self.migration_limit.is_some() {
188+
anyhow::bail!("'--migration-limit' flag should only be used on the worker node, not on the ingress");
189+
}
182190
}
183191
Output::EchoFull => {}
184192
Output::EchoCore => {

launch/dynamo-run/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ pub async fn run(
4444
.context_length(flags.context_length)
4545
.http_port(Some(flags.http_port))
4646
.router_config(flags.router_config())
47-
.request_template(flags.request_template.clone());
47+
.request_template(flags.request_template.clone())
48+
.migration_limit(flags.migration_limit);
4849

4950
// If `in=dyn` we want the trtllm/sglang/vllm subprocess to listen on that endpoint.
5051
// If not, then the endpoint isn't exposed so we let LocalModel invent one.

launch/dynamo-run/src/subprocess.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub async fn start(
4848
card.kv_cache_block_size.to_string(),
4949
"--context-length".to_string(),
5050
card.context_length.to_string(),
51+
"--migration-limit".to_string(),
52+
card.migration_limit.to_string(),
5153
];
5254
// TRTLLM only
5355
// The worker node will only publish events and metrics if the router mode is KV

launch/dynamo-run/src/subprocess/sglang_inc.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class Config:
4242
nnodes: int
4343
node_rank: int
4444
dist_init_addr: str
45+
migration_limit: int
4546
extra_engine_args: str
4647

4748

@@ -202,7 +203,13 @@ async def init(runtime: DistributedRuntime, config: Config):
202203
model_type = (
203204
ModelType.Backend if not engine_args.is_embedding else ModelType.Embedding
204205
)
205-
await register_llm(model_type, endpoint, config.model_path, config.model_name)
206+
await register_llm(
207+
model_type,
208+
endpoint,
209+
config.model_path,
210+
config.model_name,
211+
migration_limit=config.migration_limit,
212+
)
206213

207214
# the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
208215
# after the lease is revoked
@@ -268,6 +275,12 @@ def cmd_line_args():
268275
default="",
269276
help="Host address (e.g., `192.168.0.2:25000`) of the node with rank 0",
270277
)
278+
parser.add_argument(
279+
"--migration-limit",
280+
type=int,
281+
default=0,
282+
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
283+
)
271284
parser.add_argument(
272285
"--extra-engine-args",
273286
type=str,
@@ -304,6 +317,7 @@ def cmd_line_args():
304317
config.nnodes = args.nnodes
305318
config.node_rank = args.node_rank
306319
config.dist_init_addr = args.dist_init_addr
320+
config.migration_limit = args.migration_limit
307321
config.extra_engine_args = args.extra_engine_args
308322
return config
309323

launch/dynamo-run/src/subprocess/trtllm_inc.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class Config:
122122
model_name: Optional[str] = None
123123
tensor_parallel_size: int
124124
kv_block_size: int
125+
migration_limit: int
125126
extra_engine_args: str
126127
publish_events_and_metrics: bool
127128
disaggregation_mode: str
@@ -136,6 +137,7 @@ def __str__(self) -> str:
136137
f"model_name={self.model_name}, "
137138
f"tensor_parallel_size={self.tensor_parallel_size}, "
138139
f"kv_block_size={self.kv_block_size}, "
140+
f"migration_limit={self.migration_limit}, "
139141
f"extra_engine_args={self.extra_engine_args}, "
140142
f"publish_events_and_metrics={self.publish_events_and_metrics}, "
141143
f"disaggregation_mode={self.disaggregation_mode}, "
@@ -404,6 +406,7 @@ async def init(runtime: DistributedRuntime, config: Config):
404406
config.model_path,
405407
config.model_name,
406408
kv_cache_block_size=config.kv_block_size,
409+
migration_limit=config.migration_limit,
407410
)
408411

409412
# publisher will be set later if publishing is enabled.
@@ -476,6 +479,12 @@ def cmd_line_args():
476479
default=None,
477480
help="This argument is not used by TRTLLM. Please provide max_input_len, max_seq_len and max_output_len in yaml file and point --extra-engine-args to the yaml file.",
478481
)
482+
parser.add_argument(
483+
"--migration-limit",
484+
type=int,
485+
default=0,
486+
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
487+
)
479488
parser.add_argument(
480489
"--extra-engine-args",
481490
type=str,
@@ -557,6 +566,7 @@ def cmd_line_args():
557566
config.endpoint = parsed_endpoint_name
558567
config.tensor_parallel_size = args.tensor_parallel_size
559568
config.kv_block_size = args.kv_block_size
569+
config.migration_limit = args.migration_limit
560570
config.extra_engine_args = args.extra_engine_args
561571
config.publish_events_and_metrics = args.publish_events_and_metrics
562572
config.disaggregation_mode = disaggregation_mode

launch/dynamo-run/src/subprocess/vllm_inc.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class Config:
5656
tensor_parallel_size: int
5757
kv_block_size: int
5858
context_length: int
59+
migration_limit: int
5960
extra_engine_args: str
6061

6162

@@ -233,6 +234,7 @@ async def init(runtime: DistributedRuntime, config: Config):
233234
"max_model_len", None
234235
), # if None, takes length from tokenizer
235236
kv_cache_block_size=arg_map["block_size"],
237+
migration_limit=config.migration_limit,
236238
)
237239
handler = RequestHandler(component, engine_client, default_sampling_params)
238240
handler.setup_kv_metrics()
@@ -276,6 +278,12 @@ def cmd_line_args():
276278
default=None,
277279
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
278280
)
281+
parser.add_argument(
282+
"--migration-limit",
283+
type=int,
284+
default=0,
285+
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
286+
)
279287
parser.add_argument(
280288
"--extra-engine-args",
281289
type=str,
@@ -308,6 +316,7 @@ def cmd_line_args():
308316
config.tensor_parallel_size = args.tensor_parallel_size
309317
config.kv_block_size = args.kv_block_size
310318
config.context_length = args.context_length
319+
config.migration_limit = args.migration_limit
311320
config.extra_engine_args = args.extra_engine_args
312321

313322
return config

launch/dynamo-run/src/subprocess/vllm_v1_inc.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class Config:
6565
tensor_parallel_size: int
6666
kv_block_size: int
6767
context_length: int
68+
migration_limit: int
6869
extra_engine_args: str
6970

7071

@@ -218,6 +219,7 @@ async def init(runtime: DistributedRuntime, config: Config):
218219
config.model_path,
219220
config.model_name,
220221
kv_cache_block_size=config.kv_block_size,
222+
migration_limit=config.migration_limit,
221223
)
222224

223225
arg_map = {
@@ -333,6 +335,12 @@ def cmd_line_args():
333335
default=None,
334336
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
335337
)
338+
parser.add_argument(
339+
"--migration-limit",
340+
type=int,
341+
default=0,
342+
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
343+
)
336344
parser.add_argument(
337345
"--extra-engine-args",
338346
type=str,
@@ -365,6 +373,7 @@ def cmd_line_args():
365373
config.tensor_parallel_size = args.tensor_parallel_size
366374
config.kv_block_size = args.kv_block_size
367375
config.context_length = args.context_length
376+
config.migration_limit = args.migration_limit
368377
config.extra_engine_args = args.extra_engine_args
369378

370379
return config

lib/bindings/python/rust/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32)
129129
}
130130

131131
#[pyfunction]
132-
#[pyo3(signature = (model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None))]
132+
#[pyo3(signature = (model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None, migration_limit=0))]
133133
#[allow(clippy::too_many_arguments)]
134134
fn register_llm<'p>(
135135
py: Python<'p>,
@@ -140,6 +140,7 @@ fn register_llm<'p>(
140140
context_length: Option<u32>,
141141
kv_cache_block_size: Option<u32>,
142142
router_mode: Option<RouterMode>,
143+
migration_limit: u32,
143144
) -> PyResult<Bound<'p, PyAny>> {
144145
let model_type_obj = match model_type {
145146
ModelType::Chat => llm_rs::model_type::ModelType::Chat,
@@ -160,7 +161,8 @@ fn register_llm<'p>(
160161
.model_name(model_name)
161162
.context_length(context_length)
162163
.kv_cache_block_size(kv_cache_block_size)
163-
.router_config(router_config);
164+
.router_config(router_config)
165+
.migration_limit(Some(migration_limit));
164166
// Download from HF, load the ModelDeploymentCard
165167
let mut local_model = builder.build().await.map_err(to_pyerr)?;
166168
// Advertise ourself on etcd so ingress can find us

lib/llm/src/local_model.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub struct LocalModelBuilder {
4646
router_config: Option<RouterConfig>,
4747
kv_cache_block_size: u32,
4848
http_port: u16,
49+
migration_limit: u32,
4950
}
5051

5152
impl Default for LocalModelBuilder {
@@ -60,6 +61,7 @@ impl Default for LocalModelBuilder {
6061
context_length: Default::default(),
6162
template_file: Default::default(),
6263
router_config: Default::default(),
64+
migration_limit: Default::default(),
6365
}
6466
}
6567
}
@@ -112,6 +114,11 @@ impl LocalModelBuilder {
112114
self
113115
}
114116

117+
pub fn migration_limit(&mut self, migration_limit: Option<u32>) -> &mut Self {
118+
self.migration_limit = migration_limit.unwrap_or(0);
119+
self
120+
}
121+
115122
/// Make an LLM ready for use:
116123
/// - Download it from Hugging Face (and NGC in future) if necessary
117124
/// - Resolve the path
@@ -137,10 +144,12 @@ impl LocalModelBuilder {
137144

138145
// echo_full engine doesn't need a path. It's an edge case, move it out of the way.
139146
if self.model_path.is_none() {
147+
let mut card = ModelDeploymentCard::with_name_only(
148+
self.model_name.as_deref().unwrap_or(DEFAULT_NAME),
149+
);
150+
card.migration_limit = self.migration_limit;
140151
return Ok(LocalModel {
141-
card: ModelDeploymentCard::with_name_only(
142-
self.model_name.as_deref().unwrap_or(DEFAULT_NAME),
143-
),
152+
card,
144153
full_path: PathBuf::new(),
145154
endpoint_id,
146155
template,
@@ -194,6 +203,8 @@ impl LocalModelBuilder {
194203
card.context_length = context_length;
195204
}
196205

206+
card.migration_limit = self.migration_limit;
207+
197208
Ok(LocalModel {
198209
card,
199210
full_path,

0 commit comments

Comments
 (0)