-
Notifications
You must be signed in to change notification settings - Fork 676
feat: Failure Detection while Responses are returning #1671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c1254e3 to
4b1ea1b
Compare
4b1ea1b to
20f0e8a
Compare
ryanolson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are trying to create a restartable request where we have streamed some responses back to the user, then we should define a RestartableStatefulAsyncEngine.
Ultimately, to restart an autoregressive request at some future state, you need to accumulate the responses, update the request, then reissue it with.
This is particularly problematic however with the prompt templating and requires that the prompt template can properly render partial assistant messages. Most cannot.
We have the capability to do this in our prompt rendering engine.
Notice how we conditionally return a bool for should_add_generation_prompt based on if the last "Message" object is a User --> True or not-a-User which we return False.
Essentially, if the prompt template doesn't support this, we can't reproduce the proper token sequence via the prompt template / tokenization path.
We could take the sequence of tokens and reissue them without needing to re-render the request.
Regardless, we still need to capture state and produce a new request for restart.
Before doing this generally, let's do it for LLM requests as that's the core nature of our framework. Then generalize to trait that our objects need to implement to be "restartable".
I would suggest doing this for an engine that take a common llm request and common llm response object.
we'll probably want to use a scan combinator to collect state.
I think your PR defines this EOS - end of stream - sentinel; however, that should be conditioned upon the response stream itself or the backend producing it.
for LLMs, we have a stop_condition which we should always have sent on the last message in the stream.
seeing a stream terminate without seeing a stop_condition would be a possible trigger for a "restart".
WalkthroughThe changes introduce a standardized error handling trait ( Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant PushRouter
participant AddressedPushRouter
participant Stream
participant Receiver
Client->>PushRouter: Send request
PushRouter->>AddressedPushRouter: Route and generate stream
AddressedPushRouter->>Stream: Wrap responses in StreamItemWrapper
Stream->>Receiver: Send StreamItemWrapper<U> (data or completion)
Receiver->>Client: Receive annotated data or error
Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Clippy (1.86.0)error: failed to get Caused by: Caused by: Caused by: Caused by: Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
lib/llm/src/protocols/common/llm_backend.rs (1)
138-150: LGTM: Correct IsError implementation with minor suggestion.The implementation properly integrates with the existing
LLMEngineOutputstructure:
from_errleverages the existingerror()constructorerrcorrectly mapsFinishReason::Errorto boxed errors- Returns
Nonefor non-error finish reasonsConsider using
err.to_string()instead offormat!("{:?}", err)for more user-friendly error messages.- LLMEngineOutput::error(format!("{:?}", err)) + LLMEngineOutput::error(err.to_string())lib/runtime/src/pipeline/network/egress/push_router.rs (1)
184-224: Fault detection correctly identifies unresponsive instances.The implementation properly handles
NoResponderserrors using the correct async-nats error handling pattern. However, the TODO and commented code suggest that stream-level fault detection is still pending.Would you like me to investigate alternative approaches for stream-level fault detection that might avoid the compiler crash?
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
lib/bindings/python/rust/lib.rs(3 hunks)lib/llm/src/protocols/common/llm_backend.rs(3 hunks)lib/runtime/src/pipeline/network.rs(1 hunks)lib/runtime/src/pipeline/network/egress/addressed_router.rs(3 hunks)lib/runtime/src/pipeline/network/egress/push_router.rs(5 hunks)lib/runtime/src/pipeline/network/ingress/push_handler.rs(1 hunks)lib/runtime/src/protocols.rs(1 hunks)lib/runtime/src/protocols/annotated.rs(4 hunks)lib/runtime/src/protocols/is_error.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1285
File: lib/llm/src/kv_router/scheduler.rs:260-266
Timestamp: 2025-05-30T06:34:12.785Z
Learning: In the KV router scheduler code, PeaBrane prefers fail-fast behavior over silent failure handling. When accessing worker metrics data that could be out-of-bounds (like dp_rank indexing), explicit panics are preferred over graceful degradation with continue statements to ensure data integrity issues are caught early.
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
lib/runtime/src/protocols.rs (2)
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.
lib/runtime/src/pipeline/network/ingress/push_handler.rs (1)
Learnt from: oandreeva-nv
PR: ai-dynamo/dynamo#1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.
lib/runtime/src/protocols/is_error.rs (1)
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
lib/bindings/python/rust/lib.rs (2)
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.
lib/llm/src/protocols/common/llm_backend.rs (1)
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
lib/runtime/src/pipeline/network/egress/addressed_router.rs (6)
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::client::RequestErrorKind::NoResponders, not async_nats::Error::NoResponders. Use err.downcast_ref::<async_nats::client::RequestError>() and then check request_err.kind() against RequestErrorKind::NoResponders.
Learnt from: oandreeva-nv
PR: ai-dynamo/dynamo#1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1236
File: lib/llm/src/mocker/engine.rs:140-161
Timestamp: 2025-06-17T00:50:44.845Z
Learning: In Rust async code, when an Arc<Mutex<_>> is used solely to transfer ownership of a resource (like a channel receiver) into a spawned task rather than for sharing between multiple tasks, holding the mutex lock across an await is not problematic since there's no actual contention.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::error::RequestErrorKind::NoResponders. Use err.downcast_ref::<async_nats::error::RequestError>() and then check req_err.kind() against RequestErrorKind::NoResponders to handle this error properly.
lib/runtime/src/pipeline/network/egress/push_router.rs (5)
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1285
File: lib/llm/src/kv_router/scoring.rs:58-63
Timestamp: 2025-05-30T06:38:09.630Z
Learning: In lib/llm/src/kv_router/scoring.rs, the user prefers to keep the panic behavior when calculating load_avg and variance with empty endpoints rather than adding guards for division by zero. They want the code to fail fast on this error condition.
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::client::RequestErrorKind::NoResponders, not async_nats::Error::NoResponders. Use err.downcast_ref::<async_nats::client::RequestError>() and then check request_err.kind() against RequestErrorKind::NoResponders.
Learnt from: kthui
PR: ai-dynamo/dynamo#1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::error::RequestErrorKind::NoResponders. Use err.downcast_ref::<async_nats::error::RequestError>() and then check req_err.kind() against RequestErrorKind::NoResponders to handle this error properly.
🧬 Code Graph Analysis (2)
lib/runtime/src/protocols.rs (1)
lib/runtime/src/protocols/annotated.rs (1)
is_error(132-134)
lib/runtime/src/pipeline/network/egress/addressed_router.rs (2)
lib/runtime/src/protocols/annotated.rs (3)
is_error(132-134)from_err(154-156)err(158-169)lib/runtime/src/protocols/is_error.rs (4)
from_err(20-20)from_err(44-48)err(23-23)err(49-51)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (16)
lib/runtime/src/protocols.rs (1)
22-22: LGTM: Clean module declaration.The new
is_errormodule is properly declared and exposed publicly, following the established pattern in the protocols module.lib/runtime/src/pipeline/network.rs (1)
327-333: LGTM: Well-designed wrapper struct for stream completion signaling.The
StreamItemWrapper<U>struct effectively addresses the immediate need for explicit stream completion detection. The design is sound:
- Optional data field allows for completion-only messages
skip_serializing_ifattribute optimizes serialization- Clear naming and documentation
The TODO comment appropriately indicates this is a temporary solution pending SSE implementation.
lib/runtime/src/pipeline/network/ingress/push_handler.rs (2)
100-116: LGTM: Proper stream item wrapping with error handling.The implementation correctly wraps each response in
StreamItemWrapperwith appropriate completion flags. The error handling is well-designed:
- Sets
send_complete_final = falseon send failure- Calls
context.stop_generating()to halt upstream processing- Breaks the loop to prevent further processing
This ensures proper cleanup when stream transmission fails.
117-130: LGTM: Explicit stream completion signaling.The completion signaling logic is correctly implemented:
- Only sends completion signal if no errors occurred during streaming
- Uses
data: None, complete_final: trueto clearly indicate stream end- Logs errors but doesn't propagate them (appropriate for cleanup phase)
This provides the explicit end-of-stream detection needed for robust streaming.
lib/runtime/src/protocols/is_error.rs (2)
18-34: LGTM: Excellent trait design for standardized error handling.The
IsErrortrait provides a clean, consistent interface for error handling across the codebase:
from_erranderrmethods enable bidirectional error conversion- Default implementations of
is_okandis_errreduce boilerplate- Uses
Box<dyn Error>for maximum flexibility- Follows Rust conventions similar to
Result<T, E>This design will enable consistent error handling across different response types.
36-61: LGTM: Comprehensive test coverage validates trait behavior.The test implementation demonstrates proper usage of the trait:
- Tests both error and success states
- Verifies error message preservation through conversions
- Validates default implementations of
is_okandis_err- Uses
anyhow::Errorfor realistic error handling scenariosThe test coverage provides confidence in the trait's correctness.
lib/llm/src/protocols/common/llm_backend.rs (2)
21-21: LGTM: Clean import of IsError trait.The import is properly scoped and enables the trait implementation for
LLMEngineOutput.
163-184: LGTM: Thorough test coverage validates IsError implementation.The test comprehensively validates the
IsErrortrait implementation:
- Tests success states (stop finish reason)
- Tests error states with message preservation
- Tests
from_errconstructor integration- Validates all trait methods (
is_ok,is_err,err)This provides confidence that the implementation behaves correctly across all scenarios.
lib/runtime/src/protocols/annotated.rs (2)
150-170: LGTM! Clean implementation of theIsErrortrait.The implementation correctly handles error conversion and extraction, with appropriate fallback to "unknown error" when comments are missing.
193-215: Good test coverage for theIsErrorimplementation.The tests comprehensively verify all paths: normal data, error from string, and error from boxed error.
lib/bindings/python/rust/lib.rs (2)
217-217: Type update aligns with the new error-aware streaming protocol.The change to use
RsAnnotated<serde_json::Value>as the output type is consistent with theIsErrortrait requirement inPushRouter.Also applies to: 488-493
759-765: Simplified stream processing by removing redundant deserialization.Good optimization - the stream now provides annotated values directly, eliminating the need for deserialization.
lib/runtime/src/pipeline/network/egress/addressed_router.rs (2)
84-84: Trait bound addition is consistent with the error handling requirements.The
IsErrortrait bound onUenables proper error construction in the stream processing logic.
164-208: Comprehensive stream processing with proper error handling.The implementation correctly handles all edge cases:
- Protocol violations (data after completion)
- Deserialization errors
- Empty responses
- Premature stream closure
The stateful tracking of
is_complete_finalworks correctly within thefilter_mapclosure.lib/runtime/src/pipeline/network/egress/push_router.rs (2)
98-98: Trait bound addition enables error handling in the routing layer.The
IsErrortrait bound onUis required by theAddressedPushRouterand enables proper error construction.Also applies to: 231-231
113-131: Clean refactoring that eliminates code duplication.The routing methods now have clear separation of concerns:
- Instance selection logic specific to each routing strategy
- Common fault detection delegated to
generate_with_fault_detectionAlso applies to: 135-152, 160-174
ryanolson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a branch with has a python wrapper with a protocol for the return type. Using this we can evaluate the Python return object and decide to conditionally wrap it.
Let's set up a time to discuss
…rapper and IsError to MaybeError
Overview:
The router will detect if a stream is closed before all responses are received. If not, an error will be propagated to the stream consumer.
With this change, the Python script is informed when not all responses are received.
Additionally, if the error originate from networking, the instance is inhibited for future requests until ETCD can update.
Details:
StreamItemWrapperfor client to determine if all responses are received - this will be replaced with Server-Sent Events (SSE) for detecting proper stream ending in a future PR.Testing setup with
examples/fault_tolerance:The
client.pysends a request to theprocessor.pyand theprocessor.pyrelays the request toworker.py, which theworker.pyyields 9 responses for each request received, and the yielded responses are transmitted back to theprocessor.pyand thenclient.py, in order. Theclient.pyonly sends the next request after all 9 responses are received from the current request.While the
worker.pywas yielding responses, theworker.pywas stopped prematurely. Theprocessor.pyis expected to be notified of the incomplete stream via a Python exception.Before change:
The
processor.pyended at response 4 believing it is the last response, but the stream ended prematurely in reality.After change:
The processor received an exception stating
Stream ended before generation completed.Note the
examples/fault_tolerancecan be found in the commit history.Where should the reviewer start?
Start with the new
is_error.rscontaining the newIsErrortrait, and then move on toAnnotated<...>struct. Then, move on to streaming fault detection in the router implementation. Finally, check out minor change onLLMEngineOutput.Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Relates to ai-dynamo/enhancements#15
Summary by CodeRabbit
New Features
Refactor
Tests