Skip to content

Commit 72496ad

Browse files
authored
Feature/max retry (#409)
* Maximum number of allowed reconnection attempts * Requiring initial internode connection to be successful * Decreased logging level for failing to establish initial connection
1 parent d7985ef commit 72496ad

File tree

8 files changed

+122
-41
lines changed

8 files changed

+122
-41
lines changed

common/client-libs/mixnet-client/src/client.rs

+39-12
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,21 @@ pub struct Config {
2828
initial_reconnection_backoff: Duration,
2929
maximum_reconnection_backoff: Duration,
3030
initial_connection_timeout: Duration,
31+
maximum_reconnection_attempts: u32,
3132
}
3233

3334
impl Config {
3435
pub fn new(
3536
initial_reconnection_backoff: Duration,
3637
maximum_reconnection_backoff: Duration,
3738
initial_connection_timeout: Duration,
39+
maximum_reconnection_attempts: u32,
3840
) -> Self {
3941
Config {
4042
initial_reconnection_backoff,
4143
maximum_reconnection_backoff,
4244
initial_connection_timeout,
45+
maximum_reconnection_attempts,
4346
}
4447
}
4548
}
@@ -49,6 +52,7 @@ pub struct Client {
4952
maximum_reconnection_backoff: Duration,
5053
initial_reconnection_backoff: Duration,
5154
initial_connection_timeout: Duration,
55+
maximum_reconnection_attempts: u32,
5256
}
5357

5458
impl Client {
@@ -58,23 +62,26 @@ impl Client {
5862
initial_reconnection_backoff: config.initial_reconnection_backoff,
5963
maximum_reconnection_backoff: config.maximum_reconnection_backoff,
6064
initial_connection_timeout: config.initial_connection_timeout,
65+
maximum_reconnection_attempts: config.maximum_reconnection_attempts,
6166
}
6267
}
6368

6469
async fn start_new_connection_manager(
6570
&mut self,
6671
address: SocketAddr,
67-
) -> (ConnectionManagerSender, AbortHandle) {
68-
let (sender, abort_handle) = ConnectionManager::new(
72+
) -> Result<(ConnectionManagerSender, AbortHandle), io::Error> {
73+
let conn_manager = ConnectionManager::new(
6974
address,
7075
self.initial_reconnection_backoff,
7176
self.maximum_reconnection_backoff,
7277
self.initial_connection_timeout,
78+
self.maximum_reconnection_attempts,
7379
)
74-
.await
75-
.spawn_abortable();
80+
.await?;
7681

77-
(sender, abort_handle)
82+
let (sender, abort_handle) = conn_manager.spawn_abortable();
83+
84+
Ok((sender, abort_handle))
7885
}
7986

8087
// if wait_for_response is set to true, we will get information about any possible IO errors
@@ -97,7 +104,17 @@ impl Client {
97104
);
98105

99106
let (new_manager_sender, abort_handle) =
100-
self.start_new_connection_manager(socket_address).await;
107+
match self.start_new_connection_manager(socket_address).await {
108+
Ok(res) => res,
109+
Err(err) => {
110+
debug!(
111+
"failed to establish initial connection to {} - {}",
112+
socket_address, err
113+
);
114+
return Err(err);
115+
}
116+
};
117+
101118
self.connections_managers
102119
.insert(socket_address, (new_manager_sender, abort_handle));
103120
}
@@ -106,15 +123,25 @@ impl Client {
106123

107124
let framed_packet = FramedSphinxPacket::new(packet, packet_mode);
108125

109-
if wait_for_response {
126+
let (res_tx, res_rx) = if wait_for_response {
110127
let (res_tx, res_rx) = oneshot::channel();
111-
manager
112-
.0
113-
.unbounded_send((framed_packet, Some(res_tx)))
114-
.unwrap();
128+
(Some(res_tx), Some(res_rx))
129+
} else {
130+
(None, None)
131+
};
132+
133+
if let Err(err) = manager.0.unbounded_send((framed_packet, res_tx)) {
134+
warn!(
135+
"Connection manager to {} has failed - {}",
136+
socket_address, err
137+
);
138+
self.connections_managers.remove(&socket_address);
139+
return Err(io::Error::new(io::ErrorKind::BrokenPipe, err));
140+
}
141+
142+
if let Some(res_rx) = res_rx {
115143
res_rx.await.unwrap()
116144
} else {
117-
manager.0.unbounded_send((framed_packet, None)).unwrap();
118145
Ok(())
119146
}
120147
}

common/client-libs/mixnet-client/src/connection_manager/mod.rs

+41-25
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub(crate) struct ConnectionManager<'a> {
4646

4747
maximum_reconnection_backoff: Duration,
4848
reconnection_backoff: Duration,
49+
maximum_reconnection_attempts: u32,
4950

5051
state: ConnectionState<'a>,
5152
}
@@ -62,48 +63,57 @@ impl<'a> ConnectionManager<'static> {
6263
reconnection_backoff: Duration,
6364
maximum_reconnection_backoff: Duration,
6465
connection_timeout: Duration,
65-
) -> ConnectionManager<'a> {
66+
maximum_reconnection_attempts: u32,
67+
) -> Result<ConnectionManager<'a>, io::Error> {
6668
let (conn_tx, conn_rx) = mpsc::unbounded();
6769

