Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/max retry #409

Merged
merged 3 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions common/client-libs/mixnet-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@ pub struct Config {
initial_reconnection_backoff: Duration,
maximum_reconnection_backoff: Duration,
initial_connection_timeout: Duration,
maximum_reconnection_attempts: u32,
}

impl Config {
pub fn new(
initial_reconnection_backoff: Duration,
maximum_reconnection_backoff: Duration,
initial_connection_timeout: Duration,
maximum_reconnection_attempts: u32,
) -> Self {
Config {
initial_reconnection_backoff,
maximum_reconnection_backoff,
initial_connection_timeout,
maximum_reconnection_attempts,
}
}
}
Expand All @@ -49,6 +52,7 @@ pub struct Client {
maximum_reconnection_backoff: Duration,
initial_reconnection_backoff: Duration,
initial_connection_timeout: Duration,
maximum_reconnection_attempts: u32,
}

impl Client {
Expand All @@ -58,23 +62,26 @@ impl Client {
initial_reconnection_backoff: config.initial_reconnection_backoff,
maximum_reconnection_backoff: config.maximum_reconnection_backoff,
initial_connection_timeout: config.initial_connection_timeout,
maximum_reconnection_attempts: config.maximum_reconnection_attempts,
}
}

async fn start_new_connection_manager(
&mut self,
address: SocketAddr,
) -> (ConnectionManagerSender, AbortHandle) {
let (sender, abort_handle) = ConnectionManager::new(
) -> Result<(ConnectionManagerSender, AbortHandle), io::Error> {
let conn_manager = ConnectionManager::new(
address,
self.initial_reconnection_backoff,
self.maximum_reconnection_backoff,
self.initial_connection_timeout,
self.maximum_reconnection_attempts,
)
.await
.spawn_abortable();
.await?;

(sender, abort_handle)
let (sender, abort_handle) = conn_manager.spawn_abortable();

Ok((sender, abort_handle))
}

// if wait_for_response is set to true, we will get information about any possible IO errors
Expand All @@ -97,7 +104,17 @@ impl Client {
);

let (new_manager_sender, abort_handle) =
self.start_new_connection_manager(socket_address).await;
match self.start_new_connection_manager(socket_address).await {
Ok(res) => res,
Err(err) => {
debug!(
"failed to establish initial connection to {} - {}",
socket_address, err
);
return Err(err);
}
};

self.connections_managers
.insert(socket_address, (new_manager_sender, abort_handle));
}
Expand All @@ -106,15 +123,25 @@ impl Client {

let framed_packet = FramedSphinxPacket::new(packet, packet_mode);

if wait_for_response {
let (res_tx, res_rx) = if wait_for_response {
let (res_tx, res_rx) = oneshot::channel();
manager
.0
.unbounded_send((framed_packet, Some(res_tx)))
.unwrap();
(Some(res_tx), Some(res_rx))
} else {
(None, None)
};

if let Err(err) = manager.0.unbounded_send((framed_packet, res_tx)) {
warn!(
"Connection manager to {} has failed - {}",
socket_address, err
);
self.connections_managers.remove(&socket_address);
return Err(io::Error::new(io::ErrorKind::BrokenPipe, err));
}

if let Some(res_rx) = res_rx {
res_rx.await.unwrap()
} else {
manager.0.unbounded_send((framed_packet, None)).unwrap();
Ok(())
}
}
Expand Down
66 changes: 41 additions & 25 deletions common/client-libs/mixnet-client/src/connection_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub(crate) struct ConnectionManager<'a> {

maximum_reconnection_backoff: Duration,
reconnection_backoff: Duration,
maximum_reconnection_attempts: u32,

state: ConnectionState<'a>,
}
Expand All @@ -62,48 +63,57 @@ impl<'a> ConnectionManager<'static> {
reconnection_backoff: Duration,
maximum_reconnection_backoff: Duration,
connection_timeout: Duration,
) -> ConnectionManager<'a> {
maximum_reconnection_attempts: u32,
) -> Result<ConnectionManager<'a>, io::Error> {
let (conn_tx, conn_rx) = mpsc::unbounded();

// the blocking call here is fine as initially we want to wait the timeout interval (at most) anyway:
let tcp_stream_res = std::net::TcpStream::connect_timeout(&address, connection_timeout);

// we MUST succeed in making initial connection. We don't want to end up in reconnection
// loop to something we have never managed to connect (and possibly never will)

