Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions lib/engines/mistralrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ impl MistralRsEngine {

// Perform warmup request
let (tx, mut rx) = channel(1);
let request_id = engine.mistralrs.next_request_id();
let mistralrs_request_id = engine.mistralrs.next_request_id();
let warmup_request = Request::Normal(Box::new(NormalRequest {
id: request_id,
id: mistralrs_request_id,
model_id: Some(display_name.to_string()),
messages: RequestMessage::Chat {
messages: vec![IndexMap::from([
Expand Down Expand Up @@ -246,10 +246,10 @@ impl MistralRsEngine {
{
match response.as_result() {
Ok(r) => {
tracing::debug!(request_id, "Warmup response: {r:?}");
tracing::debug!(mistralrs_request_id, "Warmup response: {r:?}");
}
Err(err) => {
tracing::error!(request_id, %err, "Failed converting response to result.");
tracing::error!(mistralrs_request_id, %err, "Failed converting response to result.");
}
}
}
Expand All @@ -272,6 +272,7 @@ impl
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
let (request, context) = request.transfer(());
let ctx = context.context();
let request_id = ctx.id().to_string();
let (tx, mut rx) = channel(10_000);

let mut messages = vec![];
Expand Down Expand Up @@ -338,9 +339,9 @@ impl
n_choices: 1,
dry_params: det.dry_params,
};
let request_id = self.mistralrs.next_request_id();
let mistralrs_request_id = self.mistralrs.next_request_id();
let mistralrs_request = Request::Normal(Box::new(NormalRequest {
id: request_id,
id: mistralrs_request_id,
model_id: Some(self.display_name.clone()),
messages: RequestMessage::Chat {
messages,
Expand Down Expand Up @@ -369,14 +370,14 @@ impl
let response = match response.as_result() {
Ok(r) => r,
Err(err) => {
tracing::error!(request_id, %err, "Failed converting mistralrs channel response to result.");
tracing::error!(mistralrs_request_id, %err, "Failed converting mistralrs channel response to result.");
break;
}
};
match response {
ResponseOk::Chunk(c) => {
let Some(from_assistant) = c.choices[0].delta.content.clone() else {
tracing::warn!(request_id, "No content from mistralrs. Abandoning request.");
tracing::warn!(mistralrs_request_id, "No content from mistralrs. Abandoning request.");
break;
};
let finish_reason = match &c.choices[0].finish_reason.as_deref() {
Expand All @@ -387,7 +388,7 @@ impl
Some(FinishReason::Length)
}
Some(s) => {
tracing::warn!(request_id, stop_reason = s, "Unknow stop reason");
tracing::warn!(mistralrs_request_id, stop_reason = s, "Unknow stop reason");
Some(FinishReason::Stop)
}
None => None,
Expand All @@ -396,7 +397,7 @@ impl

#[allow(deprecated)]
let delta = NvCreateChatCompletionStreamResponse {
id: c.id,
id: format!("chatcmpl-{request_id}"),
choices: vec![dynamo_async_openai::types::ChatChoiceStream{
index: 0,
delta: dynamo_async_openai::types::ChatCompletionStreamResponseDelta{
Expand Down Expand Up @@ -427,11 +428,11 @@ impl
yield ann;

if finish_reason.is_some() {
//tracing::trace!(request_id, "Finish reason: {finish_reason:?}");
//tracing::trace!(mistralrs_request_id, "Finish reason: {finish_reason:?}");
break;
}
},
x => tracing::error!(request_id, "Unhandled. {x:?}"),
x => tracing::error!(mistralrs_request_id, "Unhandled. {x:?}"),
}
}
};
Expand Down Expand Up @@ -485,7 +486,7 @@ impl
let (request, context) = request.transfer(());
let ctx = context.context();
let (tx, mut rx) = channel(10_000);
let response_generator = request.response_generator();
let response_generator = request.response_generator(ctx.id().to_string());

let messages = RequestMessage::Completion {
text: prompt_to_string(&request.inner.prompt),
Expand Down Expand Up @@ -539,9 +540,9 @@ impl
dry_params: det.dry_params,
};

let request_id = self.mistralrs.next_request_id();
let mistralrs_request_id = self.mistralrs.next_request_id();
let mistralrs_request = Request::Normal(Box::new(NormalRequest {
id: request_id,
id: mistralrs_request_id,
model_id: Some(self.display_name.clone()),
messages,
sampling_params,
Expand All @@ -567,7 +568,7 @@ impl
let response = match response.as_result() {
Ok(r) => r,
Err(err) => {
tracing::error!(request_id, %err, "Failed converting mistralrs channel response to result.");
tracing::error!(mistralrs_request_id, %err, "Failed converting mistralrs channel response to result.");
break;
}
};
Expand All @@ -583,7 +584,7 @@ impl
Some(FinishReason::Length)
}
Some(s) => {
tracing::warn!(request_id, stop_reason = s, "Unknow stop reason");
tracing::warn!(mistralrs_request_id, stop_reason = s, "Unknow stop reason");
Some(FinishReason::Stop)
}
None => None,
Expand All @@ -602,7 +603,7 @@ impl
break;
}
},
x => tracing::error!(request_id, "Unhandled. {x:?}"),
x => tracing::error!(mistralrs_request_id, "Unhandled. {x:?}"),
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ impl
incoming_request: SingleIn<NvCreateChatCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
let (request, context) = incoming_request.transfer(());
let mut deltas = request.response_generator();
let ctx = context.context();
let mut deltas = request.response_generator(ctx.id().to_string());
let req = request.inner.messages.into_iter().next_back().unwrap();

let prompt = match req {
Expand Down Expand Up @@ -230,8 +230,8 @@ impl
incoming_request: SingleIn<NvCreateCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateCompletionResponse>>, Error> {
let (request, context) = incoming_request.transfer(());
let deltas = request.response_generator();
let ctx = context.context();
let deltas = request.response_generator(ctx.id().to_string());
let chars_string = prompt_to_string(&request.inner.prompt);
let output = stream! {
let mut id = 1;
Expand Down
13 changes: 5 additions & 8 deletions lib/llm/src/http/service/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ async fn completions(
// return a 503 if the service is not ready
check_ready(&state)?;

// todo - extract distributed tracing id and context id from headers
let request_id = uuid::Uuid::new_v4().to_string();
let request_id = request.id().to_string();

// todo - decide on default
let streaming = request.inner.stream.unwrap_or(false);
Expand Down Expand Up @@ -354,13 +353,15 @@ async fn completions(
#[tracing::instrument(skip_all)]
async fn embeddings(
State(state): State<Arc<service_v2::State>>,
headers: HeaderMap,
Json(request): Json<NvCreateEmbeddingRequest>,
) -> Result<Response, ErrorResponse> {
// return a 503 if the service is not ready
check_ready(&state)?;

// todo - extract distributed tracing id and context id from headers
let request_id = uuid::Uuid::new_v4().to_string();
let request_id = get_or_create_request_id(request.inner.user.as_deref(), &headers);
let request = Context::with_id(request, request_id);
let request_id = request.id().to_string();

// Embeddings are typically not streamed, so we default to non-streaming
let streaming = false;
Expand All @@ -381,10 +382,6 @@ async fn embeddings(
.metrics_clone()
.create_inflight_guard(model, Endpoint::Embeddings, streaming);

// setup context
// todo - inherit request_id from distributed trace details
let request = Context::with_id(request, request_id.clone());

// issue the generate call on the engine
let stream = engine
.generate(request)
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ impl
let (request, context) = request.into_parts();

// create a response generator
let response_generator = request.response_generator();
let response_generator = request.response_generator(context.id().to_string());
let mut response_generator = Box::new(response_generator);

// convert the chat completion request to a common completion request
Expand Down Expand Up @@ -548,7 +548,7 @@ impl
let (request, context) = request.into_parts();

// create a response generator
let response_generator = request.response_generator();
let response_generator = request.response_generator(context.id().to_string());
let mut response_generator = Box::new(response_generator);
// convert the chat completion request to a common completion request
let (common_request, annotations) = self.preprocess_request(&request)?;
Expand Down
16 changes: 10 additions & 6 deletions lib/llm/src/protocols/openai/chat_completions/delta.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};

use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
use crate::{
protocols::common::{self},
types::TokenIdType,
};
use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};

/// Provides a method for generating a [`DeltaGenerator`] from a chat completion request.
impl NvCreateChatCompletionRequest {
/// Creates a [`DeltaGenerator`] instance based on the chat completion request.
///
/// # Arguments
/// * `request_id` - The request ID to use for the chat completion response ID.
///
/// # Returns
/// * [`DeltaGenerator`] configured with model name and response options.
pub fn response_generator(&self) -> DeltaGenerator {
pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
let options = DeltaGeneratorOptions {
enable_usage: true,
enable_logprobs: self.inner.logprobs.unwrap_or(false)
|| self.inner.top_logprobs.unwrap_or(0) > 0,
};

DeltaGenerator::new(self.inner.model.clone(), options)
DeltaGenerator::new(self.inner.model.clone(), options, request_id)
}
}

Expand Down Expand Up @@ -67,10 +69,11 @@ impl DeltaGenerator {
/// # Arguments
/// * `model` - The model name used for response generation.
/// * `options` - Configuration options for enabling usage and log probabilities.
/// * `request_id` - The request ID to use for the chat completion response.
///
/// # Returns
/// * A new instance of [`DeltaGenerator`].
pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
Expand All @@ -97,8 +100,9 @@ impl DeltaGenerator {
// Reasoning parser wrapper
let reasoning_parser = reasoning_parser_type.get_reasoning_parser();

let chatcmpl_id = format!("chatcmpl-{request_id}");
Self {
id: format!("chatcmpl-{}", uuid::Uuid::new_v4()),
id: chatcmpl_id,
object: "chat.completion.chunk".to_string(),
created: now,
model,
Expand Down
22 changes: 6 additions & 16 deletions lib/llm/src/protocols/openai/completions/delta.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{NvCreateCompletionRequest, NvCreateCompletionResponse};
use crate::{protocols::common, types::TokenIdType};

impl NvCreateCompletionRequest {
// put this method on the request
// inspect the request to extract options
pub fn response_generator(&self) -> DeltaGenerator {
pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
let options = DeltaGeneratorOptions {
enable_usage: true,
enable_logprobs: self.inner.logprobs.unwrap_or(0) > 0,
};

DeltaGenerator::new(self.inner.model.clone(), options)
DeltaGenerator::new(self.inner.model.clone(), options, request_id)
}
}

Expand All @@ -47,7 +35,7 @@ pub struct DeltaGenerator {
}

impl DeltaGenerator {
pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
Expand All @@ -67,8 +55,10 @@ impl DeltaGenerator {
prompt_tokens_details: None,
};

let completion_id = format!("cmpl-{}", request_id);

Self {
id: format!("cmpl-{}", uuid::Uuid::new_v4()),
id: completion_id,
object: "text_completion".to_string(),
created: now,
model,
Expand Down
14 changes: 1 addition & 13 deletions lib/llm/tests/http-service.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,5 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Error;
use async_stream::stream;
Expand Down Expand Up @@ -95,7 +83,7 @@ impl
let max_tokens = request.inner.max_tokens.unwrap_or(0) as u64;

// let generator = NvCreateChatCompletionStreamResponse::generator(request.model.clone());
let mut generator = request.response_generator();
let mut generator = request.response_generator(ctx.id().to_string());

let stream = stream! {
tokio::time::sleep(std::time::Duration::from_millis(max_tokens)).await;
Expand Down
Loading