Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/healthchecker removal #214

Merged
merged 5 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion clients/desktop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ crypto = {path = "../../common/crypto"}
directory-client = { path = "../../common/client-libs/directory-client" }
gateway-client = { path = "../../common/client-libs/gateway-client" }
gateway-requests = { path = "../../gateway/gateway-requests" }
healthcheck = { path = "../../common/healthcheck" }
mix-client = { path = "../../common/client-libs/mix-client" }
multi-tcp-client = { path = "../../common/client-libs/multi-tcp-client" }
nymsphinx = { path = "../../common/nymsphinx" }
Expand Down
28 changes: 16 additions & 12 deletions clients/desktop/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,15 @@ impl NymClient {
let gateway_address = topology_accessor
.get_gateway_socket_url(&gateway_id)
.await
.expect(
format!(
.unwrap_or_else(|| {
panic!(
"Could not find gateway with id {:?}.\
It does not seem to be present in the current network topology.\
Are you sure it is still online?\
Perhaps try to run `nym-client init` again to obtain a new gateway",
gateway_id
)
.as_ref(),
);
});

url::Url::parse(&gateway_address).expect("provided gateway address is invalid!")
}
Expand All @@ -226,16 +225,9 @@ impl NymClient {
&mut self,
topology_accessor: TopologyAccessor<T>,
) {
let healthcheck_keys = MixIdentityKeyPair::new();

let topology_refresher_config = TopologyRefresherConfig::new(
self.config.get_directory_server(),
self.config.get_topology_refresh_rate(),
healthcheck_keys,
self.config.get_topology_resolution_timeout(),
self.config.get_healthcheck_connection_timeout(),
self.config.get_number_of_healthcheck_test_packets() as usize,
self.config.get_node_score_threshold(),
);
let mut topology_refresher =
TopologyRefresher::new(topology_refresher_config, topology_accessor);
Expand All @@ -246,6 +238,18 @@ impl NymClient {
self.config.get_directory_server()
);
self.runtime.block_on(topology_refresher.refresh());

// TODO: a slightly more graceful termination here
if !self
.runtime
.block_on(topology_refresher.is_topology_routable())
{
panic!(
"The current network topology seem to be insufficient to route any packets through\
- check if enough nodes and a gateway are online"
);
}

info!("Starting topology refresher...");
topology_refresher.start(self.runtime.handle());
}
Expand Down Expand Up @@ -385,7 +389,7 @@ impl NymClient {
SocketType::WebSocket => self.start_websocket_listener(
shared_topology_accessor,
received_buffer_request_sender,
input_sender.clone(),
input_sender,
),
SocketType::None => {
// if we did not start the socket, it means we're running (supposedly) in the native mode
Expand Down
104 changes: 10 additions & 94 deletions clients/desktop/src/client/topology_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
// limitations under the License.

use crate::built_info;
use crypto::identity::MixIdentityKeyPair;
use futures::lock::Mutex;
use healthcheck::HealthChecker;
use log::*;
use nymsphinx::DestinationAddressBytes;
use std::sync::Arc;
use std::time;
use std::time::Duration;
use tokio::runtime::Handle;
// use tokio::sync::RwLock;
use std::net::SocketAddr;
use tokio::task::JoinHandle;
use topology::{provider, NymTopology};

Expand Down Expand Up @@ -57,18 +54,6 @@ impl<T: NymTopology> TopologyAccessor<T> {
self.inner.lock().await.update(new_topology);
}

// not removed until healtchecker is not fully changed to use gateways instead of providers
pub(crate) async fn get_provider_socket_addr(&self, id: &str) -> Option<SocketAddr> {
match &self.inner.lock().await.0 {
None => None,
Some(ref topology) => topology
.providers()
.iter()
.find(|provider| provider.pub_key == id)
.map(|provider| provider.client_listener),
}
}

pub(crate) async fn get_gateway_socket_url(&self, id: &str) -> Option<String> {
match &self.inner.lock().await.0 {
None => None,
Expand All @@ -89,11 +74,6 @@ impl<T: NymTopology> TopologyAccessor<T> {
}
}

// Unless you absolutely need the entire topology, use `random_route` instead
pub(crate) async fn get_current_topology_clone(&self) -> Option<T> {
self.inner.lock().await.0.clone()
}

pub(crate) async fn get_all_clients(&self) -> Option<Vec<provider::Client>> {
// TODO: this will need to be modified to instead return pairs (provider, client)
match &self.inner.lock().await.0 {
Expand Down Expand Up @@ -144,122 +124,58 @@ impl<T: NymTopology> TopologyAccessor<T> {
}
}

#[derive(Debug)]
enum TopologyError {
HealthCheckError,
NoValidPathsError,
}

pub(crate) struct TopologyRefresherConfig {
directory_server: String,
refresh_rate: time::Duration,
identity_keypair: MixIdentityKeyPair,
resolution_timeout: time::Duration,
connection_timeout: time::Duration,
number_test_packets: usize,
node_score_threshold: f64,
}

impl TopologyRefresherConfig {
pub(crate) fn new(
directory_server: String,
refresh_rate: time::Duration,
identity_keypair: MixIdentityKeyPair,
resolution_timeout: time::Duration,
connection_timeout: time::Duration,
number_test_packets: usize,
node_score_threshold: f64,
) -> Self {
pub(crate) fn new(directory_server: String, refresh_rate: time::Duration) -> Self {
TopologyRefresherConfig {
directory_server,
refresh_rate,
identity_keypair,
resolution_timeout,
connection_timeout,
number_test_packets,
node_score_threshold,
}
}
}

pub(crate) struct TopologyRefresher<T: NymTopology> {
directory_server: String,
topology_accessor: TopologyAccessor<T>,
health_checker: HealthChecker,
refresh_rate: Duration,
node_score_threshold: f64,
}

impl<T: 'static + NymTopology> TopologyRefresher<T> {
pub(crate) fn new(
cfg: TopologyRefresherConfig,
topology_accessor: TopologyAccessor<T>,
) -> Self {
// this is a temporary solution as the healthcheck will eventually be moved to validators
let health_checker = healthcheck::HealthChecker::new(
cfg.resolution_timeout,
cfg.connection_timeout,
cfg.number_test_packets,
cfg.identity_keypair,
);

TopologyRefresher {
directory_server: cfg.directory_server,
topology_accessor,
health_checker,
refresh_rate: cfg.refresh_rate,
node_score_threshold: cfg.node_score_threshold,
}
}

async fn get_current_compatible_topology(&self) -> Result<T, TopologyError> {
async fn get_current_compatible_topology(&self) -> T {
let full_topology = T::new(self.directory_server.clone());
let version_filtered_topology =
full_topology.filter_system_version(built_info::PKG_VERSION);

// healthcheck needs some adjustments to work with gateways so for time being just dont run it
return Ok(version_filtered_topology);

// let healthcheck_result = self
// .health_checker
// .do_check(&version_filtered_topology)
// .await;
// let healthcheck_scores = match healthcheck_result {
// Err(err) => {
// error!("Error while performing the healthcheck: {:?}", err);
// return Err(TopologyError::HealthCheckError);
// }
// Ok(scores) => scores,
// };
//
// debug!("{}", healthcheck_scores);
//
// let healthy_topology = healthcheck_scores
// .filter_topology_by_score(&version_filtered_topology, self.node_score_threshold);
//
// // make sure you can still send a packet through the network:
// if !healthy_topology.can_construct_path_through() {
// return Err(TopologyError::NoValidPathsError);
// }
//
// Ok(healthy_topology)
// just filter by version and assume the validators will remove all bad behaving
// nodes with the staking
full_topology.filter_system_version(built_info::PKG_VERSION)
}

pub(crate) async fn refresh(&mut self) {
trace!("Refreshing the topology");
let new_topology = match self.get_current_compatible_topology().await {
Ok(topology) => Some(topology),
Err(err) => {
warn!("the obtained topology seems to be invalid - {:?}, it will be impossible to send packets through", err);
None
}
};
let new_topology = Some(self.get_current_compatible_topology().await);

self.topology_accessor
.update_global_topology(new_topology)
.await;
}

pub(crate) async fn is_topology_routable(&self) -> bool {
self.topology_accessor.is_routable().await
}

pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> {
handle.spawn(async move {
loop {
Expand Down
2 changes: 1 addition & 1 deletion clients/desktop/src/commands/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn try_gateway_registration(
sphinx_tx.clone(),
timeout,
);
if let Ok(_) = gateway_client.establish_connection().await {
if gateway_client.establish_connection().await.is_ok() {
if let Ok(token) = gateway_client.register().await {
return Some((gateway.pub_key, token));
}
Expand Down
32 changes: 0 additions & 32 deletions clients/desktop/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ const DEFAULT_TOPOLOGY_REFRESH_RATE: u64 = 30_000; // 30s
const DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT: u64 = 5_000; // 5s

const DEFAULT_GATEWAY_RESPONSE_TIMEOUT: u64 = 1_500; // 1.5s
const DEFAULT_HEALTHCHECK_CONNECTION_TIMEOUT: u64 = 1_500; // 1.5s

const DEFAULT_NUMBER_OF_HEALTHCHECK_TEST_PACKETS: u64 = 2;
const DEFAULT_NODE_SCORE_THRESHOLD: f64 = 0.0;

#[derive(Debug, Deserialize, PartialEq, Serialize, Clone, Copy)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -205,18 +201,6 @@ impl Config {
pub fn get_topology_resolution_timeout(&self) -> time::Duration {
time::Duration::from_millis(self.debug.topology_resolution_timeout)
}

pub fn get_number_of_healthcheck_test_packets(&self) -> u64 {
self.debug.number_of_healthcheck_test_packets
}

pub fn get_node_score_threshold(&self) -> f64 {
self.debug.node_score_threshold
}

pub fn get_healthcheck_connection_timeout(&self) -> time::Duration {
time::Duration::from_millis(self.debug.healthcheck_connection_timeout)
}
}

fn de_option_string<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
Expand Down Expand Up @@ -358,19 +342,6 @@ pub struct Debug {
/// did not reach its destination.
/// The provided value is interpreted as milliseconds.
topology_resolution_timeout: u64,

/// How many packets should be sent through each path during the healthcheck
number_of_healthcheck_test_packets: u64,

/// In the current healthcheck implementation, threshold indicating percentage of packets
/// node received during healthcheck. Node's score must be above that value to be
/// considered healthy.
node_score_threshold: f64,

/// Timeout for establishing initial connection when trying to forward a sphinx packet
/// during healthcheck.
/// The provided value is interpreted as milliseconds.
healthcheck_connection_timeout: u64,
}

impl Default for Debug {
Expand All @@ -382,9 +353,6 @@ impl Default for Debug {
gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT,
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
number_of_healthcheck_test_packets: DEFAULT_NUMBER_OF_HEALTHCHECK_TEST_PACKETS,
node_score_threshold: DEFAULT_NODE_SCORE_THRESHOLD,
healthcheck_connection_timeout: DEFAULT_HEALTHCHECK_CONNECTION_TIMEOUT,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/topology/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone {

fn can_construct_path_through(&self) -> bool {
!self.mix_nodes().is_empty()
&& !self.providers().is_empty()
&& !self.gateways().is_empty()
&& self.make_layered_topology().is_ok()
}
}
Expand Down
8 changes: 0 additions & 8 deletions gateway/src/node/storage/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,4 @@ impl ClientLedger {

Ok(client_vec)
}

#[cfg(test)]
pub(crate) fn create_temporary() -> Self {
let cfg = sled::Config::new().temporary(true);
ClientLedger {
db: cfg.open().unwrap(),
}
}
}
4 changes: 2 additions & 2 deletions validator/src/services/mixmining/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ mod mixnodes {
service.add(node2.clone());
let nodes = service.topology().mixnodes;
assert_eq!(2, nodes.len());
assert_eq!(node1.clone(), nodes[0]);
assert_eq!(node2.clone(), nodes[1]);
assert_eq!(node1, nodes[0]);
assert_eq!(node2, nodes[1]);
}
}

Expand Down