let initial_state = match tcp_stream_res {
Ok(stream) => {
let tokio_stream = tokio::net::TcpStream::from_std(stream).unwrap();
debug!("managed to establish initial connection to {}", address);
ConnectionState::Writing(ConnectionWriter::new(tokio_stream))
}
Err(e) => {
warn!("failed to establish initial connection to {} within {:?} ({}). Going into reconnection mode", address, connection_timeout, e);
ConnectionState::Reconnecting(ConnectionReconnector::new(
address,
reconnection_backoff,
maximum_reconnection_backoff,
))
}
Err(err) => return Err(err),
};

ConnectionManager {
Ok(ConnectionManager {
conn_tx,
conn_rx,
address,
maximum_reconnection_backoff,
reconnection_backoff,
maximum_reconnection_attempts,
state: initial_state,
}
})
}

async fn run(mut self) {
while let Some(msg) = self.conn_rx.next().await {
let (framed_packet, res_ch) = msg;
let res = self.handle_new_packet(framed_packet).await;
if let Some(res_ch) = res_ch {
if let Err(e) = res_ch.send(res) {
error!(
"failed to send response on the channel to the caller! - {:?}",
e

match self.handle_new_packet(framed_packet).await {
None => {
warn!(
"We reached maximum number of attempts trying to reconnect to {}",
self.address
);
return;
}
Some(res) => {
if let Some(res_ch) = res_ch {
if let Err(e) = res_ch.send(res) {
error!(
"failed to send response on the channel to the caller! - {:?}",
e
);
}
}
}
}
}
Expand All @@ -122,24 +132,29 @@ impl<'a> ConnectionManager<'static> {
// Possible future TODO: `Framed<...>` is both a Sink and a Stream,
// so it is possible to read any responses we might receive (it is also duplex, so that could be
// done while writing packets themselves). But it'd require slight additions to `SphinxCodec`
async fn handle_new_packet(&mut self, packet: FramedSphinxPacket) -> io::Result<()> {
async fn handle_new_packet(&mut self, packet: FramedSphinxPacket) -> Option<io::Result<()>> {
// we don't do a match here as it's possible to transition from ConnectionState::Reconnecting to ConnectionState::Writing
// in this function call. And if that happens, we want to send the packet we have received.
if let ConnectionState::Reconnecting(conn_reconnector) = &mut self.state {
// do a single poll rather than await for future to completely resolve
let new_connection = match futures::poll(conn_reconnector).await {
Poll::Pending => {
debug!("The packet is getting dropped - there's nowhere to send it");
return Err(io::Error::new(
return Some(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"connection is broken - reconnection is in progress",
));
)));
}
Poll::Ready(conn) => conn,
};

debug!("Managed to reconnect to {}!", self.address);
self.state = ConnectionState::Writing(ConnectionWriter::new(new_connection));
match new_connection {
Ok(new_conn) => {
debug!("Managed to reconnect to {}!", self.address);
self.state = ConnectionState::Writing(ConnectionWriter::new(new_conn));
}
Err(_) => return None,
}
}

// we must be in writing state if we are here, either by being here from beginning or just
Expand All @@ -154,13 +169,14 @@ impl<'a> ConnectionManager<'static> {
self.address,
self.reconnection_backoff,
self.maximum_reconnection_backoff,
self.maximum_reconnection_attempts,
));
Err(io::Error::new(
Some(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"connection is broken - reconnection is in progress",
))
)))
} else {
Ok(())
Some(Ok(()))
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

pub(crate) struct MaximumReconnectionCountReached;

pub(crate) struct ConnectionReconnector<'a> {
address: SocketAddr,
connection: BoxFuture<'a, io::Result<tokio::net::TcpStream>>,

current_retry_attempt: u32,
maximum_reconnection_attempts: u32,

current_backoff_delay: tokio::time::Delay,
maximum_reconnection_backoff: Duration,
Expand All @@ -39,20 +42,22 @@ impl<'a> ConnectionReconnector<'a> {
address: SocketAddr,
initial_reconnection_backoff: Duration,
maximum_reconnection_backoff: Duration,
maximum_reconnection_attempts: u32,
) -> ConnectionReconnector<'a> {
ConnectionReconnector {
address,
connection: tokio::net::TcpStream::connect(address).boxed(),
current_backoff_delay: tokio::time::delay_for(Duration::new(0, 0)), // if we can re-establish connection on first try without any backoff that's perfect
current_retry_attempt: 0,
maximum_reconnection_attempts,
maximum_reconnection_backoff,
initial_reconnection_backoff,
}
}
}

