Skip to content

Commit 8ea463f

Browse files
committed
propagate service overloaded error
1 parent fde25b2 commit 8ea463f

File tree

3 files changed

+45
-3
lines changed

3 files changed

+45
-3
lines changed

lib/llm/src/http/service/openai.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,24 @@ impl ErrorMessage {
108108
/// If successful, it will return the [`HttpError`] as an [`ErrorMessage::internal_server_error`]
109109
/// with the details of the error.
110110
pub fn from_anyhow(err: anyhow::Error, alt_msg: &str) -> ErrorResponse {
111+
// First check for PipelineError::ServiceOverloaded
112+
if let Some(pipeline_err) =
113+
err.downcast_ref::<dynamo_runtime::pipeline::error::PipelineError>()
114+
{
115+
if matches!(
116+
pipeline_err,
117+
dynamo_runtime::pipeline::error::PipelineError::ServiceOverloaded(_)
118+
) {
119+
return (
120+
StatusCode::SERVICE_UNAVAILABLE,
121+
Json(ErrorMessage {
122+
error: pipeline_err.to_string(),
123+
}),
124+
);
125+
}
126+
}
127+
128+
// Then check for HttpError
111129
match err.downcast::<HttpError>() {
112130
Ok(http_error) => ErrorMessage::from_http_error(http_error),
113131
Err(err) => ErrorMessage::internal_server_error(&format!("{alt_msg}: {err}")),
@@ -1150,6 +1168,22 @@ mod tests {
11501168
);
11511169
}
11521170

1171+
#[test]
1172+
fn test_service_overloaded_error_response_from_anyhow() {
1173+
use dynamo_runtime::pipeline::error::PipelineError;
1174+
1175+
let err: anyhow::Error = PipelineError::ServiceOverloaded(
1176+
"All workers are busy, please retry later".to_string(),
1177+
)
1178+
.into();
1179+
let (status, response) = ErrorMessage::from_anyhow(err, BACKUP_ERROR_MESSAGE);
1180+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
1181+
assert_eq!(
1182+
response.error,
1183+
"Service temporarily unavailable: All workers are busy, please retry later"
1184+
);
1185+
}
1186+
11531187
#[test]
11541188
fn test_validate_input_is_text_only_accepts_text() {
11551189
let request = make_base_request();

lib/runtime/src/pipeline/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ pub enum PipelineError {
131131

132132
#[error("NATS KV Err: {0} for bucket '{1}")]
133133
KeyValueError(String, String),
134+
135+
/// All instances are busy and cannot handle new requests
136+
#[error("Service temporarily unavailable: {0}")]
137+
ServiceOverloaded(String),
134138
}
135139

136140
#[derive(Debug, thiserror::Error)]

lib/runtime/src/pipeline/network/egress/push_router.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use crate::{
77
component::{Client, Endpoint, InstanceSource},
88
engine::{AsyncEngine, Data},
99
pipeline::{
10-
error::PipelineErrorExt, AddressedPushRouter, AddressedRequest, Error, ManyOut, SingleIn,
10+
error::{PipelineError, PipelineErrorExt},
11+
AddressedPushRouter, AddressedRequest, Error, ManyOut, SingleIn,
1112
},
1213
protocols::maybe_error::MaybeError,
1314
traits::DistributedRuntimeProvider,
@@ -210,7 +211,10 @@ where
210211
// Check if we actually have any instances at all
211212
let all_instances = self.client.instance_ids();
212213
if !all_instances.is_empty() {
213-
return Err(anyhow::anyhow!("All workers are busy, please retry later"));
214+
return Err(PipelineError::ServiceOverloaded(
215+
"All workers are busy, please retry later".to_string(),
216+
)
217+
.into());
214218
}
215219
}
216220

@@ -356,7 +360,7 @@ where
356360
let busy_instances: Vec<i64> = states
357361
.iter()
358362
.filter_map(|(&id, state)| {
359-
state.is_busy(BUSY_THRESHOLD).then(|| id)
363+
state.is_busy(BUSY_THRESHOLD).then_some(id)
360364
})
361365
.collect();
362366
drop(states);

0 commit comments

Comments
 (0)