diff --git a/clients/desktop/src/client/mod.rs b/clients/desktop/src/client/mod.rs index bae33a7149b..c9bf3168dae 100644 --- a/clients/desktop/src/client/mod.rs +++ b/clients/desktop/src/client/mod.rs @@ -200,7 +200,7 @@ impl NymClient { async fn get_gateway_address( gateway_id: String, - mut topology_accessor: TopologyAccessor, + topology_accessor: TopologyAccessor, ) -> url::Url { // we already have our gateway written in the config let gateway_address = topology_accessor diff --git a/clients/desktop/src/client/real_traffic_stream.rs b/clients/desktop/src/client/real_traffic_stream.rs index c07f6c882a9..31a6822ce5d 100644 --- a/clients/desktop/src/client/real_traffic_stream.rs +++ b/clients/desktop/src/client/real_traffic_stream.rs @@ -19,7 +19,7 @@ use futures::channel::mpsc; use futures::task::{Context, Poll}; use futures::{Future, Stream, StreamExt}; use log::{error, info, trace, warn}; -use nymsphinx::Destination; +use nymsphinx::{Destination, DestinationAddressBytes}; use std::pin::Pin; use std::time::Duration; use tokio::runtime::Handle; @@ -96,31 +96,48 @@ impl OutQueueControl { } } + async fn get_route( + &self, + client: Option, + ) -> Option> { + let route = match client { + None => self.topology_access.random_route().await, + Some(client) => self.topology_access.random_route_to_client(client).await, + }; + + route + } + async fn on_message(&mut self, next_message: StreamMessage) { trace!("created new message"); - let route = match self.topology_access.random_route().await { - None => { - warn!("No valid topology detected - won't send any real or loop message this time"); - // TODO: this creates a potential problem: we can lose real messages if we were - // unable to get topology, perhaps we should store them in some buffer? - return; - } - Some(route) => route, - }; let next_packet = match next_message { - StreamMessage::Cover => mix_client::packet::loop_cover_message_route( - self.our_info.address.clone(), - self.our_info.identifier, - route, - self.average_packet_delay, - ), - StreamMessage::Real(real_message) => mix_client::packet::encapsulate_message_route( - real_message.0, - real_message.1, - route, - self.average_packet_delay, - ), + StreamMessage::Cover => { + let route = self.get_route(None).await; + if route.is_none() { + warn!("No valid topology detected - won't send any real or loop message this time"); + } + let route = route.unwrap(); + mix_client::packet::loop_cover_message_route( + self.our_info.address.clone(), + self.our_info.identifier, + route, + self.average_packet_delay, + ) + } + StreamMessage::Real(real_message) => { + let route = self.get_route(Some(real_message.0.address.clone())).await; + if route.is_none() { + warn!("No valid topology detected - won't send any real or loop message this time"); + } + let route = route.unwrap(); + mix_client::packet::encapsulate_message_route( + real_message.0, + real_message.1, + route, + self.average_packet_delay, + ) + } }; let next_packet = match next_packet { @@ -143,6 +160,9 @@ impl OutQueueControl { .unwrap(); // JS: Not entirely sure why or how it fixes stuff, but without the yield call, // the UnboundedReceiver [of mix_rx] will not get a chance to read anything + // JS2: Basically it was the case that with high enough rate, the stream had already a next value + // ready and hence was immediately re-scheduled causing other tasks to be starved; + // yield makes it go back the scheduling queue regardless of its value availability tokio::task::yield_now().await; } diff --git a/clients/desktop/src/client/topology_control.rs b/clients/desktop/src/client/topology_control.rs index 21919edb33a..70d8bf05ef5 100644 --- a/clients/desktop/src/client/topology_control.rs +++ b/clients/desktop/src/client/topology_control.rs @@ -17,6 +17,7 @@ use crypto::identity::MixIdentityKeyPair; use futures::lock::Mutex; use healthcheck::HealthChecker; use log::*; +use nymsphinx::DestinationAddressBytes; use std::sync::Arc; use std::time; use std::time::Duration; @@ -57,7 +58,7 @@ impl TopologyAccessor { } // not removed until healtchecker is not fully changed to use gateways instead of providers - pub(crate) async fn get_provider_socket_addr(&mut self, id: &str) -> Option { + pub(crate) async fn get_provider_socket_addr(&self, id: &str) -> Option { match &self.inner.lock().await.0 { None => None, Some(ref topology) => topology @@ -68,7 +69,7 @@ impl TopologyAccessor { } } - pub(crate) async fn get_gateway_socket_url(&mut self, id: &str) -> Option { + pub(crate) async fn get_gateway_socket_url(&self, id: &str) -> Option { match &self.inner.lock().await.0 { None => None, Some(ref topology) => topology @@ -89,11 +90,11 @@ impl TopologyAccessor { } // Unless you absolutely need the entire topology, use `random_route` instead - pub(crate) async fn get_current_topology_clone(&mut self) -> Option { + pub(crate) async fn get_current_topology_clone(&self) -> Option { self.inner.lock().await.0.clone() } - pub(crate) async fn get_all_clients(&mut self) -> Option> { + pub(crate) async fn get_all_clients(&self) -> Option> { // TODO: this will need to be modified to instead return pairs (provider, client) match &self.inner.lock().await.0 { None => None, @@ -108,9 +109,26 @@ impl TopologyAccessor { } } + pub(crate) async fn random_route_to_client( + &self, + client_address: DestinationAddressBytes, + ) -> Option> { + let b58_address = client_address.to_base58_string(); + let guard = self.inner.lock().await; + let topology = guard.0.as_ref()?; + + let gateway = topology + .gateways() + .iter() + .cloned() + .find(|gateway| gateway.has_client(b58_address.clone()))?; + + topology.random_route_to(gateway.into()).ok() + } + // this is a rather temporary solution as each client will have an associated provider // currently that is not implemented yet and there only exists one provider in the network - pub(crate) async fn random_route(&mut self) -> Option> { + pub(crate) async fn random_route(&self) -> Option> { match &self.inner.lock().await.0 { None => None, Some(ref topology) => { @@ -120,7 +138,7 @@ impl TopologyAccessor { } // unwrap is fine here as we asserted there is at least single provider let provider = gateways.pop().unwrap().into(); - topology.route_to(provider).ok() + topology.random_route_to(provider).ok() } } } diff --git a/common/client-libs/mix-client/src/packet.rs b/common/client-libs/mix-client/src/packet.rs index cb26b8e5015..31bca849737 100644 --- a/common/client-libs/mix-client/src/packet.rs +++ b/common/client-libs/mix-client/src/packet.rs @@ -97,7 +97,7 @@ pub fn encapsulate_message( // unwrap is fine here as we asserted there is at least single provider let provider = providers.pop().unwrap().into(); - let route = topology.route_to(provider)?; + let route = topology.random_route_to(provider)?; let delays = delays::generate_from_average_duration(route.len(), average_delay); diff --git a/common/topology/src/gateway.rs b/common/topology/src/gateway.rs index aca4da3fcb4..c67b95ffe19 100644 --- a/common/topology/src/gateway.rs +++ b/common/topology/src/gateway.rs @@ -40,6 +40,13 @@ impl Node { bs58::decode(&self.pub_key).into(&mut key_bytes).unwrap(); key_bytes } + + pub fn has_client(&self, client_pub_key: String) -> bool { + self.registered_clients + .iter() + .find(|client| client.pub_key == client_pub_key) + .is_some() + } } impl filter::Versioned for Node { diff --git a/common/topology/src/lib.rs b/common/topology/src/lib.rs index 790d683f27a..41b107ae821 100644 --- a/common/topology/src/lib.rs +++ b/common/topology/src/lib.rs @@ -72,7 +72,7 @@ pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone { } // Tries to get a route through the mix network - fn mix_route(&self) -> Result, NymTopologyError> { + fn random_mix_route(&self) -> Result, NymTopologyError> { let mut layered_topology = self.make_layered_topology()?; let num_layers = layered_topology.len(); let route = (1..=num_layers as u64) @@ -87,9 +87,12 @@ pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone { } // Sets up a route to a specific provider - fn route_to(&self, provider_node: SphinxNode) -> Result, NymTopologyError> { + fn random_route_to( + &self, + provider_node: SphinxNode, + ) -> Result, NymTopologyError> { Ok(self - .mix_route()? + .random_mix_route()? .into_iter() .chain(std::iter::once(provider_node)) .collect())