Skip to content

Commit 347620a

Browse files
authored
feat: Allow Python Engine to end stream before final (#2270)
1 parent b48d4c3 commit 347620a

File tree

3 files changed

+54
-3
lines changed

3 files changed

+54
-3
lines changed

docs/guides/backend.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,31 @@ Example 4: Multiple component in a pipeline.
109109

110110
In the P/D disaggregated setup you would have `deepseek-distill-llama8b.prefill.generate` (possibly multiple instances of this) and `deepseek-distill-llama8b.decode.generate`.
111111

112+
## Migrate Ongoing Requests
113+
114+
A Python worker may need to be shut down promptly, for example when the node running the worker is to be reclaimed and there isn't enough time to complete all ongoing requests before the shutdown deadline.
115+
116+
In such cases, you can signal incomplete responses by raising a `GeneratorExit` exception in your generate loop. This will immediately close the response stream, signaling to the frontend that the stream is incomplete. With request migration enabled (see the [`migration_limit`](../architecture/request_migration.md) parameter), the frontend will automatically migrate the partially completed request to another worker instance, if available, to be completed.
117+
118+
> [!WARNING]
119+
> We will update the `GeneratorExit` exception to a new Dynamo exception. Please expect minor code breaking change in the near future.
120+
121+
Here's an example of how to implement this in your `RequestHandler`:
122+
123+
```python
124+
class RequestHandler:
125+
126+
async def generate(self, request):
127+
"""Generate response, with support for request migration"""
128+
for result in self.engine.generate_streaming(request):
129+
# Check if we need to migrate before yielding each token
130+
if is_shutting_down():
131+
# Raising GeneratorExit closes the stream and triggers migration
132+
raise GeneratorExit("Worker shutting down, migrating request")
133+
134+
yield result
135+
```
136+
137+
When `GeneratorExit` is raised, the frontend receives the incomplete response and can seamlessly continue generation on another available worker instance, preserving the user experience even during worker shutdowns.
138+
139+
For more information about how request migration works, see the [Request Migration Architecture](../architecture/request_migration.md) documentation.

lib/bindings/python/rust/engine.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ enum ResponseProcessingError {
134134
#[error("python exception: {0}")]
135135
PythonException(String),
136136

137+
#[error("python generator exit: {0}")]
138+
PyGeneratorExit(String),
139+
137140
#[error("deserialize error: {0}")]
138141
DeserializeError(String),
139142

@@ -225,6 +228,9 @@ where
225228
let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e);
226229
msg
227230
}
231+
ResponseProcessingError::PyGeneratorExit(_) => {
232+
"Stream ended before generation completed".to_string()
233+
}
228234
ResponseProcessingError::PythonException(e) => {
229235
let msg = format!("a python exception was caught while processing the async generator: {}", e);
230236
msg
@@ -276,8 +282,16 @@ where
276282
{
277283
let item = item.map_err(|e| {
278284
println!();
279-
Python::with_gil(|py| e.display(py));
280-
ResponseProcessingError::PythonException(e.to_string())
285+
let mut is_py_generator_exit = false;
286+
Python::with_gil(|py| {
287+
e.display(py);
288+
is_py_generator_exit = e.is_instance_of::<pyo3::exceptions::PyGeneratorExit>(py);
289+
});
290+
if is_py_generator_exit {
291+
ResponseProcessingError::PyGeneratorExit(e.to_string())
292+
} else {
293+
ResponseProcessingError::PythonException(e.to_string())
294+
}
281295
})?;
282296
let response = tokio::task::spawn_blocking(move || {
283297
Python::with_gil(|py| depythonize::<Resp>(&item.into_bound(py)))

lib/runtime/src/pipeline/network/ingress/push_handler.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// limitations under the License.
1515

1616
use super::*;
17+
use crate::protocols::maybe_error::MaybeError;
1718
use prometheus::{Histogram, IntCounter, IntCounterVec, IntGauge};
1819
use serde::{Deserialize, Serialize};
1920
use std::sync::Arc;
@@ -105,7 +106,7 @@ impl WorkHandlerMetrics {
105106
impl<T: Data, U: Data> PushWorkHandler for Ingress<SingleIn<T>, ManyOut<U>>
106107
where
107108
T: Data + for<'de> Deserialize<'de> + std::fmt::Debug,
108-
U: Data + Serialize + std::fmt::Debug,
109+
U: Data + Serialize + MaybeError + std::fmt::Debug,
109110
{
110111
fn add_metrics(&self, endpoint: &crate::component::Endpoint) -> Result<()> {
111112
// Call the Ingress-specific add_metrics implementation
@@ -220,6 +221,14 @@ where
220221
let mut send_complete_final = true;
221222
while let Some(resp) = stream.next().await {
222223
tracing::trace!("Sending response: {:?}", resp);
224+
if let Some(err) = resp.err() {
225+
const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
226+
if format!("{:?}", err) == STREAM_ERR_MSG {
227+
tracing::warn!(STREAM_ERR_MSG);
228+
send_complete_final = false;
229+
break;
230+
}
231+
}
223232
let resp_wrapper = NetworkStreamWrapper {
224233
data: Some(resp),
225234
complete_final: false,

0 commit comments

Comments
 (0)