-
Notifications
You must be signed in to change notification settings - Fork 690
feat: delay python stream until yield #2592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: delay python stream until yield #2592
Conversation
|
👋 Hi michaelfeil! Thank you for contributing to ai-dynamo/dynamo. Just a reminder: The 🚀 |
WalkthroughAdds explicit futures stream handling and prefetch logic in Python engine streaming; exposes EndpointType and a new Python-binding method to enable/disable endpoints; introduces a synchronous HttpService method used by the async wrapper for endpoint toggling. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant PyGen as Python Generator
participant RustEng as Rust Engine
participant Stream as Futures Stream
Note over RustEng: Prepare stream
RustEng->>PyGen: create stream
RustEng->>Stream: Box pin (mutable)
RustEng->>Stream: next() (prefetch)
alt First item Ok
RustEng->>Stream: chain(first_item + remaining)
else First item Err
RustEng-->>RustEng: log debug
RustEng-->>RustEng: return Err(Error::new(e))
else No item (EOF)
RustEng-->>RustEng: log warn
RustEng-->>RustEng: return Err(UnexpectedEof)
end
loop Process items
RustEng->>Stream: next()
alt Item Ok
RustEng-->>RustEng: handle item
else Item Err
RustEng-->>RustEng: handle error
end
end
sequenceDiagram
autonumber
participant Py as Python
participant PyHTTP as HttpService (Py)
participant RustHTTP as HttpService (Rust)
participant State as State Flags
Py->>PyHTTP: enable_endpoint("chat"|"completion"|"embedding", enabled)
PyHTTP->>RustHTTP: sync_enable_model_endpoint(EndpointType, enabled)
RustHTTP->>State: set(endpoint_type, enabled)
State-->>RustHTTP: updated
RustHTTP-->>PyHTTP: Ok
PyHTTP-->>Py: PyResult(())
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
lib/bindings/python/rust/engine.rs (1)
220-242: Prefetching the first item fixes the “HTTP 200 already sent” class of bugs; adjust tracing field usage.
- The await of the first item before constructing the outbound stream is the right approach to surface early Python exceptions (e.g., HttpError) before any response headers are committed. Nice.
- Minor: tracing fields are currently passed as a bare identifier (request_id). This records a unit field rather than the desired value. Prefer request_id = %request_id (Display) or request_id = ?request_id (Debug) to include the actual ID in structured logs.
Suggested change to the logging calls within this block:
- tracing::debug!( - request_id, - "Python exception occurred before finish of first iteration: {}", - e - ); + tracing::debug!( + request_id = %request_id, + "Python exception occurred before finish of first iteration: {e}" + ); ... - tracing::warn!( - request_id, - "python async generator stream ended before processing started" - ); + tracing::warn!( + request_id = %request_id, + "python async generator stream ended before processing started" + );As a follow-up, consider adding a targeted test that asserts:
- raising HttpError before first yield results in a non-200 HTTP status, and
- raising after the first yield continues to stream annotated errors.
I can help scaffold this if desired.lib/llm/src/http/service/service_v2.rs (1)
266-276: Synchronous helper is sound; consider simplifying the async wrapper and exposing previous state.
- enable_model_endpoint simply delegates; keeping it async preserves API compatibility, but if callers don’t rely on .await here, consider making it a plain fn to avoid misleading async semantics.
- Optional: return the previous enabled state from sync_enable_model_endpoint for observability/telemetry and easier idempotency checks.
Example (optional):
- pub async fn enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) { - self.sync_enable_model_endpoint(endpoint_type, enable); - } + pub fn enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) { + self.sync_enable_model_endpoint(endpoint_type, enable); + }and/or
- pub fn sync_enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) { - self.state.flags.set(&endpoint_type, enable); + pub fn sync_enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) -> bool { + let was = self.state.flags.get(&endpoint_type); + self.state.flags.set(&endpoint_type, enable); tracing::info!( "{} endpoints {}", endpoint_type.as_str(), if enable { "enabled" } else { "disabled" } ); + was }If you want me to check for external call sites that await enable_model_endpoint, I can generate a quick repo scan script.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
lib/bindings/python/rust/engine.rs(3 hunks)lib/bindings/python/rust/http.rs(2 hunks)lib/llm/src/http/service/service_v2.rs(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
lib/bindings/python/rust/engine.rs (2)
lib/bindings/python/rust/lib.rs (8)
PyErr(170-170)PyErr(342-342)PyErr(368-368)PyErr(871-871)id(924-926)next(859-891)new(268-293)new(902-906)lib/bindings/python/rust/context.rs (2)
id(40-42)new(19-21)
lib/bindings/python/rust/http.rs (1)
lib/bindings/python/rust/lib.rs (1)
to_pyerr(124-129)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: Build and Test - dynamo
- GitHub Check: pre-merge-rust (lib/bindings/python)
🔇 Additional comments (4)
lib/bindings/python/rust/engine.rs (3)
17-17: Good call: disambiguating StreamExt imports avoids trait-method clashes.Aliasing futures::stream::StreamExt and tokio_stream::StreamExt is the right move here and will prevent accidental method resolution to the wrong trait.
Also applies to: 25-25
212-212: Required to await first item: pinning as mutable is correct.Pinning mutably to call next(&mut stream) is necessary and correct here.
257-259: Explicit use of FuturesStreamExt::next avoids trait ambiguity—good.Given both TokioStreamExt and FuturesStreamExt are in scope, explicitly selecting FuturesStreamExt::next here is clear and correct.
lib/bindings/python/rust/http.rs (1)
22-22: Re-exporting EndpointType here is helpful for binding cohesion.This keeps the Python bindings aligned with the server’s endpoint enum without duplicating types.
| self.sync_enable_model_endpoint(endpoint_type, enable); | ||
| } | ||
|
|
||
| pub fn sync_enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you drop the async from enable_model_endpoint instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would span across the codebase. Realized this is in the wrong pr.
|
Can you take a look at the Code Rabbit reviews? They are usually very helpful. |
What's your use case for that? Just curious. |
|
If I understand correctly, the goal of this change is to preserve the http status code until the first LLM response is received by the frontend, so any failure up to the first response can be reported as a non-2xx http status. Currently, the http status code is returned as soon as the stream to the LLM worker is established. |
lib/bindings/python/rust/engine.rs
Outdated
| ))); | ||
| } | ||
| }; | ||
| // Create a new stream that yields the first item followed by the rest of the original stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If #2592 (comment) is correct, I think we should try to patch the http limitation at the http frontend, instead of changing the behavior globally at the engine binding.
I think we want to preserve the difference in timeout interval for establishing a stream to the LLM worker vs time-to-first-token, for the future. For instance, we want to migrate the request if the stream is not established within 50 ms, but it may take up to 1000 ms before receiving the first-token.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumption things that you can always make a request work. Often, you will hit additional pydanic contraints inside of the trt-llm worker. It could be that the max_tokens setting is to low to fit the json schema for structured output etc. Some requests are not migratable here. At the same time, the function to setup a stream "call1" in the engine, is syncronous and requires holding of the gil by design. If the setup function to python was async (which it could have been), we could perform this kind of validation potentially during setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, this could be moved into HTTPEngine. The delay can be optimal, as its most important for pre-processing (token lenght, schema compilation, pre-processing request. X-grammar is a edge case.
|
@grahamking Roughly the implementation for our processor in python. Apologies for only posting code internally, @itay https://basetenlabs.slack.com/archives/C04BUDD86FR/p1755798071657499?thread_ts=1755752531.714119&cid=C04BUDD86FR |
|
Closing because we want to continue in #2974 or pr by @blarson-b10 |
Overview:
Details:
problem:
Use case:
Catching HTTPExceptions during the stream. E.g. on setup, there are some issues that can happen during setup (x-grammar can't compile the grammar, tokenization is too long, usage of non-combinable features tool calling + structured output, disaggregated server setup failure). Open to how to implement it, we found this is the best way, I implemented this feature 2x (in 0.1 and 0.4), its probably best to not start the stream. Probably you will need to be able to raise a Error out of the async generator, at least for the first iteration.
Fix:
Wait for the first item to complete before setting up the stream.
Open to discussions how to raise the HTTPError better.
Where should the reviewer start?
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
New Features
Bug Fixes