From 9f69cc92fef05915fc720fb63cf2dd58ee1c3f03 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 10:04:20 +0000 Subject: [PATCH 01/27] Moved MixTrafficController to separate module --- nym-client/src/client/mix_traffic.rs | 38 ++++++++++++++++++++ nym-client/src/{clients => client}/mod.rs | 43 +++-------------------- nym-client/src/commands/tcpsocket.rs | 2 +- nym-client/src/commands/websocket.rs | 2 +- nym-client/src/lib.rs | 2 +- nym-client/src/main.rs | 2 +- nym-client/src/persistence/pathfinder.rs | 2 +- nym-client/src/sockets/tcp.rs | 10 +++--- nym-client/src/sockets/ws.rs | 4 +-- 9 files changed, 55 insertions(+), 50 deletions(-) create mode 100644 nym-client/src/client/mix_traffic.rs rename nym-client/src/{clients => client}/mod.rs (89%) diff --git a/nym-client/src/client/mix_traffic.rs b/nym-client/src/client/mix_traffic.rs new file mode 100644 index 00000000000..ef436453119 --- /dev/null +++ b/nym-client/src/client/mix_traffic.rs @@ -0,0 +1,38 @@ +use futures::channel::mpsc; +use futures::StreamExt; +use log::*; +use sphinx::SphinxPacket; +use std::io; +use std::io::prelude::*; +use std::net::SocketAddr; + +pub(crate) struct MixMessage(SocketAddr, SphinxPacket); + +impl MixMessage { + pub(crate) fn new(address: SocketAddr, packet: SphinxPacket) -> Self { + MixMessage(address, packet) + } +} + +pub(crate) struct MixTrafficController; + +impl MixTrafficController { + // this was way more difficult to implement than what this code may suggest... + pub(crate) async fn run(mut rx: mpsc::UnboundedReceiver) { + let mix_client = mix_client::MixClient::new(); + while let Some(mix_message) = rx.next().await { + info!( + "[MIX TRAFFIC CONTROL] - got a mix_message for {:?}", + mix_message.0 + ); + let send_res = mix_client.send(mix_message.1, mix_message.0).await; + match send_res { + Ok(_) => { + print!("."); + io::stdout().flush().ok().expect("Could not flush stdout"); + } + Err(e) => error!("We failed to send the message :( - {:?}", e), + }; + } + } +} diff --git a/nym-client/src/clients/mod.rs b/nym-client/src/client/mod.rs similarity index 89% rename from nym-client/src/clients/mod.rs rename to nym-client/src/client/mod.rs index 06d3c79ff15..bfd9e7198dd 100644 --- a/nym-client/src/clients/mod.rs +++ b/nym-client/src/client/mod.rs @@ -1,4 +1,5 @@ use crate::built_info; +use crate::client::mix_traffic::{MixMessage, MixTrafficController}; use crate::sockets::tcp; use crate::sockets::ws; use crate::utils; @@ -11,54 +12,20 @@ use futures::{SinkExt, StreamExt}; use log::*; use sfw_provider_requests::AuthToken; use sphinx::route::{Destination, DestinationAddressBytes}; -use sphinx::SphinxPacket; -use std::io; -use std::io::prelude::*; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::runtime::Runtime; use topology::NymTopology; +mod mix_traffic; + const LOOP_COVER_AVERAGE_DELAY: f64 = 0.5; // seconds const MESSAGE_SENDING_AVERAGE_DELAY: f64 = 0.5; // seconds; const FETCH_MESSAGES_DELAY: f64 = 1.0; // seconds; -// provider-poller sends polls service provider; receives messages -// provider-poller sends (TX) to ReceivedBufferController (RX) -// ReceivedBufferController sends (TX) to ... ??Client?? -// outQueueController sends (TX) to TrafficStreamController (RX) -// TrafficStreamController sends messages to mixnet -// ... ??Client?? sends (TX) to outQueueController (RX) -// Loop cover traffic stream just sends messages to mixnet without any channel communication - -struct MixMessage(SocketAddr, SphinxPacket); - -struct MixTrafficController; - -impl MixTrafficController { - // this was way more difficult to implement than what this code may suggest... - async fn run(mut rx: mpsc::UnboundedReceiver) { - let mix_client = mix_client::MixClient::new(); - while let Some(mix_message) = rx.next().await { - info!( - "[MIX TRAFFIC CONTROL] - got a mix_message for {:?}", - mix_message.0 - ); - let send_res = mix_client.send(mix_message.1, mix_message.0).await; - match send_res { - Ok(_) => { - print!("."); - io::stdout().flush().ok().expect("Could not flush stdout"); - } - Err(e) => error!("We failed to send the message :( - {:?}", e), - }; - } - } -} - pub type BufferResponse = oneshot::Sender>>; struct ReceivedMessagesBuffer { @@ -163,7 +130,7 @@ impl NymClient { tokio::time::delay_for(delay_duration).await; let cover_message = utils::sphinx::loop_cover_message(our_info.address, our_info.identifier, &topology); - tx.send(MixMessage(cover_message.0, cover_message.1)) + tx.send(MixMessage::new(cover_message.0, cover_message.1)) .await .unwrap(); } @@ -197,7 +164,7 @@ impl NymClient { }; mix_tx - .send(MixMessage(traffic_message.0, traffic_message.1)) + .send(MixMessage::new(traffic_message.0, traffic_message.1)) .await .unwrap(); diff --git a/nym-client/src/commands/tcpsocket.rs b/nym-client/src/commands/tcpsocket.rs index 3ed3662c7b9..053981d446e 100644 --- a/nym-client/src/commands/tcpsocket.rs +++ b/nym-client/src/commands/tcpsocket.rs @@ -1,4 +1,4 @@ -use crate::clients::{NymClient, SocketType}; +use crate::client::{NymClient, SocketType}; use crate::persistence::pemstore; use clap::ArgMatches; diff --git a/nym-client/src/commands/websocket.rs b/nym-client/src/commands/websocket.rs index 99dbc1ecc62..0a66193a6fe 100644 --- a/nym-client/src/commands/websocket.rs +++ b/nym-client/src/commands/websocket.rs @@ -1,4 +1,4 @@ -use crate::clients::{NymClient, SocketType}; +use crate::client::{NymClient, SocketType}; use crate::persistence::pemstore; use clap::ArgMatches; diff --git a/nym-client/src/lib.rs b/nym-client/src/lib.rs index 5fa9078897a..a7760d6e362 100644 --- a/nym-client/src/lib.rs +++ b/nym-client/src/lib.rs @@ -1,7 +1,7 @@ #![recursion_limit = "256"] pub mod built_info; -pub mod clients; +pub mod client; pub mod commands; pub mod persistence; pub mod sockets; diff --git a/nym-client/src/main.rs b/nym-client/src/main.rs index 8455bc7287b..64daf4e5156 100644 --- a/nym-client/src/main.rs +++ b/nym-client/src/main.rs @@ -3,7 +3,7 @@ use clap::{App, Arg, ArgMatches, SubCommand}; pub mod built_info; -pub mod clients; +pub mod client; mod commands; mod persistence; mod sockets; diff --git a/nym-client/src/persistence/pathfinder.rs b/nym-client/src/persistence/pathfinder.rs index 08110b9ae85..3e5d82bb782 100644 --- a/nym-client/src/persistence/pathfinder.rs +++ b/nym-client/src/persistence/pathfinder.rs @@ -9,7 +9,7 @@ pub struct Pathfinder { impl Pathfinder { pub fn new(id: String) -> Pathfinder { let os_config_dir = dirs::config_dir().unwrap(); // grabs the OS default config dir - let config_dir = os_config_dir.join("nym").join("clients").join(id); + let config_dir = os_config_dir.join("nym").join("client").join(id); let private_mix_key = config_dir.join("private.pem"); let public_mix_key = config_dir.join("public.pem"); Pathfinder { diff --git a/nym-client/src/sockets/tcp.rs b/nym-client/src/sockets/tcp.rs index 66d5a3dbf6d..a6cf018becc 100644 --- a/nym-client/src/sockets/tcp.rs +++ b/nym-client/src/sockets/tcp.rs @@ -1,5 +1,5 @@ -use crate::clients::BufferResponse; -use crate::clients::InputMessage; +use crate::client::BufferResponse; +use crate::client::InputMessage; use directory_client::presence::Topology; use futures::channel::{mpsc, oneshot}; use futures::future::FutureExt; @@ -130,7 +130,7 @@ impl ClientRequest { } async fn handle_get_clients(topology: &Topology) -> ServerResponse { - println!("get clients handle"); + println!("get client handle"); let clients = topology .mix_provider_nodes .iter() @@ -189,8 +189,8 @@ fn encode_fetched_messages(messages: Vec>) -> Vec { } fn encode_list_of_clients(clients: Vec>) -> Vec { - println!("clients: {:?}", clients); - // we can just concat all clients since all of them got to be 32 bytes long + println!("client: {:?}", clients); + // we can just concat all client since all of them got to be 32 bytes long // (if not, then we have bigger problem somewhere up the line) // converts [[1,2,3],[4,5,6],...] into [1,2,3,4,5,6,...] diff --git a/nym-client/src/sockets/ws.rs b/nym-client/src/sockets/ws.rs index ab01fca0117..685964c8986 100644 --- a/nym-client/src/sockets/ws.rs +++ b/nym-client/src/sockets/ws.rs @@ -1,5 +1,5 @@ -use crate::clients::BufferResponse; -use crate::clients::InputMessage; +use crate::client::BufferResponse; +use crate::client::InputMessage; use directory_client::presence::Topology; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::{mpsc, oneshot}; From abd88c0c01498ce1cfee765da02d65345da732e9 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 10:07:22 +0000 Subject: [PATCH 02/27] Fixed accidentally broken pathfinder --- nym-client/src/persistence/pathfinder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nym-client/src/persistence/pathfinder.rs b/nym-client/src/persistence/pathfinder.rs index 3e5d82bb782..08110b9ae85 100644 --- a/nym-client/src/persistence/pathfinder.rs +++ b/nym-client/src/persistence/pathfinder.rs @@ -9,7 +9,7 @@ pub struct Pathfinder { impl Pathfinder { pub fn new(id: String) -> Pathfinder { let os_config_dir = dirs::config_dir().unwrap(); // grabs the OS default config dir - let config_dir = os_config_dir.join("nym").join("client").join(id); + let config_dir = os_config_dir.join("nym").join("clients").join(id); let private_mix_key = config_dir.join("private.pem"); let public_mix_key = config_dir.join("public.pem"); Pathfinder { From 6625f6f7fee562db1c92d467f035ebbba59aa576 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 10:16:43 +0000 Subject: [PATCH 03/27] Slightly decreases the aggressive logging in mix traffic controller --- nym-client/src/client/mix_traffic.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/nym-client/src/client/mix_traffic.rs b/nym-client/src/client/mix_traffic.rs index ef436453119..bfcd3ddfc6b 100644 --- a/nym-client/src/client/mix_traffic.rs +++ b/nym-client/src/client/mix_traffic.rs @@ -1,9 +1,7 @@ use futures::channel::mpsc; use futures::StreamExt; -use log::*; +use log::{debug, error, info, trace}; use sphinx::SphinxPacket; -use std::io; -use std::io::prelude::*; use std::net::SocketAddr; pub(crate) struct MixMessage(SocketAddr, SphinxPacket); @@ -17,21 +15,22 @@ impl MixMessage { pub(crate) struct MixTrafficController; impl MixTrafficController { - // this was way more difficult to implement than what this code may suggest... pub(crate) async fn run(mut rx: mpsc::UnboundedReceiver) { + info!("Mix Traffic Controller started!"); let mix_client = mix_client::MixClient::new(); while let Some(mix_message) = rx.next().await { - info!( - "[MIX TRAFFIC CONTROL] - got a mix_message for {:?}", - mix_message.0 - ); + debug!("Got a mix_message for {:?}", mix_message.0); let send_res = mix_client.send(mix_message.1, mix_message.0).await; match send_res { Ok(_) => { - print!("."); - io::stdout().flush().ok().expect("Could not flush stdout"); + trace!("sent a mix message"); } - Err(e) => error!("We failed to send the message :( - {:?}", e), + // TODO: should there be some kind of threshold of failed messages + // that if reached, the application blows? + Err(e) => error!( + "We failed to send the message to {} :( - {:?}", + mix_message.0, e + ), }; } } From e9e5c83ab358737f46c96b82678fbcbac2fb8ab2 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 10:33:26 +0000 Subject: [PATCH 04/27] Moved ReceivedMessagesBuffer to seperate module --- nym-client/src/client/mod.rs | 56 ++---------------------- nym-client/src/client/received_buffer.rs | 55 +++++++++++++++++++++++ nym-client/src/sockets/tcp.rs | 2 +- nym-client/src/sockets/ws.rs | 2 +- 4 files changed, 60 insertions(+), 55 deletions(-) create mode 100644 nym-client/src/client/received_buffer.rs diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index bfd9e7198dd..376e3a102c5 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -4,21 +4,21 @@ use crate::sockets::tcp; use crate::sockets::ws; use crate::utils; use directory_client::presence::Topology; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use futures::join; -use futures::lock::Mutex as FMutex; use futures::select; use futures::{SinkExt, StreamExt}; use log::*; use sfw_provider_requests::AuthToken; use sphinx::route::{Destination, DestinationAddressBytes}; use std::net::SocketAddr; -use std::sync::Arc; use std::time::Duration; use tokio::runtime::Runtime; use topology::NymTopology; +use crate::client::received_buffer::ReceivedMessagesBuffer; mod mix_traffic; +pub mod received_buffer; const LOOP_COVER_AVERAGE_DELAY: f64 = 0.5; // seconds @@ -26,56 +26,6 @@ const MESSAGE_SENDING_AVERAGE_DELAY: f64 = 0.5; // seconds; const FETCH_MESSAGES_DELAY: f64 = 1.0; // seconds; -pub type BufferResponse = oneshot::Sender>>; - -struct ReceivedMessagesBuffer { - messages: Vec>, -} - -impl ReceivedMessagesBuffer { - fn add_arc_futures_mutex(self) -> Arc> { - Arc::new(FMutex::new(self)) - } - - fn new() -> Self { - ReceivedMessagesBuffer { - messages: Vec::new(), - } - } - - async fn add_new_messages(buf: Arc>, msgs: Vec>) { - info!("Adding new messages to the buffer! {:?}", msgs); - let mut unlocked = buf.lock().await; - unlocked.messages.extend(msgs); - } - - async fn run_poller_input_controller( - buf: Arc>, - mut poller_rx: mpsc::UnboundedReceiver>>, - ) { - while let Some(new_messages) = poller_rx.next().await { - ReceivedMessagesBuffer::add_new_messages(buf.clone(), new_messages).await; - } - } - - async fn acquire_and_empty(buf: Arc>) -> Vec> { - let mut unlocked = buf.lock().await; - std::mem::replace(&mut unlocked.messages, Vec::new()) - } - - async fn run_query_output_controller( - buf: Arc>, - mut query_receiver: mpsc::UnboundedReceiver, - ) { - while let Some(request) = query_receiver.next().await { - let messages = ReceivedMessagesBuffer::acquire_and_empty(buf.clone()).await; - // if this fails, the whole application needs to blow - // because currently only this thread would fail - request.send(messages).unwrap(); - } - } -} - pub enum SocketType { TCP, WebSocket, diff --git a/nym-client/src/client/received_buffer.rs b/nym-client/src/client/received_buffer.rs new file mode 100644 index 00000000000..4c6d460290d --- /dev/null +++ b/nym-client/src/client/received_buffer.rs @@ -0,0 +1,55 @@ +use futures::channel::{mpsc, oneshot}; +use futures::lock::Mutex as FMutex; +use futures::StreamExt; +use log::info; +use std::sync::Arc; + +pub type BufferResponse = oneshot::Sender>>; + +pub(crate) struct ReceivedMessagesBuffer { + messages: Vec>, +} + +impl ReceivedMessagesBuffer { + pub(crate) fn new() -> Self { + ReceivedMessagesBuffer { + messages: Vec::new(), + } + } + + pub(crate) fn add_arc_futures_mutex(self) -> Arc> { + Arc::new(FMutex::new(self)) + } + + pub(crate) async fn add_new_messages(buf: Arc>, msgs: Vec>) { + info!("Adding new messages to the buffer! {:?}", msgs); + let mut unlocked = buf.lock().await; + unlocked.messages.extend(msgs); + } + + pub(crate) async fn run_poller_input_controller( + buf: Arc>, + mut poller_rx: mpsc::UnboundedReceiver>>, + ) { + while let Some(new_messages) = poller_rx.next().await { + ReceivedMessagesBuffer::add_new_messages(buf.clone(), new_messages).await; + } + } + + pub(crate) async fn acquire_and_empty(buf: Arc>) -> Vec> { + let mut unlocked = buf.lock().await; + std::mem::replace(&mut unlocked.messages, Vec::new()) + } + + pub(crate) async fn run_query_output_controller( + buf: Arc>, + mut query_receiver: mpsc::UnboundedReceiver, + ) { + while let Some(request) = query_receiver.next().await { + let messages = ReceivedMessagesBuffer::acquire_and_empty(buf.clone()).await; + // if this fails, the whole application needs to blow + // because currently only this thread would fail + request.send(messages).unwrap(); + } + } +} diff --git a/nym-client/src/sockets/tcp.rs b/nym-client/src/sockets/tcp.rs index a6cf018becc..c063c7e4b84 100644 --- a/nym-client/src/sockets/tcp.rs +++ b/nym-client/src/sockets/tcp.rs @@ -1,4 +1,4 @@ -use crate::client::BufferResponse; +use crate::client::received_buffer::BufferResponse; use crate::client::InputMessage; use directory_client::presence::Topology; use futures::channel::{mpsc, oneshot}; diff --git a/nym-client/src/sockets/ws.rs b/nym-client/src/sockets/ws.rs index 685964c8986..14aacdcb5d1 100644 --- a/nym-client/src/sockets/ws.rs +++ b/nym-client/src/sockets/ws.rs @@ -1,4 +1,4 @@ -use crate::client::BufferResponse; +use crate::client::received_buffer::BufferResponse; use crate::client::InputMessage; use directory_client::presence::Topology; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; From af6a8f5546a4e12eda05708dca61d6e6f5195063 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 10:49:56 +0000 Subject: [PATCH 05/27] Better logging for ibid. --- nym-client/src/client/received_buffer.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nym-client/src/client/received_buffer.rs b/nym-client/src/client/received_buffer.rs index 4c6d460290d..40808bf66eb 100644 --- a/nym-client/src/client/received_buffer.rs +++ b/nym-client/src/client/received_buffer.rs @@ -1,7 +1,7 @@ use futures::channel::{mpsc, oneshot}; use futures::lock::Mutex as FMutex; use futures::StreamExt; -use log::info; +use log::{error, info, trace}; use std::sync::Arc; pub type BufferResponse = oneshot::Sender>>; @@ -22,7 +22,7 @@ impl ReceivedMessagesBuffer { } pub(crate) async fn add_new_messages(buf: Arc>, msgs: Vec>) { - info!("Adding new messages to the buffer! {:?}", msgs); + trace!("Adding new messages to the buffer! {:?}", msgs); let mut unlocked = buf.lock().await; unlocked.messages.extend(msgs); } @@ -31,12 +31,14 @@ impl ReceivedMessagesBuffer { buf: Arc>, mut poller_rx: mpsc::UnboundedReceiver>>, ) { + info!("Started Received Messages Buffer Input Controller"); while let Some(new_messages) = poller_rx.next().await { ReceivedMessagesBuffer::add_new_messages(buf.clone(), new_messages).await; } } pub(crate) async fn acquire_and_empty(buf: Arc>) -> Vec> { + trace!("Emptying the buffer and returning all messages"); let mut unlocked = buf.lock().await; std::mem::replace(&mut unlocked.messages, Vec::new()) } @@ -45,6 +47,8 @@ impl ReceivedMessagesBuffer { buf: Arc>, mut query_receiver: mpsc::UnboundedReceiver, ) { + info!("Started Received Messages Buffer Output Controller"); + while let Some(request) = query_receiver.next().await { let messages = ReceivedMessagesBuffer::acquire_and_empty(buf.clone()).await; // if this fails, the whole application needs to blow From e4c7d211b5acea584a55a6fa60b33cbcb7b935c0 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 10:50:34 +0000 Subject: [PATCH 06/27] Better handling of when messages failed to be sent to the requester Now rather than causing a panic, they will simply be readded back to the buffer --- nym-client/src/client/received_buffer.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nym-client/src/client/received_buffer.rs b/nym-client/src/client/received_buffer.rs index 40808bf66eb..3dcb0627f78 100644 --- a/nym-client/src/client/received_buffer.rs +++ b/nym-client/src/client/received_buffer.rs @@ -51,9 +51,12 @@ impl ReceivedMessagesBuffer { while let Some(request) = query_receiver.next().await { let messages = ReceivedMessagesBuffer::acquire_and_empty(buf.clone()).await; - // if this fails, the whole application needs to blow - // because currently only this thread would fail - request.send(messages).unwrap(); + if let Err(failed_messages) = request.send(messages) { + error!( + "Failed to send the messages to the requester. Adding them back to the buffer" + ); + ReceivedMessagesBuffer::add_new_messages(buf.clone(), failed_messages).await; + } } } } From ea383871ab6690be230b0dd10dc893a545daf96f Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 12:02:21 +0000 Subject: [PATCH 07/27] ReceivedBuffer is now responsible for starting both controllers --- nym-client/src/client/mod.rs | 22 +++----- nym-client/src/client/received_buffer.rs | 67 +++++++++++++++++------- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index 376e3a102c5..0be475c82ef 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -1,5 +1,6 @@ use crate::built_info; use crate::client::mix_traffic::{MixMessage, MixTrafficController}; +use crate::client::received_buffer::ReceivedMessagesBuffer; use crate::sockets::tcp; use crate::sockets::ws; use crate::utils; @@ -15,7 +16,6 @@ use std::net::SocketAddr; use std::time::Duration; use tokio::runtime::Runtime; use topology::NymTopology; -use crate::client::received_buffer::ReceivedMessagesBuffer; mod mix_traffic; pub mod received_buffer; @@ -223,18 +223,10 @@ impl NymClient { let (received_messages_buffer_output_tx, received_messages_buffer_output_rx) = mpsc::unbounded(); - let received_messages_buffer = ReceivedMessagesBuffer::new().add_arc_futures_mutex(); - - let received_messages_buffer_input_controller_future = - rt.spawn(ReceivedMessagesBuffer::run_poller_input_controller( - received_messages_buffer.clone(), - poller_input_rx, - )); - let received_messages_buffer_output_controller_future = - rt.spawn(ReceivedMessagesBuffer::run_query_output_controller( - received_messages_buffer, - received_messages_buffer_output_rx, - )); + let received_messages_buffer_controllers_future = rt.spawn( + ReceivedMessagesBuffer::new() + .start_controllers(poller_input_rx, received_messages_buffer_output_rx), + ); let mix_traffic_future = rt.spawn(MixTrafficController::run(mix_rx)); let loop_cover_traffic_future = rt.spawn(NymClient::start_loop_cover_traffic_stream( @@ -279,8 +271,7 @@ impl NymClient { rt.block_on(async { let future_results = join!( - received_messages_buffer_input_controller_future, - received_messages_buffer_output_controller_future, + received_messages_buffer_controllers_future, mix_traffic_future, loop_cover_traffic_future, out_queue_control_future, @@ -293,7 +284,6 @@ impl NymClient { && future_results.2.is_ok() && future_results.3.is_ok() && future_results.4.is_ok() - && future_results.5.is_ok() ); }); diff --git a/nym-client/src/client/received_buffer.rs b/nym-client/src/client/received_buffer.rs index 3dcb0627f78..a6df580c9f0 100644 --- a/nym-client/src/client/received_buffer.rs +++ b/nym-client/src/client/received_buffer.rs @@ -7,56 +7,83 @@ use std::sync::Arc; pub type BufferResponse = oneshot::Sender>>; pub(crate) struct ReceivedMessagesBuffer { - messages: Vec>, + inner: Arc>, } impl ReceivedMessagesBuffer { pub(crate) fn new() -> Self { ReceivedMessagesBuffer { - messages: Vec::new(), + inner: Arc::new(FMutex::new(Inner::new())), } } - pub(crate) fn add_arc_futures_mutex(self) -> Arc> { - Arc::new(FMutex::new(self)) - } + pub(crate) async fn start_controllers( + self, + poller_rx: mpsc::UnboundedReceiver>>, // to receive new messages + query_receiver: mpsc::UnboundedReceiver, // to receive requests to acquire all stored messages + ) { + let input_controller_future = tokio::spawn(Self::run_poller_input_controller( + self.inner.clone(), + poller_rx, + )); + let output_controller_future = tokio::spawn(Self::run_query_output_controller( + self.inner, + query_receiver, + )); - pub(crate) async fn add_new_messages(buf: Arc>, msgs: Vec>) { - trace!("Adding new messages to the buffer! {:?}", msgs); - let mut unlocked = buf.lock().await; - unlocked.messages.extend(msgs); + futures::future::select(input_controller_future, output_controller_future).await; + panic!("One of the received buffer controllers failed!") } pub(crate) async fn run_poller_input_controller( - buf: Arc>, + buf: Arc>, mut poller_rx: mpsc::UnboundedReceiver>>, ) { info!("Started Received Messages Buffer Input Controller"); + while let Some(new_messages) = poller_rx.next().await { - ReceivedMessagesBuffer::add_new_messages(buf.clone(), new_messages).await; + Inner::add_new_messages(&*buf, new_messages).await; } } - pub(crate) async fn acquire_and_empty(buf: Arc>) -> Vec> { - trace!("Emptying the buffer and returning all messages"); - let mut unlocked = buf.lock().await; - std::mem::replace(&mut unlocked.messages, Vec::new()) - } - pub(crate) async fn run_query_output_controller( - buf: Arc>, + buf: Arc>, mut query_receiver: mpsc::UnboundedReceiver, ) { info!("Started Received Messages Buffer Output Controller"); while let Some(request) = query_receiver.next().await { - let messages = ReceivedMessagesBuffer::acquire_and_empty(buf.clone()).await; + let messages = Inner::acquire_and_empty(&*buf).await; if let Err(failed_messages) = request.send(messages) { error!( "Failed to send the messages to the requester. Adding them back to the buffer" ); - ReceivedMessagesBuffer::add_new_messages(buf.clone(), failed_messages).await; + Inner::add_new_messages(&*buf, failed_messages).await; } } } } + +pub(crate) struct Inner { + messages: Vec>, +} + +impl Inner { + fn new() -> Self { + Inner { + messages: Vec::new(), + } + } + + async fn add_new_messages(buf: &FMutex, msgs: Vec>) { + trace!("Adding new messages to the buffer! {:?}", msgs); + let mut unlocked = buf.lock().await; + unlocked.messages.extend(msgs); + } + + async fn acquire_and_empty(buf: &FMutex) -> Vec> { + trace!("Emptying the buffer and returning all messages"); + let mut unlocked = buf.lock().await; + std::mem::replace(&mut unlocked.messages, Vec::new()) + } +} From 2af18a85e551160d35342fbe71d7061892a28b90 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 12:26:10 +0000 Subject: [PATCH 08/27] Added "is_registered" method to provider client --- common/clients/provider-client/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/clients/provider-client/src/lib.rs b/common/clients/provider-client/src/lib.rs index 643770c1dc0..a3faf056170 100644 --- a/common/clients/provider-client/src/lib.rs +++ b/common/clients/provider-client/src/lib.rs @@ -111,4 +111,8 @@ impl ProviderClient { Ok(parsed_response.auth_token) } + + pub fn is_registered(&self) -> bool { + self.auth_token.is_some() + } } From 7889ab4778f0e8f7802843882485e470b210e5f9 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 13:07:51 +0000 Subject: [PATCH 09/27] Removed redundant logging statement --- common/clients/provider-client/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/common/clients/provider-client/src/lib.rs b/common/clients/provider-client/src/lib.rs index a3faf056170..5ed8a8a4f04 100644 --- a/common/clients/provider-client/src/lib.rs +++ b/common/clients/provider-client/src/lib.rs @@ -92,7 +92,6 @@ impl ProviderClient { let bytes = pull_request.to_bytes(); let response = self.send_request(bytes).await?; - info!("Received the following response: {:?}", response); let parsed_response = PullResponse::from_bytes(&response)?; Ok(parsed_response.messages) From 06b96bd2c1755cb8c85a0bb209c1e6ee62aad8ec Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 13:08:26 +0000 Subject: [PATCH 10/27] Moved provider-polling related functionalities to new module --- nym-client/src/client/mod.rs | 55 ++++----------- nym-client/src/client/provider_poller.rs | 89 ++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 41 deletions(-) create mode 100644 nym-client/src/client/provider_poller.rs diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index 0be475c82ef..ae69b29795a 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -18,8 +18,10 @@ use tokio::runtime::Runtime; use topology::NymTopology; mod mix_traffic; +mod provider_poller; pub mod received_buffer; +// TODO: all of those constants should probably be moved to config file const LOOP_COVER_AVERAGE_DELAY: f64 = 0.5; // seconds const MESSAGE_SENDING_AVERAGE_DELAY: f64 = 0.5; @@ -123,26 +125,6 @@ impl NymClient { } } - async fn start_provider_polling( - provider_client: provider_client::ProviderClient, - mut poller_tx: mpsc::UnboundedSender>>, - ) { - let loop_message = &utils::sphinx::LOOP_COVER_MESSAGE_PAYLOAD.to_vec(); - let dummy_message = &sfw_provider_requests::DUMMY_MESSAGE_CONTENT.to_vec(); - loop { - let delay_duration = Duration::from_secs_f64(FETCH_MESSAGES_DELAY); - tokio::time::delay_for(delay_duration).await; - info!("[FETCH MSG] - Polling provider..."); - let messages = provider_client.retrieve_messages().await.unwrap(); - let good_messages = messages - .into_iter() - .filter(|message| message != loop_message && message != dummy_message) - .collect(); - // if any of those fails, whole application should blow... - poller_tx.send(good_messages).await.unwrap(); - } - } - pub fn start(self) -> Result<(), Box> { let score_threshold = 0.0; println!("Starting nym client"); @@ -192,6 +174,12 @@ impl NymClient { panic!("No valid path exists in the topology"); } + // channels for intercomponent communication + let (mix_tx, mix_rx) = mpsc::unbounded(); + let (poller_input_tx, poller_input_rx) = mpsc::unbounded(); + let (received_messages_buffer_output_tx, received_messages_buffer_output_rx) = + mpsc::unbounded(); + // this is temporary and assumes there exists only a single provider. let provider_client_listener_address: SocketAddr = versioned_healthy_topology .get_mix_provider_nodes() @@ -199,29 +187,17 @@ impl NymClient { .expect("Could not get a provider from the supplied network topology, are you using the right directory server?") .client_listener; - let mut provider_client = provider_client::ProviderClient::new( + let mut provider_poller = provider_poller::ProviderPoller::new( + poller_input_tx, provider_client_listener_address, self.address, self.auth_token, ); // registration - rt.block_on(async { - match self.auth_token { - None => { - let auth_token = provider_client.register().await.unwrap(); - provider_client.update_token(auth_token); - info!("Obtained new token! - {:?}", auth_token); - } - Some(token) => println!("Already got the token! - {:?}", token), - } - }); - - // channels for intercomponent communication - let (mix_tx, mix_rx) = mpsc::unbounded(); - let (poller_input_tx, poller_input_rx) = mpsc::unbounded(); - let (received_messages_buffer_output_tx, received_messages_buffer_output_rx) = - mpsc::unbounded(); + if let Err(err) = rt.block_on(provider_poller.perform_initial_registration()) { + panic!("Failed to perform initial registration: {:?}", err); + }; let received_messages_buffer_controllers_future = rt.spawn( ReceivedMessagesBuffer::new() @@ -242,10 +218,7 @@ impl NymClient { versioned_healthy_topology.clone(), )); - let provider_polling_future = rt.spawn(NymClient::start_provider_polling( - provider_client, - poller_input_tx, - )); + let provider_polling_future = rt.spawn(provider_poller.start_provider_polling()); match self.socket_type { SocketType::WebSocket => { diff --git a/nym-client/src/client/provider_poller.rs b/nym-client/src/client/provider_poller.rs new file mode 100644 index 00000000000..f55b11cde00 --- /dev/null +++ b/nym-client/src/client/provider_poller.rs @@ -0,0 +1,89 @@ +use crate::client::FETCH_MESSAGES_DELAY; +use crate::utils; +use futures::channel::mpsc; +use log::{debug, error, info, trace, warn}; +use provider_client::ProviderClientError; +use sfw_provider_requests::AuthToken; +use sphinx::route::DestinationAddressBytes; +use std::net::SocketAddr; +use std::time::Duration; + +pub(crate) struct ProviderPoller { + provider_client: provider_client::ProviderClient, + poller_tx: mpsc::UnboundedSender>>, +} + +impl ProviderPoller { + pub(crate) fn new( + poller_tx: mpsc::UnboundedSender>>, + provider_client_listener_address: SocketAddr, + client_address: DestinationAddressBytes, + auth_token: Option, + ) -> Self { + ProviderPoller { + provider_client: provider_client::ProviderClient::new( + provider_client_listener_address, + client_address, + auth_token, + ), + poller_tx, + } + } + + // This method is only temporary until registration is moved to `client init` + pub(crate) async fn perform_initial_registration(&mut self) -> Result<(), ProviderClientError> { + debug!("performing initial provider registration"); + + if !self.provider_client.is_registered() { + let auth_token = match self.provider_client.register().await { + // in this particular case we can ignore this error + Err(ProviderClientError::ClientAlreadyRegisteredError) => return Ok(()), + Err(err) => return Err(err), + Ok(token) => token, + }; + + self.provider_client.update_token(auth_token) + } else { + warn!("did not perform registration - we were already registered") + } + + Ok(()) + } + + pub(crate) async fn start_provider_polling(mut self) { + info!("Starting provider poller"); + + let loop_message = &utils::sphinx::LOOP_COVER_MESSAGE_PAYLOAD.to_vec(); + let dummy_message = &sfw_provider_requests::DUMMY_MESSAGE_CONTENT.to_vec(); + + let delay_duration = Duration::from_secs_f64(FETCH_MESSAGES_DELAY); + loop { + debug!("Polling provider..."); + + let messages = match self.provider_client.retrieve_messages().await { + Ok(messages) => messages, + Err(err) => { + let extended_delay_duration = + Duration::from_secs_f64(FETCH_MESSAGES_DELAY * 10.0); + error!("Failed to query the provider for messages... Going to wait {:?} before retrying", extended_delay_duration); + tokio::time::delay_for(extended_delay_duration).await; + continue; + } + }; + + let good_messages = messages + .into_iter() + .filter(|message| message != loop_message && message != dummy_message) + .collect(); + trace!("Obtained the following messages: {:?}", good_messages); + + // if this one fails, there's no retrying because it means that either: + // - we run out of memory + // - the receiver channel is closed + // in either case there's no recovery and we can only panic + self.poller_tx.unbounded_send(good_messages).unwrap(); + + tokio::time::delay_for(delay_duration).await; + } + } +} From f1debb91c6ec6199825b616d21b041aa31480ae0 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 13:29:59 +0000 Subject: [PATCH 11/27] Obtaining initial compatible topology in separate method --- nym-client/src/client/mod.rs | 51 +++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index ae69b29795a..71d9928544d 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -12,6 +12,7 @@ use futures::{SinkExt, StreamExt}; use log::*; use sfw_provider_requests::AuthToken; use sphinx::route::{Destination, DestinationAddressBytes}; +use std::error::Error; use std::net::SocketAddr; use std::time::Duration; use tokio::runtime::Runtime; @@ -46,6 +47,13 @@ pub struct NymClient { socket_type: SocketType, } +// TODO: this will be moved into module responsible for refreshing topology +#[derive(Debug)] +enum TopologyError { + HealthCheckError, + NoValidPathsError, +} + #[derive(Debug)] pub struct InputMessage(pub Destination, pub Vec); @@ -125,12 +133,11 @@ impl NymClient { } } - pub fn start(self) -> Result<(), Box> { + // TODO: this will be moved into module responsible for refreshing topology + async fn get_compatible_topology(&self) -> Result { let score_threshold = 0.0; - println!("Starting nym client"); - let mut rt = Runtime::new()?; + info!("Trying to obtain valid, healthy, topology"); - println!("Trying to obtain valid, healthy, topology"); let full_topology = Topology::new(self.directory.clone()); // run a healthcheck to determine healthy-ish nodes: @@ -143,15 +150,11 @@ impl NymClient { num_test_packets: 2, }; let healthcheck = healthcheck::HealthChecker::new(healthcheck_config); - let healthcheck_result = rt.block_on(healthcheck.do_check()); + let healthcheck_result = healthcheck.do_check().await; let healthcheck_scores = match healthcheck_result { Err(err) => { - error!( - "failed to perform healthcheck to determine healthy topology - {:?}", - err - ); - return Err(Box::new(err)); + return Err(TopologyError::HealthCheckError); } Ok(scores) => scores, }; @@ -169,19 +172,31 @@ impl NymClient { // make sure you can still send a packet through the network: if !versioned_healthy_topology.can_construct_path_through() { - error!("No valid path exists in the topology"); - // TODO: replace panic with proper return type - panic!("No valid path exists in the topology"); + return Err(TopologyError::NoValidPathsError); } + Ok(versioned_healthy_topology) + } + + pub fn start(self) -> Result<(), Box> { + info!("Starting nym client"); + let mut rt = Runtime::new()?; + // channels for intercomponent communication let (mix_tx, mix_rx) = mpsc::unbounded(); let (poller_input_tx, poller_input_rx) = mpsc::unbounded(); let (received_messages_buffer_output_tx, received_messages_buffer_output_rx) = mpsc::unbounded(); + let initial_topology = match rt.block_on(self.get_compatible_topology()) { + Ok(topology) => topology, + Err(err) => { + panic!("Failed to obtain initial network topology: {:?}", err); + } + }; + // this is temporary and assumes there exists only a single provider. - let provider_client_listener_address: SocketAddr = versioned_healthy_topology + let provider_client_listener_address: SocketAddr = initial_topology .get_mix_provider_nodes() .first() .expect("Could not get a provider from the supplied network topology, are you using the right directory server?") @@ -208,14 +223,14 @@ impl NymClient { let loop_cover_traffic_future = rt.spawn(NymClient::start_loop_cover_traffic_stream( mix_tx.clone(), Destination::new(self.address, Default::default()), - versioned_healthy_topology.clone(), + initial_topology.clone(), )); let out_queue_control_future = rt.spawn(NymClient::control_out_queue( mix_tx, self.input_rx, Destination::new(self.address, Default::default()), - versioned_healthy_topology.clone(), + initial_topology.clone(), )); let provider_polling_future = rt.spawn(provider_poller.start_provider_polling()); @@ -227,7 +242,7 @@ impl NymClient { self.input_tx, received_messages_buffer_output_tx, self.address, - versioned_healthy_topology, + initial_topology, )); } SocketType::TCP => { @@ -236,7 +251,7 @@ impl NymClient { self.input_tx, received_messages_buffer_output_tx, self.address, - versioned_healthy_topology, + initial_topology, )); } SocketType::None => (), From a57708d9362c552964003504f5bbeb632dd1e4dd Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 14:21:41 +0000 Subject: [PATCH 12/27] Separate module for cover traffic stream --- nym-client/src/client/cover_traffic_stream.rs | 33 +++++++++++++++++++ nym-client/src/client/mod.rs | 31 ++++------------- 2 files changed, 40 insertions(+), 24 deletions(-) create mode 100644 nym-client/src/client/cover_traffic_stream.rs diff --git a/nym-client/src/client/cover_traffic_stream.rs b/nym-client/src/client/cover_traffic_stream.rs new file mode 100644 index 00000000000..3405cc2cf9e --- /dev/null +++ b/nym-client/src/client/cover_traffic_stream.rs @@ -0,0 +1,33 @@ +use crate::client::mix_traffic::MixMessage; +use crate::client::LOOP_COVER_AVERAGE_DELAY; +use crate::utils; +use futures::channel::mpsc; +use log::{info, trace}; +use sphinx::route::Destination; +use std::time::Duration; +use topology::NymTopology; + +pub(crate) async fn start_loop_cover_traffic_stream( + tx: mpsc::UnboundedSender, + our_info: Destination, + topology: T, +) where + T: NymTopology, +{ + info!("Starting loop cover traffic stream"); + loop { + trace!("next cover message!"); + let delay = utils::poisson::sample(LOOP_COVER_AVERAGE_DELAY); + let delay_duration = Duration::from_secs_f64(delay); + tokio::time::delay_for(delay_duration).await; + let cover_message = + utils::sphinx::loop_cover_message(our_info.address, our_info.identifier, &topology); + + // if this one fails, there's no retrying because it means that either: + // - we run out of memory + // - the receiver channel is closed + // in either case there's no recovery and we can only panic + tx.unbounded_send(MixMessage::new(cover_message.0, cover_message.1)) + .unwrap(); + } +} diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index 71d9928544d..d992e0e110f 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -12,12 +12,12 @@ use futures::{SinkExt, StreamExt}; use log::*; use sfw_provider_requests::AuthToken; use sphinx::route::{Destination, DestinationAddressBytes}; -use std::error::Error; use std::net::SocketAddr; use std::time::Duration; use tokio::runtime::Runtime; use topology::NymTopology; +mod cover_traffic_stream; mod mix_traffic; mod provider_poller; pub mod received_buffer; @@ -78,24 +78,6 @@ impl NymClient { } } - async fn start_loop_cover_traffic_stream( - mut tx: mpsc::UnboundedSender, - our_info: Destination, - topology: Topology, - ) { - loop { - info!("[LOOP COVER TRAFFIC STREAM] - next cover message!"); - let delay = utils::poisson::sample(LOOP_COVER_AVERAGE_DELAY); - let delay_duration = Duration::from_secs_f64(delay); - tokio::time::delay_for(delay_duration).await; - let cover_message = - utils::sphinx::loop_cover_message(our_info.address, our_info.identifier, &topology); - tx.send(MixMessage::new(cover_message.0, cover_message.1)) - .await - .unwrap(); - } - } - async fn control_out_queue( mut mix_tx: mpsc::UnboundedSender, mut input_rx: mpsc::UnboundedReceiver, @@ -220,11 +202,12 @@ impl NymClient { ); let mix_traffic_future = rt.spawn(MixTrafficController::run(mix_rx)); - let loop_cover_traffic_future = rt.spawn(NymClient::start_loop_cover_traffic_stream( - mix_tx.clone(), - Destination::new(self.address, Default::default()), - initial_topology.clone(), - )); + let loop_cover_traffic_future = + rt.spawn(cover_traffic_stream::start_loop_cover_traffic_stream( + mix_tx.clone(), + Destination::new(self.address, Default::default()), + initial_topology.clone(), + )); let out_queue_control_future = rt.spawn(NymClient::control_out_queue( mix_tx, From fe48c11cd6ffe157e66d43e22f9afad78958212c Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 14:29:56 +0000 Subject: [PATCH 13/27] Moved real traffic controller to separate module --- nym-client/src/client/mod.rs | 50 +++---------------- nym-client/src/client/real_traffic_stream.rs | 51 ++++++++++++++++++++ 2 files changed, 57 insertions(+), 44 deletions(-) create mode 100644 nym-client/src/client/real_traffic_stream.rs diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index d992e0e110f..3ee95debd7a 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -1,25 +1,22 @@ use crate::built_info; -use crate::client::mix_traffic::{MixMessage, MixTrafficController}; +use crate::client::mix_traffic::MixTrafficController; use crate::client::received_buffer::ReceivedMessagesBuffer; use crate::sockets::tcp; use crate::sockets::ws; -use crate::utils; use directory_client::presence::Topology; use futures::channel::mpsc; use futures::join; -use futures::select; -use futures::{SinkExt, StreamExt}; use log::*; use sfw_provider_requests::AuthToken; use sphinx::route::{Destination, DestinationAddressBytes}; use std::net::SocketAddr; -use std::time::Duration; use tokio::runtime::Runtime; use topology::NymTopology; mod cover_traffic_stream; mod mix_traffic; mod provider_poller; +mod real_traffic_stream; pub mod received_buffer; // TODO: all of those constants should probably be moved to config file @@ -38,8 +35,10 @@ pub enum SocketType { pub struct NymClient { // to be replaced by something else I guess address: DestinationAddressBytes, - pub input_tx: mpsc::UnboundedSender, + // to be used by "send" function or socket, etc + pub input_tx: mpsc::UnboundedSender, + input_rx: mpsc::UnboundedReceiver, socket_listening_address: SocketAddr, directory: String, @@ -78,43 +77,6 @@ impl NymClient { } } - async fn control_out_queue( - mut mix_tx: mpsc::UnboundedSender, - mut input_rx: mpsc::UnboundedReceiver, - our_info: Destination, - topology: Topology, - ) { - loop { - info!("[OUT QUEUE] here I will be sending real traffic (or loop cover if nothing is available)"); - // TODO: consider replacing select macro with our own proper future definition with polling - let traffic_message = select! { - real_message = input_rx.next() => { - info!("[OUT QUEUE] - we got a real message!"); - if real_message.is_none() { - error!("Unexpected 'None' real message!"); - std::process::exit(1); - } - let real_message = real_message.unwrap(); - println!("real: {:?}", real_message); - utils::sphinx::encapsulate_message(real_message.0, real_message.1, &topology) - }, - - default => { - info!("[OUT QUEUE] - no real message - going to send extra loop cover"); - utils::sphinx::loop_cover_message(our_info.address, our_info.identifier, &topology) - } - }; - - mix_tx - .send(MixMessage::new(traffic_message.0, traffic_message.1)) - .await - .unwrap(); - - let delay_duration = Duration::from_secs_f64(MESSAGE_SENDING_AVERAGE_DELAY); - tokio::time::delay_for(delay_duration).await; - } - } - // TODO: this will be moved into module responsible for refreshing topology async fn get_compatible_topology(&self) -> Result { let score_threshold = 0.0; @@ -209,7 +171,7 @@ impl NymClient { initial_topology.clone(), )); - let out_queue_control_future = rt.spawn(NymClient::control_out_queue( + let out_queue_control_future = rt.spawn(real_traffic_stream::control_out_queue( mix_tx, self.input_rx, Destination::new(self.address, Default::default()), diff --git a/nym-client/src/client/real_traffic_stream.rs b/nym-client/src/client/real_traffic_stream.rs new file mode 100644 index 00000000000..fa1d3efdc9b --- /dev/null +++ b/nym-client/src/client/real_traffic_stream.rs @@ -0,0 +1,51 @@ +use crate::client::mix_traffic::MixMessage; +use crate::client::{InputMessage, MESSAGE_SENDING_AVERAGE_DELAY}; +use crate::utils; +use futures::channel::mpsc; +use futures::{select, StreamExt}; +use log::{debug, error, info, trace}; +use sphinx::route::Destination; +use std::time::Duration; +use topology::NymTopology; + +pub(crate) async fn control_out_queue( + mix_tx: mpsc::UnboundedSender, + mut input_rx: mpsc::UnboundedReceiver, + our_info: Destination, + topology: T, +) where + T: NymTopology, +{ + info!("Starting out queue controller where real traffic (or loop cover if nothing is available) will be sent"); + loop { + // TODO: consider replacing select macro with our own proper future definition with polling + let traffic_message = select! { + real_message = input_rx.next() => { + debug!("we got a real message!"); + if real_message.is_none() { + error!("Unexpected 'None' real message!"); + std::process::exit(1); + } + let real_message = real_message.unwrap(); + trace!("real message: {:?}", real_message); + utils::sphinx::encapsulate_message(real_message.0, real_message.1, &topology) + }, + + default => { + debug!("no real message - going to send extra loop cover"); + utils::sphinx::loop_cover_message(our_info.address, our_info.identifier, &topology) + } + }; + + // if this one fails, there's no retrying because it means that either: + // - we run out of memory + // - the receiver channel is closed + // in either case there's no recovery and we can only panic + mix_tx + .unbounded_send(MixMessage::new(traffic_message.0, traffic_message.1)) + .unwrap(); + + let delay_duration = Duration::from_secs_f64(MESSAGE_SENDING_AVERAGE_DELAY); + tokio::time::delay_for(delay_duration).await; + } +} From ee812a86c6c34fda62ad9f77b1a6ceb6c02883aa Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 14:30:17 +0000 Subject: [PATCH 14/27] Moved definition of extended duration to before the loop --- nym-client/src/client/provider_poller.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nym-client/src/client/provider_poller.rs b/nym-client/src/client/provider_poller.rs index f55b11cde00..20d88a049dd 100644 --- a/nym-client/src/client/provider_poller.rs +++ b/nym-client/src/client/provider_poller.rs @@ -57,18 +57,18 @@ impl ProviderPoller { let dummy_message = &sfw_provider_requests::DUMMY_MESSAGE_CONTENT.to_vec(); let delay_duration = Duration::from_secs_f64(FETCH_MESSAGES_DELAY); + let extended_delay_duration = Duration::from_secs_f64(FETCH_MESSAGES_DELAY * 10.0); + loop { debug!("Polling provider..."); let messages = match self.provider_client.retrieve_messages().await { - Ok(messages) => messages, Err(err) => { - let extended_delay_duration = - Duration::from_secs_f64(FETCH_MESSAGES_DELAY * 10.0); error!("Failed to query the provider for messages... Going to wait {:?} before retrying", extended_delay_duration); tokio::time::delay_for(extended_delay_duration).await; continue; } + Ok(messages) => messages, }; let good_messages = messages From 641dc78903a6cb9a83d6159c0cf7060ec413e7b3 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:07:21 +0000 Subject: [PATCH 15/27] Removed too aggressive logging --- common/clients/mix-client/src/lib.rs | 3 +-- common/clients/provider-client/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/common/clients/mix-client/src/lib.rs b/common/clients/mix-client/src/lib.rs index 431623178ae..bbdfa9bc0b0 100644 --- a/common/clients/mix-client/src/lib.rs +++ b/common/clients/mix-client/src/lib.rs @@ -17,8 +17,7 @@ impl MixClient { mix_addr: SocketAddr, ) -> Result<(), Box> { let bytes = packet.to_bytes(); - - info!("socket addr: {:?}", mix_addr); + debug!("Sending to the following address: {:?}", mix_addr); let mut stream = tokio::net::TcpStream::connect(mix_addr).await?; stream.write_all(&bytes[..]).await?; diff --git a/common/clients/provider-client/src/lib.rs b/common/clients/provider-client/src/lib.rs index 5ed8a8a4f04..1200a18fcdd 100644 --- a/common/clients/provider-client/src/lib.rs +++ b/common/clients/provider-client/src/lib.rs @@ -65,7 +65,7 @@ impl ProviderClient { pub async fn send_request(&self, bytes: Vec) -> Result, ProviderClientError> { let mut socket = tokio::net::TcpStream::connect(self.provider_network_address).await?; - info!("keep alive: {:?}", socket.keepalive()); + socket.set_keepalive(Some(Duration::from_secs(2))).unwrap(); socket.write_all(&bytes[..]).await?; if let Err(_e) = socket.shutdown(Shutdown::Write) { From c92177402d53b8f6832ba16e2a0679e9b4f0166f Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:08:19 +0000 Subject: [PATCH 16/27] Implemented OutQueueControl as a proper Stream to get rid of the select! macro --- nym-client/src/client/real_traffic_stream.rs | 116 +++++++++++++------ 1 file changed, 80 insertions(+), 36 deletions(-) diff --git a/nym-client/src/client/real_traffic_stream.rs b/nym-client/src/client/real_traffic_stream.rs index fa1d3efdc9b..cc00ea7ef00 100644 --- a/nym-client/src/client/real_traffic_stream.rs +++ b/nym-client/src/client/real_traffic_stream.rs @@ -1,51 +1,95 @@ use crate::client::mix_traffic::MixMessage; use crate::client::{InputMessage, MESSAGE_SENDING_AVERAGE_DELAY}; use crate::utils; +use directory_client::presence::Topology; use futures::channel::mpsc; -use futures::{select, StreamExt}; -use log::{debug, error, info, trace}; +use futures::task::{Context, Poll}; +use futures::{select, Stream, StreamExt}; +use log::{debug, error, info, trace, warn}; use sphinx::route::Destination; +use sphinx::SphinxPacket; +use std::net::SocketAddr; +use std::pin::Pin; use std::time::Duration; +use tokio::time; use topology::NymTopology; -pub(crate) async fn control_out_queue( +pub(crate) struct OutQueueControl { + interval: time::Interval, mix_tx: mpsc::UnboundedSender, - mut input_rx: mpsc::UnboundedReceiver, + input_rx: mpsc::UnboundedReceiver, our_info: Destination, - topology: T, -) where - T: NymTopology, -{ - info!("Starting out queue controller where real traffic (or loop cover if nothing is available) will be sent"); - loop { - // TODO: consider replacing select macro with our own proper future definition with polling - let traffic_message = select! { - real_message = input_rx.next() => { - debug!("we got a real message!"); - if real_message.is_none() { - error!("Unexpected 'None' real message!"); - std::process::exit(1); - } - let real_message = real_message.unwrap(); - trace!("real message: {:?}", real_message); - utils::sphinx::encapsulate_message(real_message.0, real_message.1, &topology) - }, - - default => { - debug!("no real message - going to send extra loop cover"); - utils::sphinx::loop_cover_message(our_info.address, our_info.identifier, &topology) - } + + // due to pinning, DerefMut trait, futures, etc its way easier to + // just have concrete implementation here rather than generic NymTopology + // considering that it will be replaced with refreshing topology within few days anyway + topology: Topology, +} + +impl Stream for OutQueueControl { + type Item = (SocketAddr, SphinxPacket); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // it is not yet time to return a message + if Stream::poll_next(Pin::new(&mut self.interval), cx).is_pending() { + return Poll::Pending; }; - // if this one fails, there's no retrying because it means that either: - // - we run out of memory - // - the receiver channel is closed - // in either case there's no recovery and we can only panic - mix_tx - .unbounded_send(MixMessage::new(traffic_message.0, traffic_message.1)) - .unwrap(); + match Stream::poll_next(Pin::new(&mut self.input_rx), cx) { + // in the case our real message channel stream was closed, we should also indicate we are closed + // (and whoever is using the stream should panic) + Poll::Ready(None) => Poll::Ready(None), + + // if there's an actual message - return it + Poll::Ready(Some(real_message)) => { + trace!("real message"); + Poll::Ready(Some(utils::sphinx::encapsulate_message( + real_message.0, + real_message.1, + &self.topology, + ))) + } + + // otherwise construct a dummy one + _ => { + trace!("loop cover message"); + Poll::Ready(Some(utils::sphinx::loop_cover_message( + self.our_info.address, + self.our_info.identifier, + &self.topology, + ))) + } + } + } +} + +impl OutQueueControl { + pub(crate) fn new( + mix_tx: mpsc::UnboundedSender, + input_rx: mpsc::UnboundedReceiver, + our_info: Destination, + topology: Topology, + ) -> Self { + OutQueueControl { + interval: time::interval(Duration::from_secs_f64(MESSAGE_SENDING_AVERAGE_DELAY)), + mix_tx, + input_rx, + our_info, + topology, + } + } - let delay_duration = Duration::from_secs_f64(MESSAGE_SENDING_AVERAGE_DELAY); - tokio::time::delay_for(delay_duration).await; + pub(crate) async fn run_out_queue_control(mut self) { + info!("starting out queue controller"); + while let Some(next_message) = self.next().await { + debug!("created new message"); + // if this one fails, there's no retrying because it means that either: + // - we run out of memory + // - the receiver channel is closed + // in either case there's no recovery and we can only panic + self.mix_tx + .unbounded_send(MixMessage::new(next_message.0, next_message.1)) + .unwrap(); + } } } From 73438b672a8fd2da72f2282ddbdcc68fa91cb8e3 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:09:12 +0000 Subject: [PATCH 17/27] Change in how OutQueueControl is spawned - not entirely sure at this point why its required --- nym-client/src/client/mod.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index 3ee95debd7a..786c36e7bc0 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -171,13 +171,20 @@ impl NymClient { initial_topology.clone(), )); - let out_queue_control_future = rt.spawn(real_traffic_stream::control_out_queue( - mix_tx, - self.input_rx, - Destination::new(self.address, Default::default()), - initial_topology.clone(), - )); - + let topology_clone = initial_topology.clone(); + let self_address = self.address; + let input_rx = self.input_rx; + + let out_queue_control_future = rt.spawn(async move { + real_traffic_stream::OutQueueControl::new( + mix_tx, + input_rx, + Destination::new(self_address, Default::default()), + topology_clone, + ) + .run_out_queue_control() + .await + }); let provider_polling_future = rt.spawn(provider_poller.start_provider_polling()); match self.socket_type { From 2e817a9a6b401409bcf88803cdbfb0560ef686b2 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:09:28 +0000 Subject: [PATCH 18/27] Comments explaining purposes of different futures --- nym-client/src/client/mod.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index 786c36e7bc0..b02ba0bd0b1 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -126,12 +126,13 @@ impl NymClient { info!("Starting nym client"); let mut rt = Runtime::new()?; - // channels for intercomponent communication + // channels for inter-component communication let (mix_tx, mix_rx) = mpsc::unbounded(); let (poller_input_tx, poller_input_rx) = mpsc::unbounded(); let (received_messages_buffer_output_tx, received_messages_buffer_output_rx) = mpsc::unbounded(); + // get initial topology; already filtered by health and version let initial_topology = match rt.block_on(self.get_compatible_topology()) { Ok(topology) => topology, Err(err) => { @@ -158,12 +159,20 @@ impl NymClient { panic!("Failed to perform initial registration: {:?}", err); }; + // setup all of futures for the components running on the client + + // buffer controlling all messages fetched from provider + // required so that other components would be able to use them (say the websocket) let received_messages_buffer_controllers_future = rt.spawn( ReceivedMessagesBuffer::new() .start_controllers(poller_input_rx, received_messages_buffer_output_rx), ); + // controller for sending sphinx packets to mixnet (either real traffic or cover traffic) let mix_traffic_future = rt.spawn(MixTrafficController::run(mix_rx)); + + // future constantly pumping loop cover traffic at some specified average rate + // the pumped traffic goes to the MixTrafficController let loop_cover_traffic_future = rt.spawn(cover_traffic_stream::start_loop_cover_traffic_stream( mix_tx.clone(), @@ -171,10 +180,15 @@ impl NymClient { initial_topology.clone(), )); + // cloning arguments required by OutQueueControl; required due to move let topology_clone = initial_topology.clone(); let self_address = self.address; let input_rx = self.input_rx; + // future constantly pumping traffic at some specified average rate + // if a real message is available on 'input_rx' that might have been received from say + // the websocket, the real message is used, otherwise a loop cover message is generated + // the pumped traffic goes to the MixTrafficController let out_queue_control_future = rt.spawn(async move { real_traffic_stream::OutQueueControl::new( mix_tx, @@ -185,8 +199,13 @@ impl NymClient { .run_out_queue_control() .await }); + + // future constantly trying to fetch any received messages from the provider + // the received messages are sent to ReceivedMessagesBuffer to be available to rest of the system let provider_polling_future = rt.spawn(provider_poller.start_provider_polling()); + // a temporary workaround for starting socket listener of specified type + // in the future the actual socket handler should start THIS client instead match self.socket_type { SocketType::WebSocket => { rt.spawn(ws::start_websocket( From 551d0eb161e59494bb109c2db4591841c8eecfdb Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:16:22 +0000 Subject: [PATCH 19/27] More comments regarding uses of specific channels --- nym-client/src/client/mod.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index b02ba0bd0b1..d443d323d1e 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -127,8 +127,19 @@ impl NymClient { let mut rt = Runtime::new()?; // channels for inter-component communication + + // mix_tx is the transmitter for any component generating sphinx packets that are to be sent to the mixnet + // they are used by cover traffic stream and real traffic stream + // mix_rx is the receiver used by MixTrafficController that sends the actual traffic let (mix_tx, mix_rx) = mpsc::unbounded(); + + // poller_input_tx is the transmitter of messages fetched from the provider - used by ProviderPoller + // poller_input_rx is the receiver for said messages - used by ReceivedMessagesBuffer let (poller_input_tx, poller_input_rx) = mpsc::unbounded(); + + // received_messages_buffer_output_tx is the transmitter for *REQUESTS* for messages contained in ReceivedMessagesBuffer - used by sockets + // the requests contain a oneshot channel to send a reply on + // received_messages_buffer_output_rx is the received for the said requests - used by ReceivedMessagesBuffer let (received_messages_buffer_output_tx, received_messages_buffer_output_rx) = mpsc::unbounded(); From ac988c62e99d91b394ce12f4e33dd0bfc752033d Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:21:02 +0000 Subject: [PATCH 20/27] Removed 'toml' from client-persistence as it will be module-specific --- nym-client/src/persistence/toml.rs | 1 - 1 file changed, 1 deletion(-) delete mode 100644 nym-client/src/persistence/toml.rs diff --git a/nym-client/src/persistence/toml.rs b/nym-client/src/persistence/toml.rs deleted file mode 100644 index f4f5578ad0d..00000000000 --- a/nym-client/src/persistence/toml.rs +++ /dev/null @@ -1 +0,0 @@ -// TODO: we can put all the TOML config templating code in here once we get to that. From 55e473d04ed8f4d109dc256b930653cdad60b365 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:46:34 +0000 Subject: [PATCH 21/27] Moved Pemstore as separate crate + defined pathfinder as trait --- Cargo.lock | 9 ++++++++ Cargo.toml | 1 + common/pemstore/Cargo.toml | 13 +++++++++++ .../mod.rs => common/pemstore/src/lib.rs | 0 common/pemstore/src/pathfinder.rs | 15 ++++++++++++ .../pemstore/src}/pemstore.rs | 19 ++++----------- nym-client/Cargo.toml | 1 + nym-client/src/commands/init.rs | 6 ++--- nym-client/src/commands/tcpsocket.rs | 9 ++++---- nym-client/src/commands/websocket.rs | 10 ++++---- nym-client/src/config/mod.rs | 1 + nym-client/src/config/persistance/mod.rs | 1 + .../persistance}/pathfinder.rs | 23 +++++++++++++++---- nym-client/src/main.rs | 2 +- 14 files changed, 80 insertions(+), 30 deletions(-) create mode 100644 common/pemstore/Cargo.toml rename nym-client/src/persistence/mod.rs => common/pemstore/src/lib.rs (100%) create mode 100644 common/pemstore/src/pathfinder.rs rename {nym-client/src/persistence => common/pemstore/src}/pemstore.rs (84%) create mode 100644 nym-client/src/config/mod.rs create mode 100644 nym-client/src/config/persistance/mod.rs rename nym-client/src/{persistence => config/persistance}/pathfinder.rs (52%) diff --git a/Cargo.lock b/Cargo.lock index 6aa1d181a49..3e9d9da323a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1356,6 +1356,7 @@ dependencies = [ "log", "mix-client", "pem", + "pemstore", "provider-client", "rand 0.7.2", "rand_distr", @@ -1502,6 +1503,14 @@ dependencies = [ "regex", ] +[[package]] +name = "pemstore" +version = "0.1.0" +dependencies = [ + "crypto", + "pem", +] + [[package]] name = "percent-encoding" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index d03d085fa8b..e233914c0dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "common/addressing", "common/crypto", "common/healthcheck", + "common/pemstore", "common/topology", "mixnode", "nym-client", diff --git a/common/pemstore/Cargo.toml b/common/pemstore/Cargo.toml new file mode 100644 index 00000000000..2774ec2ccda --- /dev/null +++ b/common/pemstore/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "pemstore" +version = "0.1.0" +authors = ["Jedrzej Stuczynski "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pem = "0.7.0" + +## internal +crypto = {path = "../crypto"} diff --git a/nym-client/src/persistence/mod.rs b/common/pemstore/src/lib.rs similarity index 100% rename from nym-client/src/persistence/mod.rs rename to common/pemstore/src/lib.rs diff --git a/common/pemstore/src/pathfinder.rs b/common/pemstore/src/pathfinder.rs new file mode 100644 index 00000000000..8633f151032 --- /dev/null +++ b/common/pemstore/src/pathfinder.rs @@ -0,0 +1,15 @@ +use std::path::PathBuf; + +pub trait PathFinder { + fn config_dir(&self) -> PathBuf; + fn private_identity_key(&self) -> PathBuf; + fn public_identity_key(&self) -> PathBuf; + + // Optional: + fn private_encryption_key(&self) -> Option { + None + } + fn public_encryption_key(&self) -> Option { + None + } +} diff --git a/nym-client/src/persistence/pemstore.rs b/common/pemstore/src/pemstore.rs similarity index 84% rename from nym-client/src/persistence/pemstore.rs rename to common/pemstore/src/pemstore.rs index 19f6bb07df9..d16f8574fac 100644 --- a/nym-client/src/persistence/pemstore.rs +++ b/common/pemstore/src/pemstore.rs @@ -1,18 +1,9 @@ -use crate::persistence::pathfinder::Pathfinder; +use crate::pathfinder::PathFinder; use pem::{encode, parse, Pem}; use std::fs::File; use std::io::prelude::*; use std::path::PathBuf; -pub fn read_mix_identity_keypair_from_disk( - id: String, -) -> crypto::identity::DummyMixIdentityKeyPair { - let pathfinder = Pathfinder::new(id); - let pem_store = PemStore::new(pathfinder); - let keypair = pem_store.read_identity(); - keypair -} - #[allow(dead_code)] pub fn read_mix_encryption_keypair_from_disk(_id: String) -> crypto::encryption::x25519::KeyPair { unimplemented!() @@ -25,11 +16,11 @@ pub struct PemStore { } impl PemStore { - pub fn new(pathfinder: Pathfinder) -> PemStore { + pub fn new(pathfinder: P) -> PemStore { PemStore { - config_dir: pathfinder.config_dir, - private_mix_key: pathfinder.private_mix_key, - public_mix_key: pathfinder.public_mix_key, + config_dir: pathfinder.config_dir(), + private_mix_key: pathfinder.private_identity_key(), + public_mix_key: pathfinder.public_identity_key(), } } diff --git a/nym-client/Cargo.toml b/nym-client/Cargo.toml index f336d11b5aa..86e3073c6e6 100644 --- a/nym-client/Cargo.toml +++ b/nym-client/Cargo.toml @@ -35,6 +35,7 @@ crypto = {path = "../common/crypto"} directory-client = { path = "../common/clients/directory-client" } healthcheck = { path = "../common/healthcheck" } mix-client = { path = "../common/clients/mix-client" } +pemstore = {path = "../common/pemstore"} provider-client = { path = "../common/clients/provider-client" } sfw-provider-requests = { path = "../sfw-provider/sfw-provider-requests" } topology = {path = "../common/topology" } diff --git a/nym-client/src/commands/init.rs b/nym-client/src/commands/init.rs index f6d675ea028..4ac9c47db40 100644 --- a/nym-client/src/commands/init.rs +++ b/nym-client/src/commands/init.rs @@ -1,13 +1,13 @@ -use crate::persistence::pathfinder::Pathfinder; -use crate::persistence::pemstore::PemStore; +use crate::config::persistance::pathfinder::ClientPathfinder; use clap::ArgMatches; use crypto::identity::MixnetIdentityKeyPair; +use pemstore::pemstore::PemStore; pub fn execute(matches: &ArgMatches) { println!("Initialising client..."); let id = matches.value_of("id").unwrap().to_string(); // required for now - let pathfinder = Pathfinder::new(id); + let pathfinder = ClientPathfinder::new(id); println!("Writing keypairs to {:?}...", pathfinder.config_dir); let mix_keys = crypto::identity::DummyMixIdentityKeyPair::new(); diff --git a/nym-client/src/commands/tcpsocket.rs b/nym-client/src/commands/tcpsocket.rs index 053981d446e..81e0ef72d5c 100644 --- a/nym-client/src/commands/tcpsocket.rs +++ b/nym-client/src/commands/tcpsocket.rs @@ -1,8 +1,8 @@ use crate::client::{NymClient, SocketType}; -use crate::persistence::pemstore; - +use crate::config::persistance::pathfinder::ClientPathfinder; use clap::ArgMatches; -use crypto::identity::{MixnetIdentityKeyPair, MixnetIdentityPublicKey}; +use crypto::identity::{DummyMixIdentityKeyPair, MixnetIdentityKeyPair, MixnetIdentityPublicKey}; +use pemstore::pemstore::PemStore; use std::net::ToSocketAddrs; pub fn execute(matches: &ArgMatches) { @@ -26,7 +26,8 @@ pub fn execute(matches: &ArgMatches) { .next() .expect("Failed to extract the socket address from the iterator"); - let keypair = pemstore::read_mix_identity_keypair_from_disk(id); + // TODO: currently we know we are reading the 'DummyMixIdentityKeyPair', but how to properly assert the type? + let keypair: DummyMixIdentityKeyPair = PemStore::new(ClientPathfinder::new(id)).read_identity(); // TODO: reading auth_token from disk (if exists); println!("Public key: {}", keypair.public_key.to_b64_string()); diff --git a/nym-client/src/commands/websocket.rs b/nym-client/src/commands/websocket.rs index 0a66193a6fe..644e023306a 100644 --- a/nym-client/src/commands/websocket.rs +++ b/nym-client/src/commands/websocket.rs @@ -1,8 +1,8 @@ use crate::client::{NymClient, SocketType}; -use crate::persistence::pemstore; - +use crate::config::persistance::pathfinder::ClientPathfinder; use clap::ArgMatches; -use crypto::identity::{MixnetIdentityKeyPair, MixnetIdentityPublicKey}; +use crypto::identity::{DummyMixIdentityKeyPair, MixnetIdentityKeyPair, MixnetIdentityPublicKey}; +use pemstore::pemstore::PemStore; use std::net::ToSocketAddrs; pub fn execute(matches: &ArgMatches) { @@ -26,7 +26,9 @@ pub fn execute(matches: &ArgMatches) { .next() .expect("Failed to extract the socket address from the iterator"); - let keypair = pemstore::read_mix_identity_keypair_from_disk(id); + // TODO: currently we know we are reading the 'DummyMixIdentityKeyPair', but how to properly assert the type? + let keypair: DummyMixIdentityKeyPair = PemStore::new(ClientPathfinder::new(id)).read_identity(); + // TODO: reading auth_token from disk (if exists); println!("Public key: {}", keypair.public_key.to_b64_string()); diff --git a/nym-client/src/config/mod.rs b/nym-client/src/config/mod.rs new file mode 100644 index 00000000000..4bf0f952092 --- /dev/null +++ b/nym-client/src/config/mod.rs @@ -0,0 +1 @@ +pub mod persistance; diff --git a/nym-client/src/config/persistance/mod.rs b/nym-client/src/config/persistance/mod.rs new file mode 100644 index 00000000000..5692e9bc8b1 --- /dev/null +++ b/nym-client/src/config/persistance/mod.rs @@ -0,0 +1 @@ +pub mod pathfinder; diff --git a/nym-client/src/persistence/pathfinder.rs b/nym-client/src/config/persistance/pathfinder.rs similarity index 52% rename from nym-client/src/persistence/pathfinder.rs rename to nym-client/src/config/persistance/pathfinder.rs index 08110b9ae85..c71aab5b84b 100644 --- a/nym-client/src/persistence/pathfinder.rs +++ b/nym-client/src/config/persistance/pathfinder.rs @@ -1,21 +1,36 @@ +use pemstore::pathfinder::PathFinder; use std::path::PathBuf; -pub struct Pathfinder { +pub struct ClientPathfinder { pub config_dir: PathBuf, pub private_mix_key: PathBuf, pub public_mix_key: PathBuf, } -impl Pathfinder { - pub fn new(id: String) -> Pathfinder { +impl ClientPathfinder { + pub fn new(id: String) -> Self { let os_config_dir = dirs::config_dir().unwrap(); // grabs the OS default config dir let config_dir = os_config_dir.join("nym").join("clients").join(id); let private_mix_key = config_dir.join("private.pem"); let public_mix_key = config_dir.join("public.pem"); - Pathfinder { + ClientPathfinder { config_dir, private_mix_key, public_mix_key, } } } + +impl PathFinder for ClientPathfinder { + fn config_dir(&self) -> PathBuf { + self.config_dir.clone() + } + + fn private_identity_key(&self) -> PathBuf { + self.private_mix_key.clone() + } + + fn public_identity_key(&self) -> PathBuf { + self.public_mix_key.clone() + } +} diff --git a/nym-client/src/main.rs b/nym-client/src/main.rs index 64daf4e5156..d1e12a85d42 100644 --- a/nym-client/src/main.rs +++ b/nym-client/src/main.rs @@ -5,7 +5,7 @@ use clap::{App, Arg, ArgMatches, SubCommand}; pub mod built_info; pub mod client; mod commands; -mod persistence; +pub mod config; mod sockets; pub mod utils; From da575f365790d0cb61041e5f104c04323e416189 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:49:48 +0000 Subject: [PATCH 22/27] ibid --- nym-client/src/lib.rs | 2 +- nym-client/src/utils/bytes.rs | 63 ----------------------------------- 2 files changed, 1 insertion(+), 64 deletions(-) delete mode 100644 nym-client/src/utils/bytes.rs diff --git a/nym-client/src/lib.rs b/nym-client/src/lib.rs index a7760d6e362..a79f75bf0cd 100644 --- a/nym-client/src/lib.rs +++ b/nym-client/src/lib.rs @@ -1,8 +1,8 @@ #![recursion_limit = "256"] pub mod built_info; +pub mod config; pub mod client; pub mod commands; -pub mod persistence; pub mod sockets; pub mod utils; diff --git a/nym-client/src/utils/bytes.rs b/nym-client/src/utils/bytes.rs deleted file mode 100644 index 6079fa8385d..00000000000 --- a/nym-client/src/utils/bytes.rs +++ /dev/null @@ -1,63 +0,0 @@ -pub fn zero_pad_to_32(mut bytes: Vec) -> [u8; 32] { - assert!(bytes.len() <= 32); - if bytes.len() != 32 { - bytes.resize(32, 0); - } - let mut padded_bytes = [0; 32]; - padded_bytes.copy_from_slice(&bytes[..]); - padded_bytes -} - -#[cfg(test)] -mod zero_padding_to_32_bytes { - use super::*; - - #[cfg(test)] - mod with_empty_input { - use super::*; - - #[test] - fn it_returns_32_zeros() { - let input = vec![]; - let result = zero_pad_to_32(input); - assert_eq!([0u8; 32], result); - } - } - - #[cfg(test)] - mod with_all_bytes_set_to_1 { - use super::*; - #[test] - fn it_returns_32_ones() { - let input = vec![1u8; 32]; - let result = zero_pad_to_32(input); - assert_eq!([1u8; 32], result); - } - } - - #[cfg(test)] - mod with_3_bytes_set { - use super::*; - #[test] - fn it_returns_input_zero_padded_to_32_bytes() { - let input = vec![1u8; 3]; - let result = zero_pad_to_32(input); - let expected_content = vec![ - 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, - ]; - assert_eq!(expected_content, result.to_vec()); - } - } - - #[cfg(test)] - mod with_oversized_input { - use super::*; - #[test] - #[should_panic] - fn it_panics() { - let input = vec![1u8; 33]; - zero_pad_to_32(input); - } - } -} From 98610e20dcdfa32a056686757bb4cffea376dd22 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 17:50:07 +0000 Subject: [PATCH 23/27] Removed unused utils/bytes module --- nym-client/src/utils/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/nym-client/src/utils/mod.rs b/nym-client/src/utils/mod.rs index d213166bdc0..3e2ed40a1bb 100644 --- a/nym-client/src/utils/mod.rs +++ b/nym-client/src/utils/mod.rs @@ -1,3 +1,2 @@ -pub mod bytes; pub mod poisson; pub mod sphinx; From b4c15eae12526454bc2a24736cb03b256a1df43a Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Tue, 21 Jan 2020 18:02:35 +0000 Subject: [PATCH 24/27] Sending real traffic at poisson rate rather than at uniform rate --- nym-client/src/client/real_traffic_stream.rs | 22 ++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/nym-client/src/client/real_traffic_stream.rs b/nym-client/src/client/real_traffic_stream.rs index cc00ea7ef00..8750df7727c 100644 --- a/nym-client/src/client/real_traffic_stream.rs +++ b/nym-client/src/client/real_traffic_stream.rs @@ -4,7 +4,7 @@ use crate::utils; use directory_client::presence::Topology; use futures::channel::mpsc; use futures::task::{Context, Poll}; -use futures::{select, Stream, StreamExt}; +use futures::{select, Future, Stream, StreamExt}; use log::{debug, error, info, trace, warn}; use sphinx::route::Destination; use sphinx::SphinxPacket; @@ -15,7 +15,7 @@ use tokio::time; use topology::NymTopology; pub(crate) struct OutQueueControl { - interval: time::Interval, + delay: time::Delay, mix_tx: mpsc::UnboundedSender, input_rx: mpsc::UnboundedReceiver, our_info: Destination, @@ -31,10 +31,23 @@ impl Stream for OutQueueControl { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // it is not yet time to return a message - if Stream::poll_next(Pin::new(&mut self.interval), cx).is_pending() { + if Future::poll(Pin::new(&mut self.delay), cx).is_pending() { return Poll::Pending; }; + // we know it's time to send a message, so let's prepare delay for the next one + // Get the `now` by looking at the current `delay` deadline + let now = self.delay.deadline(); + + let next_poisson_delay = + Duration::from_secs_f64(utils::poisson::sample(MESSAGE_SENDING_AVERAGE_DELAY)); + + // The next interval value is `next_poisson_delay` after the one that just + // yielded. + let next = now + next_poisson_delay; + self.delay.reset(next); + + // decide what kind of message to send match Stream::poll_next(Pin::new(&mut self.input_rx), cx) { // in the case our real message channel stream was closed, we should also indicate we are closed // (and whoever is using the stream should panic) @@ -70,8 +83,9 @@ impl OutQueueControl { our_info: Destination, topology: Topology, ) -> Self { + let initial_delay = time::delay_for(Duration::from_secs_f64(MESSAGE_SENDING_AVERAGE_DELAY)); OutQueueControl { - interval: time::interval(Duration::from_secs_f64(MESSAGE_SENDING_AVERAGE_DELAY)), + delay: initial_delay, mix_tx, input_rx, our_info, From 2364fbdc093869518d23187e5fee0171f3e05a2d Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Wed, 22 Jan 2020 10:50:03 +0000 Subject: [PATCH 25/27] Moved sphinx packet encapsulation and poisson delay sampling to mix client --- Cargo.lock | 6 ++++-- common/clients/mix-client/Cargo.toml | 7 +++++++ common/clients/mix-client/src/lib.rs | 3 +++ .../sphinx.rs => common/clients/mix-client/src/packet.rs | 8 ++++---- .../utils => common/clients/mix-client/src}/poisson.rs | 0 nym-client/src/lib.rs | 9 +++------ nym-client/src/main.rs | 3 --- nym-client/src/utils/mod.rs | 2 -- 8 files changed, 21 insertions(+), 17 deletions(-) rename nym-client/src/utils/sphinx.rs => common/clients/mix-client/src/packet.rs (89%) rename {nym-client/src/utils => common/clients/mix-client/src}/poisson.rs (100%) delete mode 100644 nym-client/src/utils/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 3e9d9da323a..66d9693e547 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1256,9 +1256,13 @@ dependencies = [ name = "mix-client" version = "0.1.0" dependencies = [ + "addressing", "log", + "rand 0.7.2", + "rand_distr", "sphinx", "tokio 0.2.9", + "topology", ] [[package]] @@ -1358,8 +1362,6 @@ dependencies = [ "pem", "pemstore", "provider-client", - "rand 0.7.2", - "rand_distr", "reqwest", "serde", "serde_json", diff --git a/common/clients/mix-client/Cargo.toml b/common/clients/mix-client/Cargo.toml index 89b497375df..362fad3f0a5 100644 --- a/common/clients/mix-client/Cargo.toml +++ b/common/clients/mix-client/Cargo.toml @@ -8,7 +8,14 @@ edition = "2018" [dependencies] log = "0.4.8" +rand = "0.7.2" +rand_distr = "0.2.2" tokio = { version = "0.2", features = ["full"] } +## internal +addressing = {path = "../../addressing"} +topology = {path = "../../topology"} + ## will be moved to proper dependencies once released sphinx = { git = "https://github.com/nymtech/sphinx", rev="1d8cefcb6a0cb8e87d00d89eb1ccf2839e92aa1f" } + diff --git a/common/clients/mix-client/src/lib.rs b/common/clients/mix-client/src/lib.rs index bbdfa9bc0b0..fd883a54a40 100644 --- a/common/clients/mix-client/src/lib.rs +++ b/common/clients/mix-client/src/lib.rs @@ -3,6 +3,9 @@ use sphinx::SphinxPacket; use std::net::SocketAddr; use tokio::prelude::*; +pub mod packet; +pub mod poisson; + pub struct MixClient {} impl MixClient { diff --git a/nym-client/src/utils/sphinx.rs b/common/clients/mix-client/src/packet.rs similarity index 89% rename from nym-client/src/utils/sphinx.rs rename to common/clients/mix-client/src/packet.rs index 0f3e69f36b3..356d2bbd2e4 100644 --- a/nym-client/src/utils/sphinx.rs +++ b/common/clients/mix-client/src/packet.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use topology::NymTopology; pub const LOOP_COVER_MESSAGE_PAYLOAD: &[u8] = b"The cake is a lie!"; +pub const LOOP_COVER_MESSAGE_AVERAGE_DELAY: f64 = 2.0; pub fn loop_cover_message( our_address: DestinationAddressBytes, @@ -13,21 +14,20 @@ pub fn loop_cover_message( ) -> (SocketAddr, SphinxPacket) { let destination = Destination::new(our_address, surb_id); - encapsulate_message(destination, LOOP_COVER_MESSAGE_PAYLOAD.to_vec(), topology) + encapsulate_message(destination, LOOP_COVER_MESSAGE_PAYLOAD.to_vec(), topology, LOOP_COVER_MESSAGE_AVERAGE_DELAY) } pub fn encapsulate_message( recipient: Destination, message: Vec, topology: &T, + average_delay: f64, ) -> (SocketAddr, SphinxPacket) { let mut providers = topology.get_mix_provider_nodes(); let provider = providers.pop().unwrap().into(); let route = topology.route_to(provider).unwrap(); - - // Set average packet delay to an arbitrary but at least not super-slow value for testing. - let average_delay = 0.1; + let delays = sphinx::header::delays::generate(route.len(), average_delay); // build the packet diff --git a/nym-client/src/utils/poisson.rs b/common/clients/mix-client/src/poisson.rs similarity index 100% rename from nym-client/src/utils/poisson.rs rename to common/clients/mix-client/src/poisson.rs diff --git a/nym-client/src/lib.rs b/nym-client/src/lib.rs index a79f75bf0cd..477c106b346 100644 --- a/nym-client/src/lib.rs +++ b/nym-client/src/lib.rs @@ -1,8 +1,5 @@ -#![recursion_limit = "256"] - pub mod built_info; -pub mod config; pub mod client; -pub mod commands; -pub mod sockets; -pub mod utils; +mod commands; +pub mod config; +mod sockets; diff --git a/nym-client/src/main.rs b/nym-client/src/main.rs index d1e12a85d42..a3f20596b0b 100644 --- a/nym-client/src/main.rs +++ b/nym-client/src/main.rs @@ -1,5 +1,3 @@ -#![recursion_limit = "256"] - use clap::{App, Arg, ArgMatches, SubCommand}; pub mod built_info; @@ -7,7 +5,6 @@ pub mod client; mod commands; pub mod config; mod sockets; -pub mod utils; fn main() { env_logger::init(); diff --git a/nym-client/src/utils/mod.rs b/nym-client/src/utils/mod.rs deleted file mode 100644 index 3e2ed40a1bb..00000000000 --- a/nym-client/src/utils/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod poisson; -pub mod sphinx; From 7045db7d70dc926cd86f1316755d2db1e01a1d62 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Wed, 22 Jan 2020 10:50:22 +0000 Subject: [PATCH 26/27] ibid. --- common/clients/mix-client/src/packet.rs | 9 +++++++-- nym-client/Cargo.toml | 2 -- nym-client/src/client/cover_traffic_stream.rs | 10 ++++++---- nym-client/src/client/provider_poller.rs | 5 ++--- nym-client/src/client/real_traffic_stream.rs | 16 +++++++++------- 5 files changed, 24 insertions(+), 18 deletions(-) diff --git a/common/clients/mix-client/src/packet.rs b/common/clients/mix-client/src/packet.rs index 356d2bbd2e4..cc1a027cb68 100644 --- a/common/clients/mix-client/src/packet.rs +++ b/common/clients/mix-client/src/packet.rs @@ -14,7 +14,12 @@ pub fn loop_cover_message( ) -> (SocketAddr, SphinxPacket) { let destination = Destination::new(our_address, surb_id); - encapsulate_message(destination, LOOP_COVER_MESSAGE_PAYLOAD.to_vec(), topology, LOOP_COVER_MESSAGE_AVERAGE_DELAY) + encapsulate_message( + destination, + LOOP_COVER_MESSAGE_PAYLOAD.to_vec(), + topology, + LOOP_COVER_MESSAGE_AVERAGE_DELAY, + ) } pub fn encapsulate_message( @@ -27,7 +32,7 @@ pub fn encapsulate_message( let provider = providers.pop().unwrap().into(); let route = topology.route_to(provider).unwrap(); - + let delays = sphinx::header::delays::generate(route.len(), average_delay); // build the packet diff --git a/nym-client/Cargo.toml b/nym-client/Cargo.toml index 86e3073c6e6..d80d6fcddea 100644 --- a/nym-client/Cargo.toml +++ b/nym-client/Cargo.toml @@ -21,8 +21,6 @@ futures = "0.3.1" hex = "0.4.0" log = "0.4.8" pem = "0.7.0" -rand = "0.7.2" -rand_distr = "0.2.2" reqwest = "0.9.22" serde = { version = "1.0.104", features = ["derive"] } serde_json = "1.0.44" diff --git a/nym-client/src/client/cover_traffic_stream.rs b/nym-client/src/client/cover_traffic_stream.rs index 3405cc2cf9e..82f904eec7f 100644 --- a/nym-client/src/client/cover_traffic_stream.rs +++ b/nym-client/src/client/cover_traffic_stream.rs @@ -1,6 +1,5 @@ use crate::client::mix_traffic::MixMessage; use crate::client::LOOP_COVER_AVERAGE_DELAY; -use crate::utils; use futures::channel::mpsc; use log::{info, trace}; use sphinx::route::Destination; @@ -17,11 +16,14 @@ pub(crate) async fn start_loop_cover_traffic_stream( info!("Starting loop cover traffic stream"); loop { trace!("next cover message!"); - let delay = utils::poisson::sample(LOOP_COVER_AVERAGE_DELAY); + let delay = mix_client::poisson::sample(LOOP_COVER_AVERAGE_DELAY); let delay_duration = Duration::from_secs_f64(delay); tokio::time::delay_for(delay_duration).await; - let cover_message = - utils::sphinx::loop_cover_message(our_info.address, our_info.identifier, &topology); + let cover_message = mix_client::packet::loop_cover_message( + our_info.address, + our_info.identifier, + &topology, + ); // if this one fails, there's no retrying because it means that either: // - we run out of memory diff --git a/nym-client/src/client/provider_poller.rs b/nym-client/src/client/provider_poller.rs index 20d88a049dd..aa3924f9914 100644 --- a/nym-client/src/client/provider_poller.rs +++ b/nym-client/src/client/provider_poller.rs @@ -1,5 +1,4 @@ use crate::client::FETCH_MESSAGES_DELAY; -use crate::utils; use futures::channel::mpsc; use log::{debug, error, info, trace, warn}; use provider_client::ProviderClientError; @@ -53,7 +52,7 @@ impl ProviderPoller { pub(crate) async fn start_provider_polling(mut self) { info!("Starting provider poller"); - let loop_message = &utils::sphinx::LOOP_COVER_MESSAGE_PAYLOAD.to_vec(); + let loop_message = &mix_client::packet::LOOP_COVER_MESSAGE_PAYLOAD.to_vec(); let dummy_message = &sfw_provider_requests::DUMMY_MESSAGE_CONTENT.to_vec(); let delay_duration = Duration::from_secs_f64(FETCH_MESSAGES_DELAY); @@ -64,7 +63,7 @@ impl ProviderPoller { let messages = match self.provider_client.retrieve_messages().await { Err(err) => { - error!("Failed to query the provider for messages... Going to wait {:?} before retrying", extended_delay_duration); + error!("Failed to query the provider for messages: {:?}, ... Going to wait {:?} before retrying", err, extended_delay_duration); tokio::time::delay_for(extended_delay_duration).await; continue; } diff --git a/nym-client/src/client/real_traffic_stream.rs b/nym-client/src/client/real_traffic_stream.rs index 8750df7727c..f89257747b7 100644 --- a/nym-client/src/client/real_traffic_stream.rs +++ b/nym-client/src/client/real_traffic_stream.rs @@ -1,18 +1,19 @@ use crate::client::mix_traffic::MixMessage; use crate::client::{InputMessage, MESSAGE_SENDING_AVERAGE_DELAY}; -use crate::utils; use directory_client::presence::Topology; use futures::channel::mpsc; use futures::task::{Context, Poll}; -use futures::{select, Future, Stream, StreamExt}; -use log::{debug, error, info, trace, warn}; +use futures::{Future, Stream, StreamExt}; +use log::{debug, info, trace}; use sphinx::route::Destination; use sphinx::SphinxPacket; use std::net::SocketAddr; use std::pin::Pin; use std::time::Duration; use tokio::time; -use topology::NymTopology; + +// have a rather low value for test sake +const AVERAGE_PACKET_DELAY: f64 = 0.1; pub(crate) struct OutQueueControl { delay: time::Delay, @@ -40,7 +41,7 @@ impl Stream for OutQueueControl { let now = self.delay.deadline(); let next_poisson_delay = - Duration::from_secs_f64(utils::poisson::sample(MESSAGE_SENDING_AVERAGE_DELAY)); + Duration::from_secs_f64(mix_client::poisson::sample(MESSAGE_SENDING_AVERAGE_DELAY)); // The next interval value is `next_poisson_delay` after the one that just // yielded. @@ -56,17 +57,18 @@ impl Stream for OutQueueControl { // if there's an actual message - return it Poll::Ready(Some(real_message)) => { trace!("real message"); - Poll::Ready(Some(utils::sphinx::encapsulate_message( + Poll::Ready(Some(mix_client::packet::encapsulate_message( real_message.0, real_message.1, &self.topology, + AVERAGE_PACKET_DELAY, ))) } // otherwise construct a dummy one _ => { trace!("loop cover message"); - Poll::Ready(Some(utils::sphinx::loop_cover_message( + Poll::Ready(Some(mix_client::packet::loop_cover_message( self.our_info.address, self.our_info.identifier, &self.topology, From 974ac5ef655840d9976af0c071e256b4b1534ad3 Mon Sep 17 00:00:00 2001 From: Jedrzej Stuczynski Date: Wed, 22 Jan 2020 10:52:30 +0000 Subject: [PATCH 27/27] compiler warnings --- nym-client/src/client/mod.rs | 1 + nym-client/src/client/provider_poller.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index d443d323d1e..c9685af0b95 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -98,6 +98,7 @@ impl NymClient { let healthcheck_scores = match healthcheck_result { Err(err) => { + error!("Error while performing the healtcheck: {:?}", err); return Err(TopologyError::HealthCheckError); } Ok(scores) => scores, diff --git a/nym-client/src/client/provider_poller.rs b/nym-client/src/client/provider_poller.rs index aa3924f9914..71ccd41fdd3 100644 --- a/nym-client/src/client/provider_poller.rs +++ b/nym-client/src/client/provider_poller.rs @@ -49,7 +49,7 @@ impl ProviderPoller { Ok(()) } - pub(crate) async fn start_provider_polling(mut self) { + pub(crate) async fn start_provider_polling(self) { info!("Starting provider poller"); let loop_message = &mix_client::packet::LOOP_COVER_MESSAGE_PAYLOAD.to_vec();