Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/send to correct gateway #213

Merged
merged 2 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/desktop/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl NymClient {

async fn get_gateway_address<T: NymTopology>(
gateway_id: String,
mut topology_accessor: TopologyAccessor<T>,
topology_accessor: TopologyAccessor<T>,
) -> url::Url {
// we already have our gateway written in the config
let gateway_address = topology_accessor
Expand Down
64 changes: 42 additions & 22 deletions clients/desktop/src/client/real_traffic_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::channel::mpsc;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::{error, info, trace, warn};
use nymsphinx::Destination;
use nymsphinx::{Destination, DestinationAddressBytes};
use std::pin::Pin;
use std::time::Duration;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -96,31 +96,48 @@ impl<T: 'static + NymTopology> OutQueueControl<T> {
}
}

async fn get_route(
&self,
client: Option<DestinationAddressBytes>,
) -> Option<Vec<nymsphinx::Node>> {
let route = match client {
None => self.topology_access.random_route().await,
Some(client) => self.topology_access.random_route_to_client(client).await,
};

route
}

async fn on_message(&mut self, next_message: StreamMessage) {
trace!("created new message");
let route = match self.topology_access.random_route().await {
None => {
warn!("No valid topology detected - won't send any real or loop message this time");
// TODO: this creates a potential problem: we can lose real messages if we were
// unable to get topology, perhaps we should store them in some buffer?
return;
}
Some(route) => route,
};

let next_packet = match next_message {
StreamMessage::Cover => mix_client::packet::loop_cover_message_route(
self.our_info.address.clone(),
self.our_info.identifier,
route,
self.average_packet_delay,
),
StreamMessage::Real(real_message) => mix_client::packet::encapsulate_message_route(
real_message.0,
real_message.1,
route,
self.average_packet_delay,
),
StreamMessage::Cover => {
let route = self.get_route(None).await;
if route.is_none() {
warn!("No valid topology detected - won't send any real or loop message this time");
}
let route = route.unwrap();
mix_client::packet::loop_cover_message_route(
self.our_info.address.clone(),
self.our_info.identifier,
route,
self.average_packet_delay,
)
}
StreamMessage::Real(real_message) => {
let route = self.get_route(Some(real_message.0.address.clone())).await;
if route.is_none() {
warn!("No valid topology detected - won't send any real or loop message this time");
}
let route = route.unwrap();
mix_client::packet::encapsulate_message_route(
real_message.0,
real_message.1,
route,
self.average_packet_delay,
)
}
};

let next_packet = match next_packet {
Expand All @@ -143,6 +160,9 @@ impl<T: 'static + NymTopology> OutQueueControl<T> {
.unwrap();
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
// the UnboundedReceiver [of mix_rx] will not get a chance to read anything
// JS2: Basically it was the case that with high enough rate, the stream had already a next value
// ready and hence was immediately re-scheduled causing other tasks to be starved;
// yield makes it go back the scheduling queue regardless of its value availability
tokio::task::yield_now().await;
}

Expand Down
30 changes: 24 additions & 6 deletions clients/desktop/src/client/topology_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crypto::identity::MixIdentityKeyPair;
use futures::lock::Mutex;
use healthcheck::HealthChecker;
use log::*;
use nymsphinx::DestinationAddressBytes;
use std::sync::Arc;
use std::time;
use std::time::Duration;
Expand Down Expand Up @@ -57,7 +58,7 @@ impl<T: NymTopology> TopologyAccessor<T> {
}

// not removed until healtchecker is not fully changed to use gateways instead of providers
pub(crate) async fn get_provider_socket_addr(&mut self, id: &str) -> Option<SocketAddr> {
pub(crate) async fn get_provider_socket_addr(&self, id: &str) -> Option<SocketAddr> {
match &self.inner.lock().await.0 {
None => None,
Some(ref topology) => topology
Expand All @@ -68,7 +69,7 @@ impl<T: NymTopology> TopologyAccessor<T> {
}
}

pub(crate) async fn get_gateway_socket_url(&mut self, id: &str) -> Option<String> {
pub(crate) async fn get_gateway_socket_url(&self, id: &str) -> Option<String> {
match &self.inner.lock().await.0 {
None => None,
Some(ref topology) => topology
Expand All @@ -89,11 +90,11 @@ impl<T: NymTopology> TopologyAccessor<T> {
}

// Unless you absolutely need the entire topology, use `random_route` instead
pub(crate) async fn get_current_topology_clone(&mut self) -> Option<T> {
pub(crate) async fn get_current_topology_clone(&self) -> Option<T> {
self.inner.lock().await.0.clone()
}

pub(crate) async fn get_all_clients(&mut self) -> Option<Vec<provider::Client>> {
pub(crate) async fn get_all_clients(&self) -> Option<Vec<provider::Client>> {
// TODO: this will need to be modified to instead return pairs (provider, client)
match &self.inner.lock().await.0 {
None => None,
Expand All @@ -108,9 +109,26 @@ impl<T: NymTopology> TopologyAccessor<T> {
}
}

pub(crate) async fn random_route_to_client(
&self,
client_address: DestinationAddressBytes,
) -> Option<Vec<nymsphinx::Node>> {
let b58_address = client_address.to_base58_string();
let guard = self.inner.lock().await;
let topology = guard.0.as_ref()?;

let gateway = topology
.gateways()
.iter()
.cloned()
.find(|gateway| gateway.has_client(b58_address.clone()))?;

topology.random_route_to(gateway.into()).ok()
}

// this is a rather temporary solution as each client will have an associated provider
// currently that is not implemented yet and there only exists one provider in the network
pub(crate) async fn random_route(&mut self) -> Option<Vec<nymsphinx::Node>> {
pub(crate) async fn random_route(&self) -> Option<Vec<nymsphinx::Node>> {
match &self.inner.lock().await.0 {
None => None,
Some(ref topology) => {
Expand All @@ -120,7 +138,7 @@ impl<T: NymTopology> TopologyAccessor<T> {
}
// unwrap is fine here as we asserted there is at least single provider
let provider = gateways.pop().unwrap().into();
topology.route_to(provider).ok()
topology.random_route_to(provider).ok()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/client-libs/mix-client/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub fn encapsulate_message<T: NymTopology>(
// unwrap is fine here as we asserted there is at least single provider
let provider = providers.pop().unwrap().into();

let route = topology.route_to(provider)?;
let route = topology.random_route_to(provider)?;

let delays = delays::generate_from_average_duration(route.len(), average_delay);

Expand Down
7 changes: 7 additions & 0 deletions common/topology/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ impl Node {
bs58::decode(&self.pub_key).into(&mut key_bytes).unwrap();
key_bytes
}

pub fn has_client(&self, client_pub_key: String) -> bool {
self.registered_clients
.iter()
.find(|client| client.pub_key == client_pub_key)
.is_some()
}
}

impl filter::Versioned for Node {
Expand Down
9 changes: 6 additions & 3 deletions common/topology/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone {
}

// Tries to get a route through the mix network
fn mix_route(&self) -> Result<Vec<SphinxNode>, NymTopologyError> {
fn random_mix_route(&self) -> Result<Vec<SphinxNode>, NymTopologyError> {
let mut layered_topology = self.make_layered_topology()?;
let num_layers = layered_topology.len();
let route = (1..=num_layers as u64)
Expand All @@ -87,9 +87,12 @@ pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone {
}

// Sets up a route to a specific provider
fn route_to(&self, provider_node: SphinxNode) -> Result<Vec<SphinxNode>, NymTopologyError> {
fn random_route_to(
&self,
provider_node: SphinxNode,
) -> Result<Vec<SphinxNode>, NymTopologyError> {
Ok(self
.mix_route()?
.random_mix_route()?
.into_iter()
.chain(std::iter::once(provider_node))
.collect())
Expand Down