From 64f40e21faa79ef2e307db4a52b4a25dc8f647e2 Mon Sep 17 00:00:00 2001 From: jstuczyn Date: Mon, 2 Nov 2020 14:07:37 +0000 Subject: [PATCH 1/3] Maximum number of allowed reconnection attempts --- .../client-libs/mixnet-client/src/client.rs | 28 ++++++++--- .../src/connection_manager/mod.rs | 49 +++++++++++++------ .../src/connection_manager/reconnector.rs | 14 +++++- .../mixnet-client/src/forwarder.rs | 10 +++- gateway/src/config/mod.rs | 10 ++++ gateway/src/node/mod.rs | 1 + mixnode/src/config/mod.rs | 10 ++++ mixnode/src/node/mod.rs | 1 + 8 files changed, 99 insertions(+), 24 deletions(-) diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 2a69ae7bb9d..c18e895ede0 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -28,6 +28,7 @@ pub struct Config { initial_reconnection_backoff: Duration, maximum_reconnection_backoff: Duration, initial_connection_timeout: Duration, + maximum_reconnection_attempts: u32, } impl Config { @@ -35,11 +36,13 @@ impl Config { initial_reconnection_backoff: Duration, maximum_reconnection_backoff: Duration, initial_connection_timeout: Duration, + maximum_reconnection_attempts: u32, ) -> Self { Config { initial_reconnection_backoff, maximum_reconnection_backoff, initial_connection_timeout, + maximum_reconnection_attempts, } } } @@ -49,6 +52,7 @@ pub struct Client { maximum_reconnection_backoff: Duration, initial_reconnection_backoff: Duration, initial_connection_timeout: Duration, + maximum_reconnection_attempts: u32, } impl Client { @@ -58,6 +62,7 @@ impl Client { initial_reconnection_backoff: config.initial_reconnection_backoff, maximum_reconnection_backoff: config.maximum_reconnection_backoff, initial_connection_timeout: config.initial_connection_timeout, + maximum_reconnection_attempts: config.maximum_reconnection_attempts, } } @@ -70,6 +75,7 @@ impl Client { self.initial_reconnection_backoff, self.maximum_reconnection_backoff, self.initial_connection_timeout, + self.maximum_reconnection_attempts, ) .await .spawn_abortable(); @@ -106,15 +112,25 @@ impl Client { let framed_packet = FramedSphinxPacket::new(packet, packet_mode); - if wait_for_response { + let (res_tx, res_rx) = if wait_for_response { let (res_tx, res_rx) = oneshot::channel(); - manager - .0 - .unbounded_send((framed_packet, Some(res_tx))) - .unwrap(); + (Some(res_tx), Some(res_rx)) + } else { + (None, None) + }; + + if let Err(err) = manager.0.unbounded_send((framed_packet, res_tx)) { + warn!( + "Connection manager to {} has failed - {}", + socket_address, err + ); + self.connections_managers.remove(&socket_address); + return Err(io::Error::new(io::ErrorKind::BrokenPipe, err)); + } + + if let Some(res_rx) = res_rx { res_rx.await.unwrap() } else { - manager.0.unbounded_send((framed_packet, None)).unwrap(); Ok(()) } } diff --git a/common/client-libs/mixnet-client/src/connection_manager/mod.rs b/common/client-libs/mixnet-client/src/connection_manager/mod.rs index 56fb76ba7be..2f01fde75c1 100644 --- a/common/client-libs/mixnet-client/src/connection_manager/mod.rs +++ b/common/client-libs/mixnet-client/src/connection_manager/mod.rs @@ -46,6 +46,7 @@ pub(crate) struct ConnectionManager<'a> { maximum_reconnection_backoff: Duration, reconnection_backoff: Duration, + maximum_reconnection_attempts: u32, state: ConnectionState<'a>, } @@ -62,6 +63,7 @@ impl<'a> ConnectionManager<'static> { reconnection_backoff: Duration, maximum_reconnection_backoff: Duration, connection_timeout: Duration, + maximum_reconnection_attempts: u32, ) -> ConnectionManager<'a> { let (conn_tx, conn_rx) = mpsc::unbounded(); @@ -80,6 +82,7 @@ impl<'a> ConnectionManager<'static> { address, reconnection_backoff, maximum_reconnection_backoff, + maximum_reconnection_attempts, )) } }; @@ -90,6 +93,7 @@ impl<'a> ConnectionManager<'static> { address, maximum_reconnection_backoff, reconnection_backoff, + maximum_reconnection_attempts, state: initial_state, } } @@ -97,13 +101,24 @@ impl<'a> ConnectionManager<'static> { async fn run(mut self) { while let Some(msg) = self.conn_rx.next().await { let (framed_packet, res_ch) = msg; - let res = self.handle_new_packet(framed_packet).await; - if let Some(res_ch) = res_ch { - if let Err(e) = res_ch.send(res) { - error!( - "failed to send response on the channel to the caller! - {:?}", - e + + match self.handle_new_packet(framed_packet).await { + None => { + warn!( + "We reached maximum number of attempts trying to reconnect to {}", + self.address ); + return; + } + Some(res) => { + if let Some(res_ch) = res_ch { + if let Err(e) = res_ch.send(res) { + error!( + "failed to send response on the channel to the caller! - {:?}", + e + ); + } + } } } } @@ -122,7 +137,7 @@ impl<'a> ConnectionManager<'static> { // Possible future TODO: `Framed<...>` is both a Sink and a Stream, // so it is possible to read any responses we might receive (it is also duplex, so that could be // done while writing packets themselves). But it'd require slight additions to `SphinxCodec` - async fn handle_new_packet(&mut self, packet: FramedSphinxPacket) -> io::Result<()> { + async fn handle_new_packet(&mut self, packet: FramedSphinxPacket) -> Option> { // we don't do a match here as it's possible to transition from ConnectionState::Reconnecting to ConnectionState::Writing // in this function call. And if that happens, we want to send the packet we have received. if let ConnectionState::Reconnecting(conn_reconnector) = &mut self.state { @@ -130,16 +145,21 @@ impl<'a> ConnectionManager<'static> { let new_connection = match futures::poll(conn_reconnector).await { Poll::Pending => { debug!("The packet is getting dropped - there's nowhere to send it"); - return Err(io::Error::new( + return Some(Err(io::Error::new( io::ErrorKind::BrokenPipe, "connection is broken - reconnection is in progress", - )); + ))); } Poll::Ready(conn) => conn, }; - debug!("Managed to reconnect to {}!", self.address); - self.state = ConnectionState::Writing(ConnectionWriter::new(new_connection)); + match new_connection { + Ok(new_conn) => { + debug!("Managed to reconnect to {}!", self.address); + self.state = ConnectionState::Writing(ConnectionWriter::new(new_conn)); + } + Err(_) => return None, + } } // we must be in writing state if we are here, either by being here from beginning or just @@ -154,13 +174,14 @@ impl<'a> ConnectionManager<'static> { self.address, self.reconnection_backoff, self.maximum_reconnection_backoff, + self.maximum_reconnection_attempts, )); - Err(io::Error::new( + Some(Err(io::Error::new( io::ErrorKind::BrokenPipe, "connection is broken - reconnection is in progress", - )) + ))) } else { - Ok(()) + Some(Ok(())) }; } diff --git a/common/client-libs/mixnet-client/src/connection_manager/reconnector.rs b/common/client-libs/mixnet-client/src/connection_manager/reconnector.rs index 035a5828ac7..b57b09f03a6 100644 --- a/common/client-libs/mixnet-client/src/connection_manager/reconnector.rs +++ b/common/client-libs/mixnet-client/src/connection_manager/reconnector.rs @@ -22,11 +22,14 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +pub(crate) struct MaximumReconnectionCountReached; + pub(crate) struct ConnectionReconnector<'a> { address: SocketAddr, connection: BoxFuture<'a, io::Result>, current_retry_attempt: u32, + maximum_reconnection_attempts: u32, current_backoff_delay: tokio::time::Delay, maximum_reconnection_backoff: Duration, @@ -39,12 +42,14 @@ impl<'a> ConnectionReconnector<'a> { address: SocketAddr, initial_reconnection_backoff: Duration, maximum_reconnection_backoff: Duration, + maximum_reconnection_attempts: u32, ) -> ConnectionReconnector<'a> { ConnectionReconnector { address, connection: tokio::net::TcpStream::connect(address).boxed(), current_backoff_delay: tokio::time::delay_for(Duration::new(0, 0)), // if we can re-establish connection on first try without any backoff that's perfect current_retry_attempt: 0, + maximum_reconnection_attempts, maximum_reconnection_backoff, initial_reconnection_backoff, } @@ -52,7 +57,7 @@ impl<'a> ConnectionReconnector<'a> { } impl<'a> Future for ConnectionReconnector<'a> { - type Output = tokio::net::TcpStream; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // see if we are still in exponential backoff @@ -72,6 +77,11 @@ impl<'a> Future for ConnectionReconnector<'a> { self.address, e, self.current_retry_attempt ); + // checked if we reached the maximum attempt count + if self.current_retry_attempt == self.maximum_reconnection_attempts { + return Poll::Ready(Err(MaximumReconnectionCountReached)); + } + // we failed to re-establish connection - continue exponential backoff // according to https://github.com/tokio-rs/tokio/issues/1953 there's an undocumented @@ -102,7 +112,7 @@ impl<'a> Future for ConnectionReconnector<'a> { Poll::Pending } - Poll::Ready(Ok(conn)) => Poll::Ready(conn), + Poll::Ready(Ok(conn)) => Poll::Ready(Ok(conn)), } } } diff --git a/common/client-libs/mixnet-client/src/forwarder.rs b/common/client-libs/mixnet-client/src/forwarder.rs index d68d9e5c80f..0dbb3b24f0f 100644 --- a/common/client-libs/mixnet-client/src/forwarder.rs +++ b/common/client-libs/mixnet-client/src/forwarder.rs @@ -34,11 +34,13 @@ impl PacketForwarder { initial_reconnection_backoff: Duration, maximum_reconnection_backoff: Duration, initial_connection_timeout: Duration, + maximum_reconnection_attempts: u32, ) -> (PacketForwarder, MixForwardingSender) { let client_config = Config::new( initial_reconnection_backoff, maximum_reconnection_backoff, initial_connection_timeout, + maximum_reconnection_attempts, ); let (packet_sender, packet_receiver) = mpsc::unbounded(); @@ -61,10 +63,14 @@ impl PacketForwarder { let sphinx_packet = mix_packet.into_sphinx_packet(); // we don't care about responses, we just want to fire packets // as quickly as possible - self.mixnet_client + + if let Err(err) = self + .mixnet_client .send(next_hop, sphinx_packet, packet_mode, false) .await - .unwrap(); // if we're not waiting for response, we MUST get an Ok + { + debug!("failed to forward the packet - {}", err) + } } } } diff --git a/gateway/src/config/mod.rs b/gateway/src/config/mod.rs index a5ab1e96ebe..5c1b8447a6d 100644 --- a/gateway/src/config/mod.rs +++ b/gateway/src/config/mod.rs @@ -41,6 +41,7 @@ const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_milli const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000); const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500); const DEFAULT_CACHE_ENTRY_TTL: Duration = Duration::from_millis(30_000); +const DEFAULT_MAXIMUM_RECONNECTION_ATTEMPTS: u32 = 20; const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16; const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: u16 = 5; @@ -444,6 +445,10 @@ impl Config { self.debug.initial_connection_timeout } + pub fn get_packet_forwarding_max_reconnections(&self) -> u32 { + self.debug.maximum_reconnection_attempts + } + pub fn get_message_retrieval_limit(&self) -> u16 { self.debug.message_retrieval_limit } @@ -651,6 +656,10 @@ pub struct Debug { )] presence_sending_delay: Duration, + /// Maximum number of retries node is going to attempt to re-establish existing connection + /// to another node when forwarding sphinx packets. + maximum_reconnection_attempts: u32, + /// Length of filenames for new client messages. stored_messages_filename_length: u16, @@ -674,6 +683,7 @@ impl Default for Debug { packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF, initial_connection_timeout: DEFAULT_INITIAL_CONNECTION_TIMEOUT, presence_sending_delay: DEFAULT_PRESENCE_SENDING_DELAY, + maximum_reconnection_attempts: DEFAULT_MAXIMUM_RECONNECTION_ATTEMPTS, stored_messages_filename_length: DEFAULT_STORED_MESSAGE_FILENAME_LENGTH, message_retrieval_limit: DEFAULT_MESSAGE_RETRIEVAL_LIMIT, cache_entry_ttl: DEFAULT_CACHE_ENTRY_TTL, diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index adc5373b480..d17957d20a3 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -107,6 +107,7 @@ impl Gateway { self.config.get_packet_forwarding_initial_backoff(), self.config.get_packet_forwarding_maximum_backoff(), self.config.get_initial_connection_timeout(), + self.config.get_packet_forwarding_max_reconnections(), ); tokio::spawn(async move { packet_forwarder.run().await }); diff --git a/mixnode/src/config/mod.rs b/mixnode/src/config/mod.rs index 4f5fc9007ca..6996a09336b 100644 --- a/mixnode/src/config/mod.rs +++ b/mixnode/src/config/mod.rs @@ -40,6 +40,7 @@ const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_milli const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000); const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500); const DEFAULT_CACHE_ENTRY_TTL: Duration = Duration::from_millis(30_000); +const DEFAULT_MAXIMUM_RECONNECTION_ATTEMPTS: u32 = 20; #[derive(Debug, Default, Deserialize, PartialEq, Serialize)] #[serde(deny_unknown_fields)] @@ -330,6 +331,10 @@ impl Config { self.debug.initial_connection_timeout } + pub fn get_packet_forwarding_max_reconnections(&self) -> u32 { + self.debug.maximum_reconnection_attempts + } + pub fn get_cache_entry_ttl(&self) -> Duration { self.debug.cache_entry_ttl } @@ -490,6 +495,10 @@ pub struct Debug { )] initial_connection_timeout: Duration, + /// Maximum number of retries node is going to attempt to re-establish existing connection + /// to another node when forwarding sphinx packets. + maximum_reconnection_attempts: u32, + /// Duration for which a cached vpn processing result is going to get stored for. #[serde( deserialize_with = "deserialize_duration", @@ -505,6 +514,7 @@ impl Default for Debug { packet_forwarding_initial_backoff: DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF, packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF, initial_connection_timeout: DEFAULT_INITIAL_CONNECTION_TIMEOUT, + maximum_reconnection_attempts: DEFAULT_MAXIMUM_RECONNECTION_ATTEMPTS, cache_entry_ttl: DEFAULT_CACHE_ENTRY_TTL, } } diff --git a/mixnode/src/node/mod.rs b/mixnode/src/node/mod.rs index ba5301370ab..7b06b28a4f0 100644 --- a/mixnode/src/node/mod.rs +++ b/mixnode/src/node/mod.rs @@ -83,6 +83,7 @@ impl MixNode { self.config.get_packet_forwarding_initial_backoff(), self.config.get_packet_forwarding_maximum_backoff(), self.config.get_initial_connection_timeout(), + self.config.get_packet_forwarding_max_reconnections(), ); tokio::spawn(async move { packet_forwarder.run().await }); From 13cea39ef69fd64e7cf6b0267580e6785a7d7d34 Mon Sep 17 00:00:00 2001 From: jstuczyn Date: Mon, 2 Nov 2020 14:20:16 +0000 Subject: [PATCH 2/3] Requiring initial internode connection to be successful --- .../client-libs/mixnet-client/src/client.rs | 23 ++++++++++++++----- .../src/connection_manager/mod.rs | 19 ++++++--------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index c18e895ede0..29af10b55d4 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -69,18 +69,19 @@ impl Client { async fn start_new_connection_manager( &mut self, address: SocketAddr, - ) -> (ConnectionManagerSender, AbortHandle) { - let (sender, abort_handle) = ConnectionManager::new( + ) -> Result<(ConnectionManagerSender, AbortHandle), io::Error> { + let conn_manager = ConnectionManager::new( address, self.initial_reconnection_backoff, self.maximum_reconnection_backoff, self.initial_connection_timeout, self.maximum_reconnection_attempts, ) - .await - .spawn_abortable(); + .await?; - (sender, abort_handle) + let (sender, abort_handle) = conn_manager.spawn_abortable(); + + Ok((sender, abort_handle)) } // if wait_for_response is set to true, we will get information about any possible IO errors @@ -103,7 +104,17 @@ impl Client { ); let (new_manager_sender, abort_handle) = - self.start_new_connection_manager(socket_address).await; + match self.start_new_connection_manager(socket_address).await { + Ok(res) => res, + Err(err) => { + warn!( + "failed to establish initial connection to {} - {}", + socket_address, err + ); + return Err(err); + } + }; + self.connections_managers .insert(socket_address, (new_manager_sender, abort_handle)); } diff --git a/common/client-libs/mixnet-client/src/connection_manager/mod.rs b/common/client-libs/mixnet-client/src/connection_manager/mod.rs index 2f01fde75c1..f5b87765d5f 100644 --- a/common/client-libs/mixnet-client/src/connection_manager/mod.rs +++ b/common/client-libs/mixnet-client/src/connection_manager/mod.rs @@ -64,30 +64,25 @@ impl<'a> ConnectionManager<'static> { maximum_reconnection_backoff: Duration, connection_timeout: Duration, maximum_reconnection_attempts: u32, - ) -> ConnectionManager<'a> { + ) -> Result, io::Error> { let (conn_tx, conn_rx) = mpsc::unbounded(); // the blocking call here is fine as initially we want to wait the timeout interval (at most) anyway: let tcp_stream_res = std::net::TcpStream::connect_timeout(&address, connection_timeout); + // we MUST succeed in making initial connection. We don't want to end up in reconnection + // loop to something we have never managed to connect (and possibly never will) + let initial_state = match tcp_stream_res { Ok(stream) => { let tokio_stream = tokio::net::TcpStream::from_std(stream).unwrap(); debug!("managed to establish initial connection to {}", address); ConnectionState::Writing(ConnectionWriter::new(tokio_stream)) } - Err(e) => { - warn!("failed to establish initial connection to {} within {:?} ({}). Going into reconnection mode", address, connection_timeout, e); - ConnectionState::Reconnecting(ConnectionReconnector::new( - address, - reconnection_backoff, - maximum_reconnection_backoff, - maximum_reconnection_attempts, - )) - } + Err(err) => return Err(err), }; - ConnectionManager { + Ok(ConnectionManager { conn_tx, conn_rx, address, @@ -95,7 +90,7 @@ impl<'a> ConnectionManager<'static> { reconnection_backoff, maximum_reconnection_attempts, state: initial_state, - } + }) } async fn run(mut self) { From e674ca2351e5306c2a02406ac73b96eaa9749100 Mon Sep 17 00:00:00 2001 From: jstuczyn Date: Mon, 2 Nov 2020 14:29:06 +0000 Subject: [PATCH 3/3] Decreased logging level for failing to establish initial connection --- common/client-libs/mixnet-client/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 29af10b55d4..4944f0c8a15 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -107,7 +107,7 @@ impl Client { match self.start_new_connection_manager(socket_address).await { Ok(res) => res, Err(err) => { - warn!( + debug!( "failed to establish initial connection to {} - {}", socket_address, err );