Skip to content

Commit 1f07dab

Browse files
authored
feat: Add migration to LLM requests (#1930)
1 parent 5f17918 commit 1f07dab

File tree

19 files changed

+812
-111
lines changed

19 files changed

+812
-111
lines changed

launch/dynamo-run/src/flags.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ pub struct Flags {
162162
#[arg(long)]
163163
pub request_template: Option<PathBuf>,
164164

165+
/// How many times a request can be migrated to another worker if the HTTP server lost
166+
/// connection to the current worker.
167+
#[arg(long, value_parser = clap::value_parser!(u32).range(0..1024))]
168+
pub migration_limit: Option<u32>,
169+
165170
/// Everything after a `--`.
166171
/// These are the command line arguments to the python engine when using `pystr` or `pytok`.
167172
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
@@ -180,6 +185,9 @@ impl Flags {
180185
if self.kv_cache_block_size.is_some() {
181186
anyhow::bail!("'--kv-cache-block-size' flag should only be used on the worker node, not on the ingress");
182187
}
188+
if self.migration_limit.is_some() {
189+
anyhow::bail!("'--migration-limit' flag should only be used on the worker node, not on the ingress");
190+
}
183191
}
184192
Output::EchoFull => {}
185193
Output::EchoCore => {

launch/dynamo-run/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ pub async fn run(
4545
.context_length(flags.context_length)
4646
.http_port(Some(flags.http_port))
4747
.router_config(Some(flags.router_config()))
48-
.request_template(flags.request_template.clone());
48+
.request_template(flags.request_template.clone())
49+
.migration_limit(flags.migration_limit);
4950

5051
// If `in=dyn` we want the trtllm/sglang/vllm subprocess to listen on that endpoint.
5152
// If not, then the endpoint isn't exposed so we let LocalModel invent one.

launch/dynamo-run/src/subprocess.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub async fn start(
4848
card.kv_cache_block_size.to_string(),
4949
"--context-length".to_string(),
5050
card.context_length.to_string(),
51+
"--migration-limit".to_string(),
52+
card.migration_limit.to_string(),
5153
];
5254
// TRTLLM only
5355
// The worker node will only publish events and metrics if the router mode is KV

launch/dynamo-run/src/subprocess/sglang_inc.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class Config:
4242
nnodes: int
4343
node_rank: int
4444
dist_init_addr: str
45+
migration_limit: int
4546
extra_engine_args: str
4647

4748

@@ -202,7 +203,13 @@ async def init(runtime: DistributedRuntime, config: Config):
202203
model_type = (
203204
ModelType.Backend if not engine_args.is_embedding else ModelType.Embedding
204205
)
205-
await register_llm(model_type, endpoint, config.model_path, config.model_name)
206+
await register_llm(
207+
model_type,
208+
endpoint,
209+
config.model_path,
210+
config.model_name,
211+
migration_limit=config.migration_limit,
212+
)
206213

207214
# the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
208215
# after the lease is revoked
@@ -268,6 +275,12 @@ def cmd_line_args():
268275
default="",
269276
help="Host address (e.g., `192.168.0.2:25000`) of the node with rank 0",
270277
)
278+
parser.add_argument(
279+
"--migration-limit",
280+
type=int,
281+
default=0,
282+
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
283+
)
271284
parser.add_argument(
272285
"--extra-engine-args",
273286
type=str,
@@ -304,6 +317,7 @@ def cmd_line_args():
304317
config.nnodes = args.nnodes
305318
config.node_rank = args.node_rank
306319
config.dist_init_addr = args.dist_init_addr
320+
config.migration_limit = args.migration_limit
307321
config.extra_engine_args = args.extra_engine_args
308322
return config
309323

launch/dynamo-run/src/subprocess/trtllm_inc.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class Config:
122122
model_name: Optional[str] = None
123123
tensor_parallel_size: int
124124
kv_block_size: int
125+
migration_limit: int
125126
extra_engine_args: str
126127
publish_events_and_metrics: bool
127128
disaggregation_mode: str
@@ -136,6 +137,7 @@ def __str__(self) -> str:
136137
f"model_name={self.model_name}, "
137138
f"tensor_parallel_size={self.tensor_parallel_size}, "
138139
f"kv_block_size={self.kv_block_size}, "
140+
f"migration_limit={self.migration_limit}, "
139141
f"extra_engine_args={self.extra_engine_args}, "
140142
f"publish_events_and_metrics={self.publish_events_and_metrics}, "
141143
f"disaggregation_mode={self.disaggregation_mode}, "
@@ -404,6 +406,7 @@ async def init(runtime: DistributedRuntime, config: Config):
404406
config.model_path,
405407
config.model_name,
406408
kv_cache_block_size=config.kv_block_size,
409+
migration_limit=config.migration_limit,
407410
)
408411

409412
# publisher will be set later if publishing is enabled.
@@ -476,6 +479,12 @@ def cmd_line_args():
476479
default=None,
477480
help="This argument is not used by TRTLLM. Please provide max_input_len, max_seq_len and max_output_len in yaml file and point --extra-engine-args to the yaml file.",
478481
)
482+
parser.add_argument(
483+
"--migration-limit",
484+
type=int,
485+
default=0,
486+
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
487+
)
479488
parser.add_argument(
480489
"--extra-engine-args",
481490
type=str,
@@ -557,6 +566,7 @@ def cmd_line_args():
557566
config.endpoint = parsed_endpoint_name
558567
config.tensor_parallel_size = args.tensor_parallel_size
559568
config.kv_block_size = args.kv_block_size
569+
config.migration_limit = args.migration_limit
560570
config.extra_engine_args = args.extra_engine_args
561571
config.publish_events_and_metrics = args.publish_events_and_metrics
562572
config.disaggregation_mode = disaggregation_mode

launch/dynamo-run/src/subprocess/vllm_inc.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class Config:
5656
tensor_parallel_size: int
5757
kv_block_size: int
5858
context_length: int
59+
migration_limit: int
5960
extra_engine_args: str
6061

6162

@@ -233,6 +234,7 @@ async def init(runtime: DistributedRuntime, config: Config):
233234
"max_model_len", None
234235
), # if None, takes length from tokenizer
235236
kv_cache_block_size=arg_map["block_size"],
237+
migration_limit=config.migration_limit,
236238
)
237239
handler = RequestHandler(component, engine_client, default_sampling_params)
238240
handler.setup_kv_metrics()
@@ -276,6 +278,12 @@ def cmd_line_args():
276278
default=None,
277279
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
278280
)
281+
parser.add_argument(
282+
"--migration-limit",
283+
type=int,
284+
default=0,
285+
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
286+
)
279287
parser.add_argument(
280288
"--extra-engine-args",
281289
type=str,
@@ -308,6 +316,7 @@ def cmd_line_args():
308316
config.tensor_parallel_size = args.tensor_parallel_size
309317
config.kv_block_size = args.kv_block_size
310318
config.context_length = args.context_length
319+
config.migration_limit = args.migration_limit
311320
config.extra_engine_args = args.extra_engine_args
312321

313322
return config

launch/dynamo-run/src/subprocess/vllm_v1_inc.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class Config:
6565
tensor_parallel_size: int
6666
kv_block_size: int
6767
context_length: int
68+
migration_limit: int
6869
extra_engine_args: str
6970

7071

@@ -218,6 +219,7 @@ async def init(runtime: DistributedRuntime, config: Config):
218219
config.model_path,
219220
config.model_name,
220221
kv_cache_block_size=config.kv_block_size,
222+
migration_limit=config.migration_limit,
221223
)
222224

