Skip to content

Commit 3dba905

Browse files
committed
delay streaming
1 parent 83d7b8d commit 3dba905

File tree

1 file changed

+33
-3
lines changed

1 file changed

+33
-3
lines changed

lib/bindings/python/rust/engine.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
// limitations under the License.
1515

1616
use super::context::{callable_accepts_kwarg, PyContext};
17+
use futures::stream::{self, StreamExt as FuturesStreamExt};
1718
use pyo3::prelude::*;
1819
use pyo3::types::{PyDict, PyModule};
1920
use pyo3::{PyAny, PyErr};
2021
use pyo3_async_runtimes::TaskLocals;
2122
use pythonize::{depythonize, pythonize};
2223
use std::sync::Arc;
2324
use tokio::sync::mpsc;
24-
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
25+
use tokio_stream::{wrappers::ReceiverStream, StreamExt as TokioStreamExt};
2526

2627
pub use dynamo_runtime::{
2728
pipeline::{
@@ -208,14 +209,42 @@ where
208209
})
209210
.await??;
210211

211-
let stream = Box::pin(stream);
212+
let mut stream = Box::pin(stream);
212213

213214
// process the stream
214215
// any error thrown in the stream will be caught and complete the processing task
215216
// errors are captured by a task that is watching the processing task
216217
// the error will be emitted as an annotated error
217218
let request_id = id.clone();
218219

220+
let first_item = match FuturesStreamExt::next(&mut stream).await {
221+
Some(Ok(item)) => item,
222+
Some(Err(e)) => {
223+
// Any Python exception (including HttpError) is already wrapped in PyErr
224+
// The HttpAsyncEngine will inspect this PyErr later to see if it's an HttpError
225+
tracing::debug!(
226+
request_id,
227+
"Python exception occurred before finish of first iteration: {}",
228+
e
229+
);
230+
return Err(Error::new(e));
231+
}
232+
None => {
233+
tracing::warn!(
234+
request_id,
235+
"python async generator stream ended before processing started"
236+
);
237+
return Err(Error::new(std::io::Error::new(
238+
std::io::ErrorKind::UnexpectedEof,
239+
"python async generator stream ended before processing started",
240+
)));
241+
}
242+
};
243+
// Create a new stream that yields the first item followed by the rest of the original stream
244+
let stream =
245+
futures::StreamExt::chain(stream::once(futures::future::ok(first_item)), stream);
246+
let stream = FuturesStreamExt::boxed(stream);
247+
219248
tokio::spawn(async move {
220249
tracing::debug!(
221250
request_id,
@@ -225,7 +254,8 @@ where
225254
let mut stream = stream;
226255
let mut count = 0;
227256

228-
while let Some(item) = stream.next().await {
257+
// Fix the third error by explicitly using FuturesStreamExt::next
258+
while let Some(item) = FuturesStreamExt::next(&mut stream).await {
229259
count += 1;
230260
tracing::trace!(
231261
request_id,

0 commit comments

Comments
 (0)