Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/persistent ledger #187

Merged
merged 11 commits into from
Apr 17, 2020
103 changes: 61 additions & 42 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sfw-provider/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ tokio = { version = "0.2.4", features = ["full"] }
sha2 = "0.8.0"
serde = { version = "1.0.104", features = ["derive"] }
serde_json = "1.0.44"
sled = "0.31"
hmac = "0.7.1"

## internal
2 changes: 1 addition & 1 deletion sfw-provider/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -459,7 +459,7 @@ impl ClientsEndpoint {
}

fn default_ledger_path(id: &str) -> PathBuf {
Config::default_data_directory(Some(id)).join("client_ledger.dat")
Config::default_data_directory(Some(id)).join("client_ledger.sled")
}
}

1 change: 1 addition & 0 deletions sfw-provider/src/main.rs
Original file line number Diff line number Diff line change
@@ -79,5 +79,6 @@ fn setup_logging() {
.filter_module("reqwest", log::LevelFilter::Warn)
.filter_module("mio", log::LevelFilter::Warn)
.filter_module("want", log::LevelFilter::Warn)
.filter_module("sled", log::LevelFilter::Warn)
.init();
}
157 changes: 116 additions & 41 deletions sfw-provider/src/provider/client_handling/ledger.rs
Original file line number Diff line number Diff line change
@@ -13,76 +13,151 @@
// limitations under the License.

use directory_client::presence::providers::MixProviderClient;
use futures::lock::Mutex;
use sfw_provider_requests::auth_token::AuthToken;
use log::*;
use sfw_provider_requests::auth_token::{AuthToken, AUTH_TOKEN_SIZE};
use sphinx::constants::DESTINATION_ADDRESS_LENGTH;
use sphinx::route::DestinationAddressBytes;
use std::collections::HashMap;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;

#[derive(Debug)]
pub(crate) enum ClientLedgerError {
DbReadError(sled::Error),
DbWriteError(sled::Error),
DbOpenError(sled::Error),
}

