Skip to content

Commit cb5a657

Browse files
authored
fix: Load the tokenizer JSON once for chat and completions. (#2910)
Signed-off-by: Graham King <grahamk@nvidia.com>
1 parent 9ef1328 commit cb5a657

File tree

19 files changed

+207
-218
lines changed

19 files changed

+207
-218
lines changed

lib/bindings/python/rust/llm/backend.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
3-
//
4-
// Licensed under the Apache License, Version 2.0 (the "License");
5-
// you may not use this file except in compliance with the License.
6-
// You may obtain a copy of the License at
7-
//
8-
// http://www.apache.org/licenses/LICENSE-2.0
9-
//
10-
// Unless required by applicable law or agreed to in writing, software
11-
// distributed under the License is distributed on an "AS IS" BASIS,
12-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
// See the License for the specific language governing permissions and
14-
// limitations under the License.
153

164
use super::*;
175
use crate::llm::model_card::ModelDeploymentCard;
@@ -33,10 +21,7 @@ pub(crate) struct Backend {
3321
impl Backend {
3422
#[new]
3523
fn new(mdc: ModelDeploymentCard, endpoint: Endpoint) -> PyResult<Self> {
36-
let runtime = pyo3_async_runtimes::tokio::get_runtime();
37-
let backend = runtime
38-
.block_on(llm_rs::backend::Backend::from_mdc(mdc.inner))
39-
.map_err(to_pyerr)?;
24+
let backend = llm_rs::backend::Backend::from_mdc(&mdc.inner);
4025
Ok(Self {
4126
inner: backend,
4227
endpoint,

lib/bindings/python/rust/llm/model_card.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,10 @@ impl ModelDeploymentCard {}
1616
impl ModelDeploymentCard {
1717
// Previously called "from_local_path"
1818
#[staticmethod]
19-
fn load(path: String, model_name: String, py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
20-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
21-
let mut card = RsModelDeploymentCard::load(&path, None)
22-
.await
23-
.map_err(to_pyerr)?;
24-
card.set_name(&model_name);
25-
Ok(ModelDeploymentCard { inner: card })
26-
})
19+
fn load(path: String, model_name: String) -> PyResult<ModelDeploymentCard> {
20+
let mut card = RsModelDeploymentCard::load(&path, None).map_err(to_pyerr)?;
21+
card.set_name(&model_name);
22+
Ok(ModelDeploymentCard { inner: card })
2723
}
2824

2925
#[staticmethod]

lib/bindings/python/rust/llm/preprocessor.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
3-
//
4-
// Licensed under the Apache License, Version 2.0 (the "License");
5-
// you may not use this file except in compliance with the License.
6-
// You may obtain a copy of the License at
7-
//
8-
// http://www.apache.org/licenses/LICENSE-2.0
9-
//
10-
// Unless required by applicable law or agreed to in writing, software
11-
// distributed under the License is distributed on an "AS IS" BASIS,
12-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
// See the License for the specific language governing permissions and
14-
// limitations under the License.
153

164
use super::*;
175
use crate::llm::model_card::ModelDeploymentCard;
@@ -42,10 +30,7 @@ pub(crate) struct OAIChatPreprocessor {
4230
impl OAIChatPreprocessor {
4331
#[new]
4432
fn new(mdc: ModelDeploymentCard, current: Endpoint, next: Endpoint) -> PyResult<Self> {
45-
let runtime = pyo3_async_runtimes::tokio::get_runtime();
46-
let preprocessor = runtime
47-
.block_on(OpenAIPreprocessor::new(mdc.inner.clone()))
48-
.map_err(to_pyerr)?;
33+
let preprocessor = OpenAIPreprocessor::new(mdc.inner.clone()).map_err(to_pyerr)?;
4934
Ok(Self {
5035
inner: preprocessor,
5136
current,

lib/llm/src/backend.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
1818
use std::{collections::HashSet, sync::Arc};
1919

20-
use anyhow::{Error, Result};
20+
use anyhow::Result;
2121
use futures::stream::{self, StreamExt};
2222
use tracing as log;
2323

24-
use crate::model_card::{ModelDeploymentCard, TokenizerKind};
24+
use crate::model_card::ModelDeploymentCard;
2525
use dynamo_runtime::{
2626
pipeline::{
2727
AsyncEngineContextProvider, ManyOut, Operator, ResponseStream, ServerStreamingEngine,
@@ -66,30 +66,27 @@ struct DecoderUnfoldState {
6666
}
6767

6868
impl Backend {
69-
pub async fn from_tokenizer(tokenizer: HfTokenizer) -> Result<Arc<Self>> {
69+
pub fn from_tokenizer(tokenizer: HfTokenizer) -> Arc<Self> {
7070
let tokenizer = HuggingFaceTokenizer::from_tokenizer(tokenizer);
7171
let tokenizer = Tokenizer::from(Arc::new(tokenizer));
7272

73-
Ok(Arc::new(Self {
73+
Arc::new(Self {
7474
tokenizer: Some(tokenizer),
7575
validate_engine_decode: false,
76-
}))
76+
})
7777
}
7878

79-
pub async fn from_mdc(mdc: ModelDeploymentCard) -> Result<Arc<Self>> {
80-
let tokenizer = match &mdc.tokenizer {
81-
Some(TokenizerKind::HfTokenizerJson(file)) => {
82-
HfTokenizer::from_file(file).map_err(Error::msg)?
83-
}
84-
Some(TokenizerKind::GGUF(t)) => *t.clone(),
85-
None => {
86-
return Ok(Arc::new(Self {
79+
pub fn from_mdc(mdc: &ModelDeploymentCard) -> Arc<Self> {
80+
match mdc.tokenizer_hf() {
81+
Ok(tokenizer) => Self::from_tokenizer(tokenizer),
82+
Err(err) => {
83+
tracing::warn!(%err, "tokenizer_hf error converting ModelDeploymentCard to HF tokenizer");
84+
Arc::new(Self {
8785
tokenizer: None,
8886
validate_engine_decode: false,
89-
}));
87+
})
9088
}
91-
};
92-
Self::from_tokenizer(tokenizer).await
89+
}
9390
}
9491

9592
fn decoder(

lib/llm/src/discovery/watcher.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,13 @@ impl ModelWatcher {
176176
}
177177
WatchEvent::Delete(kv) => match self.handle_delete(&kv).await {
178178
Ok(Some(model_name)) => {
179-
tracing::info!("removed model {}", model_name);
179+
tracing::info!(model_name, "removed model");
180180
}
181181
Ok(None) => {
182182
// There are other instances running this model, nothing to do
183183
}
184184
Err(e) => {
185-
tracing::error!("error removing model: {}", e);
185+
tracing::error!(error = %e, "error removing model");
186186
}
187187
},
188188
}
@@ -271,7 +271,7 @@ impl ModelWatcher {
271271
Some(card)
272272
}
273273
Err(err) => {
274-
tracing::info!(%err, "load_mdc did not complete");
274+
tracing::info!(error = %err, "load_mdc did not complete");
275275
None
276276
}
277277
};
@@ -308,6 +308,9 @@ impl ModelWatcher {
308308
None
309309
};
310310

311+
// This is expensive, we are loading ~10MiB JSON, so only do it once
312+
let tokenizer_hf = card.tokenizer_hf()?;
313+
311314
// Add chat engine only if the model supports chat
312315
if model_entry.model_type.supports_chat() {
313316
let chat_engine = entrypoint::build_routed_pipeline::<
@@ -319,18 +322,23 @@ impl ModelWatcher {
319322
self.router_mode,
320323
self.busy_threshold,
321324
kv_chooser.clone(),
325+
tokenizer_hf.clone(),
322326
)
323327
.await?;
324328
self.manager
325329
.add_chat_completions_model(&model_entry.name, chat_engine)?;
330+
tracing::info!("Chat completions is ready");
326331
}
327332

328333
// Add completions engine only if the model supports completions
329334
if model_entry.model_type.supports_completions() {
330335
let formatter = PromptFormatter::no_op();
331336
let PromptFormatter::OAI(formatter) = formatter;
332-
let preprocessor =
333-
OpenAIPreprocessor::new_with_formatter(card.clone(), formatter).await?;
337+
let preprocessor = OpenAIPreprocessor::new_with_parts(
338+
card.clone(),
339+
formatter,
340+
tokenizer_hf.clone(),
341+
)?;
334342
let completions_engine = entrypoint::build_routed_pipeline_with_preprocessor::<
335343
NvCreateCompletionRequest,
336344
NvCreateCompletionResponse,
@@ -341,10 +349,12 @@ impl ModelWatcher {
341349
self.busy_threshold,
342350
kv_chooser,
343351
preprocessor,
352+
tokenizer_hf,
344353
)
345354
.await?;
346355
self.manager
347356
.add_completions_model(&model_entry.name, completions_engine)?;
357+
tracing::info!("Completions is ready");
348358
}
349359
} else if model_entry.model_input == ModelInput::Text
350360
&& model_entry.model_type.supports_chat()
@@ -391,8 +401,8 @@ impl ModelWatcher {
391401
ManyOut<Annotated<NvCreateEmbeddingResponse>>,
392402
>::new();
393403

394-
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
395-
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
404+
let preprocessor = OpenAIPreprocessor::new(card.clone())?.into_operator();
405+
let backend = Backend::from_mdc(&card).into_operator();
396406

397407
let router = PushRouter::<
398408
PreprocessedEmbeddingRequest,

lib/llm/src/entrypoint/input/batch.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ pub async fn run(
6767
let mut prepared_engine = common::prepare_engine(runtime, engine_config).await?;
6868

6969
let pre_processor = if prepared_engine.has_tokenizer() {
70-
Some(OpenAIPreprocessor::new(prepared_engine.card.take().unwrap()).await?)
70+
Some(OpenAIPreprocessor::new(
71+
prepared_engine.card.take().unwrap(),
72+
)?)
7173
} else {
7274
None
7375
};

lib/llm/src/entrypoint/input/common.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
kv_router::{KvPushRouter, KvRouter},
1212
migration::Migration,
1313
model_card::ModelDeploymentCard,
14-
preprocessor::OpenAIPreprocessor,
14+
preprocessor::{OpenAIPreprocessor, prompt::PromptFormatter},
1515
protocols::common::llm_backend::{BackendOutput, LLMEngineOutput, PreprocessedRequest},
1616
request_template::RequestTemplate,
1717
types::{
@@ -131,10 +131,18 @@ pub async fn prepare_engine(
131131
None
132132
};
133133

134+
let hf_tokenizer = card.tokenizer_hf()?;
134135
let chat_engine = entrypoint::build_routed_pipeline::<
135136
NvCreateChatCompletionRequest,
136137
NvCreateChatCompletionStreamResponse,
137-
>(card, &client, router_mode, None, kv_chooser.clone())
138+
>(
139+
card,
140+
&client,
141+
router_mode,
142+
None,
143+
kv_chooser.clone(),
144+
hf_tokenizer,
145+
)
138146
.await?;
139147

140148
let service_name = local_model.service_name().to_string();
@@ -167,7 +175,7 @@ pub async fn prepare_engine(
167175
let pipeline = build_pipeline::<
168176
NvCreateChatCompletionRequest,
169177
NvCreateChatCompletionStreamResponse,
170-
>(model.card(), inner_engine)
178+
>(model.card(), inner_engine, model.card().tokenizer_hf()?)
171179
.await?;
172180

173181
let service_name = model.service_name().to_string();
@@ -186,6 +194,7 @@ pub async fn prepare_engine(
186194
pub async fn build_pipeline<Req, Resp>(
187195
card: &ModelDeploymentCard,
188196
engine: ExecutionContext,
197+
hf_tokenizer: tokenizers::Tokenizer,
189198
) -> anyhow::Result<Arc<ServiceFrontend<SingleIn<Req>, ManyOut<Annotated<Resp>>>>>
190199
where
191200
Req: Data,
@@ -198,10 +207,11 @@ where
198207
>,
199208
{
200209
let frontend = ServiceFrontend::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
201-
let preprocessor = OpenAIPreprocessor::new((*card).clone())
202-
.await?
203-
.into_operator();
204-
let backend = Backend::from_mdc((*card).clone()).await?.into_operator();
210+
let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?;
211+
let preprocessor =
212+
OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?
213+
.into_operator();
214+
let backend = Backend::from_tokenizer(hf_tokenizer).into_operator();
205215
let engine = ServiceBackend::from_engine(engine);
206216

207217
Ok(frontend
@@ -219,6 +229,7 @@ pub async fn build_routed_pipeline<Req, Resp>(
219229
router_mode: RouterMode,
220230
busy_threshold: Option<f64>,
221231
chooser: Option<Arc<KvRouter>>,
232+
hf_tokenizer: tokenizers::Tokenizer,
222233
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
223234
where
224235
Req: Data,
@@ -230,14 +241,17 @@ where
230241
Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
231242
>,
232243
{
233-
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?;
244+
let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?;
245+
let preprocessor =
246+
OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?;
234247
build_routed_pipeline_with_preprocessor(
235248
card,
236249
client,
237250
router_mode,
238251
busy_threshold,
239252
chooser,
240253
preprocessor,
254+
hf_tokenizer,
241255
)
242256
.await
243257
}
@@ -249,6 +263,7 @@ pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>(
249263
busy_threshold: Option<f64>,
250264
chooser: Option<Arc<KvRouter>>,
251265
preprocessor: Arc<OpenAIPreprocessor>,
266+
hf_tokenizer: tokenizers::Tokenizer,
252267
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
253268
where
254269
Req: Data,
@@ -262,8 +277,8 @@ where
262277
{
263278
let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
264279
let preprocessor_op = preprocessor.into_operator();
265-
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
266-
let migration = Migration::from_mdc(card.clone()).await?.into_operator();
280+
let backend = Backend::from_tokenizer(hf_tokenizer).into_operator();
281+
let migration = Migration::from_mdc(card).into_operator();
267282
let router =
268283
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold(
269284
client.clone(),
@@ -312,14 +327,14 @@ mod tests {
312327
#[tokio::test]
313328
async fn test_build_chat_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
314329
// Create test model card
315-
let card = ModelDeploymentCard::load(HF_PATH, None).await?;
330+
let card = ModelDeploymentCard::load(HF_PATH, None)?;
316331
let engine = crate::engines::make_engine_core();
317332

318333
// Build pipeline for chat completions
319334
let pipeline = build_pipeline::<
320335
NvCreateChatCompletionRequest,
321336
NvCreateChatCompletionStreamResponse,
322-
>(&card, engine)
337+
>(&card, engine, card.tokenizer_hf()?)
323338
.await?;
324339

325340
// Verify pipeline was created
@@ -331,13 +346,16 @@ mod tests {
331346
#[tokio::test]
332347
async fn test_build_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
333348
// Create test model card
334-
let card = ModelDeploymentCard::load(HF_PATH, None).await?;
349+
let card = ModelDeploymentCard::load(HF_PATH, None)?;
335350
let engine = crate::engines::make_engine_core();
336351

337352
// Build pipeline for completions
338-
let pipeline =
339-
build_pipeline::<NvCreateCompletionRequest, NvCreateCompletionResponse>(&card, engine)
340-
.await?;
353+
let pipeline = build_pipeline::<NvCreateCompletionRequest, NvCreateCompletionResponse>(
354+
&card,
355+
engine,
356+
card.tokenizer_hf()?,
357+
)
358+
.await?;
341359

342360
// Verify pipeline was created
343361
assert!(Arc::strong_count(&pipeline) >= 1);

lib/llm/src/entrypoint/input/endpoint.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,7 @@ pub async fn run(
7373
SingleIn<PreprocessedRequest>,
7474
ManyOut<Annotated<BackendOutput>>,
7575
>::new();
76-
let backend = Backend::from_mdc(model.card().clone())
77-
.await?
78-
.into_operator();
76+
let backend = Backend::from_mdc(model.card()).into_operator();
7977
let engine = ServiceBackend::from_engine(inner_engine);
8078
let pipeline = frontend
8179
.link(backend.forward_edge())?

0 commit comments

Comments
 (0)