diff --git a/Cargo.lock b/Cargo.lock index 05cb7ad8504..6a3659a7752 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1682,6 +1682,24 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "network-monitor" +version = "0.9.0-dev" +dependencies = [ + "crypto", + "directory-client", + "futures 0.3.5", + "gateway-client", + "log 0.4.11", + "nymsphinx", + "pretty_env_logger", + "rand 0.7.3", + "serde", + "tokio 0.2.22", + "topology", + "version-checker", +] + [[package]] name = "nonexhaustive-delayqueue" version = "0.1.0" @@ -1720,7 +1738,7 @@ dependencies = [ [[package]] name = "nym-client" -version = "0.8.1" +version = "0.9.0-dev" dependencies = [ "built", "clap", @@ -1751,7 +1769,7 @@ dependencies = [ [[package]] name = "nym-client-wasm" -version = "0.8.1" +version = "0.9.0-dev" dependencies = [ "built", "console_error_panic_hook", @@ -1773,7 +1791,7 @@ dependencies = [ [[package]] name = "nym-gateway" -version = "0.8.1" +version = "0.9.0-dev" dependencies = [ "built", "clap", @@ -1803,7 +1821,7 @@ dependencies = [ [[package]] name = "nym-mixnode" -version = "0.8.1" +version = "0.9.0-dev" dependencies = [ "bs58", "built", @@ -1830,7 +1848,7 @@ dependencies = [ [[package]] name = "nym-socks5-client" -version = "0.8.1" +version = "0.9.0-dev" dependencies = [ "built", "clap", @@ -1860,7 +1878,7 @@ dependencies = [ [[package]] name = "nym-validator" -version = "0.8.1" +version = "0.9.0-dev" dependencies = [ "abci", "bodyparser", @@ -3055,7 +3073,7 @@ dependencies = [ [[package]] name = "sphinx-socks" -version = "0.8.1" +version = "0.9.0-dev" dependencies = [ "clap", "dirs 2.0.2", @@ -3906,11 +3924,11 @@ dependencies = [ [[package]] name = "x25519-dalek" -version = "0.6.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "637ff90c9540fa3073bb577e65033069e4bae7c79d49d74aa3ffdf5342a53217" +checksum = "bc614d95359fd7afc321b66d2107ede58b246b844cf5d8a0adcca413e439f088" dependencies = [ - "curve25519-dalek 2.1.0", + "curve25519-dalek 3.0.0", "rand_core 0.5.1", "zeroize", ] diff --git a/Cargo.toml b/Cargo.toml index df7e29cc904..b9c505d7a42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ members = [ "gateway/gateway-requests", "service-providers/sphinx-socks", "mixnode", + "network-monitor", "validator", ] @@ -51,4 +52,5 @@ default-members = [ "gateway", "mixnode", "validator", + "network-monitor", ] diff --git a/clients/native/Cargo.toml b/clients/native/Cargo.toml index 0f5086c9080..c70ac644737 100644 --- a/clients/native/Cargo.toml +++ b/clients/native/Cargo.toml @@ -1,7 +1,7 @@ [package] build = "build.rs" name = "nym-client" -version = "0.8.1" +version = "0.9.0-dev" authors = ["Dave Hrycyszyn ", "Jędrzej Stuczyński "] edition = "2018" diff --git a/clients/socks5/Cargo.toml b/clients/socks5/Cargo.toml index 9bfed2c0359..c65f894a943 100644 --- a/clients/socks5/Cargo.toml +++ b/clients/socks5/Cargo.toml @@ -1,7 +1,7 @@ [package] build = "build.rs" name = "nym-socks5-client" -version = "0.8.1" +version = "0.9.0-dev" authors = ["Dave Hrycyszyn "] edition = "2018" diff --git a/clients/webassembly/Cargo.toml b/clients/webassembly/Cargo.toml index db3004b0acc..39915e7b654 100644 --- a/clients/webassembly/Cargo.toml +++ b/clients/webassembly/Cargo.toml @@ -2,7 +2,7 @@ build = "build.rs" name = "nym-client-wasm" authors = ["Dave Hrycyszyn ", "Jedrzej Stuczynski "] -version = "0.8.1" +version = "0.9.0-dev" edition = "2018" keywords = ["nym", "sphinx", "wasm", "webassembly", "privacy", "client"] license = "Apache-2.0" diff --git a/common/client-libs/directory-client/models/src/lib.rs b/common/client-libs/directory-client/models/src/lib.rs index c577e00beab..df8b7ec0ebb 100644 --- a/common/client-libs/directory-client/models/src/lib.rs +++ b/common/client-libs/directory-client/models/src/lib.rs @@ -13,4 +13,5 @@ // limitations under the License. pub mod metrics; +pub mod mixmining; pub mod presence; diff --git a/common/client-libs/directory-client/models/src/mixmining.rs b/common/client-libs/directory-client/models/src/mixmining.rs new file mode 100644 index 00000000000..270d1bb7fda --- /dev/null +++ b/common/client-libs/directory-client/models/src/mixmining.rs @@ -0,0 +1,19 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +/// A notification sent to the validators to let them know whether a given mix is +/// currently up or down (based on whether it's mixing packets) +pub struct MixStatus { + pub pub_key: String, + pub ip_version: String, + pub up: bool, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +/// A notification sent to the validators to let them know whether a given set of mixes is +/// currently up or down (based on whether it's mixing packets) +pub struct BatchMixStatus { + pub status: Vec, +} diff --git a/common/client-libs/directory-client/models/src/presence/mixnodes.rs b/common/client-libs/directory-client/models/src/presence/mixnodes.rs index 9d44ae07c56..e7d5386f43f 100644 --- a/common/client-libs/directory-client/models/src/presence/mixnodes.rs +++ b/common/client-libs/directory-client/models/src/presence/mixnodes.rs @@ -36,7 +36,7 @@ impl From for ConversionError { } } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct MixNodePresence { pub location: String, diff --git a/common/client-libs/directory-client/src/lib.rs b/common/client-libs/directory-client/src/lib.rs index 2540157870a..7734d87499e 100644 --- a/common/client-libs/directory-client/src/lib.rs +++ b/common/client-libs/directory-client/src/lib.rs @@ -15,6 +15,8 @@ use crate::requests::health_check_get::Request as HealthCheckRequest; use crate::requests::metrics_mixes_get::Request as MetricsMixRequest; use crate::requests::metrics_mixes_post::Request as MetricsMixPost; +use crate::requests::mix_mining_batch_status_post::Request as MixMiningBatchStatusPost; +use crate::requests::mix_mining_status_post::Request as MixMiningStatusPost; use crate::requests::presence_coconodes_post::Request as PresenceCocoNodesPost; use crate::requests::presence_gateways_post::Request as PresenceGatewayPost; use crate::requests::presence_mixnodes_post::Request as PresenceMixNodesPost; @@ -25,10 +27,12 @@ use directory_client_models::presence::{ coconodes::CocoPresence, gateways::GatewayPresence, mixnodes::MixNodePresence, providers::MixProviderPresence, }; +use mixmining::MixStatus; use requests::{health_check_get::HealthCheckResponse, DirectoryGetRequest, DirectoryPostRequest}; +use directory_client_models::mixmining::BatchMixStatus; pub use directory_client_models::{ - metrics, + metrics, mixmining, presence::{self, Topology}, }; @@ -123,6 +127,22 @@ impl Client { self.post(req).await } + pub async fn post_mixmining_status( + &self, + status: MixStatus, + ) -> reqwest::Result { + let req = MixMiningStatusPost::new(&self.base_url, status); + self.post(req).await + } + + pub async fn post_batch_mixmining_status( + &self, + batch_status: BatchMixStatus, + ) -> reqwest::Result { + let req = MixMiningBatchStatusPost::new(&self.base_url, batch_status); + self.post(req).await + } + // this should be soft-deprecated as the whole concept of provider will // be removed in the next topology rework pub async fn post_provider_presence( diff --git a/common/client-libs/directory-client/src/requests/mix_mining_batch_status_post.rs b/common/client-libs/directory-client/src/requests/mix_mining_batch_status_post.rs new file mode 100644 index 00000000000..e4a5727f8b1 --- /dev/null +++ b/common/client-libs/directory-client/src/requests/mix_mining_batch_status_post.rs @@ -0,0 +1,91 @@ +use super::{DirectoryPostRequest, DirectoryRequest}; +use crate::mixmining::BatchMixStatus; + +const PATH: &str = "/api/mixmining/batch"; + +pub struct Request { + base_url: String, + path: String, + payload: BatchMixStatus, +} + +impl DirectoryRequest for Request { + fn url(&self) -> String { + format!("{}{}", self.base_url, self.path) + } +} + +impl DirectoryPostRequest for Request { + type Payload = BatchMixStatus; + fn new(base_url: &str, payload: Self::Payload) -> Self { + Request { + base_url: base_url.to_string(), + path: PATH.to_string(), + payload, + } + } + + fn json_payload(&self) -> &BatchMixStatus { + &self.payload + } +} + +#[cfg(test)] +mod batch_mix_status_post_request { + use super::*; + use crate::client_test_fixture; + use mockito::mock; + + #[cfg(test)] + mod on_a_400_status { + use super::*; + + #[tokio::test] + async fn it_returns_an_error() { + let _m = mock("POST", PATH).with_status(400).create(); + let client = client_test_fixture(&mockito::server_url()); + let result = client + .post_batch_mixmining_status(fixtures::new_status()) + .await; + assert_eq!(400, result.unwrap().status()); + _m.assert(); + } + } + + #[cfg(test)] + mod on_a_201 { + use super::*; + + #[tokio::test] + async fn it_returns_a_response_with_201() { + let json = r#"{ + "ok": true + }"#; + let _m = mock("POST", "/api/mixmining/batch") + .with_status(201) + .with_body(json) + .create(); + let client = client_test_fixture(&mockito::server_url()); + let result = client + .post_batch_mixmining_status(fixtures::new_status()) + .await; + assert!(result.is_ok()); + _m.assert(); + } + } + + #[cfg(test)] + mod fixtures { + use crate::mixmining::{BatchMixStatus, MixStatus}; + + pub fn new_status() -> BatchMixStatus { + BatchMixStatus { + status: vec![MixStatus { + pub_key: "abc".to_string(), + ip_version: "4".to_string(), + up: true, + }], + } + } + } +} diff --git a/common/client-libs/directory-client/src/requests/mix_mining_status_post.rs b/common/client-libs/directory-client/src/requests/mix_mining_status_post.rs new file mode 100644 index 00000000000..442170da7cb --- /dev/null +++ b/common/client-libs/directory-client/src/requests/mix_mining_status_post.rs @@ -0,0 +1,84 @@ +use super::{DirectoryPostRequest, DirectoryRequest}; +use crate::mixmining::MixStatus; + +const PATH: &str = "/api/mixmining"; + +pub struct Request { + base_url: String, + path: String, + payload: MixStatus, +} + +impl DirectoryRequest for Request { + fn url(&self) -> String { + format!("{}{}", self.base_url, self.path) + } +} + +impl DirectoryPostRequest for Request { + type Payload = MixStatus; + fn new(base_url: &str, payload: Self::Payload) -> Self { + Request { + base_url: base_url.to_string(), + path: PATH.to_string(), + payload, + } + } + + fn json_payload(&self) -> &MixStatus { + &self.payload + } +} + +#[cfg(test)] +mod mix_status_post_request { + use super::*; + use crate::client_test_fixture; + use mockito::mock; + + #[cfg(test)] + mod on_a_400_status { + use super::*; + + #[tokio::test] + async fn it_returns_an_error() { + let _m = mock("POST", PATH).with_status(400).create(); + let client = client_test_fixture(&mockito::server_url()); + let result = client.post_mixmining_status(fixtures::new_status()).await; + assert_eq!(400, result.unwrap().status()); + _m.assert(); + } + } + + #[cfg(test)] + mod on_a_201 { + use super::*; + #[tokio::test] + async fn it_returns_a_response_with_201() { + let json = r#"{ + "ok": true + }"#; + let _m = mock("POST", "/api/mixmining") + .with_status(201) + .with_body(json) + .create(); + let client = client_test_fixture(&mockito::server_url()); + let result = client.post_mixmining_status(fixtures::new_status()).await; + assert!(result.is_ok()); + _m.assert(); + } + } + + #[cfg(test)] + mod fixtures { + use directory_client_models::mixmining::MixStatus; + + pub fn new_status() -> MixStatus { + MixStatus { + pub_key: "abc".to_string(), + ip_version: "4".to_string(), + up: true, + } + } + } +} diff --git a/common/client-libs/directory-client/src/requests/mod.rs b/common/client-libs/directory-client/src/requests/mod.rs index 39b712dff97..e3623931d4b 100644 --- a/common/client-libs/directory-client/src/requests/mod.rs +++ b/common/client-libs/directory-client/src/requests/mod.rs @@ -15,6 +15,8 @@ pub mod health_check_get; pub mod metrics_mixes_get; pub mod metrics_mixes_post; +pub mod mix_mining_batch_status_post; +pub mod mix_mining_status_post; pub mod presence_coconodes_post; pub mod presence_gateways_post; pub mod presence_mixnodes_post; diff --git a/common/crypto/Cargo.toml b/common/crypto/Cargo.toml index f190c743c74..a6c3980e11e 100644 --- a/common/crypto/Cargo.toml +++ b/common/crypto/Cargo.toml @@ -15,11 +15,11 @@ generic-array = "0.14" hkdf = "0.9" hmac = "0.8" stream-cipher = "0.4" -x25519-dalek = "0.6" -ed25519-dalek = "1.0.0-pre.4" +x25519-dalek = "1.1" +ed25519-dalek = "1.0" log = "0.4" pretty_env_logger = "0.3" -rand = {version = "0.7.3", features = ["wasm-bindgen"]} +rand = { version = "0.7.3", features = ["wasm-bindgen"] } # internal nymsphinx-types = { path = "../nymsphinx/types" } diff --git a/common/crypto/src/asymmetric/encryption/mod.rs b/common/crypto/src/asymmetric/encryption/mod.rs index b8433efcd5f..8feeffef5c7 100644 --- a/common/crypto/src/asymmetric/encryption/mod.rs +++ b/common/crypto/src/asymmetric/encryption/mod.rs @@ -100,7 +100,7 @@ impl PemStorableKeyPair for KeyPair { } } -#[derive(Debug, Copy, Clone)] +#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)] pub struct PublicKey(x25519_dalek::PublicKey); impl PublicKey { diff --git a/common/topology/src/lib.rs b/common/topology/src/lib.rs index def926f96f1..2803d71ce16 100644 --- a/common/topology/src/lib.rs +++ b/common/topology/src/lib.rs @@ -35,7 +35,7 @@ pub enum NymTopologyError { pub type MixLayer = u8; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NymTopology { coco_nodes: Vec, mixes: HashMap>, @@ -63,6 +63,20 @@ impl NymTopology { &self.mixes } + pub fn mixes_as_vec(&self) -> Vec { + let mut mixes: Vec = vec![]; + + for layer in self.mixes().values() { + mixes.extend(layer.to_owned()) + } + mixes + } + + pub fn mixes_in_layer(&self, layer: u8) -> Vec { + assert!(vec![1, 2, 3].contains(&layer)); + self.mixes.get(&layer).unwrap().to_owned() + } + pub fn gateways(&self) -> &Vec { &self.gateways } @@ -134,6 +148,10 @@ impl NymTopology { .collect()) } + pub fn set_mixes_in_layer(&mut self, layer: u8, mixes: Vec) { + self.mixes.insert(layer, mixes); + } + pub fn can_construct_path_through(&self, num_mix_hops: u8) -> bool { // if there are no gateways present, we can't do anything if self.gateways.is_empty() { @@ -176,3 +194,64 @@ impl NymTopology { } } } + +#[cfg(test)] +mod converting_mixes_to_vec { + use super::*; + + #[cfg(test)] + mod when_nodes_exist { + use crypto::asymmetric::encryption; + + use super::*; + + #[test] + fn returns_a_vec_with_hashmap_values() { + let node1 = mix::Node { + location: "London".to_string(), + host: "3.3.3.3:1789".parse().unwrap(), + pub_key: encryption::PublicKey::from_base58_string( + "C7cown6dYCLZpLiMFC1PaBmhvLvmJmLDJGeRTbPD45bX", + ) + .unwrap(), + layer: 1, + last_seen: 123, + version: "0.x.0".to_string(), + }; + + let node2 = mix::Node { + location: "Thunder Bay".to_string(), + ..node1.clone() + }; + + let node3 = mix::Node { + location: "Warsaw".to_string(), + ..node1.clone() + }; + + let mut mixes: HashMap> = HashMap::new(); + mixes.insert(1, vec![node1, node2]); + mixes.insert(2, vec![node3]); + + let topology = NymTopology::new(vec![], mixes, vec![]); + let mixvec = topology.mixes_as_vec(); + assert!(mixvec + .iter() + .map(|node| node.location.clone()) + .collect::>() + .contains(&"London".to_string())); + } + } + + #[cfg(test)] + mod when_no_nodes_exist { + use super::*; + + #[test] + fn returns_an_empty_vec() { + let topology = NymTopology::new(vec![], HashMap::new(), vec![]); + let mixvec = topology.mixes_as_vec(); + assert!(mixvec.is_empty()); + } + } +} diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index be6e59c7086..48d063c72ba 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -1,7 +1,7 @@ [package] build = "build.rs" name = "nym-gateway" -version = "0.8.1" +version = "0.9.0-dev" authors = ["Dave Hrycyszyn ", "Jędrzej Stuczyński "] edition = "2018" diff --git a/mixnode/Cargo.toml b/mixnode/Cargo.toml index 975e5a5080f..d85e211efe7 100644 --- a/mixnode/Cargo.toml +++ b/mixnode/Cargo.toml @@ -1,7 +1,7 @@ [package] build = "build.rs" name = "nym-mixnode" -version = "0.8.1" +version = "0.9.0-dev" authors = ["Dave Hrycyszyn ", "Jędrzej Stuczyński "] edition = "2018" diff --git a/network-monitor/Cargo.toml b/network-monitor/Cargo.toml new file mode 100644 index 00000000000..a111104769b --- /dev/null +++ b/network-monitor/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "network-monitor" +version = "0.9.0-dev" +authors = ["Dave Hrycyszyn ", "Jędrzej Stuczyński "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures = "0.3" +log = "0.4" +pretty_env_logger = "0.3" +rand = "0.7" +serde = "1.0" +tokio = { version = "0.2", features = ["signal", "rt-threaded", "macros"] } + +## internal +crypto = { path = "../common/crypto" } +directory-client = { path = "../common/client-libs/directory-client" } +gateway-client = { path = "../common/client-libs/gateway-client" } +nymsphinx = { path = "../common/nymsphinx" } +topology = { path = "../common/topology" } +version-checker = { path = "../common/version-checker" } + +[dev-dependencies] diff --git a/network-monitor/src/chunker.rs b/network-monitor/src/chunker.rs new file mode 100644 index 00000000000..5ae5873dc6b --- /dev/null +++ b/network-monitor/src/chunker.rs @@ -0,0 +1,74 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{DefRng, DEFAULT_RNG}; +use nymsphinx::forwarding::packet::MixPacket; +use nymsphinx::params::PacketMode; +use nymsphinx::{ + acknowledgements::AckKey, addressing::clients::Recipient, preparer::MessagePreparer, +}; +use std::time::Duration; +use topology::NymTopology; + +const DEFAULT_AVERAGE_PACKET_DELAY: Duration = Duration::from_millis(200); +const DEFAULT_AVERAGE_ACK_DELAY: Duration = Duration::from_millis(200); + +pub(crate) struct Chunker { + rng: DefRng, + me: Recipient, + message_preparer: MessagePreparer, +} + +impl Chunker { + pub(crate) fn new(me: Recipient) -> Self { + Chunker { + rng: DEFAULT_RNG, + me: me.clone(), + message_preparer: MessagePreparer::new( + DEFAULT_RNG, + me, + DEFAULT_AVERAGE_PACKET_DELAY, + DEFAULT_AVERAGE_ACK_DELAY, + PacketMode::Mix, + None, + ), + } + } + + pub(crate) async fn prepare_messages( + &mut self, + message: Vec, + topology: &NymTopology, + ) -> Vec { + let ack_key: AckKey = AckKey::new(&mut self.rng); + + let (split_message, _reply_keys) = self + .message_preparer + .prepare_and_split_message(message, false, &topology) + .expect("failed to split the message"); + + let mut mix_packets = Vec::with_capacity(split_message.len()); + for message_chunk in split_message { + // don't bother with acks etc. for time being + let prepared_fragment = self + .message_preparer + .prepare_chunk_for_sending(message_chunk, &topology, &ack_key, &self.me) + .await + .unwrap(); + + mix_packets.push(prepared_fragment.mix_packet); + } + mix_packets + } +} diff --git a/network-monitor/src/main.rs b/network-monitor/src/main.rs new file mode 100644 index 00000000000..751671281ea --- /dev/null +++ b/network-monitor/src/main.rs @@ -0,0 +1,230 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::monitor::MixnetReceiver; +use crate::run_info::{TestRunUpdateReceiver, TestRunUpdateSender}; +use crate::tested_network::{good_topology, TestedNetwork}; +use crypto::asymmetric::{encryption, identity}; +use directory_client::DirectoryClient; +use futures::channel::mpsc; +use gateway_client::GatewayClient; +use monitor::{AckSender, MixnetSender, Monitor}; +use notifications::Notifier; +use nymsphinx::addressing::clients::Recipient; +use packet_sender::PacketSender; +use rand::rngs::OsRng; +use std::sync::Arc; +use std::time; +use topology::gateway; + +mod chunker; +mod monitor; +mod notifications; +mod packet_sender; +mod run_info; +mod test_packet; +mod tested_network; + +pub(crate) type DefRng = OsRng; +pub(crate) const DEFAULT_RNG: DefRng = OsRng; + +// CHANGE THIS TO GET COMPLETE LIST OF WHICH NODE IS WORKING OR BROKEN IN PARTICULAR WAY +// || +// \/ +pub const PRINT_DETAILED_REPORT: bool = false; +// /\ +// || +// CHANGE THIS TO GET COMPLETE LIST OF WHICH NODE IS WORKING OR BROKEN IN PARTICULAR WAY + +fn setup_logging() { + let mut log_builder = pretty_env_logger::formatted_timed_builder(); + if let Ok(s) = ::std::env::var("RUST_LOG") { + log_builder.parse_filters(&s); + } else { + // default to 'Info' + log_builder.filter(None, log::LevelFilter::Info); + } + + log_builder + .filter_module("hyper", log::LevelFilter::Warn) + .filter_module("tokio_reactor", log::LevelFilter::Warn) + .filter_module("reqwest", log::LevelFilter::Warn) + .filter_module("mio", log::LevelFilter::Warn) + .filter_module("want", log::LevelFilter::Warn) + .filter_module("sled", log::LevelFilter::Warn) + .filter_module("tungstenite", log::LevelFilter::Warn) + .filter_module("tokio_tungstenite", log::LevelFilter::Warn) + .init(); +} + +fn check_if_up_to_date() { + let monitor_version = env!("CARGO_PKG_VERSION"); + let good_v4_topology = good_topology::new_v4(); + for (_, layer_mixes) in good_v4_topology.mixes().into_iter() { + for mix in layer_mixes.into_iter() { + if !version_checker::is_minor_version_compatible(monitor_version, &*mix.version) { + panic!( + "Our good topology is not compatible with monitor! Mix runs {}, we have {}", + mix.version, monitor_version + ) + } + } + } + + for gateway in good_v4_topology.gateways().into_iter() { + if !version_checker::is_minor_version_compatible(monitor_version, &*gateway.version) { + panic!( + "Our good topology is not compatible with monitor! Gateway runs {}, we have {}", + gateway.version, monitor_version + ) + } + } + + let good_v6_topology = good_topology::new_v6(); + for (_, layer_mixes) in good_v6_topology.mixes().into_iter() { + for mix in layer_mixes.into_iter() { + if !version_checker::is_minor_version_compatible(monitor_version, &*mix.version) { + panic!( + "Our good topology is not compatible with monitor! Mix runs {}, we have {}", + mix.version, monitor_version + ) + } + } + } + + for gateway in good_v6_topology.gateways().into_iter() { + if !version_checker::is_minor_version_compatible(monitor_version, &*gateway.version) { + panic!( + "Our good topology is not compatible with monitor! Gateway runs {}, we have {}", + gateway.version, monitor_version + ) + } + } +} + +#[tokio::main] +async fn main() { + println!("Network monitor starting..."); + check_if_up_to_date(); + setup_logging(); + + // Set up topology + let directory_uri = "https://qa-directory.nymtech.net"; + println!("* directory server: {}", directory_uri); + + // TODO: this might change if it turns out we need both v4 and v6 gateway clients + let gateway = tested_network::v4_gateway(); + println!("* gateway: {}", gateway.identity_key.to_base58_string()); + + // Channels for task communication + let (ack_sender, _ack_receiver) = mpsc::unbounded(); + let (mixnet_sender, mixnet_receiver) = mpsc::unbounded(); + let (test_run_sender, test_run_receiver) = mpsc::unbounded(); + + // Generate a new set of identity keys. These are ephemeral, and change on each run. + // JS: do they? or rather should they? + let identity_keypair = identity::KeyPair::new(); + let encryption_keypair = encryption::KeyPair::new(); + + // We need our own address as a Recipient so we can send ourselves test packets + let self_address = Recipient::new( + identity_keypair.public_key().clone(), + encryption_keypair.public_key().clone(), + gateway.identity_key, + ); + + let directory_client = new_directory_client(directory_uri); + + let mut network_monitor = Monitor::new(); + + let notifier = new_notifier( + encryption_keypair, + Arc::clone(&directory_client), + mixnet_receiver, + test_run_receiver, + ); + + let gateway_client = new_gateway_client(gateway, identity_keypair, ack_sender, mixnet_sender); + let tested_network = new_tested_network(gateway_client).await; + + let packet_sender = new_packet_sender( + directory_client, + tested_network, + self_address, + test_run_sender, + ); + + network_monitor.run(notifier, packet_sender).await; +} + +async fn new_tested_network(gateway_client: GatewayClient) -> TestedNetwork { + // TODO: possibly change that if it turns out we need two clients (v4 and v6) + let mut tested_network = TestedNetwork::new_good(gateway_client); + tested_network.start_gateway_client().await; + tested_network +} + +fn new_packet_sender( + directory_client: Arc, + tested_network: TestedNetwork, + self_address: Recipient, + test_run_sender: TestRunUpdateSender, +) -> PacketSender { + PacketSender::new( + directory_client, + tested_network, + self_address, + test_run_sender, + ) +} + +/// Construct a new gateway client. +pub fn new_gateway_client( + gateway: gateway::Node, + identity_keypair: identity::KeyPair, + ack_sender: AckSender, + mixnet_messages_sender: MixnetSender, +) -> GatewayClient { + let timeout = time::Duration::from_millis(500); + let identity_arc = Arc::new(identity_keypair); + + gateway_client::GatewayClient::new( + gateway.client_listener, + identity_arc, + gateway.identity_key, + None, + mixnet_messages_sender, + ack_sender, + timeout, + ) +} + +fn new_directory_client(directory_uri: &str) -> Arc { + let config = directory_client::Config::new(directory_uri.to_string()); + Arc::new(DirectoryClient::new(config)) +} + +fn new_notifier( + encryption_keypair: encryption::KeyPair, + directory_client: Arc, + mixnet_receiver: MixnetReceiver, + test_run_receiver: TestRunUpdateReceiver, +) -> Notifier { + Notifier::new( + mixnet_receiver, + encryption_keypair, + directory_client, + test_run_receiver, + ) +} diff --git a/network-monitor/src/monitor.rs b/network-monitor/src/monitor.rs new file mode 100644 index 00000000000..1586e685346 --- /dev/null +++ b/network-monitor/src/monitor.rs @@ -0,0 +1,65 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{notifications::Notifier, packet_sender::PacketSender}; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use log::*; +use tokio::time::{self, Duration}; + +pub(crate) type MixnetReceiver = UnboundedReceiver>>; +pub(crate) type MixnetSender = UnboundedSender>>; +pub(crate) type AckSender = UnboundedSender>>; + +pub(crate) const MONITOR_RUN_INTERVAL: Duration = Duration::from_secs(60); +pub(crate) const NOTIFIER_DELIVERY_TIMEOUT: Duration = Duration::from_secs(20); + +pub struct Monitor; + +impl Monitor { + pub fn new() -> Monitor { + Monitor {} + } + + pub(crate) async fn run(&mut self, mut notifier: Notifier, mut packet_sender: PacketSender) { + println!("Network monitor running - note: 'good' nodes are hardcoded."); + println!("-----------------------------------------------------------"); + tokio::spawn(async move { + notifier.run().await; + }); + + tokio::spawn(async move { + let mut interval = time::interval(MONITOR_RUN_INTERVAL); + loop { + interval.tick().await; + info!(target: "Monitor", "Starting test run"); + + if let Err(err) = packet_sender.run_test().await { + error!("Test run failed! - {:?}", err); + } + } + }); + + self.wait_for_interrupt().await + } + + async fn wait_for_interrupt(&self) { + if let Err(e) = tokio::signal::ctrl_c().await { + error!( + "There was an error while capturing SIGINT - {:?}. We will terminate regardless", + e + ); + } + println!("Received SIGINT - the network monitor will terminate now"); + } +} diff --git a/network-monitor/src/notifications/mod.rs b/network-monitor/src/notifications/mod.rs new file mode 100644 index 00000000000..2e4f789de74 --- /dev/null +++ b/network-monitor/src/notifications/mod.rs @@ -0,0 +1,154 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::monitor::MixnetReceiver; +use crate::monitor::NOTIFIER_DELIVERY_TIMEOUT; +use crate::notifications::test_run::TestRun; +use crate::notifications::test_timeout::TestTimeout; +use crate::run_info::{RunInfo, TestRunUpdate, TestRunUpdateReceiver}; +use crate::PRINT_DETAILED_REPORT; +use crypto::asymmetric::encryption::KeyPair; +use directory_client::mixmining::BatchMixStatus; +use futures::StreamExt; +use log::*; +use nymsphinx::receiver::MessageReceiver; +use std::sync::Arc; + +mod test_run; +mod test_timeout; + +#[derive(Debug)] +enum NotifierError { + DirectoryError(String), + MalformedPacketReceived, + NonTestPacketReceived, +} + +pub(crate) struct Notifier { + client_encryption_keypair: KeyPair, + message_receiver: MessageReceiver, + mixnet_receiver: MixnetReceiver, + directory_client: Arc, + test_run_receiver: TestRunUpdateReceiver, + test_run_nonce: u64, + current_test_run: TestRun, + test_timeout: TestTimeout, +} + +impl Notifier { + pub(crate) fn new( + mixnet_receiver: MixnetReceiver, + client_encryption_keypair: KeyPair, + directory_client: Arc, + test_run_receiver: TestRunUpdateReceiver, + ) -> Notifier { + let message_receiver = MessageReceiver::new(); + let mut current_test_run = TestRun::new(0).with_report(); + if PRINT_DETAILED_REPORT { + current_test_run = current_test_run.with_detailed_report(); + } + Notifier { + client_encryption_keypair, + message_receiver, + mixnet_receiver, + directory_client, + test_run_receiver, + test_run_nonce: 0, + current_test_run, + test_timeout: TestTimeout::new(), + } + } + + async fn on_run_start(&mut self, run_info: RunInfo) { + self.test_run_nonce += 1; + + self.current_test_run.refresh(self.test_run_nonce); + self.current_test_run.start_run(run_info); + } + + async fn on_run_end(&mut self) { + let batch_status = self.current_test_run.finish_run(); + if let Err(err) = self.notify_validator(batch_status).await { + warn!("Failed to send batch status to validator - {:?}", err) + } + } + + fn on_sending_over(&mut self, nonce: u64) { + assert_eq!(nonce, self.test_run_nonce); + self.test_timeout.start(NOTIFIER_DELIVERY_TIMEOUT); + } + + async fn on_test_run_update(&mut self, run_update: TestRunUpdate) { + match run_update { + TestRunUpdate::StartSending(run_info) => self.on_run_start(run_info).await, + TestRunUpdate::DoneSending(nonce) => self.on_sending_over(nonce), + } + } + + fn on_mix_messages(&mut self, messages: Vec>) { + for message in messages { + if let Err(err) = self.on_message(message) { + error!(target: "Mix receiver", "failed to process received mix packet - {:?}", err) + } + } + } + + pub(crate) async fn run(&mut self) { + debug!("Started MixnetListener"); + loop { + tokio::select! { + mix_messages = &mut self.mixnet_receiver.next() => { + self.on_mix_messages(mix_messages.expect("mix channel has failed!")); + }, + run_update = &mut self.test_run_receiver.next() => { + self.on_test_run_update(run_update.expect("packet sender has died!")).await; + } + _ = &mut self.test_timeout => { + self.on_run_end().await; + self.test_timeout.clear(); + } + } + } + } + + fn on_message(&mut self, message: Vec) -> Result<(), NotifierError> { + let encrypted_bytes = self + .message_receiver + .recover_plaintext(self.client_encryption_keypair.private_key(), message) + .map_err(|_| NotifierError::MalformedPacketReceived)?; + let fragment = self + .message_receiver + .recover_fragment(&encrypted_bytes) + .map_err(|_| NotifierError::MalformedPacketReceived)?; + let (recovered, _) = self + .message_receiver + .insert_new_fragment(fragment) + .map_err(|_| NotifierError::MalformedPacketReceived)? + .ok_or_else(|| NotifierError::NonTestPacketReceived)?; // if it's a test packet it MUST BE reconstructed with single fragment + + let all_received = self.current_test_run.received_packet(recovered.message); + if all_received { + self.test_timeout.fire(); + } + Ok(()) + } + + async fn notify_validator(&self, status: BatchMixStatus) -> Result<(), NotifierError> { + self.directory_client + .post_batch_mixmining_status(status) + .await + .map_err(|err| NotifierError::DirectoryError(err.to_string()))?; + Ok(()) + } +} diff --git a/network-monitor/src/notifications/test_run.rs b/network-monitor/src/notifications/test_run.rs new file mode 100644 index 00000000000..9567d74a5d5 --- /dev/null +++ b/network-monitor/src/notifications/test_run.rs @@ -0,0 +1,290 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::run_info::RunInfo; +use crate::test_packet::TestPacket; +use directory_client::mixmining::{BatchMixStatus, MixStatus}; +use log::*; +use std::collections::{HashMap, HashSet}; +use std::mem; + +pub(super) struct TestRun { + print_report: bool, + print_detailed_report: bool, + test_report: TestReport, + + run_nonce: u64, + expected_run_packets: HashSet, + received_packets: Vec, +} + +#[derive(Default)] +struct NodeResult { + ip_v4_compatible: bool, + ip_v6_compatible: bool, +} + +#[derive(Default)] +struct TestReport { + total_sent: usize, + total_received: usize, + malformed: Vec, + outdated: Vec<(String, String)>, + + // below are only populated if we're going to be printing the report + only_ipv4_compatible: Vec, // can't speak v6, but can speak v4 + only_ipv6_compatible: Vec, // can't speak v4, but can speak v6 + completely_unroutable: Vec, // can't speak either v4 or v6 + fully_working: Vec, +} + +impl TestReport { + fn print(&self, detailed: bool) { + info!(target: "Test Report", "Sent total of {} packets", self.total_sent); + info!(target: "Test Report", "Received total of {} packets", self.total_received); + info!(target: "Test Report", "{} nodes are malformed", self.malformed.len()); + info!(target: "Test Report", "{} nodes are outdated", self.outdated.len()); + info!(target: "Test Report", "{} nodes speak ONLY IPv4 (NO IPv6 connectivity)", self.only_ipv4_compatible.len()); + info!(target: "Test Report", "{} nodes speak ONLY IPv6 (NO IPv4 connectivity)", self.only_ipv6_compatible.len()); + info!(target: "Test Report", "{} nodes are totally unroutable!", self.completely_unroutable.len()); + info!(target: "Test Report", "{} nodes work fine!", self.fully_working.len()); + + if detailed { + info!(target: "Detailed report", "full summary:"); + for malformed in self.malformed.iter() { + info!(target: "Malformed node", "{}", malformed) + } + for outdated in self.outdated.iter() { + info!(target: "Outdated node", "{} (runs v{})", outdated.0, outdated.1) + } + for v4_node in self.only_ipv4_compatible.iter() { + info!(target: "IPv4-only node", "{}", v4_node) + } + + for v6_node in self.only_ipv6_compatible.iter() { + info!(target: "IPv6-only node", "{}", v6_node) + } + + for unroutable in self.completely_unroutable.iter() { + info!(target: "Unroutable node", "{}", unroutable) + } + + for working in self.fully_working.iter() { + info!(target: "Fully working node", "{}", working) + } + } + } +} + +impl TestRun { + pub(super) fn new(run_nonce: u64) -> Self { + TestRun { + print_report: false, + print_detailed_report: false, + test_report: Default::default(), + run_nonce, + expected_run_packets: Default::default(), + received_packets: vec![], + } + } + + pub(super) fn with_report(mut self) -> Self { + self.print_report = true; + self + } + + pub(super) fn with_detailed_report(mut self) -> Self { + self.print_report = true; + self.print_detailed_report = true; + self + } + + pub(super) fn refresh(&mut self, new_nonce: u64) { + self.test_report = Default::default(); + self.run_nonce = new_nonce; + self.expected_run_packets = Default::default(); + self.received_packets = Default::default(); + } + + fn down_status(&self, pub_key: String) -> Vec { + let v4_status = MixStatus { + pub_key: pub_key.clone(), + ip_version: "4".to_string(), + up: false, + }; + + let v6_status = MixStatus { + pub_key, + ip_version: "6".to_string(), + up: false, + }; + + let mut vec = Vec::with_capacity(2); + vec.push(v4_status); + vec.push(v6_status); + vec + } + + /// Update state of self based on the received `RunInfo` + pub(super) fn start_run(&mut self, run_info: RunInfo) { + if run_info.nonce != self.run_nonce { + error!( + "Received unexpected test run info! Got {}, expected: {}", + self.run_nonce, run_info.nonce + ); + return; + } + + // notify about malformed nodes: + for malformed_mix in run_info.malformed_mixes { + debug!( + target: "test-run", + "{} is malformed", malformed_mix.clone() + ); + self.test_report.malformed.push(malformed_mix); + } + + for old_mix in run_info.incompatible_mixes { + debug!( + target: "test-run", + "{} is outdated! It's on {} version", + old_mix.0.clone(), + old_mix.1 + ); + self.test_report.outdated.push(old_mix); + } + + self.test_report.total_sent = run_info.test_packets.len(); + + // store information about packets that are currently being sent + self.expected_run_packets + .reserve(run_info.test_packets.len()); + for test_packet in run_info.test_packets { + self.expected_run_packets.insert(test_packet); + } + } + + pub(super) fn received_packet(&mut self, message: Vec) -> bool { + let test_packet = match TestPacket::try_from_bytes(&message) { + Ok(packet) => packet, + Err(err) => { + warn!("Invalid test packet received - {:?}", err); + return false; + } + }; + + if test_packet.nonce() == self.run_nonce { + self.received_packets.push(test_packet); + } else { + warn!( + "Received test packet for different test run! (Got {}, expected {})", + test_packet.nonce(), + self.run_nonce + ); + } + + if self.received_packets.len() == self.expected_run_packets.len() { + true + } else { + false + } + } + + fn produce_summary(&self) -> HashMap { + // contains map of all (seemingly valid) nodes and whether they speak ipv4/ipv6 + let mut summary: HashMap = HashMap::new(); + + // update based on data we actually get + for received_status in self.received_packets.iter() { + let entry = summary.entry(received_status.pub_key_string()).or_default(); + if received_status.ip_version().is_v4() { + entry.ip_v4_compatible = true + } else { + entry.ip_v6_compatible = true + } + } + + // and then insert entries we didn't get but should have + for expected in self.expected_run_packets.iter() { + summary.entry(expected.pub_key_string()).or_default(); + } + + summary + } + + fn finalize_report(&mut self) { + let mut fully_working = Vec::new(); + let mut only_v4_compatible = Vec::new(); + let mut only_v6_compatible = Vec::new(); + let mut unroutable_nodes = Vec::new(); + + let summary = self.produce_summary(); + for (node, result) in summary.into_iter() { + if result.ip_v4_compatible && result.ip_v6_compatible { + fully_working.push(node) + } else if result.ip_v4_compatible { + only_v4_compatible.push(node) + } else if result.ip_v6_compatible { + only_v6_compatible.push(node) + } else { + unroutable_nodes.push(node) + } + } + + self.test_report.fully_working = fully_working; + self.test_report.only_ipv4_compatible = only_v4_compatible; + self.test_report.only_ipv6_compatible = only_v6_compatible; + self.test_report.completely_unroutable = unroutable_nodes; + } + + pub(super) fn finish_run(&mut self) -> BatchMixStatus { + self.test_report.total_received = self.received_packets.len(); + + if self.print_report { + self.finalize_report(); + self.test_report.print(self.print_detailed_report); + } + + let mut mix_status = Vec::with_capacity( + 2 * (self.test_report.malformed.len() + self.test_report.outdated.len()) + + self.expected_run_packets.len(), + ); + + // firstly we know all malformed and outdated nodes are definitely down - we haven't sent + // any test packets for those + for malformed in self.test_report.malformed.iter() { + let mut down_status = self.down_status(malformed.clone()); + mix_status.append(&mut down_status); + } + for outdated in self.test_report.outdated.iter() { + let mut down_status = self.down_status(outdated.0.clone()); + mix_status.append(&mut down_status); + } + + let mut undelivered = mem::replace(&mut self.expected_run_packets, HashSet::new()); + + // then create status for packets we actually received + for received in mem::replace(&mut self.received_packets, Vec::new()) { + undelivered.remove(&received); + mix_status.push(received.into_up_mixstatus()) + } + + // and finally create status for packets we sent but never received + for undelivered_packet in undelivered.into_iter() { + mix_status.push(undelivered_packet.into_down_mixstatus()) + } + + BatchMixStatus { status: mix_status } + } +} diff --git a/network-monitor/src/notifications/test_timeout.rs b/network-monitor/src/notifications/test_timeout.rs new file mode 100644 index 00000000000..655d2accf4e --- /dev/null +++ b/network-monitor/src/notifications/test_timeout.rs @@ -0,0 +1,57 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use log::*; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::time::{delay_for, Delay, Duration, Instant}; + +pub(super) struct TestTimeout { + delay: Option, +} + +impl TestTimeout { + pub(super) fn new() -> Self { + TestTimeout { delay: None } + } + + pub(super) fn start(&mut self, duration: Duration) { + self.delay = Some(delay_for(duration)) + } + + pub(super) fn clear(&mut self) { + self.delay = None + } + + /// Forces self to fire regardless of internal Delay state + pub(super) fn fire(&mut self) { + match self.delay.as_mut() { + None => error!("Tried to fire non-existent delay!"), + // just set the next delay to 0 so it will be polled immediately and be already elapsed + Some(delay) => delay.reset(Instant::now()), + } + } +} + +impl Future for TestTimeout { + type Output = ::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.delay.as_mut() { + None => Poll::Pending, + Some(delay) => Pin::new(delay).poll(cx), + } + } +} diff --git a/network-monitor/src/packet_sender.rs b/network-monitor/src/packet_sender.rs new file mode 100644 index 00000000000..c8a9c38893f --- /dev/null +++ b/network-monitor/src/packet_sender.rs @@ -0,0 +1,197 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::chunker::Chunker; +use crate::run_info::{RunInfo, TestRunUpdate, TestRunUpdateSender}; +use crate::test_packet::{IpVersion, TestPacket}; +use crate::tested_network::{TestMix, TestedNetwork}; +use directory_client::presence::mixnodes::MixNodePresence; +use gateway_client::error::GatewayClientError; +use log::*; +use nymsphinx::addressing::clients::Recipient; +use nymsphinx::forwarding::packet::MixPacket; +use std::convert::TryInto; +use std::sync::Arc; +use topology::mix; + +#[derive(Debug)] +pub(crate) enum PacketSenderError { + DirectoryError(String), + GatewayError(GatewayClientError), +} + +impl From for PacketSenderError { + fn from(err: GatewayClientError) -> Self { + PacketSenderError::GatewayError(err) + } +} + +pub struct PacketSender { + chunker: Chunker, + directory_client: Arc, + tested_network: TestedNetwork, + test_run_sender: TestRunUpdateSender, + nonce: u64, +} + +impl PacketSender { + pub(crate) fn new( + directory_client: Arc, + tested_network: TestedNetwork, + self_address: Recipient, + test_run_sender: TestRunUpdateSender, + ) -> Self { + PacketSender { + chunker: Chunker::new(self_address), + directory_client, + tested_network, + test_run_sender, + nonce: 0, + } + } + + fn make_test_mix(&self, presence: MixNodePresence) -> TestMix { + // the reason for that conversion is that I want to operate on concrete types + // rather than on "String" everywhere and also this way we remove obviously wrong + // mixnodes where somebody is sending bullshit presence data. + let mix_id = presence.pub_key.clone(); + let mix: Result = presence.try_into(); + match mix { + Err(err) => { + error!("mix {} is malformed - {:?}", mix_id, err); + TestMix::MalformedMix(mix_id) + } + Ok(mix) => { + if version_checker::is_minor_version_compatible( + &mix.version, + self.tested_network.system_version(), + ) { + let v4_test_packet = TestPacket::new(mix.pub_key, IpVersion::V4, self.nonce); + let v6_test_packet = TestPacket::new(mix.pub_key, IpVersion::V6, self.nonce); + + TestMix::ValidMix(mix, [v4_test_packet, v6_test_packet]) + } else { + TestMix::IncompatibleMix(mix) + } + } + } + } + + async fn get_test_mixes(&self) -> Result, PacketSenderError> { + Ok(self + .directory_client + .get_topology() + .await + .map_err(|err| PacketSenderError::DirectoryError(err.to_string()))? + .mix_nodes + .into_iter() + .map(|presence| self.make_test_mix(presence)) + .collect()) + } + + fn prepare_run_info(&self, test_mixes: &[TestMix]) -> RunInfo { + let num_valid = test_mixes.iter().filter(|mix| mix.is_valid()).count(); + let mut test_packets = Vec::with_capacity(num_valid * 2); + let mut malformed_mixes = Vec::new(); + let mut incompatible_mixes = Vec::new(); + + for test_mix in test_mixes { + match test_mix { + TestMix::ValidMix(.., mix_test_packets) => { + test_packets.push(mix_test_packets[0]); + test_packets.push(mix_test_packets[1]); + } + TestMix::MalformedMix(pub_key) => malformed_mixes.push(pub_key.clone()), + TestMix::IncompatibleMix(mix) => { + incompatible_mixes.push((mix.pub_key.to_base58_string(), mix.version.clone())) + } + } + } + RunInfo { + nonce: self.nonce, + test_packets, + malformed_mixes, + incompatible_mixes, + } + } + + async fn prepare_node_mix_packets( + &mut self, + mixnode: mix::Node, + test_packets: [TestPacket; 2], + ) -> Vec { + let mut packets = Vec::with_capacity(2); + for test_packet in test_packets.iter() { + let topology_to_test = self + .tested_network + .substitute_node(mixnode.clone(), test_packet.ip_version()); + let mix_message = test_packet.to_bytes(); + let mut mix_packet = self + .chunker + .prepare_messages(mix_message, &topology_to_test) + .await; + debug_assert_eq!(mix_packet.len(), 1); + packets.push(mix_packet.pop().unwrap()); + } + packets + } + + async fn prepare_mix_packets(&mut self, test_mixes: Vec) -> Vec { + let num_valid = test_mixes.iter().filter(|mix| mix.is_valid()).count(); + let mut mix_packets = Vec::with_capacity(num_valid); + + for test_mix in test_mixes { + match test_mix { + TestMix::ValidMix(mixnode, test_packets) => { + let mut node_mix_packets = + self.prepare_node_mix_packets(mixnode, test_packets).await; + mix_packets.append(&mut node_mix_packets); + } + _ => continue, + } + } + mix_packets + } + + async fn send_messages( + &mut self, + mix_packets: Vec, + ) -> Result<(), PacketSenderError> { + self.tested_network.send_messages(mix_packets).await?; + Ok(()) + } + + pub(crate) async fn run_test(&mut self) -> Result<(), PacketSenderError> { + self.nonce += 1; + + let test_mixes = self.get_test_mixes().await?; + info!(target: "Monitor", "Going to test {} mixes", test_mixes.len()); + let run_info = self.prepare_run_info(&test_mixes); + let mix_packets = self.prepare_mix_packets(test_mixes).await; + + // inform notifier that we're about to start the test + self.test_run_sender + .unbounded_send(TestRunUpdate::StartSending(run_info)) + .expect("notifier has crashed!"); + + self.send_messages(mix_packets).await?; + + // inform the notifier we're done sending (so that it should start its timeout) + self.test_run_sender + .unbounded_send(TestRunUpdate::DoneSending(self.nonce)) + .expect("notifier has crashed!"); + + Ok(()) + } +} diff --git a/network-monitor/src/run_info.rs b/network-monitor/src/run_info.rs new file mode 100644 index 00000000000..4fbe4630064 --- /dev/null +++ b/network-monitor/src/run_info.rs @@ -0,0 +1,31 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::test_packet::TestPacket; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; + +pub(crate) type TestRunUpdateSender = UnboundedSender; +pub(crate) type TestRunUpdateReceiver = UnboundedReceiver; + +pub(crate) struct RunInfo { + pub nonce: u64, + pub test_packets: Vec, + pub malformed_mixes: Vec, + pub incompatible_mixes: Vec<(String, String)>, +} + +pub(crate) enum TestRunUpdate { + StartSending(RunInfo), + DoneSending(u64), +} diff --git a/network-monitor/src/test_packet.rs b/network-monitor/src/test_packet.rs new file mode 100644 index 00000000000..a104bea8a25 --- /dev/null +++ b/network-monitor/src/test_packet.rs @@ -0,0 +1,157 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crypto::asymmetric::encryption; +use crypto::asymmetric::encryption::EncryptionKeyError; +use directory_client::mixmining::MixStatus; +use std::convert::{TryFrom, TryInto}; +use std::fmt::{self, Display, Formatter}; +use std::mem; + +#[derive(Debug)] +pub(crate) enum TestPacketError { + IncompletePacket, + InvalidIpVersion, + InvalidNodeKey, +} + +impl From for TestPacketError { + fn from(_: EncryptionKeyError) -> Self { + TestPacketError::InvalidNodeKey + } +} + +#[repr(u8)] +#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)] +pub(crate) enum IpVersion { + V4 = 4, + V6 = 6, +} + +impl TryFrom for IpVersion { + type Error = TestPacketError; + + fn try_from(value: u8) -> Result { + match value { + _ if value == (Self::V4 as u8) => Ok(Self::V4), + _ if value == (Self::V6 as u8) => Ok(Self::V6), + _ => Err(TestPacketError::InvalidIpVersion), + } + } +} + +impl IpVersion { + pub(crate) fn is_v4(&self) -> bool { + *self == IpVersion::V4 + } +} + +impl Into for IpVersion { + fn into(self) -> String { + format!("{}", self) + } +} + +impl Display for IpVersion { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}", *self as u8) + } +} + +#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)] +pub(crate) struct TestPacket { + ip_version: IpVersion, + nonce: u64, + pub_key: encryption::PublicKey, // TODO: eventually this will get replaced with identity::PublicKey +} + +impl Display for TestPacket { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "TestPacket {{ ip: {}, pub_key: {}, nonce: {} }}", + self.ip_version, + self.pub_key.to_base58_string(), + self.nonce + ) + } +} + +impl TestPacket { + pub(crate) fn new(pub_key: encryption::PublicKey, ip_version: IpVersion, nonce: u64) -> Self { + TestPacket { + pub_key, + ip_version, + nonce, + } + } + + pub(crate) fn nonce(&self) -> u64 { + self.nonce + } + + pub(crate) fn ip_version(&self) -> IpVersion { + self.ip_version + } + + pub(crate) fn pub_key_string(&self) -> String { + self.pub_key.to_base58_string() + } + + pub(crate) fn to_bytes(&self) -> Vec { + self.nonce + .to_be_bytes() + .iter() + .cloned() + .chain(std::iter::once(self.ip_version as u8)) + .chain(self.pub_key.to_bytes().iter().cloned()) + .collect() + } + + pub(crate) fn try_from_bytes(b: &[u8]) -> Result { + // nonce size + let n = mem::size_of::(); + + if b.len() != n + 1 + encryption::PUBLIC_KEY_SIZE { + return Err(TestPacketError::IncompletePacket); + } + + // this unwrap can't fail as we've already checked for the size + let nonce = u64::from_be_bytes(b[0..n].try_into().unwrap()); + let ip_version = IpVersion::try_from(b[n])?; + let pub_key = encryption::PublicKey::from_bytes(&b[n + 1..])?; + + Ok(TestPacket { + ip_version, + nonce, + pub_key, + }) + } + + pub(crate) fn into_up_mixstatus(self) -> MixStatus { + MixStatus { + pub_key: self.pub_key.to_base58_string(), + ip_version: self.ip_version.into(), + up: true, + } + } + + pub(crate) fn into_down_mixstatus(self) -> MixStatus { + MixStatus { + pub_key: self.pub_key.to_base58_string(), + ip_version: self.ip_version.into(), + up: false, + } + } +} diff --git a/network-monitor/src/tested_network/good_topology.rs b/network-monitor/src/tested_network/good_topology.rs new file mode 100644 index 00000000000..24a9f28afc9 --- /dev/null +++ b/network-monitor/src/tested_network/good_topology.rs @@ -0,0 +1,223 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crypto::asymmetric::{ + encryption::{self, PublicKey}, + identity, +}; +use std::collections::HashMap; +use topology::{gateway, mix, NymTopology}; + +pub(crate) fn v4_mixnodes() -> Vec { + let goodnode1 = mix::Node { + location: "London".to_string(), + host: "213.52.129.218:1789".parse().unwrap(), + pub_key: PublicKey::from_base58_string("EJHwrLafqygqctkBCntVZfUkMSDErGUStJjZniQoRoJr") + .unwrap(), + last_seen: 1600276206950298819, + layer: 1, + version: "0.8.0".to_string(), + }; + + let goodnode2 = mix::Node { + location: "Frankfurt".to_string(), + host: "172.104.244.117:1789".parse().unwrap(), + pub_key: PublicKey::from_base58_string("BW7xskYvZyHt8rGFzsmG5bEQ9ViCYYxpFsEWDcNtSYvX") + .unwrap(), + last_seen: 1600276206950298819, + layer: 2, + version: "0.8.0".to_string(), + }; + + let goodnode3 = mix::Node { + location: "London".to_string(), + host: "178.79.136.231:1789".parse().unwrap(), + pub_key: PublicKey::from_base58_string("BqBGpP4YDH5fRDVKB97Ru7aq2Wbarb3SNfZL5LGaH83e") + .unwrap(), + layer: 3, + last_seen: 1600276206950298819, + version: "0.8.0".to_string(), + }; + + vec![goodnode1, goodnode2, goodnode3] +} + +pub(crate) fn v6_mixnodes() -> Vec { + let goodnode1 = mix::Node { + location: "London".to_string(), + host: "[2a01:7e00::f03c:92ff:fe16:3dc2]:1789".parse().unwrap(), + pub_key: PublicKey::from_base58_string("EJHwrLafqygqctkBCntVZfUkMSDErGUStJjZniQoRoJr") + .unwrap(), + last_seen: 1600276206950298819, + layer: 1, + version: "0.8.0".to_string(), + }; + + let goodnode2 = mix::Node { + location: "Frankfurt".to_string(), + host: "[2a01:7e01::f03c:92ff:fe16:3d11]:1789".parse().unwrap(), + pub_key: PublicKey::from_base58_string("BW7xskYvZyHt8rGFzsmG5bEQ9ViCYYxpFsEWDcNtSYvX") + .unwrap(), + last_seen: 1600276206950298819, + layer: 2, + version: "0.8.0".to_string(), + }; + + let goodnode3 = mix::Node { + location: "London".to_string(), + host: "[2a01:7e00::f03c:92ff:fe16:3d7b]:1789".parse().unwrap(), + pub_key: PublicKey::from_base58_string("BqBGpP4YDH5fRDVKB97Ru7aq2Wbarb3SNfZL5LGaH83e") + .unwrap(), + layer: 3, + last_seen: 1600276206950298819, + version: "0.8.0".to_string(), + }; + + vec![goodnode1, goodnode2, goodnode3] +} + +pub(crate) fn v4_gateway() -> gateway::Node { + gateway::Node { + location: "unknown".to_string(), + client_listener: "ws://139.162.246.48:9000".to_string(), + mixnet_listener: "139.162.246.48:1789".parse().unwrap(), + identity_key: identity::PublicKey::from_base58_string( + "D6YaMzLSY7mANtSQRKXsmMZpqgqiVkeiagKM4V4oFPFr", + ) + .unwrap(), + sphinx_key: encryption::PublicKey::from_base58_string( + "6snGVMCatcTnvjGPaf8Ye7kCnVn6ThEDdCs4TZ7DbDVj", + ) + .unwrap(), + last_seen: 1600424297774836793, + version: "0.8.0".to_string(), + } +} + +pub(crate) fn v6_gateway() -> gateway::Node { + gateway::Node { + location: "unknown".to_string(), + client_listener: "ws://[2a01:7e00::f03c:92ff:fe16:49f1]:9000".to_string(), + mixnet_listener: "[2a01:7e00::f03c:92ff:fe16:49f1]:1789".parse().unwrap(), + identity_key: identity::PublicKey::from_base58_string( + "D6YaMzLSY7mANtSQRKXsmMZpqgqiVkeiagKM4V4oFPFr", + ) + .unwrap(), + sphinx_key: encryption::PublicKey::from_base58_string( + "6snGVMCatcTnvjGPaf8Ye7kCnVn6ThEDdCs4TZ7DbDVj", + ) + .unwrap(), + last_seen: 1600424297774836793, + version: "0.8.0".to_string(), + } +} + +/// Returns a new ipv4 NymTopology composed of known good nodes +pub(crate) fn new_v4() -> NymTopology { + let mut layered_mixes = HashMap::new(); + + for (i, node) in v4_mixnodes().into_iter().enumerate() { + layered_mixes.insert((i + 1) as u8, vec![node]); + } + + NymTopology::new(Vec::new(), layered_mixes, vec![v4_gateway()]) +} + +/// Returns a new ipv6 NymTopology composed of known good nodes +pub(crate) fn new_v6() -> NymTopology { + let mut layered_mixes = HashMap::new(); + + for (i, node) in v6_mixnodes().into_iter().enumerate() { + layered_mixes.insert((i + 1) as u8, vec![node]); + } + + NymTopology::new(Vec::new(), layered_mixes, vec![v6_gateway()]) +} + +// Returns a new topology of known good nodes, with one good node replaced with a test node +pub(crate) fn new_v4_with_node(test_node: mix::Node) -> NymTopology { + let mut topology = self::new_v4(); + topology.set_mixes_in_layer(test_node.layer as u8, vec![test_node]); + topology +} + +#[cfg(test)] +mod good_topology_test { + use super::*; + + mod subbing_in_a_node_to_test { + use super::*; + + #[test] + fn returns_good_topology_with_test_node_in_desired_layer() { + let topology = expected_topology_with_test_node(); + let expected_gateway_key = topology.gateways().first().unwrap().identity_key; + let expected_layer_1_mixnode_pubkey = + topology.mixes_in_layer(1)[0].pub_key.to_base58_string(); + let expected_layer_2_mixnode_pubkey = + topology.mixes_in_layer(2)[0].pub_key.to_base58_string(); + let expected_layer_3_mixnode_pubkey = + topology.mixes_in_layer(3)[0].pub_key.to_base58_string(); + let result = new_v4_with_node(test_node()); + let actual_gateway_key = result.gateways().first().unwrap().identity_key; + let actual_layer_1_mixnode_pubkey = + result.mixes_in_layer(1)[0].pub_key.to_base58_string(); + let actual_layer_2_mixnode_pubkey = + result.mixes_in_layer(2)[0].pub_key.to_base58_string(); + let actual_layer_3_mixnode_pubkey = + result.mixes_in_layer(3)[0].pub_key.to_base58_string(); + + assert_eq!(expected_gateway_key, actual_gateway_key); + assert_eq!( + expected_layer_1_mixnode_pubkey, + actual_layer_1_mixnode_pubkey + ); + assert_eq!( + expected_layer_2_mixnode_pubkey, + actual_layer_2_mixnode_pubkey + ); + assert_eq!( + expected_layer_3_mixnode_pubkey, + actual_layer_3_mixnode_pubkey + ); + } + } + + fn expected_topology_with_test_node() -> NymTopology { + let mut mixes = HashMap::new(); + let mixnodes = v4_mixnodes(); + let mix1 = test_node(); // this is the one we will test + let mix2 = mixnodes[1].clone(); + let mix3 = mixnodes[2].clone(); + + mixes.insert(1, vec![mix1]); + mixes.insert(2, vec![mix2]); + mixes.insert(3, vec![mix3]); + NymTopology::new(vec![], mixes, vec![v4_gateway()]) + } + + fn test_node() -> mix::Node { + mix::Node { + location: "Thunder Bay".to_string(), + host: "1.2.3.4:1234".parse().unwrap(), + pub_key: encryption::PublicKey::from_base58_string( + "9fX1rMaQdBEzjuv6kT7oyPfEabt73QTM5cfuQ9kaxrRQ", + ) + .unwrap(), + layer: 1, + last_seen: 1234, + version: "0.8.1".to_string(), + } + } +} diff --git a/network-monitor/src/tested_network/mod.rs b/network-monitor/src/tested_network/mod.rs new file mode 100644 index 00000000000..b434b37da07 --- /dev/null +++ b/network-monitor/src/tested_network/mod.rs @@ -0,0 +1,91 @@ +// Copyright 2020 Nym Technologies SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::test_packet::{IpVersion, TestPacket}; +use gateway_client::error::GatewayClientError; +use gateway_client::GatewayClient; +use nymsphinx::forwarding::packet::MixPacket; +use topology::{gateway, mix, NymTopology}; + +pub(crate) mod good_topology; + +pub(crate) enum TestMix { + ValidMix(mix::Node, [TestPacket; 2]), + IncompatibleMix(mix::Node), + MalformedMix(String), +} + +impl TestMix { + pub(crate) fn is_valid(&self) -> bool { + match self { + TestMix::ValidMix(..) => true, + _ => false, + } + } +} + +pub(crate) struct TestedNetwork { + system_version: String, + gateway_client: GatewayClient, + good_v4_topology: NymTopology, + good_v6_topology: NymTopology, +} + +pub(crate) fn v4_gateway() -> gateway::Node { + good_topology::v4_gateway() +} + +impl TestedNetwork { + pub(crate) fn new_good(gateway_client: GatewayClient) -> Self { + let good_v4_topology = good_topology::new_v4(); + + TestedNetwork { + system_version: good_v4_topology.mixes()[&1][0].version.clone(), + gateway_client, + good_v4_topology, + good_v6_topology: good_topology::new_v6(), + } + } + + pub(crate) fn system_version(&self) -> &str { + &self.system_version + } + + pub(crate) async fn start_gateway_client(&mut self) { + self.gateway_client + .authenticate_and_start() + .await + .expect("Couldn't authenticate with gateway node."); + } + + pub(crate) async fn send_messages( + &mut self, + mix_packets: Vec, + ) -> Result<(), GatewayClientError> { + self.gateway_client + .batch_send_mix_packets(mix_packets) + .await?; + Ok(()) + } + + pub(crate) fn substitute_node(&self, node: mix::Node, ip_version: IpVersion) -> NymTopology { + let mut good_topology = match ip_version { + IpVersion::V4 => self.good_v4_topology.clone(), + IpVersion::V6 => self.good_v6_topology.clone(), + }; + + good_topology.set_mixes_in_layer(node.layer as u8, vec![node]); + good_topology + } +} diff --git a/service-providers/sphinx-socks/Cargo.toml b/service-providers/sphinx-socks/Cargo.toml index 2a03293dfba..84c33b2d285 100644 --- a/service-providers/sphinx-socks/Cargo.toml +++ b/service-providers/sphinx-socks/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sphinx-socks" -version = "0.8.1" +version = "0.9.0-dev" authors = ["Dave Hrycyszyn ", "Jędrzej Stuczyński "] edition = "2018" diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 7d5829c00c4..cefe4c7ef47 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -1,7 +1,7 @@ [package] build = "build.rs" name = "nym-validator" -version = "0.8.1" +version = "0.9.0-dev" authors = ["Dave Hrycyszyn ", "Jedrzej Stuczynski "] edition = "2018"