diff --git a/Cargo.lock b/Cargo.lock index 13494070c78..0422534b91d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1101,6 +1101,7 @@ name = "explorer-api" version = "0.1.0" dependencies = [ "chrono", + "humantime-serde", "isocountry", "log", "mixnet-contract", @@ -1108,6 +1109,7 @@ dependencies = [ "pretty_env_logger", "reqwest", "rocket", + "rocket_cors", "rocket_okapi", "schemars", "serde", diff --git a/explorer-api/Cargo.toml b/explorer-api/Cargo.toml index b8e87fde868..6487b1a3932 100644 --- a/explorer-api/Cargo.toml +++ b/explorer-api/Cargo.toml @@ -9,7 +9,9 @@ edition = "2018" isocountry = "0.3.2" reqwest = "0.11.4" rocket = {version = "0.5.0-rc.1", features=["json"] } +rocket_cors = { git="https://github.com/lawliet89/rocket_cors", rev="dfd3662c49e2f6fc37df35091cb94d82f7fb5915" } serde = "1.0.126" +humantime-serde = "1.0" serde_json = "1.0.66" tokio = {version = "1.9.0", features = ["full"] } chrono = { version = "0.4.19", features = ["serde"] } diff --git a/explorer-api/src/country_statistics/http.rs b/explorer-api/src/country_statistics/http.rs index 6b0b807ac01..74be62cb13a 100644 --- a/explorer-api/src/country_statistics/http.rs +++ b/explorer-api/src/country_statistics/http.rs @@ -3,7 +3,7 @@ use crate::state::ExplorerApiStateContext; use rocket::serde::json::Json; use rocket::{Route, State}; -pub fn make_default_routes() -> Vec { +pub fn country_statistics_make_default_routes() -> Vec { routes_with_openapi![index] } diff --git a/explorer-api/src/country_statistics/mod.rs b/explorer-api/src/country_statistics/mod.rs index 6639a70a989..31fe1fc81bb 100644 --- a/explorer-api/src/country_statistics/mod.rs +++ b/explorer-api/src/country_statistics/mod.rs @@ -1,8 +1,6 @@ use isocountry::CountryCode; use log::{info, trace, warn}; -use mixnet_contract::MixNodeBond; use reqwest::Error as ReqwestError; -use validator_client::Config; use models::GeoLocation; @@ -39,7 +37,15 @@ impl CountryStatistics { /// Retrieves the current list of mixnodes from the validators and calculates how many nodes are in each country async fn calculate_nodes_per_country(&mut self) { - let mixnode_bonds = retrieve_mixnodes().await; + // force the mixnode cache to invalidate + let mixnode_bonds = self + .state + .inner + .mix_nodes + .clone() + .refresh_and_get() + .await + .value; let mut distribution = CountryNodesDistribution::new(); @@ -103,22 +109,3 @@ async fn locate(ip: &str) -> Result { let location = response.json::().await?; Ok(location) } - -async fn retrieve_mixnodes() -> Vec { - let client = new_validator_client(); - - info!("About to retrieve mixnode bonds..."); - - let bonds: Vec = match client.get_cached_mix_nodes().await { - Ok(result) => result, - Err(e) => panic!("Unable to retrieve mixnode bonds: {:?}", e), - }; - info!("Fetched {} mixnode bonds", bonds.len()); - bonds -} - -// TODO: inject constants -fn new_validator_client() -> validator_client::Client { - let config = Config::new(vec![crate::VALIDATOR_API.to_string()], crate::CONTRACT); - validator_client::Client::new(config) -} diff --git a/explorer-api/src/http/mod.rs b/explorer-api/src/http/mod.rs index 6eae464f392..2a3171081da 100644 --- a/explorer-api/src/http/mod.rs +++ b/explorer-api/src/http/mod.rs @@ -1,28 +1,50 @@ -mod swagger; +use log::info; +use rocket::http::Method; +use rocket::Request; +use rocket_cors::{AllowedHeaders, AllowedOrigins}; +use rocket_okapi::swagger_ui::make_swagger_ui; -use crate::country_statistics::http::make_default_routes; +use crate::country_statistics::http::country_statistics_make_default_routes; use crate::http::swagger::get_docs; +use crate::mix_node::http::mix_node_make_default_routes; use crate::ping::http::ping_make_default_routes; use crate::state::ExplorerApiStateContext; -use log::info; -use rocket_okapi::swagger_ui::make_swagger_ui; + +mod swagger; pub(crate) fn start(state: ExplorerApiStateContext) { tokio::spawn(async move { info!("Starting up..."); - let config = rocket::config::Config::release_default(); + let allowed_origins = AllowedOrigins::all(); + + // You can also deserialize this + let cors = rocket_cors::CorsOptions { + allowed_origins, + allowed_methods: vec![Method::Get].into_iter().map(From::from).collect(), + allowed_headers: AllowedHeaders::some(&["*"]), + allow_credentials: true, + ..Default::default() + } + .to_cors() + .unwrap(); + let config = rocket::config::Config::release_default(); rocket::build() .configure(config) - .mount("/countries", make_default_routes()) + .mount("/countries", country_statistics_make_default_routes()) .mount("/ping", ping_make_default_routes()) + .mount("/mix-node", mix_node_make_default_routes()) .mount("/swagger", make_swagger_ui(&get_docs())) - // .register("/", catchers![not_found]) + .register("/", catchers![not_found]) .manage(state) - // .manage(descriptor) - // .manage(node_stats_pointer) + .attach(cors) .launch() .await }); } + +#[catch(404)] +pub(crate) fn not_found(req: &Request) -> String { + format!("I couldn't find '{}'. Try something else?", req.uri()) +} diff --git a/explorer-api/src/http/swagger.rs b/explorer-api/src/http/swagger.rs index 12bb19c5ed0..4f30b69e746 100644 --- a/explorer-api/src/http/swagger.rs +++ b/explorer-api/src/http/swagger.rs @@ -5,6 +5,7 @@ pub(crate) fn get_docs() -> SwaggerUIConfig { urls: vec![ UrlObject::new("Country statistics", "/countries/openapi.json"), UrlObject::new("Node ping", "/ping/openapi.json"), + UrlObject::new("Mix node", "/mix-node/openapi.json"), ], ..Default::default() } diff --git a/explorer-api/src/main.rs b/explorer-api/src/main.rs index 729c1b27d40..3271e87fc6f 100644 --- a/explorer-api/src/main.rs +++ b/explorer-api/src/main.rs @@ -7,6 +7,8 @@ use log::info; mod country_statistics; mod http; +mod mix_node; +mod mix_nodes; mod ping; mod state; diff --git a/explorer-api/src/mix_node/cache.rs b/explorer-api/src/mix_node/cache.rs new file mode 100644 index 00000000000..6b66ae2e2f8 --- /dev/null +++ b/explorer-api/src/mix_node/cache.rs @@ -0,0 +1,41 @@ +use std::collections::HashMap; +use std::time::{Duration, SystemTime}; + +#[derive(Clone)] +pub(crate) struct Cache { + inner: HashMap>, +} + +impl Cache { + pub(crate) fn new() -> Self { + Cache { + inner: HashMap::new(), + } + } + + pub(crate) fn get(&self, identity_key: &str) -> Option + where + T: Clone, + { + self.inner + .get(identity_key) + .filter(|cache_item| cache_item.valid_until > SystemTime::now()) + .map(|cache_item| cache_item.value.clone()) + } + + pub(crate) fn set(&mut self, identity_key: &str, value: T) { + self.inner.insert( + identity_key.to_string(), + CacheItem { + valid_until: SystemTime::now() + Duration::from_secs(60 * 30), + value, + }, + ); + } +} + +#[derive(Clone)] +pub(crate) struct CacheItem { + pub(crate) value: T, + pub(crate) valid_until: std::time::SystemTime, +} diff --git a/explorer-api/src/mix_node/http.rs b/explorer-api/src/mix_node/http.rs new file mode 100644 index 00000000000..228b886ae81 --- /dev/null +++ b/explorer-api/src/mix_node/http.rs @@ -0,0 +1,118 @@ +use reqwest::Error as ReqwestError; + +use rocket::serde::json::Json; +use rocket::{Route, State}; + +use crate::mix_node::models::{NodeDescription, NodeStats}; +use crate::state::ExplorerApiStateContext; + +pub fn mix_node_make_default_routes() -> Vec { + routes_with_openapi![get_description, get_stats] +} + +#[openapi(tag = "mix_node")] +#[get("//description")] +pub(crate) async fn get_description( + pubkey: &str, + state: &State, +) -> Option> { + match state + .inner + .mix_node_cache + .clone() + .get_description(pubkey) + .await + { + Some(cache_value) => { + trace!("Returning cached value for {}", pubkey); + Some(Json(cache_value)) + } + None => { + trace!("No valid cache value for {}", pubkey); + match state.inner.get_mix_node(pubkey).await { + Some(bond) => { + match get_mix_node_description( + &bond.mix_node.host, + &bond.mix_node.http_api_port, + ) + .await + { + Ok(response) => { + // cache the response and return as the HTTP response + state + .inner + .mix_node_cache + .set_description(pubkey, response.clone()) + .await; + Some(Json(response)) + } + Err(e) => { + error!( + "Unable to get description for {} on {}:{} -> {}", + pubkey, bond.mix_node.host, bond.mix_node.http_api_port, e + ); + Option::None + } + } + } + None => Option::None, + } + } + } +} + +#[openapi(tag = "mix_node")] +#[get("//stats")] +pub(crate) async fn get_stats( + pubkey: &str, + state: &State, +) -> Option> { + match state.inner.mix_node_cache.get_node_stats(pubkey).await { + Some(cache_value) => { + trace!("Returning cached value for {}", pubkey); + Some(Json(cache_value)) + } + None => { + trace!("No valid cache value for {}", pubkey); + match state.inner.get_mix_node(pubkey).await { + Some(bond) => { + match get_mix_node_stats(&bond.mix_node.host, &bond.mix_node.http_api_port) + .await + { + Ok(response) => { + // cache the response and return as the HTTP response + state + .inner + .mix_node_cache + .set_node_stats(pubkey, response.clone()) + .await; + Some(Json(response)) + } + Err(e) => { + error!( + "Unable to get description for {} on {}:{} -> {}", + pubkey, bond.mix_node.host, bond.mix_node.http_api_port, e + ); + Option::None + } + } + } + None => Option::None, + } + } + } +} + +async fn get_mix_node_description(host: &str, port: &u16) -> Result { + reqwest::get(format!("http://{}:{}/description", host, port)) + .await? + .json::() + .await +} + +async fn get_mix_node_stats(host: &str, port: &u16) -> Result { + reqwest::get(format!("http://{}:{}/stats", host, port)) + .await? + .json::() + .await +} diff --git a/explorer-api/src/mix_node/mod.rs b/explorer-api/src/mix_node/mod.rs new file mode 100644 index 00000000000..b57132ec2c5 --- /dev/null +++ b/explorer-api/src/mix_node/mod.rs @@ -0,0 +1,3 @@ +mod cache; +pub(crate) mod http; +pub(crate) mod models; diff --git a/explorer-api/src/mix_node/models.rs b/explorer-api/src/mix_node/models.rs new file mode 100644 index 00000000000..8aca0dc33ea --- /dev/null +++ b/explorer-api/src/mix_node/models.rs @@ -0,0 +1,83 @@ +use std::sync::Arc; +use std::time::SystemTime; + +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::RwLock; + +use crate::mix_node::cache::Cache; + +pub(crate) struct MixNodeCache { + pub(crate) descriptions: Cache, + pub(crate) node_stats: Cache, +} + +#[derive(Clone)] +pub(crate) struct ThreadsafeMixNodeCache { + inner: Arc>, +} + +impl ThreadsafeMixNodeCache { + pub(crate) fn new() -> Self { + ThreadsafeMixNodeCache { + inner: Arc::new(RwLock::new(MixNodeCache { + descriptions: Cache::new(), + node_stats: Cache::new(), + })), + } + } + + pub(crate) async fn get_description(&self, identity_key: &str) -> Option { + self.inner.read().await.descriptions.get(identity_key) + } + + pub(crate) async fn get_node_stats(&self, identity_key: &str) -> Option { + self.inner.read().await.node_stats.get(identity_key) + } + + pub(crate) async fn set_description(&self, identity_key: &str, description: NodeDescription) { + self.inner + .write() + .await + .descriptions + .set(identity_key, description); + } + + pub(crate) async fn set_node_stats(&self, identity_key: &str, node_stats: NodeStats) { + self.inner + .write() + .await + .node_stats + .set(identity_key, node_stats); + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize, JsonSchema)] +pub(crate) struct NodeDescription { + pub(crate) name: String, + pub(crate) description: String, + pub(crate) link: String, + pub(crate) location: String, +} + +#[derive(Serialize, Clone, Deserialize, JsonSchema)] +pub(crate) struct NodeStats { + #[serde( + serialize_with = "humantime_serde::serialize", + deserialize_with = "humantime_serde::deserialize" + )] + update_time: SystemTime, + + #[serde( + serialize_with = "humantime_serde::serialize", + deserialize_with = "humantime_serde::deserialize" + )] + previous_update_time: SystemTime, + + packets_received_since_startup: u64, + packets_sent_since_startup: u64, + packets_explicitly_dropped_since_startup: u64, + packets_received_since_last_update: u64, + packets_sent_since_last_update: u64, + packets_explicitly_dropped_since_last_update: u64, +} diff --git a/explorer-api/src/mix_nodes/mod.rs b/explorer-api/src/mix_nodes/mod.rs new file mode 100644 index 00000000000..0a9293541ee --- /dev/null +++ b/explorer-api/src/mix_nodes/mod.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use rocket::tokio::sync::RwLock; + +use mixnet_contract::MixNodeBond; +use validator_client::Config; + +#[derive(Clone, Debug)] +pub(crate) struct MixNodesResult { + pub(crate) valid_until: SystemTime, + pub(crate) value: Vec, +} + +#[derive(Clone)] +pub(crate) struct ThreadsafeMixNodesResult { + inner: Arc>, +} + +impl ThreadsafeMixNodesResult { + pub(crate) fn new() -> Self { + ThreadsafeMixNodesResult { + inner: Arc::new(RwLock::new(MixNodesResult { + value: vec![], + valid_until: SystemTime::now() - Duration::from_secs(60), // in the past + })), + } + } + + pub(crate) async fn get(&self) -> MixNodesResult { + // check ttl + let valid_until = self.inner.clone().read().await.valid_until; + + if valid_until < SystemTime::now() { + // force reload + self.refresh().await; + } + + // return in-memory cache + self.inner.clone().read().await.clone() + } + + pub(crate) async fn refresh_and_get(&self) -> MixNodesResult { + self.refresh().await; + self.inner.read().await.clone() + } + + async fn refresh(&self) { + // get mixnodes and cache the new value + let value = retrieve_mixnodes().await; + self.inner.write().await.clone_from(&MixNodesResult { + value, + valid_until: SystemTime::now() + Duration::from_secs(60 * 10), // valid for 10 minutes + }); + } +} + +pub(crate) async fn retrieve_mixnodes() -> Vec { + let client = new_validator_client(); + + info!("About to retrieve mixnode bonds..."); + + let bonds: Vec = match client.get_cached_mix_nodes().await { + Ok(result) => result, + Err(e) => { + error!("Unable to retrieve mixnode bonds: {:?}", e); + vec![] + } + }; + info!("Fetched {} mixnode bonds", bonds.len()); + bonds +} + +// TODO: inject constants +fn new_validator_client() -> validator_client::Client { + let config = Config::new(vec![crate::VALIDATOR_API.to_string()], crate::CONTRACT); + validator_client::Client::new(config) +} diff --git a/explorer-api/src/ping/http.rs b/explorer-api/src/ping/http.rs index 06a7d20287f..962529d7799 100644 --- a/explorer-api/src/ping/http.rs +++ b/explorer-api/src/ping/http.rs @@ -1,19 +1,105 @@ +use std::collections::HashMap; +use std::net::ToSocketAddrs; +use std::time::Duration; + use rocket::serde::json::Json; -use rocket::Route; -use serde::Deserialize; -use serde::Serialize; +use rocket::{Route, State}; + +use crate::ping::models::PingResponse; +use crate::state::ExplorerApiStateContext; +use mixnet_contract::MixNodeBond; + +const CONNECTION_TIMEOUT_SECONDS: Duration = Duration::from_secs(10); pub fn ping_make_default_routes() -> Vec { routes_with_openapi![index] } -#[derive(Deserialize, Serialize, JsonSchema)] -pub(crate) struct PingResponse { - response_time: u32, +#[openapi(tag = "ping")] +#[get("/")] +pub(crate) async fn index( + pubkey: &str, + state: &State, +) -> Option> { + match state.inner.ping_cache.clone().get(pubkey).await { + Some(cache_value) => { + trace!("Returning cached value for {}", pubkey); + Some(Json(PingResponse { + ports: cache_value.ports, + })) + } + None => { + trace!("No cache value for {}", pubkey); + + match state.inner.get_mix_node(pubkey).await { + Some(bond) => { + let ports = port_check(&bond).await; + + trace!("Tested mix node {}: {:?}", pubkey, ports); + + let response = PingResponse { ports }; + + // cache for 1 min + trace!("Caching value for {}", pubkey); + state.inner.ping_cache.set(pubkey, response.clone()).await; + + // return response + Some(Json(response)) + } + None => None, + } + } + } +} + +async fn port_check(bond: &MixNodeBond) -> HashMap { + let mut ports: HashMap = HashMap::new(); + + let ports_to_test = vec![ + bond.mix_node.http_api_port, + bond.mix_node.mix_port, + bond.mix_node.verloc_port, + ]; + + trace!( + "Testing mix node {} on ports {:?}...", + bond.mix_node.identity_key, + ports_to_test + ); + + for port in ports_to_test { + ports.insert(port, do_port_check(&bond.mix_node.host, port).await); + } + + ports } -#[openapi(tag = "ping")] -#[get("/")] -pub(crate) async fn index() -> Json { - Json(PingResponse { response_time: 42 }) +async fn do_port_check(host: &str, port: u16) -> bool { + let addr = format!("{}:{}", host, port) + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + match tokio::time::timeout( + CONNECTION_TIMEOUT_SECONDS, + tokio::net::TcpStream::connect(addr), + ) + .await + { + Ok(Ok(_stream)) => { + // didn't timeout and tcp stream is open + trace!("Successfully pinged {}", addr); + true + } + Ok(Err(_stream_err)) => { + error!("{} ping failed {:}", addr, _stream_err); + // didn't timeout but couldn't open tcp stream + false + } + Err(_timeout) => { + // timed out + error!("{} timed out {:}", addr, _timeout); + false + } + } } diff --git a/explorer-api/src/ping/mod.rs b/explorer-api/src/ping/mod.rs index 3883215fcb6..5df938f83ce 100644 --- a/explorer-api/src/ping/mod.rs +++ b/explorer-api/src/ping/mod.rs @@ -1 +1,2 @@ -pub mod http; +pub(crate) mod http; +pub(crate) mod models; diff --git a/explorer-api/src/ping/models.rs b/explorer-api/src/ping/models.rs new file mode 100644 index 00000000000..13164524484 --- /dev/null +++ b/explorer-api/src/ping/models.rs @@ -0,0 +1,55 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::RwLock; + +pub(crate) type PingCache = HashMap; + +const CACHE_TTL: Duration = Duration::from_secs(60 * 60); // 1 hour + +#[derive(Clone)] +pub(crate) struct ThreadsafePingCache { + inner: Arc>, +} + +impl ThreadsafePingCache { + pub(crate) fn new() -> Self { + ThreadsafePingCache { + inner: Arc::new(RwLock::new(PingCache::new())), + } + } + + pub(crate) async fn get(&self, identity_key: &str) -> Option { + self.inner + .read() + .await + .get(identity_key) + .filter(|cache_item| cache_item.valid_until > SystemTime::now()) + .map(|cache_item| PingResponse { + ports: cache_item.ports.clone(), + }) + } + + pub(crate) async fn set(&self, identity_key: &str, item: PingResponse) { + self.inner.write().await.insert( + identity_key.to_string(), + PingCacheItem { + valid_until: SystemTime::now() + CACHE_TTL, + ports: item.ports, + }, + ); + } +} + +#[derive(Deserialize, Serialize, JsonSchema, Clone)] +pub(crate) struct PingResponse { + pub(crate) ports: HashMap, +} + +pub(crate) struct PingCacheItem { + pub(crate) ports: HashMap, + pub(crate) valid_until: std::time::SystemTime, +} diff --git a/explorer-api/src/state.rs b/explorer-api/src/state.rs index bf2cb1e2406..17655f31bf9 100644 --- a/explorer-api/src/state.rs +++ b/explorer-api/src/state.rs @@ -1,13 +1,17 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; use std::fs::File; use std::path::Path; +use chrono::{DateTime, Utc}; use log::info; +use serde::{Deserialize, Serialize}; use crate::country_statistics::country_nodes_distribution::{ ConcurrentCountryNodesDistribution, CountryNodesDistribution, }; +use crate::mix_node::models::ThreadsafeMixNodeCache; +use crate::mix_nodes::ThreadsafeMixNodesResult; +use crate::ping::models::ThreadsafePingCache; +use mixnet_contract::MixNodeBond; // TODO: change to an environment variable with a default value const STATE_FILE: &str = "explorer-api-state.json"; @@ -15,6 +19,21 @@ const STATE_FILE: &str = "explorer-api-state.json"; #[derive(Clone)] pub struct ExplorerApiState { pub(crate) country_node_distribution: ConcurrentCountryNodesDistribution, + pub(crate) mix_nodes: ThreadsafeMixNodesResult, + pub(crate) mix_node_cache: ThreadsafeMixNodeCache, + pub(crate) ping_cache: ThreadsafePingCache, +} + +impl ExplorerApiState { + pub(crate) async fn get_mix_node(&self, pubkey: &str) -> Option { + self.mix_nodes + .get() + .await + .value + .iter() + .find(|node| node.mix_node.identity_key == pubkey) + .cloned() + } } #[derive(Debug, Serialize, Deserialize)] @@ -50,6 +69,9 @@ impl ExplorerApiStateContext { country_node_distribution: ConcurrentCountryNodesDistribution::attach( state.country_node_distribution, ), + mix_nodes: ThreadsafeMixNodesResult::new(), + mix_node_cache: ThreadsafeMixNodeCache::new(), + ping_cache: ThreadsafePingCache::new(), } } Err(_e) => { @@ -59,6 +81,9 @@ impl ExplorerApiStateContext { ); ExplorerApiState { country_node_distribution: ConcurrentCountryNodesDistribution::new(), + mix_nodes: ThreadsafeMixNodesResult::new(), + mix_node_cache: ThreadsafeMixNodeCache::new(), + ping_cache: ThreadsafePingCache::new(), } } }