Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit e0edb21

Browse files
committed
handle replica proxy stream error
1 parent d29081a commit e0edb21

File tree

3 files changed

+17
-5
lines changed

3 files changed

+17
-5
lines changed

sqld/src/rpc/replica_proxy.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,20 @@ impl Proxy for ReplicaProxyService {
3939
&self,
4040
req: tonic::Request<tonic::Streaming<ExecReq>>,
4141
) -> Result<tonic::Response<Self::StreamExecStream>, tonic::Status> {
42-
let (meta, ext, stream) = req.into_parts();
43-
let mut req = tonic::Request::from_parts(meta, ext, stream.map(|r| r.unwrap())); // TODO: handle mapping error
42+
let (meta, ext, mut stream) = req.into_parts();
43+
let stream = async_stream::stream! {
44+
while let Some(it) = stream.next().await {
45+
match it {
46+
Ok(it) => yield it,
47+
Err(e) => {
48+
// close the stream on error
49+
tracing::error!("error proxying stream request: {e}");
50+
break
51+
},
52+
}
53+
}
54+
};
55+
let mut req = tonic::Request::from_parts(meta, ext, stream);
4456
self.do_auth(&mut req)?;
4557
let mut client = self.client.clone();
4658
client.stream_exec(req).await

sqld/tests/cluster/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
33
use super::common;
44

5-
use insta::assert_snapshot;
65
use libsql::{Database, Value};
7-
use serde_json::json;
86
use sqld::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig};
97
use tempfile::tempdir;
108
use tokio::{task::JoinSet, time::Duration};

sqld/tests/namespaces/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::path::PathBuf;
44

55
use crate::common::http::Client;
66
use crate::common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector};
7+
use insta::assert_snapshot;
78
use libsql::{Database, Value};
89
use serde_json::json;
910
use sqld::config::{AdminApiConfig, RpcServerConfig, UserApiConfig};
@@ -44,7 +45,8 @@ fn make_primary(sim: &mut Sim, path: PathBuf) {
4445
#[test]
4546
fn create_namespace() {
4647
let mut sim = Builder::new().build();
47-
make_cluster(&mut sim, 0, false);
48+
let tmp = tempdir().unwrap();
49+
make_primary(&mut sim, tmp.path().into());
4850

4951
sim.client("client", async {
5052
let db =

0 commit comments

Comments
 (0)