impl<'a> Future for ConnectionReconnector<'a> {
type Output = tokio::net::TcpStream;
type Output = Result<tokio::net::TcpStream, MaximumReconnectionCountReached>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// see if we are still in exponential backoff
Expand All @@ -72,6 +77,11 @@ impl<'a> Future for ConnectionReconnector<'a> {
self.address, e, self.current_retry_attempt
);

// checked if we reached the maximum attempt count
if self.current_retry_attempt == self.maximum_reconnection_attempts {
return Poll::Ready(Err(MaximumReconnectionCountReached));
}

// we failed to re-establish connection - continue exponential backoff

// according to https://github.com/tokio-rs/tokio/issues/1953 there's an undocumented
Expand Down Expand Up @@ -102,7 +112,7 @@ impl<'a> Future for ConnectionReconnector<'a> {

Poll::Pending
}
Poll::Ready(Ok(conn)) => Poll::Ready(conn),
Poll::Ready(Ok(conn)) => Poll::Ready(Ok(conn)),
}
}
}
10 changes: 8 additions & 2 deletions common/client-libs/mixnet-client/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ impl PacketForwarder {
initial_reconnection_backoff: Duration,
maximum_reconnection_backoff: Duration,
initial_connection_timeout: Duration,
maximum_reconnection_attempts: u32,
) -> (PacketForwarder, MixForwardingSender) {
let client_config = Config::new(
initial_reconnection_backoff,
maximum_reconnection_backoff,
initial_connection_timeout,
maximum_reconnection_attempts,
);

let (packet_sender, packet_receiver) = mpsc::unbounded();
Expand All @@ -61,10 +63,14 @@ impl PacketForwarder {
let sphinx_packet = mix_packet.into_sphinx_packet();
// we don't care about responses, we just want to fire packets
// as quickly as possible
self.mixnet_client

if let Err(err) = self
.mixnet_client
.send(next_hop, sphinx_packet, packet_mode, false)
.await
.unwrap(); // if we're not waiting for response, we MUST get an Ok
{
debug!("failed to forward the packet - {}", err)
}
}
}
}
10 changes: 10 additions & 0 deletions gateway/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_milli
const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000);
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_CACHE_ENTRY_TTL: Duration = Duration::from_millis(30_000);
const DEFAULT_MAXIMUM_RECONNECTION_ATTEMPTS: u32 = 20;

const DEFAULT_STORED_MESSAGE_FILENAME_LENGTH: u16 = 16;
const DEFAULT_MESSAGE_RETRIEVAL_LIMIT: u16 = 5;
Expand Down Expand Up @@ -444,6 +445,10 @@ impl Config {
self.debug.initial_connection_timeout
}

pub fn get_packet_forwarding_max_reconnections(&self) -> u32 {
self.debug.maximum_reconnection_attempts
}

pub fn get_message_retrieval_limit(&self) -> u16 {
self.debug.message_retrieval_limit
}
Expand Down Expand Up @@ -651,6 +656,10 @@ pub struct Debug {
)]
presence_sending_delay: Duration,

/// Maximum number of retries node is going to attempt to re-establish existing connection
/// to another node when forwarding sphinx packets.
maximum_reconnection_attempts: u32,

/// Length of filenames for new client messages.
stored_messages_filename_length: u16,

Expand All @@ -674,6 +683,7 @@ impl Default for Debug {
packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF,
initial_connection_timeout: DEFAULT_INITIAL_CONNECTION_TIMEOUT,
presence_sending_delay: DEFAULT_PRESENCE_SENDING_DELAY,
maximum_reconnection_attempts: DEFAULT_MAXIMUM_RECONNECTION_ATTEMPTS,
stored_messages_filename_length: DEFAULT_STORED_MESSAGE_FILENAME_LENGTH,
message_retrieval_limit: DEFAULT_MESSAGE_RETRIEVAL_LIMIT,
cache_entry_ttl: DEFAULT_CACHE_ENTRY_TTL,
Expand Down
1 change: 1 addition & 0 deletions gateway/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl Gateway {
self.config.get_packet_forwarding_initial_backoff(),
self.config.get_packet_forwarding_maximum_backoff(),
self.config.get_initial_connection_timeout(),
self.config.get_packet_forwarding_max_reconnections(),
);

tokio::spawn(async move { packet_forwarder.run().await });
Expand Down
Loading