Skip to content

Commit 820e2a1

Browse files
committed
Code Rabbit feedback
1 parent bf3c408 commit 820e2a1

File tree

9 files changed

+104
-139
lines changed

9 files changed

+104
-139
lines changed

components/frontend/src/dynamo/frontend/main.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,18 @@ def parse_args():
128128
help="Path to model directory on disk (e.g., /tmp/model_cache/lama3.2_1B/)",
129129
)
130130

131-
return parser.parse_args()
131+
flags = parser.parse_args()
132+
133+
if flags.static_endpoint and (not flags.model_name or not flags.model_path):
134+
parser.error("--static-endpoint requires both --model-name and --model-path")
135+
136+
return flags
132137

133138

134139
async def async_main():
135140
flags = parse_args()
136141
is_static = bool(flags.static_endpoint) # true if the string has a value
142+
137143
runtime = DistributedRuntime(asyncio.get_running_loop(), is_static)
138144

139145
if flags.router_mode == "kv":

launch/dynamo-run/src/flags.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,13 @@ impl Flags {
171171
}
172172
}
173173
Output::Static(_) => {
174-
if self.model_name.is_none() || self.model_name.is_none() {
174+
if self.model_name.is_none()
175+
|| self
176+
.model_path_pos
177+
.as_ref()
178+
.or(self.model_path_flag.as_ref())
179+
.is_none()
180+
{
175181
anyhow::bail!(
176182
"out=dyn://<path> requires --model-name and --model-path, which are the name and path on disk of the model we expect to serve."
177183
);

lib/bindings/python/examples/hello_world/server_sglang_static.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ async def generate(self, request):
6262
if finish_reason:
6363
# Don't forward the stop token
6464
out = {"token_ids": [], "finish_reason": finish_reason["type"]}
65+
next_total_toks = num_output_tokens_so_far
6566
else:
6667
next_total_toks = len(res["output_ids"])
6768
out = {"token_ids": res["output_ids"][num_output_tokens_so_far:]}

lib/llm/src/discovery/watcher.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use dynamo_runtime::{
1818

1919
use crate::{
2020
backend::Backend,
21-
engines,
21+
entrypoint,
2222
kv_router::KvRouterConfig,
2323
model_type::ModelType,
2424
preprocessor::{OpenAIPreprocessor, PreprocessedEmbeddingRequest},
@@ -216,19 +216,21 @@ impl ModelWatcher {
216216
None
217217
};
218218

219-
let chat_engine = engines::build_chat_completions(
220-
&card,
221-
&client,
222-
self.router_mode,
223-
kv_chooser.clone(),
224-
)
225-
.await?;
219+
let chat_engine =
220+
entrypoint::build_routed_pipeline::<
221+
NvCreateChatCompletionRequest,
222+
NvCreateChatCompletionStreamResponse,
223+
>(&card, &client, self.router_mode, kv_chooser.clone())
224+
.await?;
226225
self.manager
227226
.add_chat_completions_model(&model_entry.name, chat_engine)?;
228227

229228
let completions_engine =
230-
engines::build_completions(&card, &client, self.router_mode, kv_chooser)
231-
.await?;
229+
entrypoint::build_routed_pipeline::<
230+
NvCreateCompletionRequest,
231+
NvCreateCompletionResponse,
232+
>(&card, &client, self.router_mode, kv_chooser)
233+
.await?;
232234
self.manager
233235
.add_completions_model(&model_entry.name, completions_engine)?;
234236
}

lib/llm/src/engines.rs

Lines changed: 0 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
3-
//
4-
// Licensed under the Apache License, Version 2.0 (the "License");
5-
// you may not use this file except in compliance with the License.
6-
// You may obtain a copy of the License at
7-
//
8-
// http://www.apache.org/licenses/LICENSE-2.0
9-
//
10-
// Unless required by applicable law or agreed to in writing, software
11-
// distributed under the License is distributed on an "AS IS" BASIS,
12-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
// See the License for the specific language governing permissions and
14-
// limitations under the License.
153

164
use std::env;
175
use std::sync::Arc;
@@ -21,32 +9,17 @@ use std::time::Duration;
219
use async_stream::stream;
2210
use async_trait::async_trait;
2311

24-
use dynamo_runtime::component::Client;
2512
use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseStream};
26-
use dynamo_runtime::pipeline::Operator as _;
27-
use dynamo_runtime::pipeline::PushRouter;
28-
use dynamo_runtime::pipeline::RouterMode;
29-
use dynamo_runtime::pipeline::SegmentSource;
30-
use dynamo_runtime::pipeline::ServiceBackend;
31-
use dynamo_runtime::pipeline::Source as _;
3213
use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
3314
use dynamo_runtime::protocols::annotated::Annotated;
3415

35-
use crate::backend::Backend;
3616
use crate::backend::ExecutionContext;
37-
use crate::kv_router::KvPushRouter;
38-
use crate::kv_router::KvRouter;
39-
use crate::migration::Migration;
40-
use crate::model_card::ModelDeploymentCard;
41-
use crate::preprocessor::OpenAIPreprocessor;
4217
use crate::preprocessor::PreprocessedRequest;
4318
use crate::protocols::common::llm_backend::LLMEngineOutput;
4419
use crate::protocols::openai::{
4520
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
4621
completions::{prompt_to_string, NvCreateCompletionRequest, NvCreateCompletionResponse},
4722
};
48-
use crate::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
49-
use crate::types::openai::completions::OpenAICompletionsStreamingEngine;
5023
use crate::types::openai::embeddings::NvCreateEmbeddingRequest;
5124
use crate::types::openai::embeddings::NvCreateEmbeddingResponse;
5225

@@ -436,89 +409,3 @@ impl
436409
self.0.handle_chat(req).await
437410
}
438411
}
439-
440-
pub async fn build_chat_completions(
441-
card: &ModelDeploymentCard,
442-
client: &Client,
443-
router_mode: RouterMode,
444-
chooser: Option<Arc<KvRouter>>,
445-
) -> anyhow::Result<OpenAIChatCompletionsStreamingEngine> {
446-
let frontend = SegmentSource::<
447-
SingleIn<NvCreateChatCompletionRequest>,
448-
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
449-
>::new();
450-
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
451-
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
452-
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
453-
let router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
454-
client.clone(),
455-
router_mode,
456-
)
457-
.await?;
458-
let service_backend = match router_mode {
459-
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
460-
ServiceBackend::from_engine(Arc::new(router))
461-
}
462-
RouterMode::KV => {
463-
let Some(chooser) = chooser else {
464-
anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
465-
};
466-
let kv_push_router = KvPushRouter::new(router, chooser);
467-
ServiceBackend::from_engine(Arc::new(kv_push_router))
468-
}
469-
};
470-
471-
let chat_engine = frontend
472-
.link(preprocessor.forward_edge())?
473-
.link(backend.forward_edge())?
474-
.link(migration.forward_edge())?
475-
.link(service_backend)?
476-
.link(migration.backward_edge())?
477-
.link(backend.backward_edge())?
478-
.link(preprocessor.backward_edge())?
479-
.link(frontend)?;
480-
Ok(chat_engine)
481-
}
482-
483-
pub async fn build_completions(
484-
card: &ModelDeploymentCard,
485-
client: &Client,
486-
router_mode: RouterMode,
487-
chooser: Option<Arc<KvRouter>>,
488-
) -> anyhow::Result<OpenAICompletionsStreamingEngine> {
489-
let frontend = SegmentSource::<
490-
SingleIn<NvCreateCompletionRequest>,
491-
ManyOut<Annotated<NvCreateCompletionResponse>>,
492-
>::new();
493-
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
494-
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
495-
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
496-
let router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
497-
client.clone(),
498-
router_mode,
499-
)
500-
.await?;
501-
let service_backend = match router_mode {
502-
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
503-
ServiceBackend::from_engine(Arc::new(router))
504-
}
505-
RouterMode::KV => {
506-
let Some(chooser) = chooser else {
507-
anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
508-
};
509-
let kv_push_router = KvPushRouter::new(router, chooser);
510-
ServiceBackend::from_engine(Arc::new(kv_push_router))
511-
}
512-
};
513-
514-
let completions_engine = frontend
515-
.link(preprocessor.forward_edge())?
516-
.link(backend.forward_edge())?
517-
.link(migration.forward_edge())?
518-
.link(service_backend)?
519-
.link(migration.backward_edge())?
520-
.link(backend.backward_edge())?
521-
.link(preprocessor.backward_edge())?
522-
.link(frontend)?;
523-
Ok(completions_engine)
524-
}

