From 7579dee565dcfdcb62cdbc5f2d8930e05166c2eb Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 13:42:48 +0200 Subject: [PATCH 01/16] fix(ws server): fix flaky shutdown test --- server/src/tests/ws.rs | 8 ++++++- server/src/transport/ws.rs | 48 ++++++++++++++++++++++++++++---------- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index e77e7dffe0..9281ae0f6d 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -819,7 +819,13 @@ async fn drop_client_with_pending_calls_works() { init_logger(); let (handle, addr) = { - let server = ServerBuilder::default().build("127.0.0.1:0").with_default_timeout().await.unwrap().unwrap(); + let server = ServerBuilder::default() + .ping_interval(std::time::Duration::from_secs(60 * 60)) + .build("127.0.0.1:0") + .with_default_timeout() + .await + .unwrap() + .unwrap(); let mut module = RpcModule::new(()); diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 6fcd07a6fc..2e89a49024 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -230,7 +230,7 @@ pub(crate) async fn background_task( sender: Sender, mut receiver: Receiver, svc: ServiceData, -) -> Result<(), Error> { +) -> Result { let ServiceData { methods, max_request_body_size, @@ -250,7 +250,7 @@ pub(crate) async fn background_task( } = svc; let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); - let (mut conn_tx, conn_rx) = oneshot::channel(); + let (conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); let pending_calls = FuturesUnordered::new(); @@ -272,11 +272,11 @@ pub(crate) async fn background_task( stopped = stop; permit } - None => break Ok(()), + None => break Ok(Shutdown::ConnectionClosed), }; match try_recv(&mut receiver, &mut data, stopped).await { - Receive::Shutdown => break Ok(()), + Receive::Shutdown => break Ok(Shutdown::Stopped), Receive::Ok(stop) => { stopped = stop; } @@ -286,7 +286,7 @@ pub(crate) async fn background_task( match err { SokettoError::Closed => { tracing::debug!("WS transport: remote peer terminated the connection: {}", conn_id); - break Ok(()); + break Ok(Shutdown::ConnectionClosed); } SokettoError::MessageTooLarge { current, maximum } => { tracing::debug!( @@ -330,14 +330,33 @@ pub(crate) async fn background_task( // This is not strictly not needed because `tokio::spawn` will drive these the completion // but it's preferred that the `stop_handle.stopped()` should not return until all methods has been // executed and the connection has been closed. - tokio::select! { - // All pending calls executed. - _ = pending_calls.for_each(|_| async {}) => { - _ = conn_tx.send(()); + match result { + Ok(Shutdown::Stopped) | Err(_) => { + // Soketto doesn't have a way to signal when the connection is closed + // thus just throw the data and terminate the stream once the connection has + // been terminated. + // + // The receiver is not cancel-safe such that it used in stream to enforce that. + let disconnect_stream = futures_util::stream::unfold((receiver, data), |(mut receiver, mut data)| async { + if let Err(SokettoError::Closed) = receiver.receive(&mut data).await { + None + } else { + Some(((), (receiver, data))) + } + }); + + let pending = pending_calls.for_each(|_| async {}); + let disconnect = disconnect_stream.for_each(|_| async {}); + + tokio::select! { + _ = pending => (), + _ = disconnect => (), + } } - // The connection was closed, no point of waiting for the pending calls. - _ = conn_tx.closed() => {} - } + Ok(Shutdown::ConnectionClosed) => (), + }; + + _ = conn_tx.send(()); logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); drop(conn); @@ -558,3 +577,8 @@ async fn execute_unchecked_call(params: ExecuteCallParams) { } }; } + +pub(crate) enum Shutdown { + Stopped, + ConnectionClosed, +} From 01997431b39e44960154d6b389f35b7dad766ae8 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 13:44:47 +0200 Subject: [PATCH 02/16] Update server/src/transport/ws.rs --- server/src/transport/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 2e89a49024..47ea0ef413 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -333,7 +333,7 @@ pub(crate) async fn background_task( match result { Ok(Shutdown::Stopped) | Err(_) => { // Soketto doesn't have a way to signal when the connection is closed - // thus just throw the data and terminate the stream once the connection has + // thus just throw away the data and terminate the stream once the connection has // been terminated. // // The receiver is not cancel-safe such that it used in stream to enforce that. From 24d48d13d22af452c10bdef47e6d48e790ee4c6d Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 13:45:12 +0200 Subject: [PATCH 03/16] Update server/src/transport/ws.rs --- server/src/transport/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 47ea0ef413..005f0114de 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -336,7 +336,7 @@ pub(crate) async fn background_task( // thus just throw away the data and terminate the stream once the connection has // been terminated. // - // The receiver is not cancel-safe such that it used in stream to enforce that. + // The receiver is not cancel-safe such that it used in a stream to enforce that. let disconnect_stream = futures_util::stream::unfold((receiver, data), |(mut receiver, mut data)| async { if let Err(SokettoError::Closed) = receiver.receive(&mut data).await { None From e2a8e31927bf00a82c0f79708638d350e20c8476 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 16:38:55 +0200 Subject: [PATCH 04/16] fix interval stream bug --- server/src/tests/ws.rs | 9 +++++++-- server/src/transport/ws.rs | 11 +++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 9281ae0f6d..5a32d19be4 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -24,6 +24,8 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::time::Duration; + use crate::server::BatchRequestConfig; use crate::tests::helpers::{deser_call, init_logger, server_with_context}; use crate::types::SubscriptionId; @@ -816,11 +818,14 @@ async fn notif_is_ignored() { #[tokio::test] async fn drop_client_with_pending_calls_works() { + const MAX_TIMEOUT: Duration = Duration::from_secs(60); + init_logger(); let (handle, addr) = { let server = ServerBuilder::default() - .ping_interval(std::time::Duration::from_secs(60 * 60)) + // Make sure that the ping_interval doesn't force the connection to be closed + .ping_interval(MAX_TIMEOUT.checked_mul(10).unwrap()) .build("127.0.0.1:0") .with_default_timeout() .await @@ -853,5 +858,5 @@ async fn drop_client_with_pending_calls_works() { // Stop the server and ensure that the server doesn't wait for futures to complete // when the connection has already been closed. handle.stop().unwrap(); - assert!(handle.stopped().with_default_timeout().await.is_ok()); + assert!(handle.stopped().with_timeout(MAX_TIMEOUT).await.is_ok()); } diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 2e89a49024..d6ed3021ab 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -371,7 +371,11 @@ async fn send_task( stop: oneshot::Receiver<()>, ) { // Interval to send out continuously `pings`. - let ping_interval = IntervalStream::new(tokio::time::interval(ping_interval)); + let mut ping_interval = tokio::time::interval(ping_interval); + // This returns immediately so make sure it doesn't resolve before the ping_interval has been elapsed. + ping_interval.tick().await; + + let ping_interval = IntervalStream::new(ping_interval); let rx = ReceiverStream::new(rx); tokio::pin!(ping_interval, rx, stop); @@ -403,15 +407,18 @@ async fn send_task( } // Handle timer intervals. - Either::Right((Either::Left((_, stop)), next_rx)) => { + Either::Right((Either::Left((Some(_instant), stop)), next_rx)) => { if let Err(err) = send_ping(&mut ws_sender).await { tracing::debug!("WS transport error: send ping failed: {}", err); break; } + rx_item = next_rx; futs = future::select(ping_interval.next(), stop); } + Either::Right((Either::Left((None, _)), _)) => unreachable!("IntervalStream never terminates"), + // Server is stopped. Either::Right((Either::Right(_), _)) => { break; From 1e35bb6fc306db8d10ab8f23dad07eea0a769594 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 17:56:17 +0200 Subject: [PATCH 05/16] Update server/src/transport/ws.rs --- server/src/transport/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 76480c8d1c..8f46b99862 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -336,7 +336,7 @@ pub(crate) async fn background_task( // thus just throw away the data and terminate the stream once the connection has // been terminated. // - // The receiver is not cancel-safe such that it used in a stream to enforce that. + // The receiver is not cancel-safe such that it's used in a stream to enforce that. let disconnect_stream = futures_util::stream::unfold((receiver, data), |(mut receiver, mut data)| async { if let Err(SokettoError::Closed) = receiver.receive(&mut data).await { None From 98626f5a62c7fa99c295d5f81ea06520d0b10780 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 17:57:15 +0200 Subject: [PATCH 06/16] Update server/src/transport/ws.rs --- server/src/transport/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 8f46b99862..eb9a802730 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -345,7 +345,7 @@ pub(crate) async fn background_task( } }); - let pending = pending_calls.for_each(|_| async {}); + let graceful_shutdown = pending_calls.for_each(|_| async {}); let disconnect = disconnect_stream.for_each(|_| async {}); tokio::select! { From 3cd4c090778bffbbc14a31dce1d5a4db6f3ba3d4 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 17:57:39 +0200 Subject: [PATCH 07/16] Update server/src/transport/ws.rs --- server/src/transport/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index eb9a802730..455a73fac8 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -349,7 +349,7 @@ pub(crate) async fn background_task( let disconnect = disconnect_stream.for_each(|_| async {}); tokio::select! { - _ = pending => (), + _ = graceful_shutdown => (), _ = disconnect => (), } } From 91cde244f458d44e49a5f156f19ca67cfad6dd6f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 18:00:55 +0200 Subject: [PATCH 08/16] Update server/src/transport/ws.rs --- server/src/transport/ws.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 455a73fac8..ecacec98d2 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -348,6 +348,8 @@ pub(crate) async fn background_task( let graceful_shutdown = pending_calls.for_each(|_| async {}); let disconnect = disconnect_stream.for_each(|_| async {}); + // All pending calls has been finished or the connection closed. + // Then it's fine to terminate tokio::select! { _ = graceful_shutdown => (), _ = disconnect => (), From 51b3a84d3f2c70dc5dc2cc7b3f31f20a4fa08834 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 18:07:27 +0200 Subject: [PATCH 09/16] check conn_tx.closed as well --- server/src/transport/ws.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 76480c8d1c..44719f3134 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -250,7 +250,7 @@ pub(crate) async fn background_task( } = svc; let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); - let (conn_tx, conn_rx) = oneshot::channel(); + let (mut conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); let pending_calls = FuturesUnordered::new(); @@ -351,6 +351,7 @@ pub(crate) async fn background_task( tokio::select! { _ = pending => (), _ = disconnect => (), + _ = conn_tx.closed() => (), } } Ok(Shutdown::ConnectionClosed) => (), From b216f77c8c1463970bcc4987223be3adb25868df Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Apr 2023 20:20:49 +0200 Subject: [PATCH 10/16] add more tests + cleanup --- server/src/tests/ws.rs | 73 ++++++++++++++++++++----------- server/src/transport/ws.rs | 88 ++++++++++++++++++++++---------------- 2 files changed, 100 insertions(+), 61 deletions(-) diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 5a32d19be4..bf413ef0c9 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -817,34 +817,11 @@ async fn notif_is_ignored() { } #[tokio::test] -async fn drop_client_with_pending_calls_works() { +async fn close_client_with_pending_calls_works() { const MAX_TIMEOUT: Duration = Duration::from_secs(60); - init_logger(); - let (handle, addr) = { - let server = ServerBuilder::default() - // Make sure that the ping_interval doesn't force the connection to be closed - .ping_interval(MAX_TIMEOUT.checked_mul(10).unwrap()) - .build("127.0.0.1:0") - .with_default_timeout() - .await - .unwrap() - .unwrap(); - - let mut module = RpcModule::new(()); - - module - .register_async_method("infinite_call", |_, _| async move { - futures_util::future::pending::<()>().await; - "ok" - }) - .unwrap(); - let addr = server.local_addr().unwrap(); - - (server.start(module).unwrap(), addr) - }; - + let (handle, addr) = server_with_infinite_call(MAX_TIMEOUT.checked_mul(10).unwrap()).await; let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap(); for _ in 0..10 { @@ -860,3 +837,49 @@ async fn drop_client_with_pending_calls_works() { handle.stop().unwrap(); assert!(handle.stopped().with_timeout(MAX_TIMEOUT).await.is_ok()); } + +#[tokio::test] +async fn drop_client_with_pending_calls_works() { + const MAX_TIMEOUT: Duration = Duration::from_secs(60); + + init_logger(); + let (handle, addr) = server_with_infinite_call(MAX_TIMEOUT.checked_mul(10).unwrap()).await; + + { + let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap(); + + for _ in 0..10 { + let req = r#"{"jsonrpc":"2.0","method":"infinite_call","id":1}"#; + client.send(req).with_default_timeout().await.unwrap().unwrap(); + } + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } + + // Stop the server and ensure that the server doesn't wait for futures to complete + // when the connection has already been closed. + handle.stop().unwrap(); + assert!(handle.stopped().with_timeout(MAX_TIMEOUT).await.is_ok()); +} + +async fn server_with_infinite_call(timeout: Duration) -> (crate::ServerHandle, std::net::SocketAddr) { + let server = ServerBuilder::default() + // Make sure that the ping_interval doesn't force the connection to be closed + .ping_interval(timeout) + .build("127.0.0.1:0") + .with_default_timeout() + .await + .unwrap() + .unwrap(); + + let mut module = RpcModule::new(()); + + module + .register_async_method("infinite_call", |_, _| async move { + futures_util::future::pending::<()>().await; + "ok" + }) + .unwrap(); + let addr = server.local_addr().unwrap(); + + (server.start(module).unwrap(), addr) +} diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index a4f47ac64d..246ee90a3b 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -250,13 +250,13 @@ pub(crate) async fn background_task( } = svc; let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); - let (mut conn_tx, conn_rx) = oneshot::channel(); + let (conn_tx, conn_rx) = oneshot::channel(); let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); let bounded_subscriptions = BoundedSubscriptions::new(max_subscriptions_per_connection); let pending_calls = FuturesUnordered::new(); // Spawn another task that sends out the responses on the Websocket. - tokio::spawn(send_task(rx, sender, ping_interval, conn_rx)); + let send_task_handle = tokio::spawn(send_task(rx, sender, ping_interval, conn_rx)); // Buffer for incoming data. let mut data = Vec::with_capacity(100); @@ -326,40 +326,8 @@ pub(crate) async fn background_task( // Drive all running methods to completion. // **NOTE** Do not return early in this function. This `await` needs to run to guarantee // proper drop behaviour. - // - // This is not strictly not needed because `tokio::spawn` will drive these the completion - // but it's preferred that the `stop_handle.stopped()` should not return until all methods has been - // executed and the connection has been closed. - match result { - Ok(Shutdown::Stopped) | Err(_) => { - // Soketto doesn't have a way to signal when the connection is closed - // thus just throw away the data and terminate the stream once the connection has - // been terminated. - // - // The receiver is not cancel-safe such that it's used in a stream to enforce that. - let disconnect_stream = futures_util::stream::unfold((receiver, data), |(mut receiver, mut data)| async { - if let Err(SokettoError::Closed) = receiver.receive(&mut data).await { - None - } else { - Some(((), (receiver, data))) - } - }); - - let graceful_shutdown = pending_calls.for_each(|_| async {}); - let disconnect = disconnect_stream.for_each(|_| async {}); - - // All pending calls has been finished or the connection closed. - // Then it's fine to terminate - tokio::select! { - _ = graceful_shutdown => (), - _ = disconnect => (), - _ = conn_tx.closed() => (), - } - } - Ok(Shutdown::ConnectionClosed) => (), - }; - - _ = conn_tx.send(()); + graceful_shutdown(&result, pending_calls, receiver, data, conn_tx, send_task_handle).await; + tracing::trace!("ws conn task dropped"); logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); drop(conn); @@ -588,7 +556,55 @@ async fn execute_unchecked_call(params: ExecuteCallParams) { }; } +#[derive(Debug, Copy, Clone)] pub(crate) enum Shutdown { Stopped, ConnectionClosed, } + +/// Enforce a graceful shutdown. +/// +/// This will return once the connection has been terminated or all pending calls have been executed. +async fn graceful_shutdown( + result: &Result, + pending_calls: FuturesUnordered, + receiver: Receiver, + data: Vec, + mut conn_tx: oneshot::Sender<()>, + send_task_handle: tokio::task::JoinHandle<()>, +) { + match result { + Ok(Shutdown::ConnectionClosed) => (), + Ok(Shutdown::Stopped) | Err(_) => { + // Soketto doesn't have a way to signal when the connection is closed + // thus just throw away the data and terminate the stream once the connection has + // been terminated. + // + // The receiver is not cancel-safe such that it's used in a stream to enforce that. + let disconnect_stream = futures_util::stream::unfold((receiver, data), |(mut receiver, mut data)| async { + let rx = receiver.receive(&mut data).await; + if let Err(SokettoError::Closed) = rx { + None + } else { + Some(((), (receiver, data))) + } + }); + + let graceful_shutdown = pending_calls.for_each(|_| async {}); + let disconnect = disconnect_stream.for_each(|_| async {}); + + // All pending calls has been finished or the connection closed. + // Fine to terminate + tokio::select! { + _ = graceful_shutdown => {} + _ = disconnect => {} + _ = conn_tx.closed() => {} + } + } + }; + + // Send a message to close down the "send task". + _ = conn_tx.send(()); + // Ensure that send task has been closed. + _ = send_task_handle.await; +} From 1115b9a311ea2ae62c6045c81910537e9e2d572c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 27 Apr 2023 11:20:39 +0200 Subject: [PATCH 11/16] fix nit --- server/src/transport/ws.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 246ee90a3b..2f61828f11 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -260,7 +260,7 @@ pub(crate) async fn background_task( // Buffer for incoming data. let mut data = Vec::with_capacity(100); - let stopped = stop_handle.shutdown(); + let stopped = stop_handle.clone().shutdown(); tokio::pin!(stopped); @@ -327,10 +327,10 @@ pub(crate) async fn background_task( // **NOTE** Do not return early in this function. This `await` needs to run to guarantee // proper drop behaviour. graceful_shutdown(&result, pending_calls, receiver, data, conn_tx, send_task_handle).await; - tracing::trace!("ws conn task dropped"); logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); drop(conn); + drop(stop_handle); result } @@ -582,8 +582,7 @@ async fn graceful_shutdown( // // The receiver is not cancel-safe such that it's used in a stream to enforce that. let disconnect_stream = futures_util::stream::unfold((receiver, data), |(mut receiver, mut data)| async { - let rx = receiver.receive(&mut data).await; - if let Err(SokettoError::Closed) = rx { + if let Err(SokettoError::Closed) = receiver.receive(&mut data).await { None } else { Some(((), (receiver, data))) From 7b6b58e4fc19530c17bd55cb5f10532b6aeb6430 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 27 Apr 2023 11:27:27 +0200 Subject: [PATCH 12/16] Update server/src/tests/ws.rs --- server/src/tests/ws.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index bf413ef0c9..fcfbc53f48 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -852,6 +852,7 @@ async fn drop_client_with_pending_calls_works() { let req = r#"{"jsonrpc":"2.0","method":"infinite_call","id":1}"#; client.send(req).with_default_timeout().await.unwrap().unwrap(); } + // Assumption: the calls would be ACK:ed by server after 10 seconds (no way knowing that) tokio::time::sleep(std::time::Duration::from_secs(10)).await; } From 19106c1634eaa670168e6b3315bdbd9af6659537 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 27 Apr 2023 11:31:55 +0200 Subject: [PATCH 13/16] add comment in weird test --- server/src/tests/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index fcfbc53f48..640bd01df4 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -852,7 +852,7 @@ async fn drop_client_with_pending_calls_works() { let req = r#"{"jsonrpc":"2.0","method":"infinite_call","id":1}"#; client.send(req).with_default_timeout().await.unwrap().unwrap(); } - // Assumption: the calls would be ACK:ed by server after 10 seconds (no way knowing that) + // Assumption: the calls would be ACK:ed by server after 10 seconds (no way knowing that except parsing CLI output) tokio::time::sleep(std::time::Duration::from_secs(10)).await; } From d17676fb7b6971ebd203242c8d14c4d669e9d130 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 27 Apr 2023 12:16:25 +0200 Subject: [PATCH 14/16] rewrite tests without sleeps --- server/src/tests/ws.rs | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 640bd01df4..cf220cb375 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -819,9 +819,12 @@ async fn notif_is_ignored() { #[tokio::test] async fn close_client_with_pending_calls_works() { const MAX_TIMEOUT: Duration = Duration::from_secs(60); + const CONCURRENT_CALLS: usize = 10; init_logger(); - let (handle, addr) = server_with_infinite_call(MAX_TIMEOUT.checked_mul(10).unwrap()).await; + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let (handle, addr) = server_with_infinite_call(MAX_TIMEOUT.checked_mul(10).unwrap(), tx).await; let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap(); for _ in 0..10 { @@ -829,6 +832,11 @@ async fn close_client_with_pending_calls_works() { client.send(req).with_default_timeout().await.unwrap().unwrap(); } + // Assert that the server has received the calls. + for _ in 0..CONCURRENT_CALLS { + assert!(rx.recv().await.is_some()); + } + client.close().await.unwrap(); assert!(client.receive().await.is_err()); @@ -841,19 +849,23 @@ async fn close_client_with_pending_calls_works() { #[tokio::test] async fn drop_client_with_pending_calls_works() { const MAX_TIMEOUT: Duration = Duration::from_secs(60); - + const CONCURRENT_CALLS: usize = 10; init_logger(); - let (handle, addr) = server_with_infinite_call(MAX_TIMEOUT.checked_mul(10).unwrap()).await; + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (handle, addr) = server_with_infinite_call(MAX_TIMEOUT.checked_mul(10).unwrap(), tx).await; { let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap(); - for _ in 0..10 { + for _ in 0..CONCURRENT_CALLS { let req = r#"{"jsonrpc":"2.0","method":"infinite_call","id":1}"#; client.send(req).with_default_timeout().await.unwrap().unwrap(); } - // Assumption: the calls would be ACK:ed by server after 10 seconds (no way knowing that except parsing CLI output) - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + // Assert that the server has received the calls. + for _ in 0..CONCURRENT_CALLS { + assert!(rx.recv().await.is_some()); + } } // Stop the server and ensure that the server doesn't wait for futures to complete @@ -862,7 +874,10 @@ async fn drop_client_with_pending_calls_works() { assert!(handle.stopped().with_timeout(MAX_TIMEOUT).await.is_ok()); } -async fn server_with_infinite_call(timeout: Duration) -> (crate::ServerHandle, std::net::SocketAddr) { +async fn server_with_infinite_call( + timeout: Duration, + tx: tokio::sync::mpsc::UnboundedSender<()>, +) -> (crate::ServerHandle, std::net::SocketAddr) { let server = ServerBuilder::default() // Make sure that the ping_interval doesn't force the connection to be closed .ping_interval(timeout) @@ -872,10 +887,12 @@ async fn server_with_infinite_call(timeout: Duration) -> (crate::ServerHandle, s .unwrap() .unwrap(); - let mut module = RpcModule::new(()); + let mut module = RpcModule::new(tx); module - .register_async_method("infinite_call", |_, _| async move { + .register_async_method("infinite_call", |_, mut ctx| async move { + let tx = std::sync::Arc::make_mut(&mut ctx); + tx.send(()).unwrap(); futures_util::future::pending::<()>().await; "ok" }) From 47abd27884fe782ebbfdb69f7222a5d3238cf518 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 27 Apr 2023 12:51:39 +0200 Subject: [PATCH 15/16] remove needless result --- server/src/transport/ws.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 2f61828f11..63ef8faea2 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -226,11 +226,7 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData response } -pub(crate) async fn background_task( - sender: Sender, - mut receiver: Receiver, - svc: ServiceData, -) -> Result { +pub(crate) async fn background_task(sender: Sender, mut receiver: Receiver, svc: ServiceData) { let ServiceData { methods, max_request_body_size, @@ -300,7 +296,7 @@ pub(crate) async fn background_task( } err => { tracing::debug!("WS transport error: {}; terminate connection: {}", err, conn_id); - break Err(err.into()); + break Err(err); } }; } @@ -326,12 +322,11 @@ pub(crate) async fn background_task( // Drive all running methods to completion. // **NOTE** Do not return early in this function. This `await` needs to run to guarantee // proper drop behaviour. - graceful_shutdown(&result, pending_calls, receiver, data, conn_tx, send_task_handle).await; + graceful_shutdown(result, pending_calls, receiver, data, conn_tx, send_task_handle).await; logger.on_disconnect(remote_addr, TransportProtocol::WebSocket); drop(conn); drop(stop_handle); - result } /// A task that waits for new messages via the `rx channel` and sends them out on the `WebSocket`. @@ -566,7 +561,7 @@ pub(crate) enum Shutdown { /// /// This will return once the connection has been terminated or all pending calls have been executed. async fn graceful_shutdown( - result: &Result, + result: Result, pending_calls: FuturesUnordered, receiver: Receiver, data: Vec, @@ -574,7 +569,7 @@ async fn graceful_shutdown( send_task_handle: tokio::task::JoinHandle<()>, ) { match result { - Ok(Shutdown::ConnectionClosed) => (), + Ok(Shutdown::ConnectionClosed) | Err(SokettoError::Closed) => (), Ok(Shutdown::Stopped) | Err(_) => { // Soketto doesn't have a way to signal when the connection is closed // thus just throw away the data and terminate the stream once the connection has From d58a496ce10eda2be67ff4dc2f6db1bd55dd7f21 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 27 Apr 2023 13:03:20 +0200 Subject: [PATCH 16/16] fix compile warn --- server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/server.rs b/server/src/server.rs index eecde7abae..9f65b2d169 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -654,7 +654,7 @@ impl hyper::service::Service> for TowerSe ws_builder.set_max_message_size(data.max_request_body_size as usize); let (sender, receiver) = ws_builder.finish(); - let _ = ws::background_task::(sender, receiver, data).await; + ws::background_task::(sender, receiver, data).await; } .in_current_span(), );