From 63cd39234888ff97e327c52003b0e4a7eb88e41b Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 13 Dec 2021 08:09:00 +1000 Subject: [PATCH 01/17] Drop peer services if their cancel handles are dropped --- zebra-network/src/peer_set/set.rs | 8 ++++++++ zebra-network/src/peer_set/unready_service.rs | 17 ++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 9735be21a45..6f6641a28ed 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -433,6 +433,14 @@ where "service was canceled, dropping service" ); } + Poll::Ready(Some(Err((key, UnreadyError::CancelHandleDropped(_))))) => { + // Similarly, services with dropped cancel handes can have duplicates. + trace!( + ?key, + duplicate_connection = self.cancel_handles.contains_key(&key), + "cancel handle was dropped, dropping service" + ); + } // Unready -> Errored Poll::Ready(Some(Err((key, UnreadyError::Inner(error))))) => { diff --git a/zebra-network/src/peer_set/unready_service.rs b/zebra-network/src/peer_set/unready_service.rs index 4881dcd8446..cab94405e16 100644 --- a/zebra-network/src/peer_set/unready_service.rs +++ b/zebra-network/src/peer_set/unready_service.rs @@ -29,6 +29,7 @@ pub(super) struct UnreadyService { pub(super) enum Error { Inner(E), Canceled, + CancelHandleDropped(oneshot::Canceled), } impl, Req> Future for UnreadyService { @@ -37,12 +38,22 @@ impl, Req> Future for UnreadyService { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - if let Poll::Ready(Ok(CancelClientWork)) = this.cancel.poll(cx) { + if let Poll::Ready(oneshot_result) = this.cancel.poll(cx) { let key = this.key.take().expect("polled after ready"); - return Poll::Ready(Err((key, Error::Canceled))); + + // # Correctness + // + // Return an error if the service is explicitly canceled, + // or its cancel handle is dropped, implicitly cancelling it. + match oneshot_result { + Ok(CancelClientWork) => return Poll::Ready(Err((key, Error::Canceled))), + Err(canceled_error) => { + return Poll::Ready(Err((key, Error::CancelHandleDropped(canceled_error)))) + } + } } - // CORRECTNESS + // # Correctness // // The current task must be scheduled for wakeup every time we return // `Poll::Pending`. From 136ddcb07b083e9a7644773226c822e90bfc5792 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 13 Dec 2021 08:41:48 +1000 Subject: [PATCH 02/17] Exit the client task if the heartbeat task exits --- zebra-network/src/peer/client.rs | 29 +++++++++++++++++++++-------- zebra-network/src/peer/error.rs | 7 ++++++- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 93ac3b1879c..a702cf36f77 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -68,8 +68,6 @@ pub(super) struct ClientRequestReceiver { /// A message from the `peer::Client` to the `peer::Server`, /// after it has been received by the `peer::Server`. -/// -/// #[derive(Debug)] #[must_use = "tx.send() must be called before drop"] pub(super) struct InProgressClientRequest { @@ -234,7 +232,7 @@ impl Service for Client { Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // CORRECTNESS + // # Correctness // // The current task must be scheduled for wakeup every time we return // `Poll::Pending`. @@ -242,11 +240,26 @@ impl Service for Client { //`ready!` returns `Poll::Pending` when `server_tx` is unready, and // schedules this task for wakeup. // - // Since `shutdown_tx` is used for oneshot communication to the heartbeat - // task, it will never be `Pending`. - // - // TODO: should the Client exit if the heartbeat task exits and drops - // `shutdown_tx`? + // `shutdown_tx` is used for oneshot communication to the heartbeat task. + // If the heartbeat task exits, we want to drop the associated connection. + // So we use `poll_canceled` to schedule the client task for wakeup, + // if the heartbeat task exits and drops the cancel handle. + if let Poll::Ready(()) = self + .shutdown_tx + .as_mut() + .expect("only taken on drop") + .poll_canceled(cx) + { + let original_error = self + .error_slot + .try_update_error(PeerError::HeartbeatTaskExited.into()); + info!( + ?original_error, + latest_error = ?PeerError::HeartbeatTaskExited, + "client heartbeat task exited" + ); + } + if ready!(self.server_tx.poll_ready(cx)).is_err() { Poll::Ready(Err(self .error_slot diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 2f4743b1a39..99b29d126ef 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -33,6 +33,10 @@ pub enum PeerError { #[error("Internal connection dropped")] ConnectionDropped, + /// Zebra's internal heartbeat task exited. + #[error("Internal heartbeat task exited")] + HeartbeatTaskExited, + /// The remote peer did not respond to a [`peer::Client`] request in time. #[error("Client request timed out")] ClientRequestTimeout, @@ -62,6 +66,7 @@ impl PeerError { match self { PeerError::ConnectionClosed => "ConnectionClosed".into(), PeerError::ConnectionDropped => "ConnectionDropped".into(), + PeerError::HeartbeatTaskExited => "HeartbeatTaskExited".into(), PeerError::ClientRequestTimeout => "ClientRequestTimeout".into(), // TODO: add error kinds or summaries to `SerializationError` PeerError::Serialization(inner) => format!("Serialization({})", inner).into(), @@ -129,7 +134,7 @@ impl ErrorSlot { } } -/// Error used when the `ErrorSlot` already contains an error. +/// Error returned when the `ErrorSlot` already contains an error. #[derive(Clone, Debug)] pub struct AlreadyErrored { /// The original error in the error slot. From 9f24792af30e0a661816e41e3e300f2bf803f08c Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 11:27:41 +1000 Subject: [PATCH 03/17] Allow multiple errors on a connection without panicking --- zebra-network/src/peer/connection.rs | 24 ++++++------------------ zebra-network/src/peer/handshake.rs | 6 +++--- 2 files changed, 9 insertions(+), 21 deletions(-) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index f5e29ad154b..91006bfbcfc 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -698,11 +698,7 @@ where } } - /// Marks the peer as having failed with error `e`. - /// - /// # Panics - /// - /// If `self` has already failed with a previous error. + /// Marks the peer as having failed with error `e`, and starts connection cleanup. fn fail_with(&mut self, e: E) where E: Into, @@ -719,21 +715,13 @@ where // // Error slots use a threaded `std::sync::Mutex`, so accessing the slot // can block the async task's current thread. We only perform a single - // slot update per `Client`, and panic to enforce this constraint. - // - // This assertion typically fails due to these bugs: - // * we mark a connection as failed without using fail_with - // * we call fail_with without checking for a failed connection - // state - // * we continue processing messages after calling fail_with - // - // See the original bug #1510 and PR #1531, and the later bug #1599 - // and PR #1600. + // slot update per `Client`. We ignore subsequent error slot updates. let error_result = self.error_slot.try_update_error(e.clone()); if let Err(AlreadyErrored { original_error }) = error_result { - panic!( - "multiple failures for connection: \n\ + // TODO: downgrade log to debug? + info!( + "multiple errors on connection: \n\ failed connections should stop processing pending requests and responses, \n\ then close the connection. \n\ state: {:?} \n\ @@ -760,7 +748,7 @@ where // // Accessing the error slot locks a threaded std::sync::Mutex, which // can block the current async task thread. We briefly lock the mutex - // to get a reference to the error. + // to clone the error. let e = self.error_slot.try_get_error().unwrap(); let _ = tx.send(Err(e)); } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 17d7845477d..337b918aae3 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -789,12 +789,12 @@ where // in this block, see constants.rs for more. let (server_tx, server_rx) = mpsc::channel(0); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let slot = ErrorSlot::default(); + let error_slot = ErrorSlot::default(); let client = Client { shutdown_tx: Some(shutdown_tx), server_tx: server_tx.clone(), - error_slot: slot.clone(), + error_slot: error_slot.clone(), version: remote_version, }; @@ -921,7 +921,7 @@ where request_timer: None, svc: inbound_service, client_rx: server_rx.into(), - error_slot: slot, + error_slot, peer_tx, connection_tracker, metrics_label: connected_addr.get_transient_addr_label(), From 9892f340deb22b2c05d7c85aac55b50c5738982a Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 11:44:32 +1000 Subject: [PATCH 04/17] Explain why we don't need to send an error when the request is cancelled --- zebra-network/src/peer/connection.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 91006bfbcfc..45cd71e63c0 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -663,6 +663,7 @@ where }; } Either::Left((Either::Left(_), _peer_fut)) => { + // The client receiver was dropped, so we don't need to send on `tx` here. trace!(parent: &span, "client request was cancelled"); self.state = State::AwaitingRequest; } From 0a64cbf854388909c2b04d4f2373892f0630f592 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 11:59:23 +1000 Subject: [PATCH 05/17] Document connection fields --- zebra-network/src/peer/connection.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 45cd71e63c0..41f00008026 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -420,9 +420,9 @@ pub struct Connection { /// The `inbound` service, used to answer requests from this connection's peer. pub(super) svc: S, - /// A channel that receives network requests from the rest of Zebra. + /// A channel for requests that Zebra's internal services want to send to remote peers. /// - /// This channel produces `InProgressClientRequest`s. + /// This channel accepts [`Request`]s, and produces [`InProgressClientRequest`]s. pub(super) client_rx: ClientRequestReceiver, /// A slot for an error shared between the Connection and the Client that uses it. @@ -430,7 +430,11 @@ pub struct Connection { /// `None` unless the connection or client have errored. pub(super) error_slot: ErrorSlot, - /// A channel for sending requests to the connected peer. + /// A channel for sending Zcash messages to the connected peer. + /// + /// This channel accepts [`Message`]s. + /// + /// The corresponding peer message receiver is passed to [`Connection::run`]. pub(super) peer_tx: Tx, /// A connection tracker that reduces the open connection count when dropped. @@ -442,8 +446,7 @@ pub struct Connection { /// /// If this connection tracker or `Connection`s are leaked, /// the number of active connections will appear higher than it actually is. - /// - /// Eventually, Zebra could stop making connections entirely. + /// If enough connections leak, Zebra will stop making new connections. #[allow(dead_code)] pub(super) connection_tracker: ConnectionTracker, @@ -461,6 +464,9 @@ where Tx: Sink + Unpin, { /// Consume this `Connection` to form a spawnable future containing its event loop. + /// + /// `peer_rx` is a channel for receiving Zcash [`Message`]s from the connected peer. + /// The corresponding peer message receiver is [`Connection.peer_tx`]. pub async fn run(mut self, mut peer_rx: Rx) where Rx: Stream> + Unpin, From 3cf8e0f88c3ad4c2ac42ed1d7221eca93d122ab3 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 12:16:43 +1000 Subject: [PATCH 06/17] Make sure connections don't hang due to spurious timer or channel usage --- zebra-network/src/peer/connection.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 41f00008026..7ff4050be98 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -678,6 +678,8 @@ where // We've failed, but we need to flush all pending client // requests before we can return and complete the future. State::Failed => { + self.client_rx.close(); + match self.client_rx.next().await { Some(InProgressClientRequest { tx, span, .. }) => { trace!( @@ -745,6 +747,8 @@ where // we need to deal with it first if it exists. self.client_rx.close(); let old_state = std::mem::replace(&mut self.state, State::Failed); + self.request_timer = None; + self.update_state_metrics(None); if let State::AwaitingResponse { tx, .. } = old_state { From 1c2329e2c75a184673e7429b95e7e619a38fb35d Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 15:25:13 +1000 Subject: [PATCH 07/17] Actually shut down the client when the heartbeat task exits --- zebra-network/src/peer/client.rs | 41 ++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index a702cf36f77..ba2cf25073a 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -11,9 +11,12 @@ use futures::{ }; use tower::Service; -use crate::protocol::{ - external::types::Version, - internal::{Request, Response}, +use crate::{ + peer::error::AlreadyErrored, + protocol::{ + external::types::Version, + internal::{Request, Response}, + }, }; use super::{ErrorSlot, PeerError, SharedPeerError}; @@ -237,30 +240,42 @@ impl Service for Client { // The current task must be scheduled for wakeup every time we return // `Poll::Pending`. // + // `poll_canceled` schedules the client task for wakeup + // if the heartbeat task exits and drops the cancel handle. + // //`ready!` returns `Poll::Pending` when `server_tx` is unready, and // schedules this task for wakeup. - // - // `shutdown_tx` is used for oneshot communication to the heartbeat task. - // If the heartbeat task exits, we want to drop the associated connection. - // So we use `poll_canceled` to schedule the client task for wakeup, - // if the heartbeat task exits and drops the cancel handle. + + // Check if this connection's heartbeat task has exited. if let Poll::Ready(()) = self .shutdown_tx .as_mut() .expect("only taken on drop") .poll_canceled(cx) { - let original_error = self - .error_slot - .try_update_error(PeerError::HeartbeatTaskExited.into()); - info!( + let heartbeat_error: SharedPeerError = PeerError::HeartbeatTaskExited.into(); + + let original_error = self.error_slot.try_update_error(heartbeat_error.clone()); + debug!( ?original_error, - latest_error = ?PeerError::HeartbeatTaskExited, + latest_error = ?heartbeat_error, "client heartbeat task exited" ); + + // Prevent any senders from sending more messages to this peer. + self.server_tx.close_channel(); + + if let Err(AlreadyErrored { original_error }) = original_error { + return Poll::Ready(Err(original_error)); + } else { + return Poll::Ready(Err(heartbeat_error)); + } } + // Now check if there is space in the channel to send a request. if ready!(self.server_tx.poll_ready(cx)).is_err() { + // TODO: should we shut down the heartbeat task as soon as the connection fails? + Poll::Ready(Err(self .error_slot .try_get_error() From ca2ae016b6b13057d6de150742e424b5ee5920a9 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 16:11:12 +1000 Subject: [PATCH 08/17] Add tests for unready services --- zebra-network/src/peer_set/unready_service.rs | 6 +- .../src/peer_set/unready_service/tests.rs | 3 + .../peer_set/unready_service/tests/vectors.rs | 86 +++++++++++++++++++ 3 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 zebra-network/src/peer_set/unready_service/tests.rs create mode 100644 zebra-network/src/peer_set/unready_service/tests/vectors.rs diff --git a/zebra-network/src/peer_set/unready_service.rs b/zebra-network/src/peer_set/unready_service.rs index cab94405e16..97fc8d7e0b9 100644 --- a/zebra-network/src/peer_set/unready_service.rs +++ b/zebra-network/src/peer_set/unready_service.rs @@ -12,6 +12,9 @@ use tower::Service; use crate::peer_set::set::CancelClientWork; +#[cfg(test)] +mod tests; + /// A Future that becomes satisfied when an `S`-typed service is ready. /// /// May fail due to cancellation, i.e. if the service is removed from discovery. @@ -26,6 +29,7 @@ pub(super) struct UnreadyService { pub(super) _req: PhantomData, } +#[derive(Debug, Eq, PartialEq)] pub(super) enum Error { Inner(E), Canceled, @@ -65,7 +69,7 @@ impl, Req> Future for UnreadyService { let res = ready!(this .service .as_mut() - .expect("poll after ready") + .expect("polled after ready") .poll_ready(cx)); let key = this.key.take().expect("polled after ready"); diff --git a/zebra-network/src/peer_set/unready_service/tests.rs b/zebra-network/src/peer_set/unready_service/tests.rs new file mode 100644 index 00000000000..19e513250e2 --- /dev/null +++ b/zebra-network/src/peer_set/unready_service/tests.rs @@ -0,0 +1,3 @@ +//! Tests for unready services. + +mod vectors; diff --git a/zebra-network/src/peer_set/unready_service/tests/vectors.rs b/zebra-network/src/peer_set/unready_service/tests/vectors.rs new file mode 100644 index 00000000000..7d89882d779 --- /dev/null +++ b/zebra-network/src/peer_set/unready_service/tests/vectors.rs @@ -0,0 +1,86 @@ +//! Fixed test vectors for unready services. +//! +//! TODO: test that inner service errors are handled correctly (#3204) + +use std::marker::PhantomData; + +use futures::channel::oneshot; + +use zebra_test::mock_service::MockService; + +use crate::{ + peer_set::{ + set::CancelClientWork, + unready_service::{Error, UnreadyService}, + }, + Request, Response, SharedPeerError, +}; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +struct MockKey; + +#[tokio::test] +async fn unready_service_result_ok() { + zebra_test::init(); + + let (_cancel_sender, cancel) = oneshot::channel(); + + let mock_client: MockService> = + MockService::build().for_unit_tests(); + let unready_service = UnreadyService { + key: Some(MockKey), + cancel, + service: Some(mock_client), + _req: PhantomData::default(), + }; + + let result = unready_service.await; + assert!(matches!(result, Ok((MockKey, MockService { .. })))); +} + +#[tokio::test] +async fn unready_service_result_canceled() { + zebra_test::init(); + + let (cancel_sender, cancel) = oneshot::channel(); + + let mock_client: MockService> = + MockService::build().for_unit_tests(); + let unready_service = UnreadyService { + key: Some(MockKey), + cancel, + service: Some(mock_client), + _req: PhantomData::default(), + }; + + cancel_sender + .send(CancelClientWork) + .expect("unexpected oneshot send failure in tests"); + + let result = unready_service.await; + assert!(matches!(result, Err((MockKey, Error::Canceled)))); +} + +#[tokio::test] +async fn unready_service_result_cancel_handle_dropped() { + zebra_test::init(); + + let (cancel_sender, cancel) = oneshot::channel(); + + let mock_client: MockService> = + MockService::build().for_unit_tests(); + let unready_service = UnreadyService { + key: Some(MockKey), + cancel, + service: Some(mock_client), + _req: PhantomData::default(), + }; + + std::mem::drop(cancel_sender); + + let result = unready_service.await; + assert!(matches!( + result, + Err((MockKey, Error::CancelHandleDropped(_))) + )); +} From 1f7fa35e37c9c047592942db57840a61e744c193 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 16:22:08 +1000 Subject: [PATCH 09/17] Close all senders to peer when `Client` is dropped --- zebra-network/src/peer/client.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index ba2cf25073a..95a0de412dc 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -325,6 +325,9 @@ impl Service for Client { impl Drop for Client { fn drop(&mut self) { + // Prevent any senders from sending more messages to this peer. + self.server_tx.close_channel(); + let _ = self .shutdown_tx .take() From 6b46e83ac491b0d7cc419ec3976686c7c5f28abc Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 16:54:53 +1000 Subject: [PATCH 10/17] Return a Client error if the error slot has an error --- zebra-network/src/peer/client.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 95a0de412dc..c5abb165ba7 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -280,6 +280,11 @@ impl Service for Client { .error_slot .try_get_error() .expect("failed servers must set their error slot"))) + } else if let Some(error) = self.error_slot.try_get_error() { + // Prevent any senders from sending more messages to this peer. + self.server_tx.close_channel(); + + Poll::Ready(Err(error)) } else { Poll::Ready(Ok(())) } From d0b010303895fd19b20aa7f69f43d71805ed2d7b Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 16:56:09 +1000 Subject: [PATCH 11/17] Add tests for peer Client service errors --- zebra-network/src/peer/client.rs | 3 + zebra-network/src/peer/client/tests.rs | 3 + .../src/peer/client/tests/vectors.rs | 158 ++++++++++++++++++ 3 files changed, 164 insertions(+) create mode 100644 zebra-network/src/peer/client/tests.rs create mode 100644 zebra-network/src/peer/client/tests/vectors.rs diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index c5abb165ba7..59b839df40e 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -21,6 +21,9 @@ use crate::{ use super::{ErrorSlot, PeerError, SharedPeerError}; +#[cfg(test)] +mod tests; + /// The "client" duplex half of a peer connection. pub struct Client { /// Used to shut down the corresponding heartbeat. diff --git a/zebra-network/src/peer/client/tests.rs b/zebra-network/src/peer/client/tests.rs new file mode 100644 index 00000000000..78babc73190 --- /dev/null +++ b/zebra-network/src/peer/client/tests.rs @@ -0,0 +1,3 @@ +//! Tests for the [`Client`] part of peer connections + +mod vectors; diff --git a/zebra-network/src/peer/client/tests/vectors.rs b/zebra-network/src/peer/client/tests/vectors.rs new file mode 100644 index 00000000000..e319cf87e91 --- /dev/null +++ b/zebra-network/src/peer/client/tests/vectors.rs @@ -0,0 +1,158 @@ +//! Fixed peer [`Client`] test vectors. + +use futures::{ + channel::{mpsc, oneshot}, + FutureExt, +}; +use tower::ServiceExt; + +use crate::{ + peer::{Client, ErrorSlot}, + protocol::external::types::Version, + PeerError, +}; + +#[tokio::test] +async fn client_service_ready_ok() { + zebra_test::init(); + + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let (server_tx, _server_rx) = mpsc::channel(1); + + let shared_error_slot = ErrorSlot::default(); + + let mut client = Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot: shared_error_slot, + version: Version(0), + }; + + let result = client.ready().now_or_never(); + assert!(matches!(result, Some(Ok(Client { .. })))); +} + +#[tokio::test] +async fn client_service_ready_heartbeat_exit() { + zebra_test::init(); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let (server_tx, _server_rx) = mpsc::channel(1); + + let shared_error_slot = ErrorSlot::default(); + + let mut client = Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot: shared_error_slot.clone(), + version: Version(0), + }; + + shared_error_slot + .try_update_error(PeerError::HeartbeatTaskExited.into()) + .expect("unexpected earlier error in tests"); + std::mem::drop(shutdown_rx); + + let result = client.ready().now_or_never(); + assert!(matches!(result, Some(Err(_)))); +} + +#[tokio::test] +async fn client_service_ready_request_drop() { + zebra_test::init(); + + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let (server_tx, server_rx) = mpsc::channel(1); + + let shared_error_slot = ErrorSlot::default(); + + let mut client = Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot: shared_error_slot.clone(), + version: Version(0), + }; + + shared_error_slot + .try_update_error(PeerError::ConnectionDropped.into()) + .expect("unexpected earlier error in tests"); + std::mem::drop(server_rx); + + let result = client.ready().now_or_never(); + assert!(matches!(result, Some(Err(_)))); +} + +#[tokio::test] +async fn client_service_ready_request_close() { + zebra_test::init(); + + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let (server_tx, mut server_rx) = mpsc::channel(1); + + let shared_error_slot = ErrorSlot::default(); + + let mut client = Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot: shared_error_slot.clone(), + version: Version(0), + }; + + shared_error_slot + .try_update_error(PeerError::ConnectionClosed.into()) + .expect("unexpected earlier error in tests"); + server_rx.close(); + + let result = client.ready().now_or_never(); + assert!(matches!(result, Some(Err(_)))); +} + +#[tokio::test] +async fn client_service_ready_error_in_slot() { + zebra_test::init(); + + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let (server_tx, _server_rx) = mpsc::channel(1); + + let shared_error_slot = ErrorSlot::default(); + + let mut client = Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot: shared_error_slot.clone(), + version: Version(0), + }; + + shared_error_slot + .try_update_error(PeerError::Overloaded.into()) + .expect("unexpected earlier error in tests"); + + let result = client.ready().now_or_never(); + assert!(matches!(result, Some(Err(_)))); +} + +#[tokio::test] +async fn client_service_ready_multiple_errors() { + zebra_test::init(); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let (server_tx, mut server_rx) = mpsc::channel(1); + + let shared_error_slot = ErrorSlot::default(); + + let mut client = Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot: shared_error_slot.clone(), + version: Version(0), + }; + + shared_error_slot + .try_update_error(PeerError::DuplicateHandshake.into()) + .expect("unexpected earlier error in tests"); + std::mem::drop(shutdown_rx); + server_rx.close(); + + let result = client.ready().now_or_never(); + assert!(matches!(result, Some(Err(_)))); +} From b6b0f7f400c2ea15140a89c66faf41c8379403a0 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 17:20:28 +1000 Subject: [PATCH 12/17] Make Client drop and error cleanups consistent --- zebra-network/src/peer/client.rs | 94 +++++++++++++++++++------------- 1 file changed, 57 insertions(+), 37 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 59b839df40e..4130cdbb79c 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -231,25 +231,9 @@ impl Drop for MustUseOneshotSender { } } -impl Service for Client { - type Response = Response; - type Error = SharedPeerError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // # Correctness - // - // The current task must be scheduled for wakeup every time we return - // `Poll::Pending`. - // - // `poll_canceled` schedules the client task for wakeup - // if the heartbeat task exits and drops the cancel handle. - // - //`ready!` returns `Poll::Pending` when `server_tx` is unready, and - // schedules this task for wakeup. - - // Check if this connection's heartbeat task has exited. +impl Client { + /// Check if this connection's heartbeat task has exited. + fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> { if let Poll::Ready(()) = self .shutdown_tx .as_mut() @@ -265,27 +249,70 @@ impl Service for Client { "client heartbeat task exited" ); - // Prevent any senders from sending more messages to this peer. - self.server_tx.close_channel(); - if let Err(AlreadyErrored { original_error }) = original_error { - return Poll::Ready(Err(original_error)); + Err(original_error) } else { - return Poll::Ready(Err(heartbeat_error)); + Err(heartbeat_error) } + } else { + Ok(()) } + } - // Now check if there is space in the channel to send a request. + /// Poll for space in the shared request sender channel. + fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll> { if ready!(self.server_tx.poll_ready(cx)).is_err() { - // TODO: should we shut down the heartbeat task as soon as the connection fails? - Poll::Ready(Err(self .error_slot .try_get_error() .expect("failed servers must set their error slot"))) } else if let Some(error) = self.error_slot.try_get_error() { - // Prevent any senders from sending more messages to this peer. - self.server_tx.close_channel(); + Poll::Ready(Err(error)) + } else { + Poll::Ready(Ok(())) + } + } + + /// Shut down the resources held by the client half of this peer connection. + /// + /// Stops further requests to the remote peer, and stops the heartbeat task. + fn shutdown(&mut self) { + // Prevent any senders from sending more messages to this peer. + self.server_tx.close_channel(); + + // Stop the heartbeat task + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(CancelHeartbeatTask); + } + } +} + +impl Service for Client { + type Response = Response; + type Error = SharedPeerError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // # Correctness + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // `poll_canceled` schedules the client task for wakeup + // if the heartbeat task exits and drops the cancel handle. + // + //`ready!` returns `Poll::Pending` when `server_tx` is unready, and + // schedules this task for wakeup. + + let mut result = self.check_heartbeat(cx); + + if result.is_ok() { + result = ready!(self.poll_request(cx)); + } + + if let Err(error) = result { + self.shutdown(); Poll::Ready(Err(error)) } else { @@ -333,13 +360,6 @@ impl Service for Client { impl Drop for Client { fn drop(&mut self) { - // Prevent any senders from sending more messages to this peer. - self.server_tx.close_channel(); - - let _ = self - .shutdown_tx - .take() - .expect("must not drop twice") - .send(CancelHeartbeatTask); + self.shutdown(); } } From 988a8f4393a55640da54400ca18e2ddd0c34e4d3 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 17:25:30 +1000 Subject: [PATCH 13/17] Use a ClientDropped error when the Client struct is dropped --- zebra-network/src/peer/client.rs | 11 ++++++++++- zebra-network/src/peer/error.rs | 5 +++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 4130cdbb79c..181f184a1f3 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -240,8 +240,8 @@ impl Client { .expect("only taken on drop") .poll_canceled(cx) { + // Make sure there is an error in the slot let heartbeat_error: SharedPeerError = PeerError::HeartbeatTaskExited.into(); - let original_error = self.error_slot.try_update_error(heartbeat_error.clone()); debug!( ?original_error, @@ -360,6 +360,15 @@ impl Service for Client { impl Drop for Client { fn drop(&mut self) { + // Make sure there is an error in the slot + let drop_error: SharedPeerError = PeerError::ClientDropped.into(); + let original_error = self.error_slot.try_update_error(drop_error.clone()); + debug!( + ?original_error, + latest_error = ?drop_error, + "client struct dropped" + ); + self.shutdown(); } } diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 99b29d126ef..21053660e45 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -33,6 +33,10 @@ pub enum PeerError { #[error("Internal connection dropped")] ConnectionDropped, + /// Zebra dropped the [`Client`]. + #[error("Internal client dropped")] + ClientDropped, + /// Zebra's internal heartbeat task exited. #[error("Internal heartbeat task exited")] HeartbeatTaskExited, @@ -66,6 +70,7 @@ impl PeerError { match self { PeerError::ConnectionClosed => "ConnectionClosed".into(), PeerError::ConnectionDropped => "ConnectionDropped".into(), + PeerError::ClientDropped => "ClientDropped".into(), PeerError::HeartbeatTaskExited => "HeartbeatTaskExited".into(), PeerError::ClientRequestTimeout => "ClientRequestTimeout".into(), // TODO: add error kinds or summaries to `SerializationError` From 0240fca2ef9a4e200780acfe4343a36ec14a3de1 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Dec 2021 18:47:28 +1000 Subject: [PATCH 14/17] Test channel and error state in peer Client tests --- .../src/peer/client/tests/vectors.rs | 93 +++++++++++++++++-- 1 file changed, 84 insertions(+), 9 deletions(-) diff --git a/zebra-network/src/peer/client/tests/vectors.rs b/zebra-network/src/peer/client/tests/vectors.rs index e319cf87e91..3d18d790e71 100644 --- a/zebra-network/src/peer/client/tests/vectors.rs +++ b/zebra-network/src/peer/client/tests/vectors.rs @@ -7,7 +7,7 @@ use futures::{ use tower::ServiceExt; use crate::{ - peer::{Client, ErrorSlot}, + peer::{CancelHeartbeatTask, Client, ErrorSlot}, protocol::external::types::Version, PeerError, }; @@ -16,20 +16,30 @@ use crate::{ async fn client_service_ready_ok() { zebra_test::init(); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let (server_tx, _server_rx) = mpsc::channel(1); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + let (server_tx, mut server_rx) = mpsc::channel(1); let shared_error_slot = ErrorSlot::default(); let mut client = Client { shutdown_tx: Some(shutdown_tx), server_tx, - error_slot: shared_error_slot, + error_slot: shared_error_slot.clone(), version: Version(0), }; let result = client.ready().now_or_never(); assert!(matches!(result, Some(Ok(Client { .. })))); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, None)); + + let result = shutdown_rx.try_recv(); + assert!(matches!(result, Ok(None))); + + // Unlike oneshots, open futures::mpsc channels return Err when empty + let result = server_rx.try_next(); + assert!(matches!(result, Err(_))); } #[tokio::test] @@ -37,7 +47,7 @@ async fn client_service_ready_heartbeat_exit() { zebra_test::init(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let (server_tx, _server_rx) = mpsc::channel(1); + let (server_tx, mut server_rx) = mpsc::channel(1); let shared_error_slot = ErrorSlot::default(); @@ -55,13 +65,20 @@ async fn client_service_ready_heartbeat_exit() { let result = client.ready().now_or_never(); assert!(matches!(result, Some(Err(_)))); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + // Unlike oneshots, closed futures::mpsc channels return None + let result = server_rx.try_next(); + assert!(matches!(result, Ok(None))); } #[tokio::test] async fn client_service_ready_request_drop() { zebra_test::init(); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let (server_tx, server_rx) = mpsc::channel(1); let shared_error_slot = ErrorSlot::default(); @@ -80,13 +97,19 @@ async fn client_service_ready_request_drop() { let result = client.ready().now_or_never(); assert!(matches!(result, Some(Err(_)))); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + let result = shutdown_rx.try_recv(); + assert!(matches!(result, Ok(Some(CancelHeartbeatTask)))); } #[tokio::test] async fn client_service_ready_request_close() { zebra_test::init(); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let (server_tx, mut server_rx) = mpsc::channel(1); let shared_error_slot = ErrorSlot::default(); @@ -105,14 +128,23 @@ async fn client_service_ready_request_close() { let result = client.ready().now_or_never(); assert!(matches!(result, Some(Err(_)))); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + let result = shutdown_rx.try_recv(); + assert!(matches!(result, Ok(Some(CancelHeartbeatTask)))); + + let result = server_rx.try_next(); + assert!(matches!(result, Ok(None))); } #[tokio::test] async fn client_service_ready_error_in_slot() { zebra_test::init(); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let (server_tx, _server_rx) = mpsc::channel(1); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + let (server_tx, mut server_rx) = mpsc::channel(1); let shared_error_slot = ErrorSlot::default(); @@ -129,6 +161,15 @@ async fn client_service_ready_error_in_slot() { let result = client.ready().now_or_never(); assert!(matches!(result, Some(Err(_)))); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + let result = shutdown_rx.try_recv(); + assert!(matches!(result, Ok(Some(CancelHeartbeatTask)))); + + let result = server_rx.try_next(); + assert!(matches!(result, Ok(None))); } #[tokio::test] @@ -155,4 +196,38 @@ async fn client_service_ready_multiple_errors() { let result = client.ready().now_or_never(); assert!(matches!(result, Some(Err(_)))); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + let result = server_rx.try_next(); + assert!(matches!(result, Ok(None))); +} + +#[tokio::test] +async fn client_service_drop_cleanup() { + zebra_test::init(); + + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + let (server_tx, mut server_rx) = mpsc::channel(1); + + let shared_error_slot = ErrorSlot::default(); + + let client = Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot: shared_error_slot.clone(), + version: Version(0), + }; + + std::mem::drop(client); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + let result = shutdown_rx.try_recv(); + assert!(matches!(result, Ok(Some(CancelHeartbeatTask)))); + + let result = server_rx.try_next(); + assert!(matches!(result, Ok(None))); } From 2b8403a766c2634b4ed77b1c82b78c41f3ba323d Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 15 Dec 2021 12:38:44 +1000 Subject: [PATCH 15/17] Move all Connection cleanup into a single method --- zebra-network/src/peer/client.rs | 21 ++- zebra-network/src/peer/connection.rs | 193 ++++++++++++++------------- 2 files changed, 123 insertions(+), 91 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 181f184a1f3..6e55775b983 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -133,10 +133,29 @@ impl From for InProgressClientRequest { } impl ClientRequestReceiver { - /// Forwards to `inner.close()` + /// Forwards to `inner.close()`. pub fn close(&mut self) { self.inner.close() } + + /// Closes `inner`, then gets the next pending [`Request`]. + /// + /// Closing the channel ensures that: + /// - the request stream terminates, and + /// - task notifications are not required. + pub fn close_and_flush_next(&mut self) -> Option { + self.inner.close(); + + // # Correctness + // + // The request stream terminates, because the sender is closed, + // and the channel has a limited capacity. + // Task notifications are not required, because the sender is closed. + self.inner + .try_next() + .expect("channel is closed") + .map(Into::into) + } } impl Stream for ClientRequestReceiver { diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 7ff4050be98..525fb33a62e 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -490,6 +490,8 @@ where // // If there is a pending request, we wait only on an incoming peer message, and // check whether it can be interpreted as a response to the pending request. + // + // TODO: turn this comment into a module-level comment, after splitting the module. loop { self.update_state_metrics(None); @@ -522,7 +524,11 @@ where } Either::Right((None, _)) => { trace!("client_rx closed, ending connection"); - return; + + // There are no requests to be flushed, + // but we need to set an error and update metrics. + self.shutdown(PeerError::ClientDropped); + break; } Either::Right((Some(req), _)) => { let span = req.span.clone(); @@ -652,6 +658,8 @@ where tx, .. } => { + // We replaced the original state, which means `fail_with` won't see it. + // So we do the state request cleanup manually. let e = SharedPeerError::from(e); let _ = tx.send(Err(e.clone())); self.fail_with(e); @@ -675,94 +683,29 @@ where } } } - // We've failed, but we need to flush all pending client - // requests before we can return and complete the future. - State::Failed => { - self.client_rx.close(); - - match self.client_rx.next().await { - Some(InProgressClientRequest { tx, span, .. }) => { - trace!( - parent: &span, - "sending an error response to a pending request on a failed connection" - ); - // Correctness - // - // Error slots use a threaded `std::sync::Mutex`, so - // accessing the slot can block the async task's - // current thread. So we only hold the lock for long - // enough to get a reference to the error. - let e = self - .error_slot - .try_get_error() - .expect("cannot enter failed state without setting error slot"); - let _ = tx.send(Err(e)); - // Continue until we've errored all queued reqs - continue; - } - None => return, - } - } + // This connection has failed: stop the event loop, and complete the future. + State::Failed => break, } } + + assert!( + self.error_slot.try_get_error().is_some(), + "closing connections must call fail_with() or shutdown() to set the error slot" + ); } - /// Marks the peer as having failed with error `e`, and starts connection cleanup. - fn fail_with(&mut self, e: E) - where - E: Into, - { - let e = e.into(); - debug!(%e, - connection_state = ?self.state, + /// Fail this connection. + /// + /// If the connection has errored already, re-use the original error. + /// Otherwise, fail the connection with `error`. + fn fail_with(&mut self, error: impl Into) { + let error = error.into(); + + debug!(%error, client_receiver = ?self.client_rx, "failing peer service with error"); - // Update the shared error slot - // - // # Correctness - // - // Error slots use a threaded `std::sync::Mutex`, so accessing the slot - // can block the async task's current thread. We only perform a single - // slot update per `Client`. We ignore subsequent error slot updates. - let error_result = self.error_slot.try_update_error(e.clone()); - - if let Err(AlreadyErrored { original_error }) = error_result { - // TODO: downgrade log to debug? - info!( - "multiple errors on connection: \n\ - failed connections should stop processing pending requests and responses, \n\ - then close the connection. \n\ - state: {:?} \n\ - client receiver: {:?} \n\ - original error: {:?} \n\ - new error: {:?}", - self.state, self.client_rx, original_error, e, - ); - } - - // We want to close the client channel and set State::Failed so - // that we can flush any pending client requests. However, we may have - // an outstanding client request in State::AwaitingResponse, so - // we need to deal with it first if it exists. - self.client_rx.close(); - let old_state = std::mem::replace(&mut self.state, State::Failed); - self.request_timer = None; - - self.update_state_metrics(None); - - if let State::AwaitingResponse { tx, .. } = old_state { - // # Correctness - // - // We know the slot has Some(e) because we just set it above, - // and the error slot is never unset. - // - // Accessing the error slot locks a threaded std::sync::Mutex, which - // can block the current async task thread. We briefly lock the mutex - // to clone the error. - let e = self.error_slot.try_get_error().unwrap(); - let _ = tx.send(Err(e)); - } + self.shutdown(error); } /// Handle an incoming client request, possibly generating outgoing messages to the @@ -1272,20 +1215,90 @@ impl Connection { ); } } -} -impl Drop for Connection { - fn drop(&mut self) { + /// Marks the peer as having failed with `error`, and performs connection cleanup. + /// + /// If the connection has errored already, re-use the original error. + /// Otherwise, fail the connection with `error`. + fn shutdown(&mut self, error: impl Into) { + let mut error = error.into(); + + // Close channels first, so other tasks can start shutting down. + // + // TODO: close peer_tx and peer_rx, after: + // - adapting them using a struct with a Stream impl, rather than closures + // - making the struct forward `close` to the inner channel + self.client_rx.close(); + + // Update the shared error slot + // + // # Correctness + // + // Error slots use a threaded `std::sync::Mutex`, so accessing the slot + // can block the async task's current thread. We only perform a single + // slot update per `Client`. We ignore subsequent error slot updates. + let slot_result = self.error_slot.try_update_error(error.clone()); + + if let Err(AlreadyErrored { original_error }) = slot_result { + debug!( + new_error = %error, + %original_error, + connection_state = ?self.state, + "multiple errors on connection: \ + failed connections should stop processing pending requests and responses, \ + then close the connection" + ); + + error = original_error; + } else { + debug!(%error, + connection_state = ?self.state, + "shutting down peer service with error"); + } + + // Prepare to flush any pending client requests. + // + // We've already closed the client channel, so setting State::Failed + // will make the main loop flush any pending requests. + // + // However, we may have an outstanding client request in State::AwaitingResponse, + // so we need to deal with it first. if let State::AwaitingResponse { tx, .. } = std::mem::replace(&mut self.state, State::Failed) { - if let Some(error) = self.error_slot.try_get_error() { - let _ = tx.send(Err(error)); - } else { - let _ = tx.send(Err(PeerError::ConnectionDropped.into())); - } + // # Correctness + // + // We know the slot has Some(errot), because we just set it above, + // and the error slot is never unset. + // + // Accessing the error slot locks a threaded std::sync::Mutex, which + // can block the current async task thread. We briefly lock the mutex + // to clone the error. + let _ = tx.send(Err(error.clone())); } + // Make the timer and metrics consistent with the Failed state. + self.request_timer = None; + self.update_state_metrics(None); + + // Finally, flush pending client requests. + while let Some(InProgressClientRequest { tx, span, .. }) = + self.client_rx.close_and_flush_next() + { + trace!( + parent: &span, + %error, + "sending an error response to a pending request on a failed connection" + ); + let _ = tx.send(Err(error.clone())); + } + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.shutdown(PeerError::ConnectionDropped); + self.erase_state_metrics(); } } From da0100ad37ddec36a524dc8b3dbab18862fb5bc4 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 15 Dec 2021 16:13:45 +1000 Subject: [PATCH 16/17] Add tests for Connection --- zebra-network/src/peer/connection.rs | 5 + zebra-network/src/peer/connection/tests.rs | 3 + .../src/peer/connection/tests/vectors.rs | 410 ++++++++++++++++++ 3 files changed, 418 insertions(+) create mode 100644 zebra-network/src/peer/connection/tests.rs create mode 100644 zebra-network/src/peer/connection/tests/vectors.rs diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 525fb33a62e..0303ae59620 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -38,6 +38,9 @@ use crate::{ BoxError, }; +#[cfg(test)] +mod tests; + #[derive(Debug)] pub(super) enum Handler { /// Indicates that the handler has finished processing the request. @@ -435,6 +438,8 @@ pub struct Connection { /// This channel accepts [`Message`]s. /// /// The corresponding peer message receiver is passed to [`Connection::run`]. + /// + /// TODO: add a timeout when sending messages to the remote peer (#3234) pub(super) peer_tx: Tx, /// A connection tracker that reduces the open connection count when dropped. diff --git a/zebra-network/src/peer/connection/tests.rs b/zebra-network/src/peer/connection/tests.rs new file mode 100644 index 00000000000..d82199e8ec3 --- /dev/null +++ b/zebra-network/src/peer/connection/tests.rs @@ -0,0 +1,3 @@ +//! Tests for peer connections + +mod vectors; diff --git a/zebra-network/src/peer/connection/tests/vectors.rs b/zebra-network/src/peer/connection/tests/vectors.rs new file mode 100644 index 00000000000..b98b3e45c43 --- /dev/null +++ b/zebra-network/src/peer/connection/tests/vectors.rs @@ -0,0 +1,410 @@ +//! Fixed test vectors for peer connections. +//! +//! TODO: +//! - connection tests when awaiting requests (#3232) +//! - connection tests with closed/dropped peer_outbound_tx (#3233) + +use futures::{channel::mpsc, FutureExt}; +use tokio_util::codec::FramedWrite; +use tower::service_fn; +use zebra_chain::parameters::Network; + +use crate::{ + peer::{client::ClientRequestReceiver, connection::State, Connection, ErrorSlot}, + peer_set::ActiveConnectionCounter, + protocol::external::Codec, + PeerError, +}; + +#[tokio::test] +async fn connection_run_loop_ok() { + zebra_test::init(); + + let (client_tx, client_rx) = mpsc::channel(1); + + // The real stream and sink are from a split TCP connection, + // but that doesn't change how the state machine behaves. + let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + + let mut peer_outbound_bytes = Vec::::new(); + let peer_outbound_tx = FramedWrite::new( + &mut peer_outbound_bytes, + Codec::builder() + .for_network(Network::Mainnet) + .with_metrics_addr_label("test".into()) + .finish(), + ); + + let unused_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let shared_error_slot = ErrorSlot::default(); + + let connection = Connection { + state: State::AwaitingRequest, + request_timer: None, + svc: unused_inbound_service, + client_rx: ClientRequestReceiver::from(client_rx), + error_slot: shared_error_slot.clone(), + peer_tx: peer_outbound_tx, + connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), + metrics_label: "test".to_string(), + last_metrics_state: None, + }; + + let connection = connection.run(peer_inbound_rx); + + // The run loop will wait forever for a request from Zebra or the peer, + // without any errors, channel closes, or bytes written. + // + // But the connection closes if we drop the future, so we avoid the drop by cloning it. + let connection = connection.shared(); + let connection_guard = connection.clone(); + let result = connection.now_or_never(); + assert_eq!(result, None); + + let error = shared_error_slot.try_get_error(); + assert!( + matches!(error, None), + "unexpected connection error: {:?}", + error + ); + + assert!(!client_tx.is_closed()); + assert!(!peer_inbound_tx.is_closed()); + + // We need to drop the future, because it holds a mutable reference to the bytes. + std::mem::drop(connection_guard); + assert_eq!(peer_outbound_bytes, Vec::::new()); +} + +#[tokio::test] +async fn connection_run_loop_future_drop() { + zebra_test::init(); + + let (client_tx, client_rx) = mpsc::channel(1); + + let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + + let mut peer_outbound_bytes = Vec::::new(); + let peer_outbound_tx = FramedWrite::new( + &mut peer_outbound_bytes, + Codec::builder() + .for_network(Network::Mainnet) + .with_metrics_addr_label("test".into()) + .finish(), + ); + + let unused_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let shared_error_slot = ErrorSlot::default(); + + let connection = Connection { + state: State::AwaitingRequest, + request_timer: None, + svc: unused_inbound_service, + client_rx: ClientRequestReceiver::from(client_rx), + error_slot: shared_error_slot.clone(), + peer_tx: peer_outbound_tx, + connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), + metrics_label: "test".to_string(), + last_metrics_state: None, + }; + + let connection = connection.run(peer_inbound_rx); + + // now_or_never implicitly drops the connection future. + let result = connection.now_or_never(); + assert_eq!(result, None); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + assert!(client_tx.is_closed()); + assert!(peer_inbound_tx.is_closed()); + + assert_eq!(peer_outbound_bytes, Vec::::new()); +} + +#[tokio::test] +async fn connection_run_loop_client_close() { + zebra_test::init(); + + let (mut client_tx, client_rx) = mpsc::channel(1); + + let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + + let mut peer_outbound_bytes = Vec::::new(); + let peer_outbound_tx = FramedWrite::new( + &mut peer_outbound_bytes, + Codec::builder() + .for_network(Network::Mainnet) + .with_metrics_addr_label("test".into()) + .finish(), + ); + + let unused_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let shared_error_slot = ErrorSlot::default(); + + let connection = Connection { + state: State::AwaitingRequest, + request_timer: None, + svc: unused_inbound_service, + client_rx: ClientRequestReceiver::from(client_rx), + error_slot: shared_error_slot.clone(), + peer_tx: peer_outbound_tx, + connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), + metrics_label: "test".to_string(), + last_metrics_state: None, + }; + + let connection = connection.run(peer_inbound_rx); + + // Explicitly close the client channel. + client_tx.close_channel(); + + // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. + let connection = connection.shared(); + let connection_guard = connection.clone(); + let result = connection.now_or_never(); + assert_eq!(result, Some(())); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + assert!(client_tx.is_closed()); + assert!(peer_inbound_tx.is_closed()); + + // We need to drop the future, because it holds a mutable reference to the bytes. + std::mem::drop(connection_guard); + assert_eq!(peer_outbound_bytes, Vec::::new()); +} + +#[tokio::test] +async fn connection_run_loop_client_drop() { + zebra_test::init(); + + let (client_tx, client_rx) = mpsc::channel(1); + + let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + + let mut peer_outbound_bytes = Vec::::new(); + let peer_outbound_tx = FramedWrite::new( + &mut peer_outbound_bytes, + Codec::builder() + .for_network(Network::Mainnet) + .with_metrics_addr_label("test".into()) + .finish(), + ); + + let unused_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let shared_error_slot = ErrorSlot::default(); + + let connection = Connection { + state: State::AwaitingRequest, + request_timer: None, + svc: unused_inbound_service, + client_rx: ClientRequestReceiver::from(client_rx), + error_slot: shared_error_slot.clone(), + peer_tx: peer_outbound_tx, + connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), + metrics_label: "test".to_string(), + last_metrics_state: None, + }; + + let connection = connection.run(peer_inbound_rx); + + // Drop the client channel. + std::mem::drop(client_tx); + + // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. + let connection = connection.shared(); + let connection_guard = connection.clone(); + let result = connection.now_or_never(); + assert_eq!(result, Some(())); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + assert!(peer_inbound_tx.is_closed()); + + // We need to drop the future, because it holds a mutable reference to the bytes. + std::mem::drop(connection_guard); + assert_eq!(peer_outbound_bytes, Vec::::new()); +} + +#[tokio::test] +async fn connection_run_loop_inbound_close() { + zebra_test::init(); + + let (client_tx, client_rx) = mpsc::channel(1); + + let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + + let mut peer_outbound_bytes = Vec::::new(); + let peer_outbound_tx = FramedWrite::new( + &mut peer_outbound_bytes, + Codec::builder() + .for_network(Network::Mainnet) + .with_metrics_addr_label("test".into()) + .finish(), + ); + + let unused_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let shared_error_slot = ErrorSlot::default(); + + let connection = Connection { + state: State::AwaitingRequest, + request_timer: None, + svc: unused_inbound_service, + client_rx: ClientRequestReceiver::from(client_rx), + error_slot: shared_error_slot.clone(), + peer_tx: peer_outbound_tx, + connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), + metrics_label: "test".to_string(), + last_metrics_state: None, + }; + + let connection = connection.run(peer_inbound_rx); + + // Explicitly close the inbound peer channel. + peer_inbound_tx.close_channel(); + + // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. + let connection = connection.shared(); + let connection_guard = connection.clone(); + let result = connection.now_or_never(); + assert_eq!(result, Some(())); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + assert!(client_tx.is_closed()); + assert!(peer_inbound_tx.is_closed()); + + // We need to drop the future, because it holds a mutable reference to the bytes. + std::mem::drop(connection_guard); + assert_eq!(peer_outbound_bytes, Vec::::new()); +} + +#[tokio::test] +async fn connection_run_loop_inbound_drop() { + zebra_test::init(); + + let (client_tx, client_rx) = mpsc::channel(1); + + let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + + let mut peer_outbound_bytes = Vec::::new(); + let peer_outbound_tx = FramedWrite::new( + &mut peer_outbound_bytes, + Codec::builder() + .for_network(Network::Mainnet) + .with_metrics_addr_label("test".into()) + .finish(), + ); + + let unused_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let shared_error_slot = ErrorSlot::default(); + + let connection = Connection { + state: State::AwaitingRequest, + request_timer: None, + svc: unused_inbound_service, + client_rx: ClientRequestReceiver::from(client_rx), + error_slot: shared_error_slot.clone(), + peer_tx: peer_outbound_tx, + connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), + metrics_label: "test".to_string(), + last_metrics_state: None, + }; + + let connection = connection.run(peer_inbound_rx); + + // Drop the inbound peer channel. + std::mem::drop(peer_inbound_tx); + + // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. + let connection = connection.shared(); + let connection_guard = connection.clone(); + let result = connection.now_or_never(); + assert_eq!(result, Some(())); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + assert!(client_tx.is_closed()); + + // We need to drop the future, because it holds a mutable reference to the bytes. + std::mem::drop(connection_guard); + assert_eq!(peer_outbound_bytes, Vec::::new()); +} + +#[tokio::test] +async fn connection_run_loop_failed() { + zebra_test::init(); + + let (client_tx, client_rx) = mpsc::channel(1); + + let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1); + + let mut peer_outbound_bytes = Vec::::new(); + let peer_outbound_tx = FramedWrite::new( + &mut peer_outbound_bytes, + Codec::builder() + .for_network(Network::Mainnet) + .with_metrics_addr_label("test".into()) + .finish(), + ); + + let unused_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let shared_error_slot = ErrorSlot::default(); + + // Simulate an internal connection error. + shared_error_slot + .try_update_error(PeerError::ClientRequestTimeout.into()) + .expect("unexpected previous error in tests"); + + let connection = Connection { + state: State::Failed, + request_timer: None, + svc: unused_inbound_service, + client_rx: ClientRequestReceiver::from(client_rx), + error_slot: shared_error_slot.clone(), + peer_tx: peer_outbound_tx, + connection_tracker: ActiveConnectionCounter::new_counter().track_connection(), + metrics_label: "test".to_string(), + last_metrics_state: None, + }; + + let connection = connection.run(peer_inbound_rx); + + // If we drop the future, the connection will close anyway, so we avoid the drop by cloning it. + let connection = connection.shared(); + let connection_guard = connection.clone(); + let result = connection.now_or_never(); + // Because the peer error mutex is a sync mutex, + // the connection can't exit until it reaches the outer async loop. + assert_eq!(result, Some(())); + + let error = shared_error_slot.try_get_error(); + assert!(matches!(error, Some(_))); + + assert!(client_tx.is_closed()); + assert!(peer_inbound_tx.is_closed()); + + // We need to drop the future, because it holds a mutable reference to the bytes. + std::mem::drop(connection_guard); + assert_eq!(peer_outbound_bytes, Vec::::new()); +} From a209254b981ac056766e74ec0bdf0bb5d605e632 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Wed, 15 Dec 2021 11:22:52 -0300 Subject: [PATCH 17/17] fix typo in comment Co-authored-by: Conrado Gouvea --- zebra-network/src/peer/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 0303ae59620..26305ceff89 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -1273,7 +1273,7 @@ impl Connection { { // # Correctness // - // We know the slot has Some(errot), because we just set it above, + // We know the slot has Some(error), because we just set it above, // and the error slot is never unset. // // Accessing the error slot locks a threaded std::sync::Mutex, which