6870
// the blocking call here is fine as initially we want to wait the timeout interval (at most) anyway:
6971
let tcp_stream_res = std::net::TcpStream::connect_timeout(&address, connection_timeout);
7072

73+
// we MUST succeed in making initial connection. We don't want to end up in reconnection
74+
// loop to something we have never managed to connect (and possibly never will)
75+
7176
let initial_state = match tcp_stream_res {
7277
Ok(stream) => {
7378
let tokio_stream = tokio::net::TcpStream::from_std(stream).unwrap();
7479
debug!("managed to establish initial connection to {}", address);
7580
ConnectionState::Writing(ConnectionWriter::new(tokio_stream))
7681
}
77-
Err(e) => {
78-
warn!("failed to establish initial connection to {} within {:?} ({}). Going into reconnection mode", address, connection_timeout, e);
79-
ConnectionState::Reconnecting(ConnectionReconnector::new(
80-
address,
81-
reconnection_backoff,
82-
maximum_reconnection_backoff,
83-
))
84-
}
82+
Err(err) => return Err(err),
8583
};
8684

87-
ConnectionManager {
85+
Ok(ConnectionManager {
8886
conn_tx,
8987
conn_rx,
9088
address,
9189
maximum_reconnection_backoff,
9290
reconnection_backoff,
91+
maximum_reconnection_attempts,
9392
state: initial_state,
94-
}
93+
})
9594
}
9695