#[derive(Debug, Clone)]
// Note: you should NEVER create more than a single instance of this using 'new()'.
// You should always use .clone() to create additional instances
pub struct ClientLedger {
inner: Arc<Mutex<ClientLedgerInner>>,
pub(crate) struct ClientLedger {
db: sled::Db,
}

impl ClientLedger {
pub(crate) fn new() -> Self {
ClientLedger {
inner: Arc::new(Mutex::new(ClientLedgerInner(HashMap::new()))),
pub(crate) fn load(file: PathBuf) -> Result<Self, ClientLedgerError> {
let db = match sled::open(file) {
Err(e) => return Err(ClientLedgerError::DbOpenError(e)),
Ok(db) => db,
};

let ledger = ClientLedger { db };

ledger.db.iter().keys().for_each(|key| {
println!(
"key: {:?}",
ledger
.read_destination_address_bytes(key.unwrap())
.to_base58_string()
);
});

Ok(ledger)
}

fn read_auth_token(&self, raw_token: sled::IVec) -> AuthToken {
let token_bytes_ref = raw_token.as_ref();
// if this fails it means we have some database corruption and we
// absolutely can't continue
if token_bytes_ref.len() != AUTH_TOKEN_SIZE {
error!("CLIENT LEDGER DATA CORRUPTION - TOKEN HAS INVALID LENGTH");
panic!("CLIENT LEDGER DATA CORRUPTION - TOKEN HAS INVALID LENGTH");
}

let mut token_bytes = [0u8; AUTH_TOKEN_SIZE];
token_bytes.copy_from_slice(token_bytes_ref);
AuthToken::from_bytes(token_bytes)
}

fn read_destination_address_bytes(
&self,
raw_destination: sled::IVec,
) -> DestinationAddressBytes {
let destination_ref = raw_destination.as_ref();
// if this fails it means we have some database corruption and we
// absolutely can't continue
if destination_ref.len() != DESTINATION_ADDRESS_LENGTH {
error!("CLIENT LEDGER DATA CORRUPTION - CLIENT ADDRESS HAS INVALID LENGTH");
panic!("CLIENT LEDGER DATA CORRUPTION - CLIENT ADDRESS HAS INVALID LENGTH");
}

let mut destination_bytes = [0u8; DESTINATION_ADDRESS_LENGTH];
destination_bytes.copy_from_slice(destination_ref);
DestinationAddressBytes::from_bytes(destination_bytes)
}

pub(crate) async fn verify_token(
pub(crate) fn verify_token(
&self,
auth_token: &AuthToken,
client_address: &DestinationAddressBytes,
) -> bool {
match self.inner.lock().await.0.get(client_address) {
None => false,
Some(expected_token) => expected_token == auth_token,
) -> Result<bool, ClientLedgerError> {
match self.db.get(&client_address.to_bytes()) {
Err(e) => Err(ClientLedgerError::DbReadError(e)),
Ok(token) => match token {
Some(token_ivec) => Ok(&self.read_auth_token(token_ivec) == auth_token),
None => Ok(false),
},
}
}

pub(crate) async fn insert_token(
pub(crate) fn insert_token(
&mut self,
auth_token: AuthToken,
client_address: DestinationAddressBytes,
) -> Option<AuthToken> {
self.inner.lock().await.0.insert(client_address, auth_token)
) -> Result<Option<AuthToken>, ClientLedgerError> {
let insertion_result = match self
.db
.insert(&client_address.to_bytes(), &auth_token.to_bytes())
{
Err(e) => Err(ClientLedgerError::DbWriteError(e)),
Ok(existing_token) => {
Ok(existing_token.map(|existing_token| self.read_auth_token(existing_token)))
}
};

// registration doesn't happen that often so might as well flush it to the disk to be sure
self.db.flush().unwrap();
insertion_result
}

pub(crate) async fn remove_token(
pub(crate) fn remove_token(
&mut self,
client_address: &DestinationAddressBytes,
) -> Option<AuthToken> {
self.inner.lock().await.0.remove(client_address)
}
) -> Result<Option<AuthToken>, ClientLedgerError> {
let removal_result = match self.db.remove(&client_address.to_bytes()) {
Err(e) => Err(ClientLedgerError::DbWriteError(e)),
Ok(existing_token) => {
Ok(existing_token.map(|existing_token| self.read_auth_token(existing_token)))
}
};

pub(crate) async fn current_clients(&self) -> Vec<MixProviderClient> {
self.inner
.lock()
.await
.0
.keys()
.map(|client_address| client_address.to_base58_string())
.map(|pub_key| MixProviderClient { pub_key })
.collect()
// removing of tokens happens extremely rarely, so flush is also fine here
self.db.flush().unwrap();
removal_result
}

#[allow(dead_code)]
pub(crate) fn load(_file: PathBuf) -> Self {
// TODO: actual loading,
// temporarily just create a new one
Self::new()
pub(crate) fn current_clients(&self) -> Result<Vec<MixProviderClient>, ClientLedgerError> {
let clients = self.db.iter().keys();

let mut client_vec = Vec::new();
for client in clients {
match client {
Err(e) => return Err(ClientLedgerError::DbWriteError(e)),
Ok(client_entry) => client_vec.push(MixProviderClient {
pub_key: self
.read_destination_address_bytes(client_entry)
.to_base58_string(),
}),
}
}

Ok(client_vec)
}

#[allow(dead_code)]
pub(crate) async fn save(&self, _file: PathBuf) -> io::Result<()> {
unimplemented!()
#[cfg(test)]
pub(crate) fn create_temporary() -> Self {
let cfg = sled::Config::new().temporary(true);
ClientLedger {
db: cfg.open().unwrap(),
}
}
}

struct ClientLedgerInner(HashMap<DestinationAddressBytes, AuthToken>);
16 changes: 10 additions & 6 deletions sfw-provider/src/provider/client_handling/request_processing.rs
Original file line number Diff line number Diff line change
@@ -105,7 +105,7 @@ impl RequestProcessor {
if self
.client_ledger
.insert_token(auth_token.clone(), req.destination_address.clone())
.await
.unwrap()
.is_some()
{
info!(
@@ -121,7 +121,7 @@ impl RequestProcessor {
// we must revert our changes if this operation failed
self.client_ledger
.remove_token(&req.destination_address)
.await;
.unwrap();
}

Ok(ClientProcessingResult::RegisterResponse(auth_token))
@@ -143,10 +143,14 @@ impl RequestProcessor {
&self,
req: PullRequest,
) -> Result<ClientProcessingResult, ClientProcessingError> {
debug!(
"Processing a pull request from {:?}",
req.destination_address.to_base58_string()
);
if self
.client_ledger
.verify_token(&req.auth_token, &req.destination_address)
.await
.unwrap()
{
let retrieved_messages = self
.client_storage
@@ -176,7 +180,7 @@ mod generating_new_auth_token {
let request_processor = RequestProcessor {
secret_key: Arc::new(key),
client_storage: ClientStorage::new(3, 16, Default::default()),
client_ledger: ClientLedger::new(),
client_ledger: ClientLedger::create_temporary(),
max_request_size: 42,
};

@@ -195,14 +199,14 @@ mod generating_new_auth_token {
let request_processor1 = RequestProcessor {
secret_key: Arc::new(key1),
client_storage: ClientStorage::new(3, 16, Default::default()),
client_ledger: ClientLedger::new(),
client_ledger: ClientLedger::create_temporary(),
max_request_size: 42,
};

let request_processor2 = RequestProcessor {
secret_key: Arc::new(key2),
client_storage: ClientStorage::new(3, 16, Default::default()),
client_ledger: ClientLedger::new(),
client_ledger: ClientLedger::create_temporary(),
max_request_size: 42,
};

5 changes: 4 additions & 1 deletion sfw-provider/src/provider/mod.rs
Original file line number Diff line number Diff line change
@@ -49,7 +49,10 @@ impl ServiceProvider {

pub fn new(config: Config) -> Self {
let sphinx_keypair = Self::load_sphinx_keys(&config);
let registered_clients_ledger = ClientLedger::load(config.get_clients_ledger_path());
let registered_clients_ledger = match ClientLedger::load(config.get_clients_ledger_path()) {
Err(e) => panic!(format!("Failed to load the ledger - {:?}", e)),
Ok(ledger) => ledger,
};
ServiceProvider {
runtime: Runtime::new().unwrap(),
config,
4 changes: 2 additions & 2 deletions sfw-provider/src/provider/presence.rs
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ pub struct Notifier {
}

impl Notifier {
pub fn new(config: NotifierConfig, client_ledger: ClientLedger) -> Notifier {
pub(crate) fn new(config: NotifierConfig, client_ledger: ClientLedger) -> Notifier {
let directory_client_cfg = directory_client::Config {
base_url: config.directory_server,
};
@@ -85,7 +85,7 @@ impl Notifier {
client_listener: self.client_listener.clone(),
mixnet_listener: self.mixnet_listener.clone(),
pub_key: self.pub_key_string.clone(),
registered_clients: self.client_ledger.current_clients().await,
registered_clients: self.client_ledger.current_clients().unwrap(),
last_seen: 0,
version: built_info::PKG_VERSION.to_string(),
}