-
Notifications
You must be signed in to change notification settings - Fork 693
HttpAsyncEngine awaits the first result from PythonAsyncEngine to check for errors #2974
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Brian Larson <brian.larson@baseten.co>
Signed-off-by: Brian Larson <brian.larson@baseten.co>
|
👋 Hi blarson-b10! Thank you for contributing to ai-dynamo/dynamo. Just a reminder: The 🚀 |
WalkthroughAdds a new streaming generation path that returns a (data stream, context) pair, refactors engine APIs to delegate to this path, updates HTTP handling to consume and rechain the first item, and exposes a cancellation token API to Python via DistributedRuntime and a new CancellationToken type. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant PyAsyncEngine as PythonAsyncEngine
participant PySrvEngine as PythonServerStreamingEngine
participant PyGen as Python async generator
participant RespStream as ResponseStream
Note over PySrvEngine,PyGen: New streaming path
Caller->>PyAsyncEngine: generate(request)
PyAsyncEngine->>PySrvEngine: generate_in_parts(request)
PySrvEngine->>PyGen: invoke(generator[, context=?])
PyGen-->>PySrvEngine: (DataStream<Annotated<Resp>>, Context)
PySrvEngine-->>PyAsyncEngine: stream + context
PyAsyncEngine->>RespStream: ResponseStream::new(stream, context)
RespStream-->>Caller: ResponseStream<Annotated<Resp>>
sequenceDiagram
autonumber
participant Client
participant Http as HTTP Handler
participant Engine as PythonAsyncEngine
participant Stream as DataStream
participant Ctx as AsyncEngineContext
Client->>Http: request
Http->>Engine: generate_in_parts(request)
Engine-->>Http: (Stream, Ctx)
Note right of Http: request_id = Ctx.id()
Http->>Stream: poll first item
alt first item exists
Http->>Http: chain(first_item + rest)
Http-->>Client: ResponseStream::new(chained, Ctx)
else no items
Http->>Http: warn(request_id)
Http-->>Client: Err(Io::UnexpectedEof)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Pre-merge checks (1 passed, 2 warnings)❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/bindings/python/rust/engine.rs (1)
38-38: Typo in doc comment (“bingings” → “bindings”)This appears in generated docs; fix the spelling.
-/// Add bingings from this crate to the provided module +/// Add bindings from this crate to the provided module
🧹 Nitpick comments (8)
lib/bindings/python/rust/http.rs (1)
197-201: Minor: simplify stream chaining and bring traits into scopeReadability nit: import StreamExt/stream and use method chaining.
Apply this diff:
- // Create a new stream that yields the first item followed by the rest of the original stream - let once_stream = futures::stream::once(async { first_item }); - let stream = futures::StreamExt::chain(once_stream, stream); + // Yield the first item, then the remainder of the original stream + use futures::{stream, StreamExt}; + let stream = stream::once(async move { first_item }).chain(stream);lib/bindings/python/rust/engine.rs (7)
231-235: Stale/misleading comment about not passing contextThe code conditionally passes
context(whenhas_context), yet the comment claims it’s “impossible.” Update to reflect current behavior.- // tell the python async generator to stop generating - // right now, this is impossible as we are not passing the context to the python async generator - // todo: add task-local context to the python async generator + // Request cancellation via the runtime context. + // If the Python generator ignores it, we still stop draining the stream here.
320-320: Remove stray println!Avoids noisy stdout in libraries; logs are already emitted via
tracing.- println!();
280-282: Avoid shadowingcontext; use a distinct name for the returned engine contextImproves readability and prevents accidental misuse.
- let context = context.context(); - Ok((Box::pin(stream), context)) + let engine_ctx = context.context(); + Ok((Box::pin(stream), engine_ctx))
160-161: Channel capacity should be configurable or at least namedMagic constant
128can become a bottleneck or memory sink. Consider a named constant or config.- let (tx, rx) = mpsc::channel::<Annotated<Resp>>(128); + const PY_STREAM_CHANNEL_CAP: usize = 128; + let (tx, rx) = mpsc::channel::<Annotated<Resp>>(PY_STREAM_CHANNEL_CAP);
206-212: Instrument the processing task with a span and structured fieldsAttach
request_idto a span for coherent logs; use structured fields instead of positional formatting.+ use tracing::Instrument; tokio::spawn(async move { - tracing::debug!( - request_id, + tracing::debug!( + request_id = %request_id, "starting task to process python async generator stream" ); ... - tracing::debug!( - request_id, - "finished processing python async generator stream" - ); + tracing::debug!(request_id = %request_id, "finished processing python async generator stream"); - }); + }.instrument(tracing::info_span!("py_stream_proc", request_id = %request_id)));Also applies to: 271-277
78-78: Expose a forwarder to avoidself.0 .0reach-throughsCurrent usage in http.rs (
self.0 .0.generate_in_parts) is brittle. Add a forwarding method onPythonAsyncEngine.+impl PythonAsyncEngine { + pub async fn generate_in_parts<Req, Resp>( + &self, + request: SingleIn<Req>, + ) -> Result<(DataStream<Annotated<Resp>>, Arc<dyn AsyncEngineContext>), Error> + where + Req: Data + Serialize, + Resp: Data + for<'de> Deserialize<'de>, + { + self.0.generate_in_parts(request).await + } +}
212-223: Minor logging polishUse structured fields and avoid ordinal strings; this helps querying logs.
- count += 1; - tracing::trace!( - request_id, - "processing the {}th item from python async generator", - count - ); + count += 1; + tracing::trace!(request_id = %request_id, item_idx = count, "processing item from python async generator");
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
lib/bindings/python/rust/engine.rs(4 hunks)lib/bindings/python/rust/http.rs(1 hunks)lib/bindings/python/rust/lib.rs(1 hunks)lib/bindings/python/src/dynamo/_core.pyi(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
lib/bindings/python/rust/lib.rs (3)
lib/bindings/python/src/dynamo/_core.pyi (2)
child_token(56-60)CancellationToken(62-77)lib/runtime/src/distributed.rs (1)
child_token(260-262)lib/runtime/src/transports/etcd.rs (1)
child_token(77-79)
lib/bindings/python/src/dynamo/_core.pyi (3)
lib/bindings/python/rust/lib.rs (3)
child_token(380-384)cancel(540-542)cancelled(544-550)lib/runtime/src/distributed.rs (1)
child_token(260-262)lib/runtime/src/transports/etcd.rs (1)
child_token(77-79)
lib/bindings/python/rust/http.rs (2)
lib/bindings/python/rust/lib.rs (3)
next(962-994)new(320-345)new(1005-1009)lib/bindings/python/rust/engine.rs (2)
new(93-100)new(123-139)
lib/bindings/python/rust/engine.rs (3)
lib/llm/src/http/client.rs (7)
context(316-318)context(352-354)new(73-81)new(244-251)new(302-304)new(394-398)new(453-457)lib/runtime/src/engine.rs (8)
context(175-175)context(251-253)context(268-270)context(274-276)context(449-452)context(464-466)generate(217-217)generate(474-476)lib/bindings/python/rust/http.rs (4)
generate(179-227)new(40-44)new(128-130)new(168-170)
🔇 Additional comments (4)
lib/bindings/python/rust/lib.rs (1)
380-385: Expose child_token: looks correct and aligned with pyiConstructor returns a fresh runtime child token wrapped in the Python class. Matches the stub and CancellationToken methods below.
lib/bindings/python/rust/http.rs (1)
188-194: Consider surfacing EOF as an HTTP error for client clarityUnexpectedEof maps to a generic 500. If the Python generator terminates before producing any output, returning an HttpError (e.g., 502 Bad Gateway) may be more actionable for HTTP clients.
If desired, swap the IO error with:
- return Err(Error::new(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - error_msg, - ))); + return Err(http_error::HttpError { code: 502, message: error_msg.to_string() })?;lib/bindings/python/src/dynamo/_core.pyi (1)
56-78: New CancellationToken API: shape and docs look goodMatches the Rust binding (child_token, cancel, cancelled). No issues.
| async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> { | ||
| let (stream, context) = self.generate_in_parts(request).await?; | ||
| Ok(ResponseStream::new(Box::pin(stream), context)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix double pin when wrapping DataStream in ResponseStream
generate_in_parts already returns a DataStream (pinned). Wrapping it again with Box::pin yields a Pin<Box<Pin<Box<...>>>> and will not compile.
- let (stream, context) = self.generate_in_parts(request).await?;
- Ok(ResponseStream::new(Box::pin(stream), context))
+ let (stream, context) = self.generate_in_parts(request).await?;
+ Ok(ResponseStream::new(stream, context))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> { | |
| let (stream, context) = self.generate_in_parts(request).await?; | |
| Ok(ResponseStream::new(Box::pin(stream), context)) | |
| } | |
| async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> { | |
| let (stream, context) = self.generate_in_parts(request).await?; | |
| Ok(ResponseStream::new(stream, context)) | |
| } |
🤖 Prompt for AI Agents
In lib/bindings/python/rust/engine.rs around lines 307 to 310, the code
double-pins the stream returned by generate_in_parts by calling Box::pin(stream)
even though generate_in_parts already returns a pinned DataStream; remove the
extra Box::pin and pass the returned pinned stream directly to
ResponseStream::new (or adjust types so ResponseStream::new accepts the pinned
DataStream) to avoid the Pin<Box<Pin<...>>> compile error.
| match self.0 .0.generate_in_parts(request).await { | ||
| Ok((mut stream, context)) => { | ||
| let request_id = context.id().to_string(); | ||
| let first_item = match futures::StreamExt::next(&mut stream).await { | ||
| // TODO - item may still contain an Annotated error. How do we want to handle that? | ||
| // TODO - should we be returning an HttpError here? | ||
| Some(item) => item, | ||
| None => { | ||
| let error_msg = "python async generator stream ended before processing started"; | ||
| tracing::warn!(request_id, error_msg); | ||
| return Err(Error::new(std::io::Error::new( | ||
| std::io::ErrorKind::UnexpectedEof, | ||
| error_msg, | ||
| ))); | ||
| } | ||
| }; | ||
|
|
||
| // Create a new stream that yields the first item followed by the rest of the original stream | ||
| let once_stream = futures::stream::once(async { first_item }); | ||
| let stream = futures::StreamExt::chain(once_stream, stream); | ||
|
|
||
| Ok(ResponseStream::new(Box::pin(stream), context)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Actually enforce “inspect first item for error” and fail early
The PR goal says we await the first item to check for errors. Right now we only guard against EOF; annotated errors pass through. Fail fast and surface an HTTP error.
Apply this diff (adjust message as desired):
- let first_item = match futures::StreamExt::next(&mut stream).await {
+ let first_item = match futures::StreamExt::next(&mut stream).await {
// TODO - item may still contain an Annotated error. How do we want to handle that?
// TODO - should we be returning an HttpError here?
Some(item) => item,
None => {
let error_msg = "python async generator stream ended before processing started";
- tracing::warn!(request_id, error_msg);
+ tracing::warn!(%request_id, "{}", error_msg);
return Err(Error::new(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
error_msg,
)));
}
};
+
+ // Fail fast if Python produced an annotated error as the first item.
+ if first_item.is_error() {
+ let msg = format!("first stream item was an error; request_id={}", request_id);
+ // Prefer explicit HTTP error so callers get an HTTP status instead of a 500 IO error.
+ return Err(http_error::HttpError { code: 500, message: msg })?;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| match self.0 .0.generate_in_parts(request).await { | |
| Ok((mut stream, context)) => { | |
| let request_id = context.id().to_string(); | |
| let first_item = match futures::StreamExt::next(&mut stream).await { | |
| // TODO - item may still contain an Annotated error. How do we want to handle that? | |
| // TODO - should we be returning an HttpError here? | |
| Some(item) => item, | |
| None => { | |
| let error_msg = "python async generator stream ended before processing started"; | |
| tracing::warn!(request_id, error_msg); | |
| return Err(Error::new(std::io::Error::new( | |
| std::io::ErrorKind::UnexpectedEof, | |
| error_msg, | |
| ))); | |
| } | |
| }; | |
| // Create a new stream that yields the first item followed by the rest of the original stream | |
| let once_stream = futures::stream::once(async { first_item }); | |
| let stream = futures::StreamExt::chain(once_stream, stream); | |
| Ok(ResponseStream::new(Box::pin(stream), context)) | |
| } | |
| match self.0 .0.generate_in_parts(request).await { | |
| Ok((mut stream, context)) => { | |
| let request_id = context.id().to_string(); | |
| let first_item = match futures::StreamExt::next(&mut stream).await { | |
| // TODO - item may still contain an Annotated error. How do we want to handle that? | |
| // TODO - should we be returning an HttpError here? | |
| Some(item) => item, | |
| None => { | |
| let error_msg = "python async generator stream ended before processing started"; | |
| tracing::warn!(%request_id, "{}", error_msg); | |
| return Err(Error::new(std::io::Error::new( | |
| std::io::ErrorKind::UnexpectedEof, | |
| error_msg, | |
| ))); | |
| } | |
| }; | |
| // Fail fast if Python produced an annotated error as the first item. | |
| if first_item.is_error() { | |
| let msg = format!("first stream item was an error; request_id={}", request_id); | |
| // Prefer explicit HTTP error so callers get an HTTP status instead of a 500 IO error. | |
| return Err(http_error::HttpError { code: 500, message: msg })?; | |
| } | |
| // Create a new stream that yields the first item followed by the rest of the original stream | |
| let once_stream = futures::stream::once(async { first_item }); | |
| let stream = futures::StreamExt::chain(once_stream, stream); | |
| Ok(ResponseStream::new(Box::pin(stream), context)) | |
| } |
🤖 Prompt for AI Agents
lib/bindings/python/rust/http.rs around lines 180-202: the code awaits the first
stream item only for EOF but does not fail early when that first item contains
an Annotated/error; update the logic after receiving first_item to inspect
whether it represents an error and, if so, convert it into an immediate
Err(HttpError) (or the project’s HTTP error type) with a clear message and
request_id, logging appropriately; if first_item is Ok/valid, wrap it and chain
the rest of the stream as before.
| let error_msg = "python async generator stream ended before processing started"; | ||
| tracing::warn!(request_id, error_msg); | ||
| return Err(Error::new(std::io::Error::new( | ||
| std::io::ErrorKind::UnexpectedEof, | ||
| error_msg, | ||
| ))); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix tracing::warn! invocation (won’t compile as written)
tracing requires a format string and/or key-value fields. Current call will not compile.
Apply this diff:
- tracing::warn!(request_id, error_msg);
+ tracing::warn!(%request_id, "{}", error_msg);🤖 Prompt for AI Agents
In lib/bindings/python/rust/http.rs around lines 188 to 194, the tracing::warn!
call is using bare values and won't compile; replace it with a tracing
invocation that provides a format string and key-value fields (e.g. include
request_id as a field and log the message with a "{}" format or as a keyed
value) so the call becomes a valid tracing macro with either
tracing::warn!(request_id = ?request_id, "{}", error_msg) or
tracing::warn!(request_id = ?request_id, error = %error_msg), then keep the
subsequent Err creation unchanged.
|
Background: #2592 we originally submitted this Pr, which required modifications of the engine.rs. However, this wait for HTTPError is only useful for the HTTPEngine. As a result we refactored this PR to make it compatible with upstream dynamo. |
| match self.0 .0.generate_in_parts(request).await { | ||
| Ok((mut stream, context)) => { | ||
| let request_id = context.id().to_string(); | ||
| let first_item = match futures::StreamExt::next(&mut stream).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should extract the error message check / check if HTTPError (identical to if let Some(py_err) = e.downcast_ref:)
Overview:
HttpAsyncEngine awaits the first result from PythonAsyncEngine to check for errors.
TODO - I need some help to better summarize the problem this is trying to solve before sending this PR upstream.
Details:
HttpAsyncEngine::generate awaits the first item and checks for errors, and then creates a new stream from this item and the remaining items.
To enable this, PythonServerStreamingEngine is modified to have a helper method which returns the stream and context (the parts) separately before they are combined into ResponseStream.
Where should the reviewer start?
lib/bindings/python/rust/http.rs is a good starting point.
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
New Features
Improvements