Skip to content

Commit c1254e3

Browse files
committed
[WIP] Implement instance inhibit for node failure while streaming response
1 parent 622f104 commit c1254e3

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

lib/bindings/python/rust/lib.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -643,14 +643,15 @@ impl Client {
643643
let annotated = annotated.unwrap_or(false);
644644

645645
let (tx, rx) = tokio::sync::mpsc::channel(32);
646-
let client = self.router.clone();
646+
let router = self.router.clone();
647647

648648
pyo3_async_runtimes::tokio::future_into_py(py, async move {
649-
let stream = client.round_robin(request.into()).await.map_err(to_pyerr)?;
649+
let (stream, instance_id) = router.round_robin_ret_id(request.into()).await.map_err(to_pyerr)?;
650650
tokio::spawn(process_stream(stream, tx));
651651
Ok(AsyncResponseStream::new(
652652
Arc::new(Mutex::new(rx)),
653653
annotated,
654+
Some((router.clone(), instance_id)),
654655
))
655656
})
656657
}
@@ -667,14 +668,15 @@ impl Client {
667668
let annotated = annotated.unwrap_or(false);
668669

669670
let (tx, rx) = tokio::sync::mpsc::channel(32);
670-
let client = self.router.clone();
671+
let router = self.router.clone();
671672

672673
pyo3_async_runtimes::tokio::future_into_py(py, async move {
673-
let stream = client.random(request.into()).await.map_err(to_pyerr)?;
674+
let (stream, instance_id) = router.random_ret_id(request.into()).await.map_err(to_pyerr)?;
674675
tokio::spawn(process_stream(stream, tx));
675676
Ok(AsyncResponseStream::new(
676677
Arc::new(Mutex::new(rx)),
677678
annotated,
679+
Some((router.clone(), instance_id)),
678680
))
679681
})
680682
}
@@ -692,10 +694,10 @@ impl Client {
692694
let annotated = annotated.unwrap_or(false);
693695

694696
let (tx, rx) = tokio::sync::mpsc::channel(32);
695-
let client = self.router.clone();
697+
let router = self.router.clone();
696698

697699
pyo3_async_runtimes::tokio::future_into_py(py, async move {
698-
let stream = client
700+
let stream = router
699701
.direct(request.into(), instance_id)
700702
.await
701703
.map_err(to_pyerr)?;
@@ -705,6 +707,7 @@ impl Client {
705707
Ok(AsyncResponseStream::new(
706708
Arc::new(Mutex::new(rx)),
707709
annotated,
710+
Some((router.clone(), instance_id)),
708711
))
709712
})
710713
}
@@ -731,6 +734,7 @@ impl Client {
731734
Ok(AsyncResponseStream::new(
732735
Arc::new(Mutex::new(rx)),
733736
annotated,
737+
None,
734738
))
735739
})
736740
}
@@ -778,22 +782,27 @@ async fn process_stream(
778782

779783
#[pyclass]
780784
struct AsyncResponseStream {
785+
// Pipe for receiving responses.
781786
rx: Arc<Mutex<tokio::sync::mpsc::Receiver<RsAnnotated<PyObject>>>>,
782787
// Return response in Annotated wrapper.
783788
annotated: bool,
784-
// For iterator to track complete final revceival.
789+
// For iterator to track complete final receival.
785790
is_complete_final: Arc<AtomicBool>,
791+
// Track the (router, instance_id) for reporting complete final errors.
792+
routing: Option<(rs::pipeline::PushRouter<serde_json::Value, serde_json::Value>, i64)>,
786793
}
787794

788795
impl AsyncResponseStream {
789796
pub fn new(
790797
rx: Arc<Mutex<tokio::sync::mpsc::Receiver<RsAnnotated<PyObject>>>>,
791798
annotated: bool,
799+
routing: Option<(rs::pipeline::PushRouter<serde_json::Value, serde_json::Value>, i64)>,
792800
) -> Self {
793801
AsyncResponseStream {
794802
rx,
795803
annotated,
796804
is_complete_final: Arc::new(AtomicBool::new(false)),
805+
routing,
797806
}
798807
}
799808
}
@@ -811,6 +820,7 @@ impl AsyncResponseStream {
811820
let rx = self.rx.clone();
812821
let annotated = self.annotated;
813822
let is_complete_final = self.is_complete_final.clone();
823+
let routing = self.routing.clone();
814824

815825
pyo3_async_runtimes::tokio::future_into_py(py, async move {
816826
loop {
@@ -849,6 +859,9 @@ impl AsyncResponseStream {
849859
}
850860
None => {
851861
if !(*is_complete_final).load(Ordering::Relaxed) {
862+
if let Some((router, instance_id)) = &routing {
863+
router.client.report_instance_down(*instance_id).await;
864+
}
852865
return Err(PyConnectionError::new_err(
853866
"Stream closed unexpectedly before complete final",
854867
));

lib/runtime/src/pipeline/network/egress/push_router.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@ where
109109

110110
/// Issue a request to the next available instance in a round-robin fashion
111111
pub async fn round_robin(&self, request: SingleIn<T>) -> anyhow::Result<ManyOut<U>> {
112+
match self.round_robin_ret_id(request).await {
113+
Ok((stream, _)) => Ok(stream),
114+
Err(err) => Err(err),
115+
}
116+
}
117+
118+
pub async fn round_robin_ret_id(
119+
&self,
120+
request: SingleIn<T>,
121+
) -> anyhow::Result<(ManyOut<U>, i64)> {
112122
let slf = self;
113123
let routing_algorithm = move || async move {
114124
let counter = slf.round_robin_counter.fetch_add(1, Ordering::Relaxed);
@@ -135,6 +145,13 @@ where
135145

136146
/// Issue a request to a random endpoint
137147
pub async fn random(&self, request: SingleIn<T>) -> anyhow::Result<ManyOut<U>> {
148+
match self.random_ret_id(request).await {
149+
Ok((stream, _)) => Ok(stream),
150+
Err(err) => Err(err),
151+
}
152+
}
153+
154+
pub async fn random_ret_id(&self, request: SingleIn<T>) -> anyhow::Result<(ManyOut<U>, i64)> {
138155
let slf = self;
139156
let routing_algorithm = move || async move {
140157
let instance_id = {
@@ -178,8 +195,13 @@ where
178195
}
179196
Ok(instance_id)
180197
};
181-
self.generate_with_fault_tolerance(routing_algorithm, request)
198+
match self
199+
.generate_with_fault_tolerance(routing_algorithm, request)
182200
.await
201+
{
202+
Ok((stream, _)) => Ok(stream),
203+
Err(err) => Err(err),
204+
}
183205
}
184206

185207
pub async fn r#static(&self, request: SingleIn<T>) -> anyhow::Result<ManyOut<U>> {
@@ -194,7 +216,7 @@ where
194216
&self,
195217
routing_algorithm: F,
196218
request: SingleIn<T>,
197-
) -> anyhow::Result<ManyOut<U>>
219+
) -> anyhow::Result<(ManyOut<U>, i64)>
198220
where
199221
F: FnOnce() -> R,
200222
R: Future<Output = anyhow::Result<i64>>,
@@ -212,7 +234,10 @@ where
212234
}
213235
}
214236
}
215-
stream
237+
match stream {
238+
Ok(stream) => Ok((stream, instance_id)),
239+
Err(err) => Err(err),
240+
}
216241
}
217242
}
218243

0 commit comments

Comments
 (0)