Skip to content

Commit 727080c

Browse files
committed
[WIP] Allow PyGeneratorExit to end stream without final flag
1 parent 4c90b1b commit 727080c

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

lib/bindings/python/rust/engine.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ enum ResponseProcessingError {
134134
#[error("python exception: {0}")]
135135
PythonException(String),
136136

137+
#[error("python generator exit: {0}")]
138+
PyGeneratorExit(String),
139+
137140
#[error("deserialize error: {0}")]
138141
DeserializeError(String),
139142

@@ -225,6 +228,9 @@ where
225228
let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e);
226229
msg
227230
}
231+
ResponseProcessingError::PyGeneratorExit(_) => {
232+
"Stream ended before generation completed".to_string()
233+
}
228234
ResponseProcessingError::PythonException(e) => {
229235
let msg = format!("a python exception was caught while processing the async generator: {}", e);
230236
msg
@@ -276,8 +282,16 @@ where
276282
{
277283
let item = item.map_err(|e| {
278284
println!();
279-
Python::with_gil(|py| e.display(py));
280-
ResponseProcessingError::PythonException(e.to_string())
285+
let mut is_py_generator_exit = false;
286+
Python::with_gil(|py| {
287+
e.display(py);
288+
is_py_generator_exit = e.is_instance_of::<pyo3::exceptions::PyGeneratorExit>(py);
289+
});
290+
if is_py_generator_exit {
291+
ResponseProcessingError::PyGeneratorExit(e.to_string())
292+
} else {
293+
ResponseProcessingError::PythonException(e.to_string())
294+
}
281295
})?;
282296
let response = tokio::task::spawn_blocking(move || {
283297
Python::with_gil(|py| depythonize::<Resp>(&item.into_bound(py)))

lib/runtime/src/pipeline/network/ingress/push_handler.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// limitations under the License.
1515

1616
use super::*;
17+
use crate::protocols::maybe_error::MaybeError;
1718
use prometheus::{Histogram, IntCounter, IntCounterVec, IntGauge};
1819
use serde::{Deserialize, Serialize};
1920
use std::sync::Arc;
@@ -105,7 +106,7 @@ impl WorkHandlerMetrics {
105106
impl<T: Data, U: Data> PushWorkHandler for Ingress<SingleIn<T>, ManyOut<U>>
106107
where
107108
T: Data + for<'de> Deserialize<'de> + std::fmt::Debug,
108-
U: Data + Serialize + std::fmt::Debug,
109+
U: Data + Serialize + MaybeError + std::fmt::Debug,
109110
{
110111
fn add_metrics(&self, endpoint: &crate::component::Endpoint) -> Result<()> {
111112
// Call the Ingress-specific add_metrics implementation
@@ -220,6 +221,14 @@ where
220221
let mut send_complete_final = true;
221222
while let Some(resp) = stream.next().await {
222223
tracing::trace!("Sending response: {:?}", resp);
224+
if let Some(err) = resp.err() {
225+
const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
226+
if format!("{:?}", err) == STREAM_ERR_MSG {
227+
tracing::warn!(STREAM_ERR_MSG);
228+
send_complete_final = false;
229+
break;
230+
}
231+
}
223232
let resp_wrapper = NetworkStreamWrapper {
224233
data: Some(resp),
225234
complete_final: false,

0 commit comments

Comments
 (0)