diff --git a/Cargo.lock b/Cargo.lock index 2e9e7c19f74..e3f12730bdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,6 +747,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi 0.3.8", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -877,20 +887,12 @@ dependencies = [ ] [[package]] -name = "gateway" -version = "0.1.0" +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" dependencies = [ - "dotenv", - "futures 0.3.4", - "futures-channel", - "futures-util", - "log 0.4.8", - "multi-tcp-client", - "nymsphinx", - "pretty_env_logger", - "tokio 0.2.16", - "tokio-tungstenite", - "tungstenite", + "byteorder", ] [[package]] @@ -1308,9 +1310,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b2de95ecb4691949fea4716ca53cdbcfccb2c612e19644a8bad05edcf9f47b" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" dependencies = [ "scopeguard", ] @@ -1671,6 +1673,7 @@ dependencies = [ "serde_json", "sfw-provider-requests", "sha2", + "sled", "sphinx", "tempfile", "tokio 0.2.16", @@ -1760,10 +1763,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.6.2", "rustc_version", ] +[[package]] +name = "parking_lot" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" +dependencies = [ + "lock_api", + "parking_lot_core 0.7.1", +] + [[package]] name = "parking_lot_core" version = "0.6.2" @@ -1779,6 +1792,20 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "parking_lot_core" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e136c1904604defe99ce5fd71a28d473fa60a12255d511aa78a9ddf11237aeb" +dependencies = [ + "cfg-if", + "cloudabi", + "libc", + "redox_syscall", + "smallvec 1.3.0", + "winapi 0.3.8", +] + [[package]] name = "pem" version = "0.7.0" @@ -1975,8 +2002,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" dependencies = [ "proc-macro-error-attr", - "proc-macro2 1.0.10", - "quote 1.0.3", + "proc-macro2", + "quote", "syn", "version_check 0.9.1", ] @@ -1987,8 +2014,8 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" dependencies = [ - "proc-macro2 1.0.10", - "quote 1.0.3", + "proc-macro2", + "quote", "syn", "syn-mid", "version_check 0.9.1", @@ -2006,15 +2033,6 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" -[[package]] -name = "proc-macro2" -version = "0.4.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" -dependencies = [ - "unicode-xid 0.1.0", -] - [[package]] name = "proc-macro2" version = "1.0.10" @@ -2080,15 +2098,6 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" -[[package]] -name = "quote" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce23b6b870e8f94f81fb0a363d65d86675884b34a09043c81e5562f11c1f8e1" -dependencies = [ - "proc-macro2 0.4.30", -] - [[package]] name = "quote" version = "1.0.3" @@ -2559,10 +2568,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] -name = "slice_as_array" -version = "1.1.0" +name = "sled" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64c963ee59ddedb5ab95dc2cd97c48b4a292572a52c5636fbbabdb9985bfe4c3" +checksum = "8fb6824dde66ad33bf20c6e8476f5b82b871bc8bc3c129a10ea2f7dae5060fa3" +dependencies = [ + "crc32fast", + "crossbeam-epoch", + "crossbeam-utils", + "fs2", + "fxhash", + "libc", + "log 0.4.8", + "parking_lot 0.10.2", +] [[package]] name = "smallvec" @@ -2872,7 +2891,7 @@ dependencies = [ "log 0.4.8", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.9.0", "slab", "tokio-executor", "tokio-io", diff --git a/sfw-provider/Cargo.toml b/sfw-provider/Cargo.toml index e4181a5a8d8..c1e514fd936 100644 --- a/sfw-provider/Cargo.toml +++ b/sfw-provider/Cargo.toml @@ -21,6 +21,7 @@ tokio = { version = "0.2.4", features = ["full"] } sha2 = "0.8.0" serde = { version = "1.0.104", features = ["derive"] } serde_json = "1.0.44" +sled = "0.31" hmac = "0.7.1" ## internal diff --git a/sfw-provider/src/config/mod.rs b/sfw-provider/src/config/mod.rs index 3ceaef14ff6..3dafee8ed7a 100644 --- a/sfw-provider/src/config/mod.rs +++ b/sfw-provider/src/config/mod.rs @@ -459,7 +459,7 @@ impl ClientsEndpoint { } fn default_ledger_path(id: &str) -> PathBuf { - Config::default_data_directory(Some(id)).join("client_ledger.dat") + Config::default_data_directory(Some(id)).join("client_ledger.sled") } } diff --git a/sfw-provider/src/main.rs b/sfw-provider/src/main.rs index 93c31867f6c..7c46b7550af 100644 --- a/sfw-provider/src/main.rs +++ b/sfw-provider/src/main.rs @@ -79,5 +79,6 @@ fn setup_logging() { .filter_module("reqwest", log::LevelFilter::Warn) .filter_module("mio", log::LevelFilter::Warn) .filter_module("want", log::LevelFilter::Warn) + .filter_module("sled", log::LevelFilter::Warn) .init(); } diff --git a/sfw-provider/src/provider/client_handling/ledger.rs b/sfw-provider/src/provider/client_handling/ledger.rs index d46a5420cc2..374026ba06b 100644 --- a/sfw-provider/src/provider/client_handling/ledger.rs +++ b/sfw-provider/src/provider/client_handling/ledger.rs @@ -13,76 +13,151 @@ // limitations under the License. use directory_client::presence::providers::MixProviderClient; -use futures::lock::Mutex; -use sfw_provider_requests::auth_token::AuthToken; +use log::*; +use sfw_provider_requests::auth_token::{AuthToken, AUTH_TOKEN_SIZE}; +use sphinx::constants::DESTINATION_ADDRESS_LENGTH; use sphinx::route::DestinationAddressBytes; -use std::collections::HashMap; -use std::io; use std::path::PathBuf; -use std::sync::Arc; + +#[derive(Debug)] +pub(crate) enum ClientLedgerError { + DbReadError(sled::Error), + DbWriteError(sled::Error), + DbOpenError(sled::Error), +} #[derive(Debug, Clone)] // Note: you should NEVER create more than a single instance of this using 'new()'. // You should always use .clone() to create additional instances -pub struct ClientLedger { - inner: Arc>, +pub(crate) struct ClientLedger { + db: sled::Db, } impl ClientLedger { - pub(crate) fn new() -> Self { - ClientLedger { - inner: Arc::new(Mutex::new(ClientLedgerInner(HashMap::new()))), + pub(crate) fn load(file: PathBuf) -> Result { + let db = match sled::open(file) { + Err(e) => return Err(ClientLedgerError::DbOpenError(e)), + Ok(db) => db, + }; + + let ledger = ClientLedger { db }; + + ledger.db.iter().keys().for_each(|key| { + println!( + "key: {:?}", + ledger + .read_destination_address_bytes(key.unwrap()) + .to_base58_string() + ); + }); + + Ok(ledger) + } + + fn read_auth_token(&self, raw_token: sled::IVec) -> AuthToken { + let token_bytes_ref = raw_token.as_ref(); + // if this fails it means we have some database corruption and we + // absolutely can't continue + if token_bytes_ref.len() != AUTH_TOKEN_SIZE { + error!("CLIENT LEDGER DATA CORRUPTION - TOKEN HAS INVALID LENGTH"); + panic!("CLIENT LEDGER DATA CORRUPTION - TOKEN HAS INVALID LENGTH"); + } + + let mut token_bytes = [0u8; AUTH_TOKEN_SIZE]; + token_bytes.copy_from_slice(token_bytes_ref); + AuthToken::from_bytes(token_bytes) + } + + fn read_destination_address_bytes( + &self, + raw_destination: sled::IVec, + ) -> DestinationAddressBytes { + let destination_ref = raw_destination.as_ref(); + // if this fails it means we have some database corruption and we + // absolutely can't continue + if destination_ref.len() != DESTINATION_ADDRESS_LENGTH { + error!("CLIENT LEDGER DATA CORRUPTION - CLIENT ADDRESS HAS INVALID LENGTH"); + panic!("CLIENT LEDGER DATA CORRUPTION - CLIENT ADDRESS HAS INVALID LENGTH"); } + + let mut destination_bytes = [0u8; DESTINATION_ADDRESS_LENGTH]; + destination_bytes.copy_from_slice(destination_ref); + DestinationAddressBytes::from_bytes(destination_bytes) } - pub(crate) async fn verify_token( + pub(crate) fn verify_token( &self, auth_token: &AuthToken, client_address: &DestinationAddressBytes, - ) -> bool { - match self.inner.lock().await.0.get(client_address) { - None => false, - Some(expected_token) => expected_token == auth_token, + ) -> Result { + match self.db.get(&client_address.to_bytes()) { + Err(e) => Err(ClientLedgerError::DbReadError(e)), + Ok(token) => match token { + Some(token_ivec) => Ok(&self.read_auth_token(token_ivec) == auth_token), + None => Ok(false), + }, } } - pub(crate) async fn insert_token( + pub(crate) fn insert_token( &mut self, auth_token: AuthToken, client_address: DestinationAddressBytes, - ) -> Option { - self.inner.lock().await.0.insert(client_address, auth_token) + ) -> Result, ClientLedgerError> { + let insertion_result = match self + .db + .insert(&client_address.to_bytes(), &auth_token.to_bytes()) + { + Err(e) => Err(ClientLedgerError::DbWriteError(e)), + Ok(existing_token) => { + Ok(existing_token.map(|existing_token| self.read_auth_token(existing_token))) + } + }; + + // registration doesn't happen that often so might as well flush it to the disk to be sure + self.db.flush().unwrap(); + insertion_result } - pub(crate) async fn remove_token( + pub(crate) fn remove_token( &mut self, client_address: &DestinationAddressBytes, - ) -> Option { - self.inner.lock().await.0.remove(client_address) - } + ) -> Result, ClientLedgerError> { + let removal_result = match self.db.remove(&client_address.to_bytes()) { + Err(e) => Err(ClientLedgerError::DbWriteError(e)), + Ok(existing_token) => { + Ok(existing_token.map(|existing_token| self.read_auth_token(existing_token))) + } + }; - pub(crate) async fn current_clients(&self) -> Vec { - self.inner - .lock() - .await - .0 - .keys() - .map(|client_address| client_address.to_base58_string()) - .map(|pub_key| MixProviderClient { pub_key }) - .collect() + // removing of tokens happens extremely rarely, so flush is also fine here + self.db.flush().unwrap(); + removal_result } - #[allow(dead_code)] - pub(crate) fn load(_file: PathBuf) -> Self { - // TODO: actual loading, - // temporarily just create a new one - Self::new() + pub(crate) fn current_clients(&self) -> Result, ClientLedgerError> { + let clients = self.db.iter().keys(); + + let mut client_vec = Vec::new(); + for client in clients { + match client { + Err(e) => return Err(ClientLedgerError::DbWriteError(e)), + Ok(client_entry) => client_vec.push(MixProviderClient { + pub_key: self + .read_destination_address_bytes(client_entry) + .to_base58_string(), + }), + } + } + + Ok(client_vec) } - #[allow(dead_code)] - pub(crate) async fn save(&self, _file: PathBuf) -> io::Result<()> { - unimplemented!() + #[cfg(test)] + pub(crate) fn create_temporary() -> Self { + let cfg = sled::Config::new().temporary(true); + ClientLedger { + db: cfg.open().unwrap(), + } } } - -struct ClientLedgerInner(HashMap); diff --git a/sfw-provider/src/provider/client_handling/request_processing.rs b/sfw-provider/src/provider/client_handling/request_processing.rs index 57861afbc0c..c2c2452edc4 100644 --- a/sfw-provider/src/provider/client_handling/request_processing.rs +++ b/sfw-provider/src/provider/client_handling/request_processing.rs @@ -105,7 +105,7 @@ impl RequestProcessor { if self .client_ledger .insert_token(auth_token.clone(), req.destination_address.clone()) - .await + .unwrap() .is_some() { info!( @@ -121,7 +121,7 @@ impl RequestProcessor { // we must revert our changes if this operation failed self.client_ledger .remove_token(&req.destination_address) - .await; + .unwrap(); } Ok(ClientProcessingResult::RegisterResponse(auth_token)) @@ -143,10 +143,14 @@ impl RequestProcessor { &self, req: PullRequest, ) -> Result { + debug!( + "Processing a pull request from {:?}", + req.destination_address.to_base58_string() + ); if self .client_ledger .verify_token(&req.auth_token, &req.destination_address) - .await + .unwrap() { let retrieved_messages = self .client_storage @@ -176,7 +180,7 @@ mod generating_new_auth_token { let request_processor = RequestProcessor { secret_key: Arc::new(key), client_storage: ClientStorage::new(3, 16, Default::default()), - client_ledger: ClientLedger::new(), + client_ledger: ClientLedger::create_temporary(), max_request_size: 42, }; @@ -195,14 +199,14 @@ mod generating_new_auth_token { let request_processor1 = RequestProcessor { secret_key: Arc::new(key1), client_storage: ClientStorage::new(3, 16, Default::default()), - client_ledger: ClientLedger::new(), + client_ledger: ClientLedger::create_temporary(), max_request_size: 42, }; let request_processor2 = RequestProcessor { secret_key: Arc::new(key2), client_storage: ClientStorage::new(3, 16, Default::default()), - client_ledger: ClientLedger::new(), + client_ledger: ClientLedger::create_temporary(), max_request_size: 42, }; diff --git a/sfw-provider/src/provider/mod.rs b/sfw-provider/src/provider/mod.rs index ec17c977659..a5dcdc43ecb 100644 --- a/sfw-provider/src/provider/mod.rs +++ b/sfw-provider/src/provider/mod.rs @@ -49,7 +49,10 @@ impl ServiceProvider { pub fn new(config: Config) -> Self { let sphinx_keypair = Self::load_sphinx_keys(&config); - let registered_clients_ledger = ClientLedger::load(config.get_clients_ledger_path()); + let registered_clients_ledger = match ClientLedger::load(config.get_clients_ledger_path()) { + Err(e) => panic!(format!("Failed to load the ledger - {:?}", e)), + Ok(ledger) => ledger, + }; ServiceProvider { runtime: Runtime::new().unwrap(), config, diff --git a/sfw-provider/src/provider/presence.rs b/sfw-provider/src/provider/presence.rs index 78251b7efbc..06bd6f8dfb1 100644 --- a/sfw-provider/src/provider/presence.rs +++ b/sfw-provider/src/provider/presence.rs @@ -62,7 +62,7 @@ pub struct Notifier { } impl Notifier { - pub fn new(config: NotifierConfig, client_ledger: ClientLedger) -> Notifier { + pub(crate) fn new(config: NotifierConfig, client_ledger: ClientLedger) -> Notifier { let directory_client_cfg = directory_client::Config { base_url: config.directory_server, }; @@ -85,7 +85,7 @@ impl Notifier { client_listener: self.client_listener.clone(), mixnet_listener: self.mixnet_listener.clone(), pub_key: self.pub_key_string.clone(), - registered_clients: self.client_ledger.current_clients().await, + registered_clients: self.client_ledger.current_clients().unwrap(), last_seen: 0, version: built_info::PKG_VERSION.to_string(), }