lib/llm/src/entrypoint.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//! - Connect it to an Input
77
88
pub mod input;
9+
pub use input::build_routed_pipeline;
910

1011
use std::sync::Arc;
1112

lib/llm/src/entrypoint/input.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::{
1616

1717
pub mod batch;
1818
mod common;
19+
pub use common::build_routed_pipeline;
1920
pub mod endpoint;
2021
pub mod http;
2122
pub mod text;

lib/llm/src/entrypoint/input/common.rs

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ use std::pin::Pin;
66
use crate::{
77
backend::{Backend, ExecutionContext},
88
discovery::{ModelManager, ModelWatcher, MODEL_ROOT_PATH},
9-
engines::{self, StreamingEngineAdapter},
10-
entrypoint::EngineConfig,
9+
engines::StreamingEngineAdapter,
10+
entrypoint::{self, EngineConfig},
11+
kv_router::{KvPushRouter, KvRouter},
12+
migration::Migration,
1113
model_card::ModelDeploymentCard,
1214
preprocessor::OpenAIPreprocessor,
13-
protocols::common::llm_backend::{BackendOutput, PreprocessedRequest},
15+
protocols::common::llm_backend::{BackendOutput, LLMEngineOutput, PreprocessedRequest},
1416
request_template::RequestTemplate,
1517
types::{
1618
openai::chat_completions::{
@@ -21,10 +23,12 @@ use crate::{
2123
},
2224
};
2325
use dynamo_runtime::{
26+
component::Client,
2427
distributed::DistributedConfig,
2528
engine::{AsyncEngineStream, Data},
2629
pipeline::{
27-
Context, ManyOut, Operator, RouterMode, ServiceBackend, ServiceFrontend, SingleIn, Source,
30+
Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
31+
ServiceEngine, ServiceFrontend, SingleIn, Source,
2832
},
2933
DistributedRuntime, Runtime,
3034
};
@@ -124,9 +128,11 @@ pub async fn prepare_engine(
124128
None
125129
};
126130

127-
let chat_engine =
128-
engines::build_chat_completions(card, &client, router_mode, kv_chooser.clone())
129-
.await?;
131+
let chat_engine = entrypoint::build_routed_pipeline::<
132+
NvCreateChatCompletionRequest,
133+
NvCreateChatCompletionStreamResponse,
134+
>(card, &client, router_mode, kv_chooser.clone())
135+
.await?;
130136

131137
let service_name = local_model.service_name().to_string();
132138
tracing::info!("Static connecting to {service_name}");
@@ -204,6 +210,56 @@ where
204210
.link(frontend)?)
205211
}
206212

213+
pub async fn build_routed_pipeline<Req, Resp>(
214+
card: &ModelDeploymentCard,
215+
client: &Client,
216+
router_mode: RouterMode,
217+
chooser: Option<Arc<KvRouter>>,
218+
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
219+
where
220+
Req: Data,
221+
Resp: Data,
222+
OpenAIPreprocessor: Operator<
223+
Context<Req>,
224+
Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
225+
Context<PreprocessedRequest>,
226+
Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
227+
>,
228+
{
229+
let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
230+
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
231+
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
232+
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
233+
let router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
234+
client.clone(),
235+
router_mode,
236+
)
237+
.await?;
238+
let service_backend = match router_mode {
239+
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
240+
ServiceBackend::from_engine(Arc::new(router))
241+
}
242+
RouterMode::KV => {
243+
let Some(chooser) = chooser else {
244+
anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
245+
};
246+
let kv_push_router = KvPushRouter::new(router, chooser);
247+
ServiceBackend::from_engine(Arc::new(kv_push_router))
248+
}
249+
};
250+
251+
let engine = frontend
252+
.link(preprocessor.forward_edge())?
253+
.link(backend.forward_edge())?
254+
.link(migration.forward_edge())?
255+
.link(service_backend)?
256+
.link(migration.backward_edge())?
257+
.link(backend.backward_edge())?
258+
.link(preprocessor.backward_edge())?
259+
.link(frontend)?;
260+
Ok(engine)
261+
}
262+
207263
#[cfg(test)]
208264
mod tests {
209265
use super::*;

lib/llm/src/entrypoint/input/http.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use std::sync::Arc;
55

66
use crate::{
77
discovery::{ModelManager, ModelWatcher, MODEL_ROOT_PATH},
8-
engines::{self, StreamingEngineAdapter},
9-
entrypoint::{input::common, EngineConfig},
8+
engines::StreamingEngineAdapter,
9+
entrypoint::{self, input::common, EngineConfig},
1010
http::service::service_v2,
1111
kv_router::KvRouterConfig,
1212
types::openai::{
@@ -78,13 +78,18 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
7878
None
7979
};
8080

81-
let chat_engine =
82-
engines::build_chat_completions(card, &client, router_mode, kv_chooser.clone())
83-
.await?;
81+
let chat_engine = entrypoint::build_routed_pipeline::<
82+
NvCreateChatCompletionRequest,
83+
NvCreateChatCompletionStreamResponse,
84+
>(card, &client, router_mode, kv_chooser.clone())
85+
.await?;
8486
manager.add_chat_completions_model(local_model.display_name(), chat_engine)?;
8587

86-
let completions_engine =
87-
engines::build_completions(card, &client, router_mode, kv_chooser).await?;
88+
let completions_engine = entrypoint::build_routed_pipeline::<
89+
NvCreateCompletionRequest,
90+
NvCreateCompletionResponse,
91+
>(card, &client, router_mode, kv_chooser)
92+
.await?;
8893
manager.add_completions_model(local_model.display_name(), completions_engine)?;
8994
}
9095
EngineConfig::StaticFull { engine, model, .. } => {

0 commit comments

Comments
 (0)