9796
async fn run(mut self) {
9897
while let Some(msg) = self.conn_rx.next().await {
9998
let (framed_packet, res_ch) = msg;
100-
let res = self.handle_new_packet(framed_packet).await;
101-
if let Some(res_ch) = res_ch {
102-
if let Err(e) = res_ch.send(res) {
103-
error!(
104-
"failed to send response on the channel to the caller! - {:?}",
105-
e
99+
100+
match self.handle_new_packet(framed_packet).await {
101+
None => {
102+
warn!(
103+
"We reached maximum number of attempts trying to reconnect to {}",
104+
self.address
106105
);
106+
return;
107+
}
108+
Some(res) => {
109+
if let Some(res_ch) = res_ch {
110+
if let Err(e) = res_ch.send(res) {
111+
error!(
112+
"failed to send response on the channel to the caller! - {:?}",
113+
e
114+
);
115+
}
116+
}
107117
}
108118
}
109119
}
@@ -122,24 +132,29 @@ impl<'a> ConnectionManager<'static> {
122132
// Possible future TODO: `Framed<...>` is both a Sink and a Stream,
123133
// so it is possible to read any responses we might receive (it is also duplex, so that could be
124134
// done while writing packets themselves). But it'd require slight additions to `SphinxCodec`
125-
async fn handle_new_packet(&mut self, packet: FramedSphinxPacket) -> io::Result<()> {
135+
async fn handle_new_packet(&mut self, packet: FramedSphinxPacket) -> Option<io::Result<()>> {
126136
// we don't do a match here as it's possible to transition from ConnectionState::Reconnecting to ConnectionState::Writing
127137
// in this function call. And if that happens, we want to send the packet we have received.
128138
if let ConnectionState::Reconnecting(conn_reconnector) = &mut self.state {
129139
// do a single poll rather than await for future to completely resolve
130140
let new_connection = match futures::poll(conn_reconnector).await {
131141
Poll::Pending => {
132142
debug!("The packet is getting dropped - there's nowhere to send it");
133-
return Err(io::Error::new(
143+
return Some(Err(io::Error::new(
134144
io::ErrorKind::BrokenPipe,
135145
"connection is broken - reconnection is in progress",
136-
));
146+
)));
137147
}
138148
Poll::Ready(conn) => conn,
139149
};
140150

141-
debug!("Managed to reconnect to {}!", self.address);
142-
self.state = ConnectionState::Writing(ConnectionWriter::new(new_connection));
151+
match new_connection {
152+
Ok(new_conn) => {
153+
debug!("Managed to reconnect to {}!", self.address);
154+
self.state = ConnectionState::Writing(ConnectionWriter::new(new_conn));
155+
}
156+
Err(_) => return None,
157+
}
143158
}
144159

145160
// we must be in writing state if we are here, either by being here from beginning or just
@@ -154,13 +169,14 @@ impl<'a> ConnectionManager<'static> {
154169
self.address,
155170
self.reconnection_backoff,
156171
self.maximum_reconnection_backoff,
172+
self.maximum_reconnection_attempts,
157173
));
158-
Err(io::Error::new(
174+
Some(Err(io::Error::new(
159175
io::ErrorKind::BrokenPipe,
160176
"connection is broken - reconnection is in progress",
161-
))
177+
)))
162178
} else {
163-
Ok(())
179+
Some(Ok(()))
164180
};
165181
}
166182

common/client-libs/mixnet-client/src/connection_manager/reconnector.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ use std::pin::Pin;
2222
use std::task::{Context, Poll};
2323
use std::time::Duration;
2424

25+
pub(crate) struct MaximumReconnectionCountReached;
26+
2527
pub(crate) struct ConnectionReconnector<'a> {
2628
address: SocketAddr,
2729
connection: BoxFuture<'a, io::Result<tokio::net::TcpStream>>,
2830

2931
current_retry_attempt: u32,
32+
maximum_reconnection_attempts: u32,
3033

3134
current_backoff_delay: tokio::time::Delay,
3235
maximum_reconnection_backoff: Duration,
@@ -39,20 +42,22 @@ impl<'a> ConnectionReconnector<'a> {
3942
address: SocketAddr,
4043
initial_reconnection_backoff: Duration,
4144
maximum_reconnection_backoff: Duration,
45+
maximum_reconnection_attempts: u32,
4246
) -> ConnectionReconnector<'a> {
4347
ConnectionReconnector {
4448
address,
4549
connection: tokio::net::TcpStream::connect(address).boxed(),
4650
current_backoff_delay: tokio::time::delay_for(Duration::new(0, 0)), // if we can re-establish connection on first try without any backoff that's perfect
4751
current_retry_attempt: 0,
52+
maximum_reconnection_attempts,
4853
maximum_reconnection_backoff,
4954
initial_reconnection_backoff,
5055
}
5156
}
5257
}
5358

5459
impl<'a> Future for ConnectionReconnector<'a> {
55-
type Output = tokio::net::TcpStream;
60+
type Output = Result<tokio::net::TcpStream, MaximumReconnectionCountReached>;
5661

5762
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5863
// see if we are still in exponential backoff
@@ -72,6 +77,11 @@ impl<'a> Future for ConnectionReconnector<'a> {
7277
self.address, e, self.current_retry_attempt
7378
);
7479

80+
// checked if we reached the maximum attempt count
81+
if self.current_retry_attempt == self.maximum_reconnection_attempts {
82+
return Poll::Ready(Err(MaximumReconnectionCountReached));
83+
}
84+
7585
// we failed to re-establish connection - continue exponential backoff
7686

7787
// according to https://github.com/tokio-rs/tokio/issues/1953 there's an undocumented
@@ -102,7 +112,7 @@ impl<'a> Future for ConnectionReconnector<'a> {
102112

103113
Poll::Pending
104114
}
105-
Poll::Ready(Ok(conn)) => Poll::Ready(conn),
115+
Poll::Ready(Ok(conn)) => Poll::Ready(Ok(conn)),
106116
}
107117
}
108118
}