223225
arg_map = {
@@ -333,6 +335,12 @@ def cmd_line_args():
333335
default=None,
334336
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
335337
)
338+
parser.add_argument(
339+
"--migration-limit",
340+
type=int,
341+
default=0,
342+
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
343+
)
336344
parser.add_argument(
337345
"--extra-engine-args",
338346
type=str,
@@ -365,6 +373,7 @@ def cmd_line_args():
365373
config.tensor_parallel_size = args.tensor_parallel_size
366374
config.kv_block_size = args.kv_block_size
367375
config.context_length = args.context_length
376+
config.migration_limit = args.migration_limit
368377
config.extra_engine_args = args.extra_engine_args
369378

370379
return config

lib/bindings/python/rust/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32)
131131
}
132132

133133
#[pyfunction]
134-
#[pyo3(signature = (model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None))]
134+
#[pyo3(signature = (model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None, migration_limit=0))]
135135
#[allow(clippy::too_many_arguments)]
136136
fn register_llm<'p>(
137137
py: Python<'p>,
@@ -142,6 +142,7 @@ fn register_llm<'p>(
142142
context_length: Option<u32>,
143143
kv_cache_block_size: Option<u32>,
144144
router_mode: Option<RouterMode>,
145+
migration_limit: u32,
145146
) -> PyResult<Bound<'p, PyAny>> {
146147
let model_type_obj = match model_type {
147148
ModelType::Chat => llm_rs::model_type::ModelType::Chat,
@@ -162,7 +163,8 @@ fn register_llm<'p>(
162163
.model_name(model_name)
163164
.context_length(context_length)
164165
.kv_cache_block_size(kv_cache_block_size)
165-
.router_config(Some(router_config));
166+
.router_config(Some(router_config))
167+
.migration_limit(Some(migration_limit));
166168
// Download from HF, load the ModelDeploymentCard
167169
let mut local_model = builder.build().await.map_err(to_pyerr)?;
168170
// Advertise ourself on etcd so ingress can find us

lib/llm/src/discovery/watcher.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use dynamo_runtime::{
1919
use crate::{
2020
backend::Backend,
2121
kv_router::{KvPushRouter, KvRouterConfig},
22+
migration::Migration,
2223
model_type::ModelType,
2324
preprocessor::{OpenAIPreprocessor, PreprocessedEmbeddingRequest, PreprocessedRequest},
2425
protocols::common::llm_backend::{EmbeddingsEngineOutput, LLMEngineOutput},
@@ -197,12 +198,14 @@ impl ModelWatcher {
197198
// function. Needs checking carefully, possibly we need to store it in state.
198199
let _cache_dir = Some(card.move_from_nats(self.drt.nats_client()).await?);
199200

201+
// Chat Completions
200202
let frontend = SegmentSource::<
201203
SingleIn<NvCreateChatCompletionRequest>,
202204
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
203205
>::new();
204206
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
205207
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
208+
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
206209
let router =
207210
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
208211
client.clone(),
@@ -231,19 +234,23 @@ impl ModelWatcher {
231234
let chat_engine = frontend
232235
.link(preprocessor.forward_edge())?
233236
.link(backend.forward_edge())?
237+
.link(migration.forward_edge())?
234238
.link(service_backend)?
239+
.link(migration.backward_edge())?
235240
.link(backend.backward_edge())?
236241
.link(preprocessor.backward_edge())?
237242
.link(frontend)?;
238243
self.manager
239244
.add_chat_completions_model(&model_entry.name, chat_engine)?;
240245

246+
// Completions
241247
let frontend = SegmentSource::<
242248
SingleIn<NvCreateCompletionRequest>,
243249
ManyOut<Annotated<NvCreateCompletionResponse>>,
244250
>::new();
245251
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
246252
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
253+
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
247254
let router =
248255
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
249256
client,
@@ -272,7 +279,9 @@ impl ModelWatcher {
272279
let completions_engine = frontend
273280
.link(preprocessor.forward_edge())?
274281
.link(backend.forward_edge())?
282+
.link(migration.forward_edge())?
275283
.link(service_backend)?
284+
.link(migration.backward_edge())?
276285
.link(backend.backward_edge())?
277286
.link(preprocessor.backward_edge())?
278287
.link(frontend)?;

lib/llm/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod hub;
2222
// pub mod key_value_store;
2323
pub mod kv_router;
2424
pub mod local_model;
25+
pub mod migration;
2526
pub mod mocker;
2627
pub mod model_card;
2728
pub mod model_type;

0 commit comments

Comments
 (0)