From ebe929aaad3d63fc4e135c75b1141b7c4c73b252 Mon Sep 17 00:00:00 2001 From: jstuczyn <jedrzej.stuczynski@gmail.com> Date: Tue, 5 May 2020 12:45:32 +0100 Subject: [PATCH 1/4] Some clippy and compiler warning fixes --- clients/desktop/src/commands/init.rs | 2 +- gateway/src/node/storage/ledger.rs | 8 -------- validator/src/services/mixmining/mod.rs | 4 ++-- 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/clients/desktop/src/commands/init.rs b/clients/desktop/src/commands/init.rs index 196bb12bf31..d3323669fa0 100644 --- a/clients/desktop/src/commands/init.rs +++ b/clients/desktop/src/commands/init.rs @@ -75,7 +75,7 @@ async fn try_gateway_registration( sphinx_tx.clone(), timeout, ); - if let Ok(_) = gateway_client.establish_connection().await { + if gateway_client.establish_connection().await.is_ok() { if let Ok(token) = gateway_client.register().await { return Some((gateway.pub_key, token)); } diff --git a/gateway/src/node/storage/ledger.rs b/gateway/src/node/storage/ledger.rs index df9d79d9d22..1aa9e4ab307 100644 --- a/gateway/src/node/storage/ledger.rs +++ b/gateway/src/node/storage/ledger.rs @@ -158,12 +158,4 @@ impl ClientLedger { Ok(client_vec) } - - #[cfg(test)] - pub(crate) fn create_temporary() -> Self { - let cfg = sled::Config::new().temporary(true); - ClientLedger { - db: cfg.open().unwrap(), - } - } } diff --git a/validator/src/services/mixmining/mod.rs b/validator/src/services/mixmining/mod.rs index c93cfa1d28c..d4c1e9190b2 100644 --- a/validator/src/services/mixmining/mod.rs +++ b/validator/src/services/mixmining/mod.rs @@ -135,8 +135,8 @@ mod mixnodes { service.add(node2.clone()); let nodes = service.topology().mixnodes; assert_eq!(2, nodes.len()); - assert_eq!(node1.clone(), nodes[0]); - assert_eq!(node2.clone(), nodes[1]); + assert_eq!(node1, nodes[0]); + assert_eq!(node2, nodes[1]); } } From 16b765c79ace945cf2a3abf2448c740fe5bcc7ff Mon Sep 17 00:00:00 2001 From: jstuczyn <jedrzej.stuczynski@gmail.com> Date: Tue, 5 May 2020 12:45:44 +0100 Subject: [PATCH 2/4] Removed healthchecker from the client --- Cargo.lock | 1 - clients/desktop/Cargo.toml | 1 - clients/desktop/src/client/mod.rs | 25 +++-- .../desktop/src/client/topology_control.rs | 104 ++---------------- clients/desktop/src/config/mod.rs | 32 ------ common/topology/src/lib.rs | 2 +- 6 files changed, 24 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a880398f976..2db7a5564de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1626,7 +1626,6 @@ dependencies = [ "futures 0.3.4", "gateway-client", "gateway-requests", - "healthcheck", "log 0.4.8", "mix-client", "multi-tcp-client", diff --git a/clients/desktop/Cargo.toml b/clients/desktop/Cargo.toml index 56d05ef0701..6fffd4af2e3 100644 --- a/clients/desktop/Cargo.toml +++ b/clients/desktop/Cargo.toml @@ -34,7 +34,6 @@ crypto = {path = "../../common/crypto"} directory-client = { path = "../../common/client-libs/directory-client" } gateway-client = { path = "../../common/client-libs/gateway-client" } gateway-requests = { path = "../../gateway/gateway-requests" } -healthcheck = { path = "../../common/healthcheck" } mix-client = { path = "../../common/client-libs/mix-client" } multi-tcp-client = { path = "../../common/client-libs/multi-tcp-client" } nymsphinx = { path = "../../common/nymsphinx" } diff --git a/clients/desktop/src/client/mod.rs b/clients/desktop/src/client/mod.rs index bae33a7149b..ad1386f3ba5 100644 --- a/clients/desktop/src/client/mod.rs +++ b/clients/desktop/src/client/mod.rs @@ -206,16 +206,15 @@ impl NymClient { let gateway_address = topology_accessor .get_gateway_socket_url(&gateway_id) .await - .expect( - format!( + .unwrap_or_else(|| { + panic!( "Could not find gateway with id {:?}.\ It does not seem to be present in the current network topology.\ Are you sure it is still online?\ Perhaps try to run `nym-client init` again to obtain a new gateway", gateway_id ) - .as_ref(), - ); + }); url::Url::parse(&gateway_address).expect("provided gateway address is invalid!") } @@ -226,16 +225,9 @@ impl NymClient { &mut self, topology_accessor: TopologyAccessor<T>, ) { - let healthcheck_keys = MixIdentityKeyPair::new(); - let topology_refresher_config = TopologyRefresherConfig::new( self.config.get_directory_server(), self.config.get_topology_refresh_rate(), - healthcheck_keys, - self.config.get_topology_resolution_timeout(), - self.config.get_healthcheck_connection_timeout(), - self.config.get_number_of_healthcheck_test_packets() as usize, - self.config.get_node_score_threshold(), ); let mut topology_refresher = TopologyRefresher::new(topology_refresher_config, topology_accessor); @@ -246,6 +238,15 @@ impl NymClient { self.config.get_directory_server() ); self.runtime.block_on(topology_refresher.refresh()); + + // TODO: a slightly more graceful termination here + if !self.runtime.block_on(topology_refresher.is_topology_routable()) { + panic!( + "The current network topology seem to be insufficient to route any packets through\ + - check if enough nodes and a gateway are online" + ); + } + info!("Starting topology refresher..."); topology_refresher.start(self.runtime.handle()); } @@ -385,7 +386,7 @@ impl NymClient { SocketType::WebSocket => self.start_websocket_listener( shared_topology_accessor, received_buffer_request_sender, - input_sender.clone(), + input_sender, ), SocketType::None => { // if we did not start the socket, it means we're running (supposedly) in the native mode diff --git a/clients/desktop/src/client/topology_control.rs b/clients/desktop/src/client/topology_control.rs index 21919edb33a..ca38c6af3ad 100644 --- a/clients/desktop/src/client/topology_control.rs +++ b/clients/desktop/src/client/topology_control.rs @@ -13,16 +13,13 @@ // limitations under the License. use crate::built_info; -use crypto::identity::MixIdentityKeyPair; use futures::lock::Mutex; -use healthcheck::HealthChecker; use log::*; use std::sync::Arc; use std::time; use std::time::Duration; use tokio::runtime::Handle; // use tokio::sync::RwLock; -use std::net::SocketAddr; use tokio::task::JoinHandle; use topology::{provider, NymTopology}; @@ -56,18 +53,6 @@ impl<T: NymTopology> TopologyAccessor<T> { self.inner.lock().await.update(new_topology); } - // not removed until healtchecker is not fully changed to use gateways instead of providers - pub(crate) async fn get_provider_socket_addr(&mut self, id: &str) -> Option<SocketAddr> { - match &self.inner.lock().await.0 { - None => None, - Some(ref topology) => topology - .providers() - .iter() - .find(|provider| provider.pub_key == id) - .map(|provider| provider.client_listener), - } - } - pub(crate) async fn get_gateway_socket_url(&mut self, id: &str) -> Option<String> { match &self.inner.lock().await.0 { None => None, @@ -88,11 +73,6 @@ impl<T: NymTopology> TopologyAccessor<T> { } } - // Unless you absolutely need the entire topology, use `random_route` instead - pub(crate) async fn get_current_topology_clone(&mut self) -> Option<T> { - self.inner.lock().await.0.clone() - } - pub(crate) async fn get_all_clients(&mut self) -> Option<Vec<provider::Client>> { // TODO: this will need to be modified to instead return pairs (provider, client) match &self.inner.lock().await.0 { @@ -126,40 +106,16 @@ impl<T: NymTopology> TopologyAccessor<T> { } } -#[derive(Debug)] -enum TopologyError { - HealthCheckError, - NoValidPathsError, -} - pub(crate) struct TopologyRefresherConfig { directory_server: String, refresh_rate: time::Duration, - identity_keypair: MixIdentityKeyPair, - resolution_timeout: time::Duration, - connection_timeout: time::Duration, - number_test_packets: usize, - node_score_threshold: f64, } impl TopologyRefresherConfig { - pub(crate) fn new( - directory_server: String, - refresh_rate: time::Duration, - identity_keypair: MixIdentityKeyPair, - resolution_timeout: time::Duration, - connection_timeout: time::Duration, - number_test_packets: usize, - node_score_threshold: f64, - ) -> Self { + pub(crate) fn new(directory_server: String, refresh_rate: time::Duration) -> Self { TopologyRefresherConfig { directory_server, refresh_rate, - identity_keypair, - resolution_timeout, - connection_timeout, - number_test_packets, - node_score_threshold, } } } @@ -167,9 +123,7 @@ impl TopologyRefresherConfig { pub(crate) struct TopologyRefresher<T: NymTopology> { directory_server: String, topology_accessor: TopologyAccessor<T>, - health_checker: HealthChecker, refresh_rate: Duration, - node_score_threshold: f64, } impl<T: 'static + NymTopology> TopologyRefresher<T> { @@ -177,71 +131,33 @@ impl<T: 'static + NymTopology> TopologyRefresher<T> { cfg: TopologyRefresherConfig, topology_accessor: TopologyAccessor<T>, ) -> Self { - // this is a temporary solution as the healthcheck will eventually be moved to validators - let health_checker = healthcheck::HealthChecker::new( - cfg.resolution_timeout, - cfg.connection_timeout, - cfg.number_test_packets, - cfg.identity_keypair, - ); - TopologyRefresher { directory_server: cfg.directory_server, topology_accessor, - health_checker, refresh_rate: cfg.refresh_rate, - node_score_threshold: cfg.node_score_threshold, } } - async fn get_current_compatible_topology(&self) -> Result<T, TopologyError> { + async fn get_current_compatible_topology(&self) -> T { let full_topology = T::new(self.directory_server.clone()); - let version_filtered_topology = - full_topology.filter_system_version(built_info::PKG_VERSION); - - // healthcheck needs some adjustments to work with gateways so for time being just dont run it - return Ok(version_filtered_topology); - - // let healthcheck_result = self - // .health_checker - // .do_check(&version_filtered_topology) - // .await; - // let healthcheck_scores = match healthcheck_result { - // Err(err) => { - // error!("Error while performing the healthcheck: {:?}", err); - // return Err(TopologyError::HealthCheckError); - // } - // Ok(scores) => scores, - // }; - // - // debug!("{}", healthcheck_scores); - // - // let healthy_topology = healthcheck_scores - // .filter_topology_by_score(&version_filtered_topology, self.node_score_threshold); - // - // // make sure you can still send a packet through the network: - // if !healthy_topology.can_construct_path_through() { - // return Err(TopologyError::NoValidPathsError); - // } - // - // Ok(healthy_topology) + // just filter by version and assume the validators will remove all bad behaving + // nodes with the staking + full_topology.filter_system_version(built_info::PKG_VERSION) } pub(crate) async fn refresh(&mut self) { trace!("Refreshing the topology"); - let new_topology = match self.get_current_compatible_topology().await { - Ok(topology) => Some(topology), - Err(err) => { - warn!("the obtained topology seems to be invalid - {:?}, it will be impossible to send packets through", err); - None - } - }; + let new_topology = Some(self.get_current_compatible_topology().await); self.topology_accessor .update_global_topology(new_topology) .await; } + pub(crate) async fn is_topology_routable(&self) -> bool { + self.topology_accessor.is_routable().await + } + pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> { handle.spawn(async move { loop { diff --git a/clients/desktop/src/config/mod.rs b/clients/desktop/src/config/mod.rs index 51d52cb2f81..24cde77272d 100644 --- a/clients/desktop/src/config/mod.rs +++ b/clients/desktop/src/config/mod.rs @@ -33,10 +33,6 @@ const DEFAULT_TOPOLOGY_REFRESH_RATE: u64 = 30_000; // 30s const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: u64 = 5_000; // 5s const DEFAULT_GATEWAY_RESPONSE_TIMEOUT: u64 = 1_500; // 1.5s -const DEFAULT_HEALTHCHECK_CONNECTION_TIMEOUT: u64 = 1_500; // 1.5s - -const DEFAULT_NUMBER_OF_HEALTHCHECK_TEST_PACKETS: u64 = 2; -const DEFAULT_NODE_SCORE_THRESHOLD: f64 = 0.0; #[derive(Debug, Deserialize, PartialEq, Serialize, Clone, Copy)] #[serde(deny_unknown_fields)] @@ -205,18 +201,6 @@ impl Config { pub fn get_topology_resolution_timeout(&self) -> time::Duration { time::Duration::from_millis(self.debug.topology_resolution_timeout) } - - pub fn get_number_of_healthcheck_test_packets(&self) -> u64 { - self.debug.number_of_healthcheck_test_packets - } - - pub fn get_node_score_threshold(&self) -> f64 { - self.debug.node_score_threshold - } - - pub fn get_healthcheck_connection_timeout(&self) -> time::Duration { - time::Duration::from_millis(self.debug.healthcheck_connection_timeout) - } } fn de_option_string<'de, D>(deserializer: D) -> Result<Option<String>, D::Error> @@ -357,19 +341,6 @@ pub struct Debug { /// did not reach its destination. /// The provided value is interpreted as milliseconds. topology_resolution_timeout: u64, - - /// How many packets should be sent through each path during the healthcheck - number_of_healthcheck_test_packets: u64, - - /// In the current healthcheck implementation, threshold indicating percentage of packets - /// node received during healthcheck. Node's score must be above that value to be - /// considered healthy. - node_score_threshold: f64, - - /// Timeout for establishing initial connection when trying to forward a sphinx packet - /// during healthcheck. - /// The provided value is interpreted as milliseconds. - healthcheck_connection_timeout: u64, } impl Default for Debug { @@ -381,9 +352,6 @@ impl Default for Debug { gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT, topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE, topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT, - number_of_healthcheck_test_packets: DEFAULT_NUMBER_OF_HEALTHCHECK_TEST_PACKETS, - node_score_threshold: DEFAULT_NODE_SCORE_THRESHOLD, - healthcheck_connection_timeout: DEFAULT_HEALTHCHECK_CONNECTION_TIMEOUT, } } } diff --git a/common/topology/src/lib.rs b/common/topology/src/lib.rs index 790d683f27a..aac21e664a1 100644 --- a/common/topology/src/lib.rs +++ b/common/topology/src/lib.rs @@ -143,7 +143,7 @@ pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone { fn can_construct_path_through(&self) -> bool { !self.mix_nodes().is_empty() - && !self.providers().is_empty() + && !self.gateways().is_empty() && self.make_layered_topology().is_ok() } } From c0131cdb31ad6e3d95582e44040de8fa58b0c568 Mon Sep 17 00:00:00 2001 From: jstuczyn <jedrzej.stuczynski@gmail.com> Date: Tue, 5 May 2020 12:46:23 +0100 Subject: [PATCH 3/4] Cargo fmt issue after 100 explicit saves......... --- clients/desktop/src/client/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/desktop/src/client/mod.rs b/clients/desktop/src/client/mod.rs index ad1386f3ba5..dce03414328 100644 --- a/clients/desktop/src/client/mod.rs +++ b/clients/desktop/src/client/mod.rs @@ -240,7 +240,10 @@ impl NymClient { self.runtime.block_on(topology_refresher.refresh()); // TODO: a slightly more graceful termination here - if !self.runtime.block_on(topology_refresher.is_topology_routable()) { + if !self + .runtime + .block_on(topology_refresher.is_topology_routable()) + { panic!( "The current network topology seem to be insufficient to route any packets through\ - check if enough nodes and a gateway are online" From 2bd91090fe930146679d7812d19aae12c82bb290 Mon Sep 17 00:00:00 2001 From: jstuczyn <jedrzej.stuczynski@gmail.com> Date: Tue, 5 May 2020 14:17:02 +0100 Subject: [PATCH 4/4] Changes accidentally removed in previous PR --- clients/desktop/src/client/topology_control.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/desktop/src/client/topology_control.rs b/clients/desktop/src/client/topology_control.rs index 784aa475abb..cd13e8fc793 100644 --- a/clients/desktop/src/client/topology_control.rs +++ b/clients/desktop/src/client/topology_control.rs @@ -54,7 +54,7 @@ impl<T: NymTopology> TopologyAccessor<T> { self.inner.lock().await.update(new_topology); } - pub(crate) async fn get_gateway_socket_url(&mut self, id: &str) -> Option<String> { + pub(crate) async fn get_gateway_socket_url(&self, id: &str) -> Option<String> { match &self.inner.lock().await.0 { None => None, Some(ref topology) => topology @@ -74,7 +74,7 @@ impl<T: NymTopology> TopologyAccessor<T> { } } - pub(crate) async fn get_all_clients(&mut self) -> Option<Vec<provider::Client>> { + pub(crate) async fn get_all_clients(&self) -> Option<Vec<provider::Client>> { // TODO: this will need to be modified to instead return pairs (provider, client) match &self.inner.lock().await.0 { None => None,