common/client-libs/mixnet-client/src/forwarder.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ impl PacketForwarder {
3434
initial_reconnection_backoff: Duration,
3535
maximum_reconnection_backoff: Duration,
3636
initial_connection_timeout: Duration,
37+
maximum_reconnection_attempts: u32,
3738
) -> (PacketForwarder, MixForwardingSender) {
3839
let client_config = Config::new(
3940
initial_reconnection_backoff,
4041
maximum_reconnection_backoff,
4142
initial_connection_timeout,
43+
maximum_reconnection_attempts,
4244
);
4345

4446
let (packet_sender, packet_receiver) = mpsc::unbounded();
@@ -61,10 +63,14 @@ impl PacketForwarder {
6163
let sphinx_packet = mix_packet.into_sphinx_packet();
6264
// we don't care about responses, we just want to fire packets
6365
// as quickly as possible
64-
self.mixnet_client
66+
67+
if let Err(err) = self
68+
.mixnet_client
6569
.send(next_hop, sphinx_packet, packet_mode, false)
6670
.await
67-
.unwrap(); // if we're not waiting for response, we MUST get an Ok
71+
{
72+
debug!("failed to forward the packet - {}", err)
73+
}
6874
}
6975
}
7076
}

gateway/src/config/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_milli
4141
const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000);
4242
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
4343
const DEFAULT_CACHE_ENTRY_TTL: Duration = Duration::from_millis(30_000);
44+
const DEFAULT_MAXIMUM_RECONNECTION_ATTEMPTS: u32 = 20;
4445

4546
const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16;
4647
const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: u16 = 5;
@@ -444,6 +445,10 @@ impl Config {
444445
self.debug.initial_connection_timeout
445446
}
446447

448+
pub fn get_packet_forwarding_max_reconnections(&self) -> u32 {
449+
self.debug.maximum_reconnection_attempts
450+
}
451+
447452
pub fn get_message_retrieval_limit(&self) -> u16 {
448453
self.debug.message_retrieval_limit
449454
}
@@ -651,6 +656,10 @@ pub struct Debug {
651656
)]
652657
presence_sending_delay: Duration,
653658

659+
/// Maximum number of retries node is going to attempt to re-establish existing connection
660+
/// to another node when forwarding sphinx packets.
661+
maximum_reconnection_attempts: u32,
662+
654663
/// Length of filenames for new client messages.
655664
stored_messages_filename_length: u16,
656665

@@ -674,6 +683,7 @@ impl Default for Debug {
674683
packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF,
675684
initial_connection_timeout: DEFAULT_INITIAL_CONNECTION_TIMEOUT,
676685
presence_sending_delay: DEFAULT_PRESENCE_SENDING_DELAY,
686+
maximum_reconnection_attempts: DEFAULT_MAXIMUM_RECONNECTION_ATTEMPTS,
677687
stored_messages_filename_length: DEFAULT_STORED_MESSAGE_FILENAME_LENGTH,
678688
message_retrieval_limit: DEFAULT_MESSAGE_RETRIEVAL_LIMIT,
679689
cache_entry_ttl: DEFAULT_CACHE_ENTRY_TTL,

gateway/src/node/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl Gateway {
107107
self.config.get_packet_forwarding_initial_backoff(),
108108
self.config.get_packet_forwarding_maximum_backoff(),
109109
self.config.get_initial_connection_timeout(),
110+
self.config.get_packet_forwarding_max_reconnections(),
110111
);
111112

112113
tokio::spawn(async move { packet_forwarder.run().await });

0 commit comments

Comments
 (0)