diff --git a/.travis.yml b/.travis.yml index 7041ae8d65c..bf149680d5a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,5 +11,5 @@ before_script: - rustup component add rustfmt script: - cargo build - - cargo test + - cargo test -- --test-threads=1 - cargo fmt -- --check \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 15f3faaa2ee..2a6b9e79c6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1464,6 +1464,7 @@ dependencies = [ "healthcheck", "log", "mix-client", + "multi-tcp-client", "pem", "pemstore", "pretty_env_logger", diff --git a/common/clients/mix-client/src/packet.rs b/common/clients/mix-client/src/packet.rs index 5132d4cb92b..6b25afd6b1d 100644 --- a/common/clients/mix-client/src/packet.rs +++ b/common/clients/mix-client/src/packet.rs @@ -39,6 +39,7 @@ impl From for SphinxPacketEncapsulationError { } } +#[deprecated(note = "please use loop_cover_message_route instead")] pub fn loop_cover_message( our_address: DestinationAddressBytes, surb_id: SURBIdentifier, @@ -47,6 +48,7 @@ pub fn loop_cover_message( ) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> { let destination = Destination::new(our_address, surb_id); + #[allow(deprecated)] encapsulate_message( destination, LOOP_COVER_MESSAGE_PAYLOAD.to_vec(), @@ -55,6 +57,23 @@ pub fn loop_cover_message( ) } +pub fn loop_cover_message_route( + our_address: DestinationAddressBytes, + surb_id: SURBIdentifier, + route: Vec, + average_delay: time::Duration, +) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> { + let destination = Destination::new(our_address, surb_id); + + encapsulate_message_route( + destination, + LOOP_COVER_MESSAGE_PAYLOAD.to_vec(), + route, + average_delay, + ) +} + +#[deprecated(note = "please use encapsulate_message_route instead")] pub fn encapsulate_message( recipient: Destination, message: Vec, @@ -81,3 +100,20 @@ pub fn encapsulate_message( Ok((first_node_address, packet)) } + +pub fn encapsulate_message_route( + recipient: Destination, + message: Vec, + route: Vec, + average_delay: time::Duration, +) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> { + let delays = sphinx::header::delays::generate_from_average_duration(route.len(), average_delay); + + // build the packet + let packet = sphinx::SphinxPacket::new(message, &route[..], &recipient, &delays)?; + + let first_node_address = + addressing::socket_address_from_encoded_bytes(route.first().unwrap().address.to_bytes())?; + + Ok((first_node_address, packet)) +} diff --git a/common/clients/multi-tcp-client/src/connection_manager/mod.rs b/common/clients/multi-tcp-client/src/connection_manager/mod.rs index a695ca05c98..bbd0f8bcd78 100644 --- a/common/clients/multi-tcp-client/src/connection_manager/mod.rs +++ b/common/clients/multi-tcp-client/src/connection_manager/mod.rs @@ -77,7 +77,7 @@ impl<'a> ConnectionManager<'a> { return match conn_writer.write_all(msg).await { // if we failed to write to connection we should reconnect // TODO: is this true? can we fail to write to a connection while it still remains open and valid? - Ok(res) => Ok(res), + Ok(_) => Ok(()), Err(e) => { trace!("Creating connection reconnector!"); self.state = ConnectionState::Reconnecting(ConnectionReconnector::new( diff --git a/common/crypto/src/identity/mod.rs b/common/crypto/src/identity/mod.rs index 192b4911df9..9b7a396671e 100644 --- a/common/crypto/src/identity/mod.rs +++ b/common/crypto/src/identity/mod.rs @@ -121,8 +121,8 @@ impl MixIdentityPrivateKey { // TODO: this will be implemented differently by using the proper trait impl MixIdentityPrivateKey { - pub fn as_scalar(self) -> Scalar { - let encryption_key = self.0; + pub fn as_scalar(&self) -> Scalar { + let encryption_key = &self.0; encryption_key.0 } } diff --git a/common/topology/src/lib.rs b/common/topology/src/lib.rs index 5edba93d7d0..eb81cdfd421 100644 --- a/common/topology/src/lib.rs +++ b/common/topology/src/lib.rs @@ -10,7 +10,9 @@ mod filter; pub mod mix; pub mod provider; -pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync { +// TODO: Figure out why 'Clone' was required to have 'TopologyAccessor' working +// even though it only contains an Arc +pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone { fn new(directory_server: String) -> Self; fn new_from_nodes( mix_nodes: Vec, diff --git a/mixnode/src/node/listener.rs b/mixnode/src/node/listener.rs index 7146318ad02..0409a1754de 100644 --- a/mixnode/src/node/listener.rs +++ b/mixnode/src/node/listener.rs @@ -55,7 +55,7 @@ async fn process_socket_connection( // we must be able to handle multiple packets from same connection independently tokio::spawn(process_received_packet( - buf.clone(), + buf, // note: processing_data is relatively cheap (and safe) to clone - // it contains arc to private key and metrics reporter (which is just // a single mpsc unbounded sender) diff --git a/nym-client/Cargo.toml b/nym-client/Cargo.toml index 87ba8de7330..ac4bde4fc88 100644 --- a/nym-client/Cargo.toml +++ b/nym-client/Cargo.toml @@ -34,6 +34,7 @@ crypto = {path = "../common/crypto"} directory-client = { path = "../common/clients/directory-client" } healthcheck = { path = "../common/healthcheck" } mix-client = { path = "../common/clients/mix-client" } +multi-tcp-client = { path = "../common/clients/multi-tcp-client" } pemstore = {path = "../common/pemstore"} provider-client = { path = "../common/clients/provider-client" } sfw-provider-requests = { path = "../sfw-provider/sfw-provider-requests" } diff --git a/nym-client/src/client/cover_traffic_stream.rs b/nym-client/src/client/cover_traffic_stream.rs index d23ca59747e..c0e4e8dc57a 100644 --- a/nym-client/src/client/cover_traffic_stream.rs +++ b/nym-client/src/client/cover_traffic_stream.rs @@ -1,38 +1,86 @@ -use crate::client::mix_traffic::MixMessage; -use crate::client::topology_control::TopologyInnerRef; -use futures::channel::mpsc; -use log::{error, info, trace, warn}; +use crate::client::mix_traffic::{MixMessage, MixMessageSender}; +use crate::client::topology_control::TopologyAccessor; +use futures::task::{Context, Poll}; +use futures::{Future, Stream, StreamExt}; +use log::*; use sphinx::route::Destination; -use std::time; +use std::pin::Pin; +use std::time::Duration; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; +use tokio::time; use topology::NymTopology; -pub(crate) async fn start_loop_cover_traffic_stream( - tx: mpsc::UnboundedSender, +pub(crate) struct LoopCoverTrafficStream { + average_packet_delay: Duration, + average_cover_message_sending_delay: Duration, + next_delay: time::Delay, + mix_tx: MixMessageSender, our_info: Destination, - topology_ctrl_ref: TopologyInnerRef, - average_cover_message_delay_duration: time::Duration, - average_packet_delay_duration: time::Duration, -) { - info!("Starting loop cover traffic stream"); - loop { - trace!("next cover message!"); - let delay_duration = mix_client::poisson::sample(average_cover_message_delay_duration); - tokio::time::delay_for(delay_duration).await; + topology_access: TopologyAccessor, +} + +impl Stream for LoopCoverTrafficStream { + // Item is only used to indicate we should create a new message rather than actual cover message + // reason being to not introduce unnecessary complexity by having to keep state of topology + // mutex when trying to acquire it. So right now the Stream trait serves as a glorified timer. + // Perhaps this should be changed in the future. + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // it is not yet time to return a message + if Pin::new(&mut self.next_delay).poll(cx).is_pending() { + return Poll::Pending; + }; + + // we know it's time to send a message, so let's prepare delay for the next one + // Get the `now` by looking at the current `delay` deadline + let now = self.next_delay.deadline(); + let next_poisson_delay = + mix_client::poisson::sample(self.average_cover_message_sending_delay); + + // The next interval value is `next_poisson_delay` after the one that just + // yielded. + let next = now + next_poisson_delay; + self.next_delay.reset(next); + + Poll::Ready(Some(())) + } +} + +impl LoopCoverTrafficStream { + pub(crate) fn new( + mix_tx: MixMessageSender, + our_info: Destination, + topology_access: TopologyAccessor, + average_cover_message_sending_delay: time::Duration, + average_packet_delay: time::Duration, + ) -> Self { + LoopCoverTrafficStream { + average_packet_delay, + average_cover_message_sending_delay, + next_delay: time::delay_for(Default::default()), + mix_tx, + our_info, + topology_access, + } + } - let read_lock = topology_ctrl_ref.read().await; - let topology = match read_lock.topology.as_ref() { + async fn on_new_message(&mut self) { + trace!("next cover message!"); + let route = match self.topology_access.random_route().await { None => { warn!("No valid topology detected - won't send any loop cover message this time"); - continue; + return; } - Some(topology) => topology, + Some(route) => route, }; - let cover_message = match mix_client::packet::loop_cover_message( - our_info.address.clone(), - our_info.identifier, - topology, - average_packet_delay_duration, + let cover_message = match mix_client::packet::loop_cover_message_route( + self.our_info.address.clone(), + self.our_info.identifier, + route, + self.average_packet_delay, ) { Ok(message) => message, Err(err) => { @@ -40,7 +88,7 @@ pub(crate) async fn start_loop_cover_traffic_stream( "Somehow we managed to create an invalid cover message - {:?}", err ); - continue; + return; } }; @@ -48,7 +96,25 @@ pub(crate) async fn start_loop_cover_traffic_stream( // - we run out of memory // - the receiver channel is closed // in either case there's no recovery and we can only panic - tx.unbounded_send(MixMessage::new(cover_message.0, cover_message.1)) + self.mix_tx + .unbounded_send(MixMessage::new(cover_message.0, cover_message.1)) .unwrap(); } + + async fn run(&mut self) { + // we should set initial delay only when we actually start the stream + self.next_delay = time::delay_for(mix_client::poisson::sample( + self.average_cover_message_sending_delay, + )); + + while let Some(_) = self.next().await { + self.on_new_message().await; + } + } + + pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> { + handle.spawn(async move { + self.run().await; + }) + } } diff --git a/nym-client/src/client/mix_traffic.rs b/nym-client/src/client/mix_traffic.rs index bfcd3ddfc6b..c71a7c3be6e 100644 --- a/nym-client/src/client/mix_traffic.rs +++ b/nym-client/src/client/mix_traffic.rs @@ -1,10 +1,15 @@ use futures::channel::mpsc; use futures::StreamExt; -use log::{debug, error, info, trace}; +use log::*; use sphinx::SphinxPacket; use std::net::SocketAddr; +use std::time::Duration; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; pub(crate) struct MixMessage(SocketAddr, SphinxPacket); +pub(crate) type MixMessageSender = mpsc::UnboundedSender; +pub(crate) type MixMessageReceiver = mpsc::UnboundedReceiver; impl MixMessage { pub(crate) fn new(address: SocketAddr, packet: SphinxPacket) -> Self { @@ -12,26 +17,57 @@ impl MixMessage { } } -pub(crate) struct MixTrafficController; - -impl MixTrafficController { - pub(crate) async fn run(mut rx: mpsc::UnboundedReceiver) { - info!("Mix Traffic Controller started!"); - let mix_client = mix_client::MixClient::new(); - while let Some(mix_message) = rx.next().await { - debug!("Got a mix_message for {:?}", mix_message.0); - let send_res = mix_client.send(mix_message.1, mix_message.0).await; - match send_res { - Ok(_) => { - trace!("sent a mix message"); - } - // TODO: should there be some kind of threshold of failed messages - // that if reached, the application blows? - Err(e) => error!( - "We failed to send the message to {} :( - {:?}", - mix_message.0, e - ), - }; +// TODO: put our TCP client here +pub(crate) struct MixTrafficController<'a> { + tcp_client: multi_tcp_client::Client<'a>, + mix_rx: MixMessageReceiver, +} + +impl MixTrafficController<'static> { + pub(crate) async fn new( + initial_endpoints: Vec, + initial_reconnection_backoff: Duration, + maximum_reconnection_backoff: Duration, + mix_rx: MixMessageReceiver, + ) -> Self { + let tcp_client_config = multi_tcp_client::Config::new( + initial_endpoints, + initial_reconnection_backoff, + maximum_reconnection_backoff, + ); + + MixTrafficController { + tcp_client: multi_tcp_client::Client::new(tcp_client_config).await, + mix_rx, + } + } + + async fn on_message(&mut self, mix_message: MixMessage) { + debug!("Got a mix_message for {:?}", mix_message.0); + match self + .tcp_client + .send(mix_message.0, &mix_message.1.to_bytes()) + .await + { + Ok(_) => trace!("sent a mix message"), + // TODO: should there be some kind of threshold of failed messages + // that if reached, the application blows? + Err(e) => error!( + "We failed to send the packet to {} - {:?}", + mix_message.0, e + ), + }; + } + + pub(crate) async fn run(&mut self) { + while let Some(mix_message) = self.mix_rx.next().await { + self.on_message(mix_message).await; } } + + pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> { + handle.spawn(async move { + self.run().await; + }) + } } diff --git a/nym-client/src/client/mod.rs b/nym-client/src/client/mod.rs index f2cc8b35d00..f4d5e5e3473 100644 --- a/nym-client/src/client/mod.rs +++ b/nym-client/src/client/mod.rs @@ -1,14 +1,19 @@ -use crate::client::mix_traffic::MixTrafficController; -use crate::client::received_buffer::ReceivedMessagesBuffer; -use crate::client::topology_control::TopologyInnerRef; +use crate::client::cover_traffic_stream::LoopCoverTrafficStream; +use crate::client::mix_traffic::{MixMessageReceiver, MixMessageSender, MixTrafficController}; +use crate::client::provider_poller::{PolledMessagesReceiver, PolledMessagesSender}; +use crate::client::received_buffer::{ + ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController, +}; +use crate::client::topology_control::{ + TopologyAccessor, TopologyRefresher, TopologyRefresherConfig, +}; use crate::config::persistence::pathfinder::ClientPathfinder; use crate::config::{Config, SocketType}; use crate::sockets::tcp; use crate::sockets::ws; use crypto::identity::MixIdentityKeyPair; -use directory_client::presence::Topology; +use directory_client::presence; use futures::channel::mpsc; -use futures::join; use log::*; use pemstore::pemstore::PemStore; use sfw_provider_requests::AuthToken; @@ -21,21 +26,24 @@ mod cover_traffic_stream; mod mix_traffic; mod provider_poller; mod real_traffic_stream; -pub mod received_buffer; -pub mod topology_control; +pub(crate) mod received_buffer; +pub(crate) mod topology_control; + +pub(crate) type InputMessageSender = mpsc::UnboundedSender; +pub(crate) type InputMessageReceiver = mpsc::UnboundedReceiver; pub struct NymClient { + runtime: Runtime, config: Config, identity_keypair: MixIdentityKeyPair, // to be used by "send" function or socket, etc - pub input_tx: mpsc::UnboundedSender, - // the other end of the above channel - input_rx: mpsc::UnboundedReceiver, + input_tx: Option, } #[derive(Debug)] -pub struct InputMessage(pub Destination, pub Vec); +// TODO: make fields private +pub(crate) struct InputMessage(pub Destination, pub Vec); impl NymClient { fn load_identity_keys(config_file: &Config) -> MixIdentityKeyPair { @@ -51,75 +59,112 @@ impl NymClient { pub fn new(config: Config) -> Self { let identity_keypair = Self::load_identity_keys(&config); - let (input_tx, input_rx) = mpsc::unbounded::(); NymClient { + runtime: Runtime::new().unwrap(), config, identity_keypair, - input_tx, - input_rx, + input_tx: None, } } + pub fn as_mix_destination(&self) -> Destination { + Destination::new( + self.identity_keypair.public_key().derive_address(), + // TODO: what with SURBs? + Default::default(), + ) + } + async fn get_provider_socket_address( - &self, - topology_ctrl_ref: TopologyInnerRef, + provider_id: String, + mut topology_accessor: TopologyAccessor, ) -> SocketAddr { - // this is temporary and assumes there exists only a single provider. - topology_ctrl_ref.read().await.topology.as_ref().unwrap() + topology_accessor.get_current_topology_clone().await.as_ref().expect("The current network topoloy is empty - are you using correct directory server?") .providers() - .first() - .expect("Could not get a compatible provider from the initial network topology, are you using the right directory server?") + .iter() + .find(|provider| provider.pub_key == provider_id) + .unwrap_or_else( || panic!("Could not find provider with id {:?} - are you sure it is still online? Perhaps try to run `nym-client init` again to obtain a new provider", provider_id)) .client_listener } - pub fn start(self) -> Result<(), Box> { - info!("Starting nym client"); - let mut rt = Runtime::new()?; - - // channels for inter-component communication - - // mix_tx is the transmitter for any component generating sphinx packets that are to be sent to the mixnet - // they are used by cover traffic stream and real traffic stream - // mix_rx is the receiver used by MixTrafficController that sends the actual traffic - let (mix_tx, mix_rx) = mpsc::unbounded(); - - // poller_input_tx is the transmitter of messages fetched from the provider - used by ProviderPoller - // poller_input_rx is the receiver for said messages - used by ReceivedMessagesBuffer - let (poller_input_tx, poller_input_rx) = mpsc::unbounded(); + // future constantly pumping loop cover traffic at some specified average rate + // the pumped traffic goes to the MixTrafficController + fn start_cover_traffic_stream( + &self, + topology_accessor: TopologyAccessor, + mix_tx: MixMessageSender, + ) { + info!("Starting loop cover traffic stream..."); + // we need to explicitly enter runtime due to "next_delay: time::delay_for(Default::default())" + // set in the constructor which HAS TO be called within context of a tokio runtime + self.runtime + .enter(|| { + LoopCoverTrafficStream::new( + mix_tx, + self.as_mix_destination(), + topology_accessor, + self.config.get_loop_cover_traffic_average_delay(), + self.config.get_average_packet_delay(), + ) + }) + .start(self.runtime.handle()); + } - // received_messages_buffer_output_tx is the transmitter for *REQUESTS* for messages contained in ReceivedMessagesBuffer - used by sockets - // the requests contain a oneshot channel to send a reply on - // received_messages_buffer_output_rx is the received for the said requests - used by ReceivedMessagesBuffer - let (received_messages_buffer_output_tx, received_messages_buffer_output_rx) = - mpsc::unbounded(); + fn start_real_traffic_stream( + &self, + topology_accessor: TopologyAccessor, + mix_tx: MixMessageSender, + input_rx: InputMessageReceiver, + ) { + info!("Starting real traffic stream..."); + // we need to explicitly enter runtime due to "next_delay: time::delay_for(Default::default())" + // set in the constructor which HAS TO be called within context of a tokio runtime + self.runtime + .enter(|| { + real_traffic_stream::OutQueueControl::new( + mix_tx, + input_rx, + self.as_mix_destination(), + topology_accessor, + self.config.get_average_packet_delay(), + self.config.get_message_sending_average_delay(), + ) + }) + .start(self.runtime.handle()); + } - let self_address = self.identity_keypair.public_key().derive_address(); + // buffer controlling all messages fetched from provider + // required so that other components would be able to use them (say the websocket) + fn start_received_messages_buffer_controller( + &self, + query_receiver: ReceivedBufferRequestReceiver, + poller_receiver: PolledMessagesReceiver, + ) { + info!("Starting 'received messages buffer controller'..."); + ReceivedMessagesBufferController::new(query_receiver, poller_receiver) + .start(self.runtime.handle()) + } - // generate same type of keys we have as our identity - let healthcheck_keys = MixIdentityKeyPair::new(); + // future constantly trying to fetch any received messages from the provider + // the received messages are sent to ReceivedMessagesBuffer to be available to rest of the system + fn start_provider_poller( + &mut self, + topology_accessor: TopologyAccessor, + poller_input_tx: PolledMessagesSender, + ) { + info!("Starting provider poller..."); + // we already have our provider written in the config + let provider_id = self.config.get_provider_id(); - info!("Trying to obtain initial compatible network topology before starting up rest of modules"); - // TODO: when we switch to our graph topology, we need to remember to change 'Topology' type - let topology_controller_config = topology_control::TopologyControlConfig::::new( - self.config.get_directory_server(), - self.config.get_topology_refresh_rate(), - healthcheck_keys, - self.config.get_topology_resolution_timeout(), - self.config.get_number_of_healthcheck_test_packets() as usize, + let provider_client_listener_address = self.runtime.block_on( + Self::get_provider_socket_address(provider_id, topology_accessor), ); - let topology_controller = rt.block_on(topology_control::TopologyControl::::new( - topology_controller_config, - )); - - let provider_client_listener_address = - rt.block_on(self.get_provider_socket_address(topology_controller.get_inner_ref())); - let mut provider_poller = provider_poller::ProviderPoller::new( poller_input_tx, provider_client_listener_address, - self_address.clone(), + self.identity_keypair.public_key().derive_address(), self.config .get_provider_auth_token() .map(|str_token| AuthToken::try_from_base58_string(str_token).ok()) @@ -127,109 +172,152 @@ impl NymClient { self.config.get_fetch_message_delay(), ); - // registration - if let Err(err) = rt.block_on(provider_poller.perform_initial_registration()) { - panic!("Failed to perform initial provider registration: {:?}", err); - }; + if !provider_poller.is_registered() { + info!("Trying to perform initial provider registration..."); + self.runtime + .block_on(provider_poller.perform_initial_registration()) + .expect("Failed to perform initial provider registration"); + } + provider_poller.start(self.runtime.handle()); + } - // setup all of futures for the components running on the client + // future responsible for periodically polling directory server and updating + // the current global view of topology + fn start_topology_refresher( + &mut self, + topology_accessor: TopologyAccessor, + ) { + let healthcheck_keys = MixIdentityKeyPair::new(); - // buffer controlling all messages fetched from provider - // required so that other components would be able to use them (say the websocket) - let received_messages_buffer_controllers_future = rt.spawn( - ReceivedMessagesBuffer::new() - .start_controllers(poller_input_rx, received_messages_buffer_output_rx), + let topology_refresher_config = TopologyRefresherConfig::new( + self.config.get_directory_server(), + self.config.get_topology_refresh_rate(), + healthcheck_keys, + self.config.get_topology_resolution_timeout(), + self.config.get_number_of_healthcheck_test_packets() as usize, + self.config.get_node_score_threshold(), ); + let mut topology_refresher = + TopologyRefresher::new(topology_refresher_config, topology_accessor); + // before returning, block entire runtime to refresh the current network view so that any + // components depending on topology would see a non-empty view + info!("Obtaining initial network topology..."); + self.runtime.block_on(topology_refresher.refresh()); + info!("Starting topology refresher..."); + topology_refresher.start(self.runtime.handle()); + } - // controller for sending sphinx packets to mixnet (either real traffic or cover traffic) - let mix_traffic_future = rt.spawn(MixTrafficController::run(mix_rx)); - - // future constantly pumping loop cover traffic at some specified average rate - // the pumped traffic goes to the MixTrafficController - let loop_cover_traffic_future = - rt.spawn(cover_traffic_stream::start_loop_cover_traffic_stream( - mix_tx.clone(), - Destination::new(self_address.clone(), Default::default()), - topology_controller.get_inner_ref(), - self.config.get_loop_cover_traffic_average_delay(), - self.config.get_average_packet_delay(), - )); - - // cloning arguments required by OutQueueControl; required due to move - let input_rx = self.input_rx; - let topology_ref = topology_controller.get_inner_ref(); - let average_packet_delay = self.config.get_average_packet_delay(); - let message_sending_average_delay = self.config.get_message_sending_average_delay(); - let self_address_clone = self_address.clone(); - // future constantly pumping traffic at some specified average rate - // if a real message is available on 'input_rx' that might have been received from say - // the websocket, the real message is used, otherwise a loop cover message is generated - // the pumped traffic goes to the MixTrafficController - let out_queue_control_future = rt.spawn(async move { - real_traffic_stream::OutQueueControl::new( - mix_tx, - input_rx, - Destination::new(self_address_clone, Default::default()), - topology_ref, - average_packet_delay, - message_sending_average_delay, - ) - .run_out_queue_control() - .await - }); - - // future constantly trying to fetch any received messages from the provider - // the received messages are sent to ReceivedMessagesBuffer to be available to rest of the system - let provider_polling_future = rt.spawn(provider_poller.start_provider_polling()); + // controller for sending sphinx packets to mixnet (either real traffic or cover traffic) + fn start_mix_traffic_controller(&mut self, mix_rx: MixMessageReceiver) { + info!("Starting mix trafic controller..."); + // TODO: possible optimisation: set the initial endpoints to all known mixes from layer 1 + let initial_mix_endpoints = Vec::new(); + self.runtime + .block_on(MixTrafficController::new( + initial_mix_endpoints, + self.config.get_packet_forwarding_initial_backoff(), + self.config.get_packet_forwarding_maximum_backoff(), + mix_rx, + )) + .start(self.runtime.handle()); + } + fn start_socket_listener( + &self, + topology_accessor: TopologyAccessor, + received_messages_buffer_output_tx: ReceivedBufferRequestSender, + input_tx: InputMessageSender, + ) { match self.config.get_socket_type() { SocketType::WebSocket => { - rt.spawn(ws::start_websocket( + ws::start_websocket( + self.runtime.handle(), self.config.get_listening_port(), - self.input_tx, + input_tx, received_messages_buffer_output_tx, - self_address, - topology_controller.get_inner_ref(), - )); + self.identity_keypair.public_key().derive_address(), + topology_accessor, + ); } SocketType::TCP => { - rt.spawn(tcp::start_tcpsocket( + tcp::start_tcpsocket( + self.runtime.handle(), self.config.get_listening_port(), - self.input_tx, + input_tx, received_messages_buffer_output_tx, - self_address, - topology_controller.get_inner_ref(), - )); + self.identity_keypair.public_key().derive_address(), + topology_accessor, + ); } SocketType::None => (), } + } - // future responsible for periodically polling directory server and updating - // the current global view of topology - let topology_refresher_future = rt.spawn(topology_controller.run_refresher()); - - rt.block_on(async { - let future_results = join!( - received_messages_buffer_controllers_future, - mix_traffic_future, - loop_cover_traffic_future, - out_queue_control_future, - provider_polling_future, - topology_refresher_future, - ); + /// EXPERIMENTAL DIRECT RUST API + /// It's entirely untested and there are absolutely no guarantees about it + pub fn send_message(&self, destination: Destination, message: Vec) { + self.input_tx + .as_ref() + .expect("start method was not called before!") + .unbounded_send(InputMessage(destination, message)) + .unwrap() + } - assert!( - future_results.0.is_ok() - && future_results.1.is_ok() - && future_results.2.is_ok() - && future_results.3.is_ok() - && future_results.4.is_ok() - && future_results.5.is_ok() + /// blocking version of `start` method. Will run forever (or until SIGINT is sent) + pub fn run_forever(&mut self) { + self.start(); + if let Err(e) = self.runtime.block_on(tokio::signal::ctrl_c()) { + error!( + "There was an error while capturing SIGINT - {:?}. We will terminate regardless", + e ); - }); + } + + println!( + "Received SIGINT - the mixnode will terminate now (threads are not YET nicely stopped)" + ); + } + + pub fn start(&mut self) { + info!("Starting nym client"); + // channels for inter-component communication - // this line in theory should never be reached as the runtime should be permanently blocked on traffic senders - error!("The client went kaput..."); - Ok(()) + // mix_tx is the transmitter for any component generating sphinx packets that are to be sent to the mixnet + // they are used by cover traffic stream and real traffic stream + // mix_rx is the receiver used by MixTrafficController that sends the actual traffic + let (mix_tx, mix_rx) = mpsc::unbounded(); + + // poller_input_tx is the transmitter of messages fetched from the provider - used by ProviderPoller + // poller_input_rx is the receiver for said messages - used by ReceivedMessagesBuffer + let (poller_input_tx, poller_input_rx) = mpsc::unbounded(); + + // received_messages_buffer_output_tx is the transmitter for *REQUESTS* for messages contained in ReceivedMessagesBuffer - used by sockets + // the requests contain a oneshot channel to send a reply on + // received_messages_buffer_output_rx is the received for the said requests - used by ReceivedMessagesBuffer + let (received_messages_buffer_output_tx, received_messages_buffer_output_rx) = + mpsc::unbounded(); + + // channels responsible for controlling real messages + let (input_tx, input_rx) = mpsc::unbounded::(); + + // TODO: when we switch to our graph topology, we need to remember to change 'presence::Topology' type + let shared_topology_accessor = TopologyAccessor::::new(); + // the components are started in very specific order. Unless you know what you are doing, + // do not change that. + self.start_topology_refresher(shared_topology_accessor.clone()); + self.start_received_messages_buffer_controller( + received_messages_buffer_output_rx, + poller_input_rx, + ); + self.start_provider_poller(shared_topology_accessor.clone(), poller_input_tx); + self.start_mix_traffic_controller(mix_rx); + self.start_cover_traffic_stream(shared_topology_accessor.clone(), mix_tx.clone()); + self.start_real_traffic_stream(shared_topology_accessor.clone(), mix_tx, input_rx); + self.start_socket_listener( + shared_topology_accessor, + received_messages_buffer_output_tx, + input_tx.clone(), + ); + self.input_tx = Some(input_tx); } } diff --git a/nym-client/src/client/provider_poller.rs b/nym-client/src/client/provider_poller.rs index 526acb9e4b4..24d329c9ae2 100644 --- a/nym-client/src/client/provider_poller.rs +++ b/nym-client/src/client/provider_poller.rs @@ -1,10 +1,15 @@ use futures::channel::mpsc; -use log::{debug, error, info, trace, warn}; +use log::*; use provider_client::ProviderClientError; use sfw_provider_requests::AuthToken; use sphinx::route::DestinationAddressBytes; use std::net::SocketAddr; use std::time; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; + +pub(crate) type PolledMessagesSender = mpsc::UnboundedSender>>; +pub(crate) type PolledMessagesReceiver = mpsc::UnboundedReceiver>>; pub(crate) struct ProviderPoller { polling_rate: time::Duration, @@ -31,11 +36,15 @@ impl ProviderPoller { } } + pub(crate) fn is_registered(&self) -> bool { + self.provider_client.is_registered() + } + // This method is only temporary until registration is moved to `client init` pub(crate) async fn perform_initial_registration(&mut self) -> Result<(), ProviderClientError> { debug!("performing initial provider registration"); - if !self.provider_client.is_registered() { + if !self.is_registered() { let auth_token = match self.provider_client.register().await { // in this particular case we can ignore this error Err(ProviderClientError::ClientAlreadyRegisteredError) => return Ok(()), @@ -52,8 +61,6 @@ impl ProviderPoller { } pub(crate) async fn start_provider_polling(self) { - info!("Starting provider poller"); - let loop_message = &mix_client::packet::LOOP_COVER_MESSAGE_PAYLOAD.to_vec(); let dummy_message = &sfw_provider_requests::DUMMY_MESSAGE_CONTENT.to_vec(); @@ -86,4 +93,8 @@ impl ProviderPoller { tokio::time::delay_for(self.polling_rate).await; } } + + pub(crate) fn start(self, handle: &Handle) -> JoinHandle<()> { + handle.spawn(async move { self.start_provider_polling().await }) + } } diff --git a/nym-client/src/client/real_traffic_stream.rs b/nym-client/src/client/real_traffic_stream.rs index 0fb9a198fe7..2e831966103 100644 --- a/nym-client/src/client/real_traffic_stream.rs +++ b/nym-client/src/client/real_traffic_stream.rs @@ -1,5 +1,5 @@ use crate::client::mix_traffic::MixMessage; -use crate::client::topology_control::TopologyInnerRef; +use crate::client::topology_control::TopologyAccessor; use crate::client::InputMessage; use futures::channel::mpsc; use futures::task::{Context, Poll}; @@ -8,6 +8,8 @@ use log::{error, info, trace, warn}; use sphinx::route::Destination; use std::pin::Pin; use std::time::Duration; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; use tokio::time; use topology::NymTopology; @@ -18,7 +20,7 @@ pub(crate) struct OutQueueControl { mix_tx: mpsc::UnboundedSender, input_rx: mpsc::UnboundedReceiver, our_info: Destination, - topology_ctrl_ref: TopologyInnerRef, + topology_access: TopologyAccessor, } pub(crate) enum StreamMessage { @@ -60,12 +62,12 @@ impl Stream for OutQueueControl { } } -impl OutQueueControl { +impl OutQueueControl { pub(crate) fn new( mix_tx: mpsc::UnboundedSender, input_rx: mpsc::UnboundedReceiver, our_info: Destination, - topology: TopologyInnerRef, + topology_access: TopologyAccessor, average_packet_delay: Duration, average_message_sending_delay: Duration, ) -> Self { @@ -76,10 +78,57 @@ impl OutQueueControl { mix_tx, input_rx, our_info, - topology_ctrl_ref: topology, + topology_access, } } + async fn on_message(&mut self, next_message: StreamMessage) { + trace!("created new message"); + let route = match self.topology_access.random_route().await { + None => { + warn!("No valid topology detected - won't send any real or loop message this time"); + // TODO: this creates a potential problem: we can lose real messages if we were + // unable to get topology, perhaps we should store them in some buffer? + return; + } + Some(route) => route, + }; + + let next_packet = match next_message { + StreamMessage::Cover => mix_client::packet::loop_cover_message_route( + self.our_info.address.clone(), + self.our_info.identifier, + route, + self.average_packet_delay, + ), + StreamMessage::Real(real_message) => mix_client::packet::encapsulate_message_route( + real_message.0, + real_message.1, + route, + self.average_packet_delay, + ), + }; + + let next_packet = match next_packet { + Ok(message) => message, + Err(err) => { + error!( + "Somehow we managed to create an invalid traffic message - {:?}", + err + ); + return; + } + }; + + // if this one fails, there's no retrying because it means that either: + // - we run out of memory + // - the receiver channel is closed + // in either case there's no recovery and we can only panic + self.mix_tx + .unbounded_send(MixMessage::new(next_packet.0, next_packet.1)) + .unwrap(); + } + pub(crate) async fn run_out_queue_control(mut self) { // we should set initial delay only when we actually start the stream self.next_delay = time::delay_for(mix_client::poisson::sample( @@ -88,51 +137,11 @@ impl OutQueueControl { info!("starting out queue controller"); while let Some(next_message) = self.next().await { - trace!("created new message"); - let read_lock = self.topology_ctrl_ref.read().await; - let topology = match read_lock.topology.as_ref() { - None => { - warn!( - "No valid topology detected - won't send any loop cover message this time" - ); - continue; - } - Some(topology) => topology, - }; - - let next_packet = match next_message { - StreamMessage::Cover => mix_client::packet::loop_cover_message( - self.our_info.address.clone(), - self.our_info.identifier, - topology, - self.average_packet_delay, - ), - StreamMessage::Real(real_message) => mix_client::packet::encapsulate_message( - real_message.0, - real_message.1, - topology, - self.average_packet_delay, - ), - }; - - let next_packet = match next_packet { - Ok(message) => message, - Err(err) => { - error!( - "Somehow we managed to create an invalid traffic message - {:?}", - err - ); - continue; - } - }; - - // if this one fails, there's no retrying because it means that either: - // - we run out of memory - // - the receiver channel is closed - // in either case there's no recovery and we can only panic - self.mix_tx - .unbounded_send(MixMessage::new(next_packet.0, next_packet.1)) - .unwrap(); + self.on_message(next_message).await; } } + + pub(crate) fn start(self, handle: &Handle) -> JoinHandle<()> { + handle.spawn(async move { self.run_out_queue_control().await }) + } } diff --git a/nym-client/src/client/received_buffer.rs b/nym-client/src/client/received_buffer.rs index a6df580c9f0..123fc928821 100644 --- a/nym-client/src/client/received_buffer.rs +++ b/nym-client/src/client/received_buffer.rs @@ -1,89 +1,124 @@ +use crate::client::provider_poller::PolledMessagesReceiver; use futures::channel::{mpsc, oneshot}; -use futures::lock::Mutex as FMutex; +use futures::lock::Mutex; use futures::StreamExt; -use log::{error, info, trace}; +use log::*; use std::sync::Arc; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; -pub type BufferResponse = oneshot::Sender>>; +pub(crate) type ReceivedBufferResponse = oneshot::Sender>>; +pub(crate) type ReceivedBufferRequestSender = mpsc::UnboundedSender; +pub(crate) type ReceivedBufferRequestReceiver = mpsc::UnboundedReceiver; -pub(crate) struct ReceivedMessagesBuffer { - inner: Arc>, +struct ReceivedMessagesBufferInner { + messages: Vec>, +} + +#[derive(Debug, Clone)] +// Note: you should NEVER create more than a single instance of this using 'new()'. +// You should always use .clone() to create additional instances +struct ReceivedMessagesBuffer { + inner: Arc>, } impl ReceivedMessagesBuffer { - pub(crate) fn new() -> Self { + fn new() -> Self { ReceivedMessagesBuffer { - inner: Arc::new(FMutex::new(Inner::new())), + inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner { + messages: Vec::new(), + })), } } - pub(crate) async fn start_controllers( - self, - poller_rx: mpsc::UnboundedReceiver>>, // to receive new messages - query_receiver: mpsc::UnboundedReceiver, // to receive requests to acquire all stored messages - ) { - let input_controller_future = tokio::spawn(Self::run_poller_input_controller( - self.inner.clone(), - poller_rx, - )); - let output_controller_future = tokio::spawn(Self::run_query_output_controller( - self.inner, - query_receiver, - )); + async fn add_new_messages(&mut self, msgs: Vec>) { + trace!("Adding new messages to the buffer! {:?}", msgs); + self.inner.lock().await.messages.extend(msgs) + } - futures::future::select(input_controller_future, output_controller_future).await; - panic!("One of the received buffer controllers failed!") + async fn acquire_and_empty(&mut self) -> Vec> { + trace!("Emptying the buffer and returning all messages"); + let mut mutex_guard = self.inner.lock().await; + std::mem::replace(&mut mutex_guard.messages, Vec::new()) } +} - pub(crate) async fn run_poller_input_controller( - buf: Arc>, - mut poller_rx: mpsc::UnboundedReceiver>>, - ) { - info!("Started Received Messages Buffer Input Controller"); +struct RequestReceiver { + received_buffer: ReceivedMessagesBuffer, + query_receiver: ReceivedBufferRequestReceiver, +} - while let Some(new_messages) = poller_rx.next().await { - Inner::add_new_messages(&*buf, new_messages).await; +impl RequestReceiver { + fn new( + received_buffer: ReceivedMessagesBuffer, + query_receiver: ReceivedBufferRequestReceiver, + ) -> Self { + RequestReceiver { + received_buffer, + query_receiver, } } - pub(crate) async fn run_query_output_controller( - buf: Arc>, - mut query_receiver: mpsc::UnboundedReceiver, - ) { - info!("Started Received Messages Buffer Output Controller"); - - while let Some(request) = query_receiver.next().await { - let messages = Inner::acquire_and_empty(&*buf).await; - if let Err(failed_messages) = request.send(messages) { - error!( - "Failed to send the messages to the requester. Adding them back to the buffer" - ); - Inner::add_new_messages(&*buf, failed_messages).await; + fn start(mut self, handle: &Handle) -> JoinHandle<()> { + handle.spawn(async move { + while let Some(request) = self.query_receiver.next().await { + let messages = self.received_buffer.acquire_and_empty().await; + if let Err(failed_messages) = request.send(messages) { + error!( + "Failed to send the messages to the requester. Adding them back to the buffer" + ); + self.received_buffer.add_new_messages(failed_messages).await; + } } - } + }) } } -pub(crate) struct Inner { - messages: Vec>, +struct MessageReceiver { + received_buffer: ReceivedMessagesBuffer, + poller_receiver: PolledMessagesReceiver, } -impl Inner { - fn new() -> Self { - Inner { - messages: Vec::new(), +impl MessageReceiver { + fn new( + received_buffer: ReceivedMessagesBuffer, + poller_receiver: PolledMessagesReceiver, + ) -> Self { + MessageReceiver { + received_buffer, + poller_receiver, } } + fn start(mut self, handle: &Handle) -> JoinHandle<()> { + handle.spawn(async move { + while let Some(new_messages) = self.poller_receiver.next().await { + self.received_buffer.add_new_messages(new_messages).await; + } + }) + } +} - async fn add_new_messages(buf: &FMutex, msgs: Vec>) { - trace!("Adding new messages to the buffer! {:?}", msgs); - let mut unlocked = buf.lock().await; - unlocked.messages.extend(msgs); +pub(crate) struct ReceivedMessagesBufferController { + messsage_receiver: MessageReceiver, + request_receiver: RequestReceiver, +} + +impl ReceivedMessagesBufferController { + pub(crate) fn new( + query_receiver: ReceivedBufferRequestReceiver, + poller_receiver: PolledMessagesReceiver, + ) -> Self { + let received_buffer = ReceivedMessagesBuffer::new(); + + ReceivedMessagesBufferController { + messsage_receiver: MessageReceiver::new(received_buffer.clone(), poller_receiver), + request_receiver: RequestReceiver::new(received_buffer, query_receiver), + } } - async fn acquire_and_empty(buf: &FMutex) -> Vec> { - trace!("Emptying the buffer and returning all messages"); - let mut unlocked = buf.lock().await; - std::mem::replace(&mut unlocked.messages, Vec::new()) + pub(crate) fn start(self, handle: &Handle) { + // TODO: should we do anything with JoinHandle(s) returned by start methods? + self.messsage_receiver.start(handle); + self.request_receiver.start(handle); } } diff --git a/nym-client/src/client/topology_control.rs b/nym-client/src/client/topology_control.rs index c7403503c3b..1b11641bb9c 100644 --- a/nym-client/src/client/topology_control.rs +++ b/nym-client/src/client/topology_control.rs @@ -1,23 +1,67 @@ use crate::built_info; use crypto::identity::MixIdentityKeyPair; +use futures::lock::Mutex; use healthcheck::HealthChecker; -use log::{error, info, trace, warn}; -use std::marker::PhantomData; +use log::*; use std::sync::Arc; use std::time; -use tokio::sync::RwLock as FRwLock; +use std::time::Duration; +use tokio::runtime::Handle; +// use tokio::sync::RwLock; +use tokio::task::JoinHandle; use topology::NymTopology; -const NODE_HEALTH_THRESHOLD: f64 = 0.0; +struct TopologyAccessorInner(Option); -// auxiliary type for ease of use -pub type TopologyInnerRef = Arc>>; +impl TopologyAccessorInner { + fn new() -> Self { + TopologyAccessorInner(None) + } -pub(crate) struct TopologyControl { - directory_server: String, - inner: Arc>>, - health_checker: HealthChecker, - refresh_rate: time::Duration, + fn update(&mut self, new: Option) { + self.0 = new; + } +} + +#[derive(Clone, Debug)] +pub(crate) struct TopologyAccessor { + // TODO: this requires some actual benchmarking to determine if obtaining mutex is not going + // to cause some bottlenecking and whether perhaps RwLock would be better + inner: Arc>>, +} + +impl TopologyAccessor { + pub(crate) fn new() -> Self { + TopologyAccessor { + inner: Arc::new(Mutex::new(TopologyAccessorInner::new())), + } + } + + async fn update_global_topology(&mut self, new_topology: Option) { + self.inner.lock().await.update(new_topology); + } + + // Unless you absolutely need the entire topology, use `random_route` instead + pub(crate) async fn get_current_topology_clone(&mut self) -> Option { + self.inner.lock().await.0.clone() + } + + // this is a rather temporary solution as each client will have an associated provider + // currently that is not implemented yet and there only exists one provider in the network + pub(crate) async fn random_route(&mut self) -> Option> { + match &self.inner.lock().await.0 { + None => None, + Some(ref topology) => { + let mut providers = topology.providers(); + if providers.is_empty() { + return None; + } + // unwrap is fine here as we asserted there is at least single provider + let provider = providers.pop().unwrap().into(); + topology.route_to(provider).ok() + } + } + } } #[derive(Debug)] @@ -26,40 +70,48 @@ enum TopologyError { NoValidPathsError, } -pub(crate) struct TopologyControlConfig { +pub(crate) struct TopologyRefresherConfig { directory_server: String, refresh_rate: time::Duration, identity_keypair: MixIdentityKeyPair, resolution_timeout: time::Duration, number_test_packets: usize, - - // the only reason I put phantom data here is so that we would we able to infer type - // of TopologyControl directly from the provided config rather than having to - // specify it during TopologyControl::::new() call - _topology_type_phantom: PhantomData<*const T>, + node_score_threshold: f64, } -impl TopologyControlConfig { +impl TopologyRefresherConfig { pub(crate) fn new( directory_server: String, refresh_rate: time::Duration, identity_keypair: MixIdentityKeyPair, resolution_timeout: time::Duration, number_test_packets: usize, + node_score_threshold: f64, ) -> Self { - TopologyControlConfig { + TopologyRefresherConfig { directory_server, refresh_rate, identity_keypair, resolution_timeout, number_test_packets, - _topology_type_phantom: PhantomData, + node_score_threshold, } } } -impl TopologyControl { - pub(crate) async fn new(cfg: TopologyControlConfig) -> Self { +pub(crate) struct TopologyRefresher { + directory_server: String, + topology_accessor: TopologyAccessor, + health_checker: HealthChecker, + refresh_rate: Duration, + node_score_threshold: f64, +} + +impl TopologyRefresher { + pub(crate) fn new( + cfg: TopologyRefresherConfig, + topology_accessor: TopologyAccessor, + ) -> Self { // this is a temporary solution as the healthcheck will eventually be moved to validators let health_checker = healthcheck::HealthChecker::new( cfg.resolution_timeout, @@ -67,26 +119,13 @@ impl TopologyControl { cfg.identity_keypair, ); - let mut topology_control = TopologyControl { + TopologyRefresher { directory_server: cfg.directory_server, - refresh_rate: cfg.refresh_rate, - inner: Arc::new(FRwLock::new(Inner::new(None))), + topology_accessor, health_checker, - }; - - // best effort approach to try to get a valid topology after call to 'new' - let initial_topology = match topology_control.get_current_compatible_topology().await { - Ok(topology) => Some(topology), - Err(err) => { - error!("Initial topology is invalid - {:?}. Right now it will be impossible to send any packets through the mixnet!", err); - None - } - }; - topology_control - .update_global_topology(initial_topology) - .await; - - topology_control + refresh_rate: cfg.refresh_rate, + node_score_threshold: cfg.node_score_threshold, + } } async fn get_current_compatible_topology(&self) -> Result { @@ -110,7 +149,7 @@ impl TopologyControl { }; let healthy_topology = healthcheck_scores - .filter_topology_by_score(&version_filtered_topology, NODE_HEALTH_THRESHOLD); + .filter_topology_by_score(&version_filtered_topology, self.node_score_threshold); // make sure you can still send a packet through the network: if !healthy_topology.can_construct_path_through() { @@ -120,42 +159,27 @@ impl TopologyControl { Ok(healthy_topology) } - pub(crate) fn get_inner_ref(&self) -> Arc>> { - self.inner.clone() - } - - async fn update_global_topology(&mut self, new_topology: Option) { - // acquire write lock - let mut write_lock = self.inner.write().await; - write_lock.topology = new_topology; - } - - pub(crate) async fn run_refresher(mut self) { - info!("Starting topology refresher"); - loop { - trace!("Refreshing the topology"); - let new_topology_res = self.get_current_compatible_topology().await; - - let new_topology = match new_topology_res { - Ok(topology) => Some(topology), - Err(err) => { - warn!("the obtained topology seems to be invalid - {:?}, it will be impossible to send packets through", err); - None - } - }; + pub(crate) async fn refresh(&mut self) { + trace!("Refreshing the topology"); + let new_topology = match self.get_current_compatible_topology().await { + Ok(topology) => Some(topology), + Err(err) => { + warn!("the obtained topology seems to be invalid - {:?}, it will be impossible to send packets through", err); + None + } + }; - self.update_global_topology(new_topology).await; - tokio::time::delay_for(self.refresh_rate).await; - } + self.topology_accessor + .update_global_topology(new_topology) + .await; } -} -pub struct Inner { - pub topology: Option, -} - -impl Inner { - fn new(topology: Option) -> Self { - Inner { topology } + pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> { + handle.spawn(async move { + loop { + self.refresh().await; + tokio::time::delay_for(self.refresh_rate).await; + } + }) } } diff --git a/nym-client/src/commands/run.rs b/nym-client/src/commands/run.rs index 9d2918a2044..9f59192bf56 100644 --- a/nym-client/src/commands/run.rs +++ b/nym-client/src/commands/run.rs @@ -50,7 +50,5 @@ pub fn execute(matches: &ArgMatches) { .expect("Failed to load config file"); config = override_config(config, matches); - - let client = NymClient::new(config); - client.start().unwrap(); + NymClient::new(config).run_forever(); } diff --git a/nym-client/src/config/mod.rs b/nym-client/src/config/mod.rs index 8b23dfa2245..207f5b222a8 100644 --- a/nym-client/src/config/mod.rs +++ b/nym-client/src/config/mod.rs @@ -18,6 +18,8 @@ const DEFAULT_AVERAGE_PACKET_DELAY: u64 = 200; const DEFAULT_FETCH_MESSAGES_DELAY: u64 = 1000; const DEFAULT_TOPOLOGY_REFRESH_RATE: u64 = 10_000; const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: u64 = 5_000; +const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: u64 = 10_000; // 10s +const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: u64 = 300_000; // 5min const DEFAULT_NUMBER_OF_HEALTHCHECK_TEST_PACKETS: u64 = 2; const DEFAULT_NODE_SCORE_THRESHOLD: f64 = 0.0; @@ -203,6 +205,14 @@ impl Config { pub fn get_node_score_threshold(&self) -> f64 { self.debug.node_score_threshold } + + pub fn get_packet_forwarding_initial_backoff(&self) -> time::Duration { + time::Duration::from_millis(self.debug.packet_forwarding_initial_backoff) + } + + pub fn get_packet_forwarding_maximum_backoff(&self) -> time::Duration { + time::Duration::from_millis(self.debug.packet_forwarding_maximum_backoff) + } } fn de_option_string<'de, D>(deserializer: D) -> Result, D::Error> @@ -356,6 +366,16 @@ pub struct Debug { /// node received during healthcheck. Node's score must be above that value to be /// considered healthy. node_score_threshold: f64, + + /// Initial value of an exponential backoff to reconnect to dropped TCP connection when + /// forwarding sphinx packets. + /// The provided value is interpreted as milliseconds. + packet_forwarding_initial_backoff: u64, + + /// Maximum value of an exponential backoff to reconnect to dropped TCP connection when + /// forwarding sphinx packets. + /// The provided value is interpreted as milliseconds. + packet_forwarding_maximum_backoff: u64, } impl Default for Debug { @@ -370,6 +390,8 @@ impl Default for Debug { topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT, number_of_healthcheck_test_packets: DEFAULT_NUMBER_OF_HEALTHCHECK_TEST_PACKETS, node_score_threshold: DEFAULT_NODE_SCORE_THRESHOLD, + packet_forwarding_initial_backoff: DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF, + packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF, } } } diff --git a/nym-client/src/config/template.rs b/nym-client/src/config/template.rs index 561ae802add..c18730ed4b9 100644 --- a/nym-client/src/config/template.rs +++ b/nym-client/src/config/template.rs @@ -109,6 +109,15 @@ number_of_healthcheck_test_packets = {{ debug.number_of_healthcheck_test_packets # node received during healthcheck. Node's score must be above that value to be # considered healthy. node_score_threshold = {{ debug.node_score_threshold }} - + +# Initial value of an exponential backoff to reconnect to dropped TCP connection when +# forwarding sphinx packets. +# The provided value is interpreted as milliseconds. +packet_forwarding_initial_backoff = {{ debug.packet_forwarding_initial_backoff }} + +# Maximum value of an exponential backoff to reconnect to dropped TCP connection when +# forwarding sphinx packets. +# The provided value is interpreted as milliseconds. +packet_forwarding_maximum_backoff = {{ debug.packet_forwarding_maximum_backoff }} "# } diff --git a/nym-client/src/sockets/tcp.rs b/nym-client/src/sockets/tcp.rs index c8b65c0df4c..abfe06b3c13 100644 --- a/nym-client/src/sockets/tcp.rs +++ b/nym-client/src/sockets/tcp.rs @@ -1,5 +1,5 @@ -use crate::client::received_buffer::BufferResponse; -use crate::client::topology_control::TopologyInnerRef; +use crate::client::received_buffer::ReceivedBufferResponse; +use crate::client::topology_control::TopologyAccessor; use crate::client::InputMessage; use futures::channel::{mpsc, oneshot}; use futures::future::FutureExt; @@ -11,6 +11,8 @@ use std::convert::TryFrom; use std::io; use std::net::SocketAddr; use tokio::prelude::*; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; use topology::NymTopology; const SEND_REQUEST_PREFIX: u8 = 1; @@ -110,7 +112,9 @@ impl ClientRequest { ServerResponse::Send } - async fn handle_fetch(mut msg_query: mpsc::UnboundedSender) -> ServerResponse { + async fn handle_fetch( + mut msg_query: mpsc::UnboundedSender, + ) -> ServerResponse { trace!("handle_fetch called"); let (res_tx, res_rx) = oneshot::channel(); if msg_query.send(res_tx).await.is_err() { @@ -133,9 +137,10 @@ impl ClientRequest { ServerResponse::Fetch { messages } } - async fn handle_get_clients(topology: &TopologyInnerRef) -> ServerResponse { - let topology_data = &topology.read().await.topology; - match topology_data { + async fn handle_get_clients( + mut topology_accessor: TopologyAccessor, + ) -> ServerResponse { + match topology_accessor.get_current_topology_clone().await { Some(topology) => { let clients = topology .providers() @@ -228,7 +233,7 @@ async fn handle_connection( } ClientRequest::Fetch => ClientRequest::handle_fetch(request_handling_data.msg_query).await, ClientRequest::GetClients => { - ClientRequest::handle_get_clients(&request_handling_data.topology).await + ClientRequest::handle_get_clients(request_handling_data.topology_accessor).await } ClientRequest::OwnDetails => { ClientRequest::handle_own_details(request_handling_data.self_address).await @@ -240,17 +245,17 @@ async fn handle_connection( struct RequestHandlingData { msg_input: mpsc::UnboundedSender, - msg_query: mpsc::UnboundedSender, + msg_query: mpsc::UnboundedSender, self_address: DestinationAddressBytes, - topology: TopologyInnerRef, + topology_accessor: TopologyAccessor, } async fn accept_connection( mut socket: tokio::net::TcpStream, msg_input: mpsc::UnboundedSender, - msg_query: mpsc::UnboundedSender, + msg_query: mpsc::UnboundedSender, self_address: DestinationAddressBytes, - topology: TopologyInnerRef, + topology_accessor: TopologyAccessor, ) { let address = socket .peer_addr() @@ -271,7 +276,7 @@ async fn accept_connection( } Ok(n) => { let request_handling_data = RequestHandlingData { - topology: topology.clone(), + topology_accessor: topology_accessor.clone(), msg_input: msg_input.clone(), msg_query: msg_query.clone(), self_address: self_address.clone(), @@ -295,16 +300,16 @@ async fn accept_connection( } } -pub async fn start_tcpsocket( +pub(crate) async fn run_tcpsocket( listening_port: u16, message_tx: mpsc::UnboundedSender, - received_messages_query_tx: mpsc::UnboundedSender, + received_messages_query_tx: mpsc::UnboundedSender, self_address: DestinationAddressBytes, - topology: TopologyInnerRef, -) -> Result<(), TCPSocketError> { + topology_accessor: TopologyAccessor, +) { let address = SocketAddr::new("127.0.0.1".parse().unwrap(), listening_port); info!("Starting tcp socket listener at {:?}", address); - let mut listener = tokio::net::TcpListener::bind(address).await?; + let mut listener = tokio::net::TcpListener::bind(address).await.unwrap(); while let Ok((stream, _)) = listener.accept().await { // it's fine to be cloning the channel on all new connection, because in principle @@ -314,10 +319,27 @@ pub async fn start_tcpsocket( message_tx.clone(), received_messages_query_tx.clone(), self_address.clone(), - topology.clone(), + topology_accessor.clone(), )); } +} - error!("The tcpsocket went kaput..."); - Ok(()) +pub(crate) fn start_tcpsocket( + handle: &Handle, + listening_port: u16, + message_tx: mpsc::UnboundedSender, + received_messages_query_tx: mpsc::UnboundedSender, + self_address: DestinationAddressBytes, + topology_accessor: TopologyAccessor, +) -> JoinHandle<()> { + handle.spawn(async move { + run_tcpsocket( + listening_port, + message_tx, + received_messages_query_tx, + self_address, + topology_accessor, + ) + .await; + }) } diff --git a/nym-client/src/sockets/ws.rs b/nym-client/src/sockets/ws.rs index 845c4971c97..01bd6a111a1 100644 --- a/nym-client/src/sockets/ws.rs +++ b/nym-client/src/sockets/ws.rs @@ -1,5 +1,5 @@ -use crate::client::received_buffer::BufferResponse; -use crate::client::topology_control::TopologyInnerRef; +use crate::client::received_buffer::ReceivedBufferResponse; +use crate::client::topology_control::TopologyAccessor; use crate::client::InputMessage; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::{mpsc, oneshot}; @@ -12,6 +12,8 @@ use sphinx::route::{Destination, DestinationAddressBytes}; use std::convert::TryFrom; use std::io; use std::net::SocketAddr; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::tungstenite::protocol::{CloseFrame, Message}; use topology::NymTopology; @@ -19,10 +21,10 @@ use topology::NymTopology; struct Connection { address: SocketAddr, msg_input: mpsc::UnboundedSender, - msg_query: mpsc::UnboundedSender, + msg_query: mpsc::UnboundedSender, rx: UnboundedReceiver, self_address: DestinationAddressBytes, - topology: TopologyInnerRef, + topology_accessor: TopologyAccessor, tx: UnboundedSender, } @@ -49,7 +51,9 @@ impl Connection { ClientRequest::handle_send(message, recipient_address, self.msg_input.clone()).await } ClientRequest::Fetch => ClientRequest::handle_fetch(self.msg_query.clone()).await, - ClientRequest::GetClients => ClientRequest::handle_get_clients(&self.topology).await, + ClientRequest::GetClients => { + ClientRequest::handle_get_clients(self.topology_accessor.clone()).await + } ClientRequest::OwnDetails => { ClientRequest::handle_own_details(self.self_address.clone()).await } @@ -229,7 +233,9 @@ impl ClientRequest { ServerResponse::Send } - async fn handle_fetch(mut msg_query: mpsc::UnboundedSender) -> ServerResponse { + async fn handle_fetch( + mut msg_query: mpsc::UnboundedSender, + ) -> ServerResponse { let (res_tx, res_rx) = oneshot::channel(); if msg_query.send(res_tx).await.is_err() { warn!("Failed to handle_fetch. msg_query.send() is an error."); @@ -265,9 +271,10 @@ impl ClientRequest { ServerResponse::Fetch { messages } } - async fn handle_get_clients(topology: &TopologyInnerRef) -> ServerResponse { - let topology_data = &topology.read().await.topology; - match topology_data { + async fn handle_get_clients( + mut topology_accessor: TopologyAccessor, + ) -> ServerResponse { + match topology_accessor.get_current_topology_clone().await { Some(topology) => { let clients = topology .providers() @@ -313,9 +320,9 @@ impl Into for ServerResponse { async fn accept_connection( stream: tokio::net::TcpStream, msg_input: mpsc::UnboundedSender, - msg_query: mpsc::UnboundedSender, + msg_query: mpsc::UnboundedSender, self_address: DestinationAddressBytes, - topology: TopologyInnerRef, + topology_accessor: TopologyAccessor, ) { let address = stream .peer_addr() @@ -339,7 +346,7 @@ async fn accept_connection( address, rx: msg_rx, tx: response_tx, - topology, + topology_accessor, msg_input, msg_query, self_address, @@ -391,16 +398,16 @@ async fn accept_connection( } } -pub async fn start_websocket( +pub(crate) async fn run_websocket( listening_port: u16, message_tx: mpsc::UnboundedSender, - received_messages_query_tx: mpsc::UnboundedSender, + received_messages_query_tx: mpsc::UnboundedSender, self_address: DestinationAddressBytes, - topology: TopologyInnerRef, -) -> Result<(), WebSocketError> { + topology_accessor: TopologyAccessor, +) { let address = SocketAddr::new("127.0.0.1".parse().unwrap(), listening_port); info!("Starting websocket listener at {:?}", address); - let mut listener = tokio::net::TcpListener::bind(address).await?; + let mut listener = tokio::net::TcpListener::bind(address).await.unwrap(); while let Ok((stream, _)) = listener.accept().await { // it's fine to be cloning the channel on all new connection, because in principle @@ -410,10 +417,27 @@ pub async fn start_websocket( message_tx.clone(), received_messages_query_tx.clone(), self_address.clone(), - topology.clone(), + topology_accessor.clone(), )); } +} - error!("The websocket went kaput..."); - Ok(()) +pub(crate) fn start_websocket( + handle: &Handle, + listening_port: u16, + message_tx: mpsc::UnboundedSender, + received_messages_query_tx: mpsc::UnboundedSender, + self_address: DestinationAddressBytes, + topology_accessor: TopologyAccessor, +) -> JoinHandle<()> { + handle.spawn(async move { + run_websocket( + listening_port, + message_tx, + received_messages_query_tx, + self_address, + topology_accessor, + ) + .await; + }) } diff --git a/sfw-provider/src/provider/client_handling/listener.rs b/sfw-provider/src/provider/client_handling/listener.rs index f3cee1bf5ee..260c947b071 100644 --- a/sfw-provider/src/provider/client_handling/listener.rs +++ b/sfw-provider/src/provider/client_handling/listener.rs @@ -37,7 +37,7 @@ async fn process_request( .map(|c| c.into_tuple()) .unzip(); let response_bytes = PullResponse::new(messages).to_bytes(); - if let Ok(_) = socket.write_all(&response_bytes).await { + if socket.write_all(&response_bytes).await.is_ok() { // only delete stored messages if we managed to actually send the response if let Err(e) = request_processor.delete_sent_messages(paths).await { error!("Somehow failed to delete stored messages! - {:?}", e); diff --git a/sfw-provider/src/provider/client_handling/request_processing.rs b/sfw-provider/src/provider/client_handling/request_processing.rs index 6e7a36bb08d..57707c1b0dd 100644 --- a/sfw-provider/src/provider/client_handling/request_processing.rs +++ b/sfw-provider/src/provider/client_handling/request_processing.rs @@ -85,27 +85,26 @@ impl RequestProcessor { ); let auth_token = self.generate_new_auth_token(req.destination_address.clone()); - if let Some(_) = self + if self .client_ledger .insert_token(auth_token.clone(), req.destination_address.clone()) .await + .is_some() { info!( "Client {:?} was already registered before!", req.destination_address.to_base58_string() ) - } else { - if let Err(e) = self - .client_storage - .create_storage_dir(req.destination_address.clone()) - .await - { - error!("We failed to create inbox directory for the client -{:?}\nReverting issued token...", e); - // we must revert our changes if this operation failed - self.client_ledger - .remove_token(&req.destination_address) - .await; - } + } else if let Err(e) = self + .client_storage + .create_storage_dir(req.destination_address.clone()) + .await + { + error!("We failed to create inbox directory for the client -{:?}\nReverting issued token...", e); + // we must revert our changes if this operation failed + self.client_ledger + .remove_token(&req.destination_address) + .await; } Ok(ClientProcessingResult::RegisterResponse(auth_token)) diff --git a/sfw-provider/src/provider/mix_handling/listener.rs b/sfw-provider/src/provider/mix_handling/listener.rs index 00a6b970622..70a37c78411 100644 --- a/sfw-provider/src/provider/mix_handling/listener.rs +++ b/sfw-provider/src/provider/mix_handling/listener.rs @@ -42,10 +42,7 @@ async fn process_socket_connection( } // we must be able to handle multiple packets from same connection independently - tokio::spawn(process_received_packet( - buf.clone(), - packet_processor.clone(), - )) + tokio::spawn(process_received_packet(buf, packet_processor.clone())) } Err(e) => { warn!(