Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions lib/llm/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ impl RetryManager {
if let Some(response) = response_stream.next().await {
if let Some(err) = response.err() {
const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
if format!("{:?}", err) == STREAM_ERR_MSG {
if err
.chain()
.any(|e| e.to_string().starts_with(STREAM_ERR_MSG))
{
tracing::warn!("Stream disconnected... recreating stream...");
if let Err(err) = self.new_stream().await {
tracing::warn!("Cannot recreate stream: {:?}", err);
tracing::warn!("Cannot recreate stream: {:#}", err);
} else {
continue;
}
Expand Down Expand Up @@ -462,6 +465,7 @@ mod tests {
/// Expected behavior: All 10 responses should be received successfully.
#[tokio::test]
async fn test_retry_manager_no_migration() {
dynamo_runtime::logging::init();
let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new(MockBehavior::Success, 10, 100));
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
Expand Down Expand Up @@ -493,6 +497,7 @@ mod tests {
/// Expected behavior: All 10 responses should be received successfully after retry.
#[tokio::test]
async fn test_retry_manager_new_request_migration() {
dynamo_runtime::logging::init();
let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new(MockBehavior::FailThenSuccess, 10, 100));
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
Expand Down Expand Up @@ -524,6 +529,8 @@ mod tests {
/// Expected behavior: 5 responses from first stream + 5 responses from retry stream = 10 total.
#[tokio::test]
async fn test_retry_manager_ongoing_request_migration() {
dynamo_runtime::logging::init();

let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new(
MockBehavior::MidStreamFail { fail_after: 5 },
Expand Down Expand Up @@ -560,6 +567,7 @@ mod tests {
/// Expected behavior: Should receive an error after all retries are exhausted, with the original error.
#[tokio::test]
async fn test_retry_manager_new_request_migration_indefinite_failure() {
dynamo_runtime::logging::init();
let request = create_mock_request(0);
let mock_engine = Arc::new(MockEngine::new(MockBehavior::AlwaysFail, 0, 100));
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
Expand All @@ -580,6 +588,7 @@ mod tests {
/// Expected behavior: Should receive some responses from first stream, then error after retries exhausted.
#[tokio::test]
async fn test_retry_manager_ongoing_request_migration_indefinite_failure() {
dynamo_runtime::logging::init();
let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new(
MockBehavior::MidStreamFailAlways { fail_after: 3 },
Expand Down Expand Up @@ -627,6 +636,7 @@ mod tests {
/// Expected behavior: Should receive some responses from first stream, then error after retries exhausted.
#[tokio::test]
async fn test_retry_manager_ongoing_request_migration_indefinite_failure_stream_error() {
dynamo_runtime::logging::init();
let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new(
MockBehavior::MidStreamFailAlwaysStreamError { fail_after: 3 },
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/protocols/common/llm_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ impl MaybeError for LLMEngineOutput {
LLMEngineOutput::error(format!("{:?}", err))
}

fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
fn err(&self) -> Option<anyhow::Error> {
if let Some(FinishReason::Error(err_msg)) = &self.finish_reason {
Some(anyhow::Error::msg(err_msg.clone()).into())
Some(anyhow::Error::msg(err_msg.clone()))
} else {
None
}
Expand Down
6 changes: 3 additions & 3 deletions lib/runtime/src/protocols/annotated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ where
Annotated::from_error(format!("{:?}", err))
}

fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
fn err(&self) -> Option<anyhow::Error> {
if self.is_error() {
if let Some(comment) = &self.comment {
if !comment.is_empty() {
return Some(anyhow::Error::msg(comment.join("; ")).into());
return Some(anyhow::Error::msg(comment.join("; ")));
}
}
Some(anyhow::Error::msg("unknown error").into())
Some(anyhow::Error::msg("unknown error"))
} else {
None
}
Expand Down
6 changes: 3 additions & 3 deletions lib/runtime/src/protocols/maybe_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub trait MaybeError {
fn from_err(err: Box<dyn Error + Send + Sync>) -> Self;

/// Construct into an error instance.
fn err(&self) -> Option<Box<dyn Error + Send + Sync>>;
fn err(&self) -> Option<anyhow::Error>;

/// Check if the current instance represents a success.
fn is_ok(&self) -> bool {
Expand All @@ -46,8 +46,8 @@ mod tests {
message: err.to_string(),
}
}
fn err(&self) -> Option<Box<dyn Error + Send + Sync>> {
Some(anyhow::Error::msg(self.message.clone()).into())
fn err(&self) -> Option<anyhow::Error> {
Some(anyhow::Error::msg(self.message.clone()))
}
}

Expand Down
Loading