From 9de1afeeb68d4d5a8717ff8ecc0e9238e52905b6 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 6 Jun 2019 13:35:06 +0300 Subject: [PATCH] SecretStore: non-blocking wait of session completion (#10303) * make SS sessions return future * fix grumbles * do not create unused Condvar in production mode --- secret-store/src/key_server.rs | 384 +++++++++++++----- .../key_version_negotiation_session.rs | 42 +- .../servers_set_change_session.rs | 46 ++- .../admin_sessions/share_add_session.rs | 29 +- .../admin_sessions/share_change_session.rs | 2 +- .../client_sessions/decryption_session.rs | 48 ++- .../client_sessions/encryption_session.rs | 39 +- .../client_sessions/generation_session.rs | 50 ++- .../client_sessions/signing_session_ecdsa.rs | 36 +- .../signing_session_schnorr.rs | 48 ++- .../src/key_server_cluster/cluster.rs | 274 ++++++++++--- .../cluster_message_processor.rs | 24 +- .../key_server_cluster/cluster_sessions.rs | 118 +++++- .../cluster_sessions_creator.rs | 145 +++++-- .../connection_trigger_with_migration.rs | 7 +- secret-store/src/key_server_cluster/mod.rs | 2 +- secret-store/src/listener/http_listener.rs | 236 +++++------ secret-store/src/listener/mod.rs | 62 ++- .../src/listener/service_contract_listener.rs | 8 +- secret-store/src/traits.rs | 62 ++- 20 files changed, 1133 insertions(+), 529 deletions(-) diff --git a/secret-store/src/key_server.rs b/secret-store/src/key_server.rs index 5418ec00a3c..f93f92d5a43 100644 --- a/secret-store/src/key_server.rs +++ b/secret-store/src/key_server.rs @@ -16,14 +16,15 @@ use std::collections::BTreeSet; use std::sync::Arc; +use futures::{future::{err, result}, Future}; use parking_lot::Mutex; use crypto::DEFAULT_MAC; -use ethkey::crypto; +use ethkey::{crypto, public_to_address}; use parity_runtime::Executor; use super::acl_storage::AclStorage; use super::key_storage::KeyStorage; use super::key_server_set::KeyServerSet; -use key_server_cluster::{math, new_network_cluster}; +use key_server_cluster::{math, new_network_cluster, ClusterSession, WaitableSession}; use traits::{AdminSessionsServer, ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer, NodeKeyPair}; use types::{Error, Public, RequestSignature, Requester, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow, ClusterConfiguration, MessageHash, EncryptedMessageSignature, NodeId}; @@ -58,132 +59,212 @@ impl KeyServerImpl { impl KeyServer for KeyServerImpl {} impl AdminSessionsServer for KeyServerImpl { - fn change_servers_set(&self, old_set_signature: RequestSignature, new_set_signature: RequestSignature, new_servers_set: BTreeSet) -> Result<(), Error> { - let servers_set_change_session = self.data.lock().cluster - .new_servers_set_change_session(None, None, new_servers_set, old_set_signature, new_set_signature)?; - servers_set_change_session.as_servers_set_change() - .expect("new_servers_set_change_session creates servers_set_change_session; qed") - .wait().map_err(Into::into) + fn change_servers_set( + &self, + old_set_signature: RequestSignature, + new_set_signature: RequestSignature, + new_servers_set: BTreeSet, + ) -> Box + Send> { + return_session(self.data.lock().cluster + .new_servers_set_change_session(None, None, new_servers_set, old_set_signature, new_set_signature)) } } impl ServerKeyGenerator for KeyServerImpl { - fn generate_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result { - // recover requestor' public key from signature - let address = author.address(key_id).map_err(Error::InsufficientRequesterData)?; + fn generate_key( + &self, + key_id: ServerKeyId, + author: Requester, + threshold: usize, + ) -> Box + Send> { + // recover requestor' address key from signature + let address = author.address(&key_id).map_err(Error::InsufficientRequesterData); // generate server key - let generation_session = self.data.lock().cluster.new_generation_session(key_id.clone(), None, address, threshold)?; - generation_session.wait(None) - .expect("when wait is called without timeout it always returns Some; qed") - .map_err(Into::into) + return_session(address.and_then(|address| self.data.lock().cluster + .new_generation_session(key_id, None, address, threshold))) } - fn restore_key_public(&self, key_id: &ServerKeyId, author: &Requester) -> Result { + fn restore_key_public( + &self, + key_id: ServerKeyId, + author: Requester, + ) -> Box + Send> { // recover requestor' public key from signature - let address = author.address(key_id).map_err(Error::InsufficientRequesterData)?; + let session_and_address = author + .address(&key_id) + .map_err(Error::InsufficientRequesterData) + .and_then(|address| self.data.lock().cluster.new_key_version_negotiation_session(key_id) + .map(|session| (session, address))); + let (session, address) = match session_and_address { + Ok((session, address)) => (session, address), + Err(error) => return Box::new(err(error)), + }; // negotiate key version && retrieve common key data - let negotiation_session = self.data.lock().cluster.new_key_version_negotiation_session(*key_id)?; - negotiation_session.wait() - .and_then(|_| negotiation_session.common_key_data()) - .and_then(|key_share| if key_share.author == address { + let core_session = session.session.clone(); + Box::new(session.into_wait_future() + .and_then(move |_| core_session.common_key_data() + .map(|key_share| (key_share, address))) + .and_then(|(key_share, address)| if key_share.author == address { Ok(key_share.public) } else { Err(Error::AccessDenied) - }) - .map_err(Into::into) + })) } } impl DocumentKeyServer for KeyServerImpl { - fn store_document_key(&self, key_id: &ServerKeyId, author: &Requester, common_point: Public, encrypted_document_key: Public) -> Result<(), Error> { + fn store_document_key( + &self, + key_id: ServerKeyId, + author: Requester, + common_point: Public, + encrypted_document_key: Public, + ) -> Box + Send> { // store encrypted key - let encryption_session = self.data.lock().cluster.new_encryption_session(key_id.clone(), - author.clone(), common_point, encrypted_document_key)?; - encryption_session.wait(None).map_err(Into::into) + return_session(self.data.lock().cluster.new_encryption_session(key_id, + author.clone(), common_point, encrypted_document_key)) } - fn generate_document_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result { + fn generate_document_key( + &self, + key_id: ServerKeyId, + author: Requester, + threshold: usize, + ) -> Box + Send> { // recover requestor' public key from signature - let public = author.public(key_id).map_err(Error::InsufficientRequesterData)?; + let public = result(author.public(&key_id).map_err(Error::InsufficientRequesterData)); // generate server key - let server_key = self.generate_key(key_id, author, threshold)?; + let data = self.data.clone(); + let server_key = public.and_then(move |public| { + let data = data.lock(); + let session = data.cluster.new_generation_session(key_id, None, public_to_address(&public), threshold); + result(session.map(|session| (public, session))) + }) + .and_then(|(public, session)| session.into_wait_future().map(move |server_key| (public, server_key))); // generate random document key - let document_key = math::generate_random_point()?; - let encrypted_document_key = math::encrypt_secret(&document_key, &server_key)?; + let document_key = server_key.and_then(|(public, server_key)| + result(math::generate_random_point() + .and_then(|document_key| math::encrypt_secret(&document_key, &server_key) + .map(|encrypted_document_key| (public, document_key, encrypted_document_key)))) + ); // store document key in the storage - self.store_document_key(key_id, author, encrypted_document_key.common_point, encrypted_document_key.encrypted_point)?; + let data = self.data.clone(); + let stored_document_key = document_key.and_then(move |(public, document_key, encrypted_document_key)| { + let data = data.lock(); + let session = data.cluster.new_encryption_session(key_id, + author.clone(), encrypted_document_key.common_point, encrypted_document_key.encrypted_point); + result(session.map(|session| (public, document_key, session))) + }) + .and_then(|(public, document_key, session)| session.into_wait_future().map(move |_| (public, document_key))); // encrypt document key with requestor public key - let document_key = crypto::ecies::encrypt(&public, &DEFAULT_MAC, document_key.as_bytes()) - .map_err(|err| Error::Internal(format!("Error encrypting document key: {}", err)))?; - Ok(document_key) + let encrypted_document_key = stored_document_key + .and_then(|(public, document_key)| crypto::ecies::encrypt(&public, &DEFAULT_MAC, document_key.as_bytes()) + .map_err(|err| Error::Internal(format!("Error encrypting document key: {}", err)))); + + Box::new(encrypted_document_key) } - fn restore_document_key(&self, key_id: &ServerKeyId, requester: &Requester) -> Result { + fn restore_document_key( + &self, + key_id: ServerKeyId, + requester: Requester, + ) -> Box + Send> { // recover requestor' public key from signature - let public = requester.public(key_id).map_err(Error::InsufficientRequesterData)?; + let public = result(requester.public(&key_id).map_err(Error::InsufficientRequesterData)); // decrypt document key - let decryption_session = self.data.lock().cluster.new_decryption_session(key_id.clone(), - None, requester.clone(), None, false, false)?; - let document_key = decryption_session.wait(None) - .expect("when wait is called without timeout it always returns Some; qed")? - .decrypted_secret; + let data = self.data.clone(); + let stored_document_key = public.and_then(move |public| { + let data = data.lock(); + let session = data.cluster.new_decryption_session(key_id, None, requester.clone(), None, false, false); + result(session.map(|session| (public, session))) + }) + .and_then(|(public, session)| session.into_wait_future().map(move |document_key| (public, document_key))); // encrypt document key with requestor public key - let document_key = crypto::ecies::encrypt(&public, &DEFAULT_MAC, document_key.as_bytes()) - .map_err(|err| Error::Internal(format!("Error encrypting document key: {}", err)))?; - Ok(document_key) + let encrypted_document_key = stored_document_key + .and_then(|(public, document_key)| + crypto::ecies::encrypt(&public, &DEFAULT_MAC, document_key.decrypted_secret.as_bytes()) + .map_err(|err| Error::Internal(format!("Error encrypting document key: {}", err)))); + + Box::new(encrypted_document_key) } - fn restore_document_key_shadow(&self, key_id: &ServerKeyId, requester: &Requester) -> Result { - let decryption_session = self.data.lock().cluster.new_decryption_session(key_id.clone(), - None, requester.clone(), None, true, false)?; - decryption_session.wait(None) - .expect("when wait is called without timeout it always returns Some; qed") - .map_err(Into::into) + fn restore_document_key_shadow( + &self, + key_id: ServerKeyId, + requester: Requester, + ) -> Box + Send> { + return_session(self.data.lock().cluster.new_decryption_session(key_id, + None, requester.clone(), None, true, false)) } } impl MessageSigner for KeyServerImpl { - fn sign_message_schnorr(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result { + fn sign_message_schnorr( + &self, + key_id: ServerKeyId, + requester: Requester, + message: MessageHash, + ) -> Box + Send> { // recover requestor' public key from signature - let public = requester.public(key_id).map_err(Error::InsufficientRequesterData)?; + let public = result(requester.public(&key_id).map_err(Error::InsufficientRequesterData)); // sign message - let signing_session = self.data.lock().cluster.new_schnorr_signing_session(key_id.clone(), - requester.clone().into(), None, message)?; - let message_signature = signing_session.wait()?; + let data = self.data.clone(); + let signature = public.and_then(move |public| { + let data = data.lock(); + let session = data.cluster.new_schnorr_signing_session(key_id, requester.clone().into(), None, message); + result(session.map(|session| (public, session))) + }) + .and_then(|(public, session)| session.into_wait_future().map(move |signature| (public, signature))); // compose two message signature components into single one - let mut combined_signature = [0; 64]; - combined_signature[..32].clone_from_slice(message_signature.0.as_bytes()); - combined_signature[32..].clone_from_slice(message_signature.1.as_bytes()); - - // encrypt combined signature with requestor public key - let message_signature = crypto::ecies::encrypt(&public, &DEFAULT_MAC, &combined_signature) - .map_err(|err| Error::Internal(format!("Error encrypting message signature: {}", err)))?; - Ok(message_signature) + let combined_signature = signature.map(|(public, signature)| { + let mut combined_signature = [0; 64]; + combined_signature[..32].clone_from_slice(signature.0.as_bytes()); + combined_signature[32..].clone_from_slice(signature.1.as_bytes()); + (public, combined_signature) + }); + + // encrypt signature with requestor public key + let encrypted_signature = combined_signature + .and_then(|(public, combined_signature)| crypto::ecies::encrypt(&public, &DEFAULT_MAC, &combined_signature) + .map_err(|err| Error::Internal(format!("Error encrypting message signature: {}", err)))); + + Box::new(encrypted_signature) } - fn sign_message_ecdsa(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result { + fn sign_message_ecdsa( + &self, + key_id: ServerKeyId, + requester: Requester, + message: MessageHash, + ) -> Box + Send> { // recover requestor' public key from signature - let public = requester.public(key_id).map_err(Error::InsufficientRequesterData)?; + let public = result(requester.public(&key_id).map_err(Error::InsufficientRequesterData)); // sign message - let signing_session = self.data.lock().cluster.new_ecdsa_signing_session(key_id.clone(), - requester.clone().into(), None, message)?; - let message_signature = signing_session.wait()?; + let data = self.data.clone(); + let signature = public.and_then(move |public| { + let data = data.lock(); + let session = data.cluster.new_ecdsa_signing_session(key_id, requester.clone().into(), None, message); + result(session.map(|session| (public, session))) + }) + .and_then(|(public, session)| session.into_wait_future().map(move |signature| (public, signature))); // encrypt combined signature with requestor public key - let message_signature = crypto::ecies::encrypt(&public, &DEFAULT_MAC, &*message_signature) - .map_err(|err| Error::Internal(format!("Error encrypting message signature: {}", err)))?; - Ok(message_signature) + let encrypted_signature = signature + .and_then(|(public, signature)| crypto::ecies::encrypt(&public, &DEFAULT_MAC, &*signature) + .map_err(|err| Error::Internal(format!("Error encrypting message signature: {}", err)))); + + Box::new(encrypted_signature) } } @@ -215,6 +296,15 @@ impl KeyServerCore { } } +fn return_session( + session: Result, Error>, +) -> Box + Send> { + match session { + Ok(session) => Box::new(session.into_wait_future()), + Err(error) => Box::new(err(error)) + } +} + #[cfg(test)] pub mod tests { use std::collections::BTreeSet; @@ -222,6 +312,7 @@ pub mod tests { use std::sync::Arc; use std::net::SocketAddr; use std::collections::BTreeMap; + use futures::Future; use crypto::DEFAULT_MAC; use ethkey::{self, crypto, Secret, Random, Generator, verify_public}; use acl_storage::DummyAclStorage; @@ -244,45 +335,88 @@ pub mod tests { impl KeyServer for DummyKeyServer {} impl AdminSessionsServer for DummyKeyServer { - fn change_servers_set(&self, _old_set_signature: RequestSignature, _new_set_signature: RequestSignature, _new_servers_set: BTreeSet) -> Result<(), Error> { + fn change_servers_set( + &self, + _old_set_signature: RequestSignature, + _new_set_signature: RequestSignature, + _new_servers_set: BTreeSet, + ) -> Box + Send> { unimplemented!("test-only") } } impl ServerKeyGenerator for DummyKeyServer { - fn generate_key(&self, _key_id: &ServerKeyId, _author: &Requester, _threshold: usize) -> Result { + fn generate_key( + &self, + _key_id: ServerKeyId, + _author: Requester, + _threshold: usize, + ) -> Box + Send> { unimplemented!("test-only") } - fn restore_key_public(&self, _key_id: &ServerKeyId, _author: &Requester) -> Result { + fn restore_key_public( + &self, + _key_id: ServerKeyId, + _author: Requester, + ) -> Box + Send> { unimplemented!("test-only") } } impl DocumentKeyServer for DummyKeyServer { - fn store_document_key(&self, _key_id: &ServerKeyId, _author: &Requester, _common_point: Public, _encrypted_document_key: Public) -> Result<(), Error> { + fn store_document_key( + &self, + _key_id: ServerKeyId, + _author: Requester, + _common_point: Public, + _encrypted_document_key: Public, + ) -> Box + Send> { unimplemented!("test-only") } - fn generate_document_key(&self, _key_id: &ServerKeyId, _author: &Requester, _threshold: usize) -> Result { + fn generate_document_key( + &self, + _key_id: ServerKeyId, + _author: Requester, + _threshold: usize, + ) -> Box + Send> { unimplemented!("test-only") } - fn restore_document_key(&self, _key_id: &ServerKeyId, _requester: &Requester) -> Result { + fn restore_document_key( + &self, + _key_id: ServerKeyId, + _requester: Requester, + ) -> Box + Send> { unimplemented!("test-only") } - fn restore_document_key_shadow(&self, _key_id: &ServerKeyId, _requester: &Requester) -> Result { + fn restore_document_key_shadow( + &self, + _key_id: ServerKeyId, + _requester: Requester, + ) -> Box + Send> { unimplemented!("test-only") } } impl MessageSigner for DummyKeyServer { - fn sign_message_schnorr(&self, _key_id: &ServerKeyId, _requester: &Requester, _message: MessageHash) -> Result { + fn sign_message_schnorr( + &self, + _key_id: ServerKeyId, + _requester: Requester, + _message: MessageHash, + ) -> Box + Send> { unimplemented!("test-only") } - fn sign_message_ecdsa(&self, _key_id: &ServerKeyId, _requester: &Requester, _message: MessageHash) -> Result { + fn sign_message_ecdsa( + &self, + _key_id: ServerKeyId, + _requester: Requester, + _message: MessageHash, + ) -> Box + Send> { unimplemented!("test-only") } } @@ -355,13 +489,20 @@ pub mod tests { let threshold = 0; let document = Random.generate().unwrap().secret().clone(); let secret = Random.generate().unwrap().secret().clone(); - let signature = ethkey::sign(&secret, &document).unwrap(); - let generated_key = key_servers[0].generate_document_key(&document, &signature.clone().into(), threshold).unwrap(); + let signature: Requester = ethkey::sign(&secret, &document).unwrap().into(); + let generated_key = key_servers[0].generate_document_key( + *document, + signature.clone(), + threshold, + ).wait().unwrap(); let generated_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &generated_key).unwrap(); // now let's try to retrieve key back for key_server in key_servers.iter() { - let retrieved_key = key_server.restore_document_key(&document, &signature.clone().into()).unwrap(); + let retrieved_key = key_server.restore_document_key( + *document, + signature.clone(), + ).wait().unwrap(); let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap(); assert_eq!(retrieved_key, generated_key); } @@ -378,13 +519,20 @@ pub mod tests { // generate document key let document = Random.generate().unwrap().secret().clone(); let secret = Random.generate().unwrap().secret().clone(); - let signature = ethkey::sign(&secret, &document).unwrap(); - let generated_key = key_servers[0].generate_document_key(&document, &signature.clone().into(), *threshold).unwrap(); + let signature: Requester = ethkey::sign(&secret, &document).unwrap().into(); + let generated_key = key_servers[0].generate_document_key( + *document, + signature.clone(), + *threshold, + ).wait().unwrap(); let generated_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &generated_key).unwrap(); // now let's try to retrieve key back for (i, key_server) in key_servers.iter().enumerate() { - let retrieved_key = key_server.restore_document_key(&document, &signature.clone().into()).unwrap(); + let retrieved_key = key_server.restore_document_key( + *document, + signature.clone(), + ).wait().unwrap(); let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap(); assert_eq!(retrieved_key, generated_key); @@ -406,20 +554,24 @@ pub mod tests { // generate server key let server_key_id = Random.generate().unwrap().secret().clone(); let requestor_secret = Random.generate().unwrap().secret().clone(); - let signature = ethkey::sign(&requestor_secret, &server_key_id).unwrap(); - let server_public = key_servers[0].generate_key(&server_key_id, &signature.clone().into(), *threshold).unwrap(); + let signature: Requester = ethkey::sign(&requestor_secret, &server_key_id).unwrap().into(); + let server_public = key_servers[0].generate_key( + *server_key_id, + signature.clone(), + *threshold, + ).wait().unwrap(); // generate document key (this is done by KS client so that document key is unknown to any KS) let generated_key = Random.generate().unwrap().public().clone(); let encrypted_document_key = math::encrypt_secret(&generated_key, &server_public).unwrap(); // store document key - key_servers[0].store_document_key(&server_key_id, &signature.clone().into(), - encrypted_document_key.common_point, encrypted_document_key.encrypted_point).unwrap(); + key_servers[0].store_document_key(*server_key_id, signature.clone(), + encrypted_document_key.common_point, encrypted_document_key.encrypted_point).wait().unwrap(); // now let's try to retrieve key back for key_server in key_servers.iter() { - let retrieved_key = key_server.restore_document_key(&server_key_id, &signature.clone().into()).unwrap(); + let retrieved_key = key_server.restore_document_key(*server_key_id, signature.clone()).wait().unwrap(); let retrieved_key = crypto::ecies::decrypt(&requestor_secret, &DEFAULT_MAC, &retrieved_key).unwrap(); let retrieved_key = Public::from_slice(&retrieved_key); assert_eq!(retrieved_key, generated_key); @@ -438,12 +590,20 @@ pub mod tests { // generate server key let server_key_id = Random.generate().unwrap().secret().clone(); let requestor_secret = Random.generate().unwrap().secret().clone(); - let signature = ethkey::sign(&requestor_secret, &server_key_id).unwrap(); - let server_public = key_servers[0].generate_key(&server_key_id, &signature.clone().into(), *threshold).unwrap(); + let signature: Requester = ethkey::sign(&requestor_secret, &server_key_id).unwrap().into(); + let server_public = key_servers[0].generate_key( + *server_key_id, + signature.clone(), + *threshold, + ).wait().unwrap(); // sign message let message_hash = H256::from_low_u64_be(42); - let combined_signature = key_servers[0].sign_message_schnorr(&server_key_id, &signature.into(), message_hash.clone()).unwrap(); + let combined_signature = key_servers[0].sign_message_schnorr( + *server_key_id, + signature, + message_hash, + ).wait().unwrap(); let combined_signature = crypto::ecies::decrypt(&requestor_secret, &DEFAULT_MAC, &combined_signature).unwrap(); let signature_c = Secret::from_slice(&combined_signature[..32]).unwrap(); let signature_s = Secret::from_slice(&combined_signature[32..]).unwrap(); @@ -463,15 +623,19 @@ pub mod tests { let threshold = 0; let document = Random.generate().unwrap().secret().clone(); let secret = Random.generate().unwrap().secret().clone(); - let signature = ethkey::sign(&secret, &document).unwrap(); - let generated_key = key_servers[0].generate_document_key(&document, &signature.clone().into(), threshold).unwrap(); + let signature: Requester = ethkey::sign(&secret, &document).unwrap().into(); + let generated_key = key_servers[0].generate_document_key( + *document, + signature.clone(), + threshold, + ).wait().unwrap(); let generated_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &generated_key).unwrap(); // remove key from node0 key_storages[0].remove(&document).unwrap(); // now let's try to retrieve key back by requesting it from node0, so that session must be delegated - let retrieved_key = key_servers[0].restore_document_key(&document, &signature.into()).unwrap(); + let retrieved_key = key_servers[0].restore_document_key(*document, signature).wait().unwrap(); let retrieved_key = crypto::ecies::decrypt(&secret, &DEFAULT_MAC, &retrieved_key).unwrap(); assert_eq!(retrieved_key, generated_key); drop(runtime); @@ -486,15 +650,19 @@ pub mod tests { // generate server key let server_key_id = Random.generate().unwrap().secret().clone(); let requestor_secret = Random.generate().unwrap().secret().clone(); - let signature = ethkey::sign(&requestor_secret, &server_key_id).unwrap(); - let server_public = key_servers[0].generate_key(&server_key_id, &signature.clone().into(), threshold).unwrap(); + let signature: Requester = ethkey::sign(&requestor_secret, &server_key_id).unwrap().into(); + let server_public = key_servers[0].generate_key(*server_key_id, signature.clone(), threshold).wait().unwrap(); // remove key from node0 key_storages[0].remove(&server_key_id).unwrap(); // sign message let message_hash = H256::from_low_u64_be(42); - let combined_signature = key_servers[0].sign_message_schnorr(&server_key_id, &signature.into(), message_hash.clone()).unwrap(); + let combined_signature = key_servers[0].sign_message_schnorr( + *server_key_id, + signature, + message_hash, + ).wait().unwrap(); let combined_signature = crypto::ecies::decrypt(&requestor_secret, &DEFAULT_MAC, &combined_signature).unwrap(); let signature_c = Secret::from_slice(&combined_signature[..32]).unwrap(); let signature_s = Secret::from_slice(&combined_signature[32..]).unwrap(); @@ -514,14 +682,22 @@ pub mod tests { let server_key_id = Random.generate().unwrap().secret().clone(); let requestor_secret = Random.generate().unwrap().secret().clone(); let signature = ethkey::sign(&requestor_secret, &server_key_id).unwrap(); - let server_public = key_servers[0].generate_key(&server_key_id, &signature.clone().into(), threshold).unwrap(); + let server_public = key_servers[0].generate_key( + *server_key_id, + signature.clone().into(), + threshold, + ).wait().unwrap(); // remove key from node0 key_storages[0].remove(&server_key_id).unwrap(); // sign message let message_hash = H256::random(); - let signature = key_servers[0].sign_message_ecdsa(&server_key_id, &signature.into(), message_hash.clone()).unwrap(); + let signature = key_servers[0].sign_message_ecdsa( + *server_key_id, + signature.clone().into(), + message_hash, + ).wait().unwrap(); let signature = crypto::ecies::decrypt(&requestor_secret, &DEFAULT_MAC, &signature).unwrap(); let signature = H520::from_slice(&signature[0..65]); diff --git a/secret-store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs b/secret-store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs index e2d115bcf23..8db04632647 100644 --- a/secret-store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs +++ b/secret-store/src/key_server_cluster/admin_sessions/key_version_negotiation_session.rs @@ -18,10 +18,11 @@ use std::sync::Arc; use std::collections::{BTreeSet, BTreeMap}; use ethereum_types::{Address, H256}; use ethkey::Secret; -use parking_lot::{Mutex, Condvar}; +use futures::Oneshot; +use parking_lot::Mutex; use key_server_cluster::{Error, SessionId, NodeId, DocumentKeyShare}; use key_server_cluster::cluster::Cluster; -use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession}; +use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal}; use key_server_cluster::decryption_session::SessionImpl as DecryptionSession; use key_server_cluster::signing_session_ecdsa::SessionImpl as EcdsaSigningSession; use key_server_cluster::signing_session_schnorr::SessionImpl as SchnorrSigningSession; @@ -87,8 +88,8 @@ struct SessionCore { pub transport: T, /// Session nonce. pub nonce: u64, - /// SessionImpl completion condvar. - pub completed: Condvar, + /// Session completion signal. + pub completed: CompletionSignal>, } /// Mutable session data. @@ -166,8 +167,9 @@ pub struct LargestSupportResultComputer; impl SessionImpl where T: SessionTransport { /// Create new session. - pub fn new(params: SessionParams) -> Self { - SessionImpl { + pub fn new(params: SessionParams) -> (Self, Oneshot, Error>>) { + let (completed, oneshot) = CompletionSignal::new(); + (SessionImpl { core: SessionCore { meta: params.meta, sub_session: params.sub_session, @@ -175,7 +177,7 @@ impl SessionImpl where T: SessionTransport { result_computer: params.result_computer, transport: params.transport, nonce: params.nonce, - completed: Condvar::new(), + completed, }, data: Mutex::new(SessionData { state: SessionState::WaitingForInitialization, @@ -191,7 +193,7 @@ impl SessionImpl where T: SessionTransport { continue_with: None, failed_continue_with: None, }) - } + }, oneshot) } /// Return session meta. @@ -221,10 +223,9 @@ impl SessionImpl where T: SessionTransport { self.data.lock().failed_continue_with.take() } - /// Wait for session completion. - pub fn wait(&self) -> Result, Error> { - Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone()) - .expect("wait_session returns Some if called without timeout; qed") + /// Return session completion result (if available). + pub fn result(&self) -> Option, Error>> { + self.data.lock().result.clone() } /// Retrieve common key data (author, threshold, public), if available. @@ -344,7 +345,7 @@ impl SessionImpl where T: SessionTransport { // update state data.state = SessionState::Finished; data.result = Some(Ok(None)); - self.core.completed.notify_all(); + self.core.completed.send(Ok(None)); Ok(()) } @@ -450,15 +451,18 @@ impl SessionImpl where T: SessionTransport { } } + let result = result.map(Some); data.state = SessionState::Finished; - data.result = Some(result.map(Some)); - core.completed.notify_all(); + data.result = Some(result.clone()); + core.completed.send(result); } } } impl ClusterSession for SessionImpl where T: SessionTransport { type Id = SessionIdWithSubSession; + type CreationData = (); + type SuccessfulResult = Option<(H256, NodeId)>; fn type_name() -> &'static str { "version negotiation" @@ -482,7 +486,7 @@ impl ClusterSession for SessionImpl where T: SessionTransport { warn!(target: "secretstore_net", "{}: key version negotiation session failed with timeout", self.core.meta.self_node_id); data.result = Some(Err(Error::ConsensusTemporaryUnreachable)); - self.core.completed.notify_all(); + self.core.completed.send(Err(Error::ConsensusTemporaryUnreachable)); } } } @@ -510,8 +514,8 @@ impl ClusterSession for SessionImpl where T: SessionTransport { self.core.meta.self_node_id, error, node); data.state = SessionState::Finished; - data.result = Some(Err(error)); - self.core.completed.notify_all(); + data.result = Some(Err(error.clone())); + self.core.completed.send(Err(error)); } fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> { @@ -698,7 +702,7 @@ mod tests { cluster: cluster, }, nonce: 0, - }), + }).0, }) }).collect(), queue: VecDeque::new(), diff --git a/secret-store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs b/secret-store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs index a0d4acdc125..299d02121f1 100644 --- a/secret-store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs +++ b/secret-store/src/key_server_cluster/admin_sessions/servers_set_change_session.rs @@ -17,13 +17,14 @@ use std::sync::Arc; use std::collections::{BTreeSet, BTreeMap}; use std::collections::btree_map::Entry; -use parking_lot::{Mutex, Condvar}; +use futures::Oneshot; +use parking_lot::Mutex; use ethereum_types::H256; use ethkey::{Public, Signature}; use key_server_cluster::{Error, NodeId, SessionId, KeyStorage}; use key_server_cluster::math; use key_server_cluster::cluster::Cluster; -use key_server_cluster::cluster_sessions::ClusterSession; +use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal}; use key_server_cluster::message::{Message, ServersSetChangeMessage, ConsensusMessageWithServersSet, InitializeConsensusSessionWithServersSet, ServersSetChangeConsensusMessage, ConfirmConsensusInitialization, UnknownSessionsRequest, UnknownSessions, @@ -93,8 +94,8 @@ struct SessionCore { pub admin_public: Public, /// Migration id (if this session is a part of auto-migration process). pub migration_id: Option, - /// SessionImpl completion condvar. - pub completed: Condvar, + /// Session completion signal. + pub completed: CompletionSignal<()>, } /// Servers set change consensus session type. @@ -182,8 +183,9 @@ struct ServersSetChangeKeyVersionNegotiationTransport { impl SessionImpl { /// Create new servers set change session. - pub fn new(params: SessionParams) -> Result { - Ok(SessionImpl { + pub fn new(params: SessionParams) -> Result<(Self, Oneshot>), Error> { + let (completed, oneshot) = CompletionSignal::new(); + Ok((SessionImpl { core: SessionCore { meta: params.meta, cluster: params.cluster, @@ -192,7 +194,7 @@ impl SessionImpl { all_nodes_set: params.all_nodes_set, admin_public: params.admin_public, migration_id: params.migration_id, - completed: Condvar::new(), + completed, }, data: Mutex::new(SessionData { state: SessionState::EstablishingConsensus, @@ -205,7 +207,7 @@ impl SessionImpl { active_key_sessions: BTreeMap::new(), result: None, }), - }) + }, oneshot)) } /// Get session id. @@ -218,10 +220,9 @@ impl SessionImpl { self.core.migration_id.as_ref() } - /// Wait for session completion. - pub fn wait(&self) -> Result<(), Error> { - Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone()) - .expect("wait_session returns Some if called without timeout; qed") + /// Return session completion result (if available). + pub fn result(&self) -> Option> { + self.data.lock().result.clone() } /// Initialize servers set change session on master node. @@ -423,7 +424,7 @@ impl SessionImpl { &KeyVersionNegotiationMessage::RequestKeyVersions(ref message) if sender == &self.core.meta.master_node_id => { let key_id = message.session.clone().into(); let key_share = self.core.key_storage.get(&key_id)?; - let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams { + let (negotiation_session, _) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams { meta: ShareChangeSessionMeta { id: key_id.clone(), self_node_id: self.core.meta.self_node_id.clone(), @@ -671,7 +672,7 @@ impl SessionImpl { } data.state = SessionState::Finished; - self.core.completed.notify_all(); + self.core.completed.send(Ok(())); Ok(()) } @@ -741,7 +742,7 @@ impl SessionImpl { }; let key_share = core.key_storage.get(&key_id)?; - let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams { + let (negotiation_session, _) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams { meta: ShareChangeSessionMeta { id: key_id, self_node_id: core.meta.self_node_id.clone(), @@ -797,7 +798,8 @@ impl SessionImpl { let negotiation_session = data.negotiation_sessions.remove(&key_id) .expect("share change session is only initialized when negotiation is completed; qed"); let (selected_version, selected_master) = negotiation_session - .wait()? + .result() + .expect("share change session is only initialized when negotiation is completed; qed")? .expect("initialize_share_change_session is only called on share change master; negotiation session completes with some on master; qed"); let selected_version_holders = negotiation_session.version_holders(&selected_version)?; let selected_version_threshold = negotiation_session.common_key_data()?.threshold; @@ -882,7 +884,7 @@ impl SessionImpl { if data.result.is_some() && data.active_key_sessions.len() == 0 { data.state = SessionState::Finished; - core.completed.notify_all(); + core.completed.send(Ok(())); } Ok(()) @@ -907,7 +909,7 @@ impl SessionImpl { data.state = SessionState::Finished; data.result = Some(Ok(())); - core.completed.notify_all(); + core.completed.send(Ok(())); Ok(()) } @@ -915,6 +917,8 @@ impl SessionImpl { impl ClusterSession for SessionImpl { type Id = SessionId; + type CreationData = (); // never used directly + type SuccessfulResult = (); fn type_name() -> &'static str { "servers set change" @@ -954,8 +958,8 @@ impl ClusterSession for SessionImpl { self.core.meta.self_node_id, error, node); data.state = SessionState::Finished; - data.result = Some(Err(error)); - self.core.completed.notify_all(); + data.result = Some(Err(error.clone())); + self.core.completed.send(Err(error)); } fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> { @@ -1109,7 +1113,7 @@ pub mod tests { nonce: 1, admin_public: admin_public, migration_id: None, - }).unwrap() + }).unwrap().0 } } diff --git a/secret-store/src/key_server_cluster/admin_sessions/share_add_session.rs b/secret-store/src/key_server_cluster/admin_sessions/share_add_session.rs index b5195a62939..ef7882d68c0 100644 --- a/secret-store/src/key_server_cluster/admin_sessions/share_add_session.rs +++ b/secret-store/src/key_server_cluster/admin_sessions/share_add_session.rs @@ -18,10 +18,11 @@ use std::sync::Arc; use std::collections::{BTreeSet, BTreeMap}; use ethereum_types::{H256, Address}; use ethkey::{Public, Secret, Signature}; -use parking_lot::{Mutex, Condvar}; +use futures::Oneshot; +use parking_lot::Mutex; use key_server_cluster::{Error, SessionId, NodeId, DocumentKeyShare, DocumentKeyShareVersion, KeyStorage}; use key_server_cluster::cluster::Cluster; -use key_server_cluster::cluster_sessions::ClusterSession; +use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal}; use key_server_cluster::math; use key_server_cluster::message::{Message, ShareAddMessage, ShareAddConsensusMessage, ConsensusMessageOfShareAdd, InitializeConsensusSessionOfShareAdd, KeyShareCommon, NewKeysDissemination, ShareAddError, @@ -71,8 +72,8 @@ struct SessionCore { pub key_storage: Arc, /// Administrator public key. pub admin_public: Option, - /// SessionImpl completion condvar. - pub completed: Condvar, + /// Session completion signal. + pub completed: CompletionSignal<()>, } /// Share add consensus session type. @@ -158,10 +159,10 @@ pub struct IsolatedSessionTransport { impl SessionImpl where T: SessionTransport { /// Create new share addition session. - pub fn new(params: SessionParams) -> Result { + pub fn new(params: SessionParams) -> Result<(Self, Oneshot>), Error> { let key_share = params.key_storage.get(¶ms.meta.id)?; - - Ok(SessionImpl { + let (completed, oneshot) = CompletionSignal::new(); + Ok((SessionImpl { core: SessionCore { meta: params.meta, nonce: params.nonce, @@ -169,7 +170,7 @@ impl SessionImpl where T: SessionTransport { transport: params.transport, key_storage: params.key_storage, admin_public: params.admin_public, - completed: Condvar::new(), + completed, }, data: Mutex::new(SessionData { state: SessionState::ConsensusEstablishing, @@ -181,7 +182,7 @@ impl SessionImpl where T: SessionTransport { secret_subshares: None, result: None, }), - }) + }, oneshot)) } /// Set pre-established consensus data. @@ -752,7 +753,7 @@ impl SessionImpl where T: SessionTransport { // signal session completion data.state = SessionState::Finished; data.result = Some(Ok(())); - core.completed.notify_all(); + core.completed.send(Ok(())); Ok(()) } @@ -760,6 +761,8 @@ impl SessionImpl where T: SessionTransport { impl ClusterSession for SessionImpl where T: SessionTransport { type Id = SessionId; + type CreationData = (); // never used directly + type SuccessfulResult = (); fn type_name() -> &'static str { "share add" @@ -801,8 +804,8 @@ impl ClusterSession for SessionImpl where T: SessionTransport { self.core.meta.self_node_id, error, node); data.state = SessionState::Finished; - data.result = Some(Err(error)); - self.core.completed.notify_all(); + data.result = Some(Err(error.clone())); + self.core.completed.send(Err(error)); } fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> { @@ -914,7 +917,7 @@ pub mod tests { key_storage, admin_public: Some(admin_public), nonce: 1, - }).unwrap() + }).unwrap().0 } } diff --git a/secret-store/src/key_server_cluster/admin_sessions/share_change_session.rs b/secret-store/src/key_server_cluster/admin_sessions/share_change_session.rs index bd2bed2d61d..d6af236d4b2 100644 --- a/secret-store/src/key_server_cluster/admin_sessions/share_change_session.rs +++ b/secret-store/src/key_server_cluster/admin_sessions/share_change_session.rs @@ -166,7 +166,7 @@ impl ShareChangeSession { let consensus_group = self.consensus_group.take().ok_or(Error::InvalidStateForRequest)?; let version_holders = self.version_holders.take().ok_or(Error::InvalidStateForRequest)?; let new_nodes_map = self.new_nodes_map.take().ok_or(Error::InvalidStateForRequest)?; - let share_add_session = ShareAddSessionImpl::new(ShareAddSessionParams { + let (share_add_session, _) = ShareAddSessionImpl::new(ShareAddSessionParams { meta: self.meta.clone(), nonce: self.nonce, transport: ShareChangeTransport::new(self.session_id, self.nonce, self.cluster.clone()), diff --git a/secret-store/src/key_server_cluster/client_sessions/decryption_session.rs b/secret-store/src/key_server_cluster/client_sessions/decryption_session.rs index b47c81b98ce..1bda0bc3305 100644 --- a/secret-store/src/key_server_cluster/client_sessions/decryption_session.rs +++ b/secret-store/src/key_server_cluster/client_sessions/decryption_session.rs @@ -16,14 +16,14 @@ use std::collections::{BTreeSet, BTreeMap}; use std::sync::Arc; -use std::time; -use parking_lot::{Mutex, Condvar}; +use futures::Oneshot; +use parking_lot::Mutex; use ethereum_types::{Address, H256}; use ethkey::Secret; use key_server_cluster::{Error, AclStorage, DocumentKeyShare, NodeId, SessionId, Requester, EncryptedDocumentKeyShadow, SessionMeta}; use key_server_cluster::cluster::Cluster; -use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession}; +use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal}; use key_server_cluster::message::{Message, DecryptionMessage, DecryptionConsensusMessage, RequestPartialDecryption, PartialDecryption, DecryptionSessionError, DecryptionSessionCompleted, ConsensusMessage, InitializeConsensusSession, ConfirmConsensusInitialization, DecryptionSessionDelegation, DecryptionSessionDelegationCompleted}; @@ -59,8 +59,8 @@ struct SessionCore { pub cluster: Arc, /// Session-level nonce. pub nonce: u64, - /// SessionImpl completion condvar. - pub completed: Condvar, + /// Session completion signal. + pub completed: CompletionSignal, } /// Decryption consensus session type. @@ -147,7 +147,10 @@ enum DelegationStatus { impl SessionImpl { /// Create new decryption session. - pub fn new(params: SessionParams, requester: Option) -> Result { + pub fn new( + params: SessionParams, + requester: Option, + ) -> Result<(Self, Oneshot>), Error> { debug_assert_eq!(params.meta.threshold, params.key_share.as_ref().map(|ks| ks.threshold).unwrap_or_default()); // check that common_point and encrypted_point are already set @@ -175,14 +178,15 @@ impl SessionImpl { consensus_transport: consensus_transport, })?; - Ok(SessionImpl { + let (completed, oneshot) = CompletionSignal::new(); + Ok((SessionImpl { core: SessionCore { meta: params.meta, access_key: params.access_key, key_share: params.key_share, cluster: params.cluster, nonce: params.nonce, - completed: Condvar::new(), + completed, }, data: Mutex::new(SessionData { version: None, @@ -194,7 +198,7 @@ impl SessionImpl { delegation_status: None, result: None, }), - }) + }, oneshot)) } /// Get this node id. @@ -209,7 +213,7 @@ impl SessionImpl { &self.core.access_key } - /// Get session state. + /// Get session state (tests only). #[cfg(test)] pub fn state(&self) -> ConsensusSessionState { self.data.lock().consensus_session.state() @@ -231,9 +235,9 @@ impl SessionImpl { self.data.lock().origin.clone() } - /// Wait for session completion. - pub fn wait(&self, timeout: Option) -> Option> { - Self::wait_session(&self.core.completed, &self.data, timeout, |data| data.result.clone()) + /// Get session completion result (if available). + pub fn result(&self) -> Option> { + self.data.lock().result.clone() } /// Get broadcasted shadows. @@ -667,13 +671,15 @@ impl SessionImpl { }; } - data.result = Some(result); - core.completed.notify_all(); + data.result = Some(result.clone()); + core.completed.send(result); } } impl ClusterSession for SessionImpl { type Id = SessionIdWithSubSession; + type CreationData = Requester; + type SuccessfulResult = EncryptedDocumentKeyShadow; fn type_name() -> &'static str { "decryption" @@ -832,7 +838,7 @@ pub fn create_default_decryption_session() -> Arc { acl_storage: Arc::new(DummyAclStorage::default()), cluster: Arc::new(DummyCluster::new(Default::default())), nonce: 0, - }, Some(Requester::Public(H512::from_low_u64_be(2)))).unwrap()) + }, Some(Requester::Public(H512::from_low_u64_be(2)))).unwrap().0) } #[cfg(test)] @@ -915,7 +921,7 @@ mod tests { acl_storage: acl_storages[i].clone(), cluster: clusters[i].clone(), nonce: 0, - }, if i == 0 { signature.clone().map(Into::into) } else { None }).unwrap()).collect(); + }, if i == 0 { signature.clone().map(Into::into) } else { None }).unwrap().0).collect(); (requester, clusters, acl_storages, sessions) } @@ -1014,7 +1020,9 @@ mod tests { acl_storage: Arc::new(DummyAclStorage::default()), cluster: Arc::new(DummyCluster::new(self_node_id.clone())), nonce: 0, - }, Some(Requester::Signature(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap()))).unwrap(); + }, Some(Requester::Signature( + ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap() + ))).unwrap().0; assert_eq!(session.initialize(Default::default(), Default::default(), false, false), Err(Error::InvalidMessage)); } @@ -1049,7 +1057,9 @@ mod tests { acl_storage: Arc::new(DummyAclStorage::default()), cluster: Arc::new(DummyCluster::new(self_node_id.clone())), nonce: 0, - }, Some(Requester::Signature(ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap()))).unwrap(); + }, Some(Requester::Signature( + ethkey::sign(Random.generate().unwrap().secret(), &SessionId::default()).unwrap() + ))).unwrap().0; assert_eq!(session.initialize(Default::default(), Default::default(), false, false), Err(Error::ConsensusUnreachable)); } diff --git a/secret-store/src/key_server_cluster/client_sessions/encryption_session.rs b/secret-store/src/key_server_cluster/client_sessions/encryption_session.rs index a3eabc35c61..bdead62e58c 100644 --- a/secret-store/src/key_server_cluster/client_sessions/encryption_session.rs +++ b/secret-store/src/key_server_cluster/client_sessions/encryption_session.rs @@ -16,15 +16,15 @@ use std::collections::BTreeMap; use std::fmt::{Debug, Formatter, Error as FmtError}; -use std::time; use std::sync::Arc; -use parking_lot::{Condvar, Mutex}; +use futures::Oneshot; +use parking_lot::Mutex; use ethereum_types::Address; use ethkey::Public; use key_server_cluster::{Error, NodeId, SessionId, Requester, KeyStorage, DocumentKeyShare, ServerKeyId}; use key_server_cluster::cluster::Cluster; -use key_server_cluster::cluster_sessions::ClusterSession; +use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal}; use key_server_cluster::message::{Message, EncryptionMessage, InitializeEncryptionSession, ConfirmEncryptionInitialization, EncryptionSessionError}; @@ -49,8 +49,8 @@ pub struct SessionImpl { cluster: Arc, /// Session nonce. nonce: u64, - /// SessionImpl completion condvar. - completed: Condvar, + /// Session completion signal. + completed: CompletionSignal<()>, /// Mutable session data. data: Mutex, } @@ -108,23 +108,24 @@ pub enum SessionState { impl SessionImpl { /// Create new encryption session. - pub fn new(params: SessionParams) -> Result { + pub fn new(params: SessionParams) -> Result<(Self, Oneshot>), Error> { check_encrypted_data(params.encrypted_data.as_ref())?; - Ok(SessionImpl { + let (completed, oneshot) = CompletionSignal::new(); + Ok((SessionImpl { id: params.id, self_node_id: params.self_node_id, encrypted_data: params.encrypted_data, key_storage: params.key_storage, cluster: params.cluster, nonce: params.nonce, - completed: Condvar::new(), + completed, data: Mutex::new(SessionData { state: SessionState::WaitingForInitialization, nodes: BTreeMap::new(), result: None, }), - }) + }, oneshot)) } /// Get this node Id. @@ -132,12 +133,6 @@ impl SessionImpl { &self.self_node_id } - /// Wait for session completion. - pub fn wait(&self, timeout: Option) -> Result<(), Error> { - Self::wait_session(&self.completed, &self.data, timeout, |data| data.result.clone()) - .expect("wait_session returns Some if called without timeout; qed") - } - /// Start new session initialization. This must be called on master node. pub fn initialize(&self, requester: Requester, common_point: Public, encrypted_point: Public) -> Result<(), Error> { let mut data = self.data.lock(); @@ -175,7 +170,7 @@ impl SessionImpl { } else { data.state = SessionState::Finished; data.result = Some(Ok(())); - self.completed.notify_all(); + self.completed.send(Ok(())); Ok(()) } @@ -230,7 +225,7 @@ impl SessionImpl { // update state data.state = SessionState::Finished; data.result = Some(Ok(())); - self.completed.notify_all(); + self.completed.send(Ok(())); Ok(()) } @@ -238,6 +233,8 @@ impl SessionImpl { impl ClusterSession for SessionImpl { type Id = SessionId; + type CreationData = (); + type SuccessfulResult = (); fn type_name() -> &'static str { "encryption" @@ -260,7 +257,7 @@ impl ClusterSession for SessionImpl { data.state = SessionState::Failed; data.result = Some(Err(Error::NodeDisconnected)); - self.completed.notify_all(); + self.completed.send(Err(Error::NodeDisconnected)); } fn on_session_timeout(&self) { @@ -270,7 +267,7 @@ impl ClusterSession for SessionImpl { data.state = SessionState::Failed; data.result = Some(Err(Error::NodeDisconnected)); - self.completed.notify_all(); + self.completed.send(Err(Error::NodeDisconnected)); } fn on_session_error(&self, node: &NodeId, error: Error) { @@ -290,8 +287,8 @@ impl ClusterSession for SessionImpl { warn!("{}: encryption session failed with error: {} from {}", self.node(), error, node); data.state = SessionState::Failed; - data.result = Some(Err(error)); - self.completed.notify_all(); + data.result = Some(Err(error.clone())); + self.completed.send(Err(error)); } fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> { diff --git a/secret-store/src/key_server_cluster/client_sessions/generation_session.rs b/secret-store/src/key_server_cluster/client_sessions/generation_session.rs index 0fa805f5718..02d33f5dbee 100644 --- a/secret-store/src/key_server_cluster/client_sessions/generation_session.rs +++ b/secret-store/src/key_server_cluster/client_sessions/generation_session.rs @@ -16,15 +16,15 @@ use std::collections::{BTreeSet, BTreeMap, VecDeque}; use std::fmt::{Debug, Formatter, Error as FmtError}; -use std::time::Duration; use std::sync::Arc; -use parking_lot::{Condvar, Mutex}; +use futures::Oneshot; +use parking_lot::Mutex; use ethereum_types::Address; use ethkey::{Public, Secret}; use key_server_cluster::{Error, NodeId, SessionId, KeyStorage, DocumentKeyShare, DocumentKeyShareVersion}; use key_server_cluster::math; use key_server_cluster::cluster::Cluster; -use key_server_cluster::cluster_sessions::ClusterSession; +use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal}; use key_server_cluster::message::{Message, GenerationMessage, InitializeSession, ConfirmInitialization, CompleteInitialization, KeysDissemination, PublicKeyShare, SessionError, SessionCompleted}; @@ -47,10 +47,10 @@ pub struct SessionImpl { cluster: Arc, /// Session-level nonce. nonce: u64, - /// SessionImpl completion condvar. - completed: Condvar, /// Mutable session data. data: Mutex, + /// Session completion signal. + completed: CompletionSignal, } /// SessionImpl creation parameters @@ -204,8 +204,9 @@ impl From> for InitializationNodes { impl SessionImpl { /// Create new generation session. - pub fn new(params: SessionParams) -> Self { - SessionImpl { + pub fn new(params: SessionParams) -> (Self, Oneshot>) { + let (completed, oneshot) = CompletionSignal::new(); + (SessionImpl { id: params.id, self_node_id: params.self_node_id, key_storage: params.key_storage, @@ -213,7 +214,7 @@ impl SessionImpl { // when nonce.is_nonce(), generation session is wrapped // => nonce is checked somewhere else && we can pass any value nonce: params.nonce.unwrap_or_default(), - completed: Condvar::new(), + completed, data: Mutex::new(SessionData { state: SessionState::WaitingForInitialization, simulate_faulty_behaviour: false, @@ -230,7 +231,7 @@ impl SessionImpl { key_share: None, joint_public_and_secret: None, }), - } + }, oneshot) } /// Get this node Id. @@ -259,10 +260,10 @@ impl SessionImpl { self.data.lock().origin.clone() } - /// Wait for session completion. - pub fn wait(&self, timeout: Option) -> Option> { - Self::wait_session(&self.completed, &self.data, timeout, |data| data.joint_public_and_secret.clone() - .map(|r| r.map(|r| r.0.clone()))) + /// Get session completion result (if available). + pub fn result(&self) -> Option> { + self.data.lock().joint_public_and_secret.clone() + .map(|r| r.map(|r| r.0.clone())) } /// Get generated public and secret (if any). @@ -328,8 +329,12 @@ impl SessionImpl { self.verify_keys()?; self.complete_generation()?; - self.data.lock().state = SessionState::Finished; - self.completed.notify_all(); + let mut data = self.data.lock(); + let result = data.joint_public_and_secret.clone() + .expect("session is instantly completed on a single node; qed") + .map(|(p, _, _)| p); + data.state = SessionState::Finished; + self.completed.send(result); Ok(()) } @@ -619,8 +624,11 @@ impl SessionImpl { } // we have received enough confirmations => complete session + let result = data.joint_public_and_secret.clone() + .expect("we're on master node; we have received last completion confirmation; qed") + .map(|(p, _, _)| p); data.state = SessionState::Finished; - self.completed.notify_all(); + self.completed.send(result); Ok(()) } @@ -813,6 +821,8 @@ impl SessionImpl { impl ClusterSession for SessionImpl { type Id = SessionId; + type CreationData = (); + type SuccessfulResult = Public; fn type_name() -> &'static str { "generation" @@ -838,7 +848,7 @@ impl ClusterSession for SessionImpl { data.state = SessionState::Failed; data.key_share = Some(Err(Error::NodeDisconnected)); data.joint_public_and_secret = Some(Err(Error::NodeDisconnected)); - self.completed.notify_all(); + self.completed.send(Err(Error::NodeDisconnected)); } fn on_session_timeout(&self) { @@ -849,7 +859,7 @@ impl ClusterSession for SessionImpl { data.state = SessionState::Failed; data.key_share = Some(Err(Error::NodeDisconnected)); data.joint_public_and_secret = Some(Err(Error::NodeDisconnected)); - self.completed.notify_all(); + self.completed.send(Err(Error::NodeDisconnected)); } fn on_session_error(&self, node: &NodeId, error: Error) { @@ -867,8 +877,8 @@ impl ClusterSession for SessionImpl { let mut data = self.data.lock(); data.state = SessionState::Failed; data.key_share = Some(Err(error.clone())); - data.joint_public_and_secret = Some(Err(error)); - self.completed.notify_all(); + data.joint_public_and_secret = Some(Err(error.clone())); + self.completed.send(Err(error)); } fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> { diff --git a/secret-store/src/key_server_cluster/client_sessions/signing_session_ecdsa.rs b/secret-store/src/key_server_cluster/client_sessions/signing_session_ecdsa.rs index fe3bd4f1143..d3c801af6e4 100644 --- a/secret-store/src/key_server_cluster/client_sessions/signing_session_ecdsa.rs +++ b/secret-store/src/key_server_cluster/client_sessions/signing_session_ecdsa.rs @@ -17,12 +17,13 @@ use std::collections::{BTreeSet, BTreeMap}; use std::collections::btree_map::Entry; use std::sync::Arc; -use parking_lot::{Mutex, Condvar}; +use futures::Oneshot; +use parking_lot::Mutex; use ethkey::{Public, Secret, Signature, sign}; use ethereum_types::H256; use key_server_cluster::{Error, NodeId, SessionId, SessionMeta, AclStorage, DocumentKeyShare, Requester}; use key_server_cluster::cluster::{Cluster}; -use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession}; +use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal}; use key_server_cluster::generation_session::{SessionImpl as GenerationSession, SessionParams as GenerationSessionParams, SessionState as GenerationSessionState}; use key_server_cluster::math; @@ -58,8 +59,8 @@ struct SessionCore { pub cluster: Arc, /// Session-level nonce. pub nonce: u64, - /// SessionImpl completion condvar. - pub completed: Condvar, + /// Session completion signal. + pub completed: CompletionSignal, } /// Signing consensus session type. @@ -170,7 +171,10 @@ enum DelegationStatus { impl SessionImpl { /// Create new signing session. - pub fn new(params: SessionParams, requester: Option) -> Result { + pub fn new( + params: SessionParams, + requester: Option, + ) -> Result<(Self, Oneshot>), Error> { debug_assert_eq!(params.meta.threshold, params.key_share.as_ref().map(|ks| ks.threshold).unwrap_or_default()); let consensus_transport = SigningConsensusTransport { @@ -197,14 +201,15 @@ impl SessionImpl { consensus_transport: consensus_transport, })?; - Ok(SessionImpl { + let (completed, oneshot) = CompletionSignal::new(); + Ok((SessionImpl { core: SessionCore { meta: params.meta, access_key: params.access_key, key_share: params.key_share, cluster: params.cluster, nonce: params.nonce, - completed: Condvar::new(), + completed, }, data: Mutex::new(SessionData { state: SessionState::ConsensusEstablishing, @@ -218,10 +223,11 @@ impl SessionImpl { delegation_status: None, result: None, }), - }) + }, oneshot)) } /// Wait for session completion. + #[cfg(test)] pub fn wait(&self) -> Result { Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone()) .expect("wait_session returns Some if called without timeout; qed") @@ -251,7 +257,6 @@ impl SessionImpl { })))?; data.delegation_status = Some(DelegationStatus::DelegatedTo(master)); Ok(()) - } /// Initialize signing session on master node. @@ -284,8 +289,9 @@ impl SessionImpl { // consensus established => threshold is 0 => we can generate signature on this node if data.consensus_session.state() == ConsensusSessionState::ConsensusEstablished { - data.result = Some(sign(&key_version.secret_share, &message_hash).map_err(Into::into)); - self.core.completed.notify_all(); + let result = sign(&key_version.secret_share, &message_hash).map_err(Into::into); + data.result = Some(result.clone()); + self.core.completed.send(result); } Ok(()) @@ -797,7 +803,7 @@ impl SessionImpl { map: map_message, }), nonce: None, - }) + }).0 } /// Set signing session result. @@ -820,8 +826,8 @@ impl SessionImpl { }; } - data.result = Some(result); - core.completed.notify_all(); + data.result = Some(result.clone()); + core.completed.send(result); } /// Check if all nonces are generated. @@ -883,6 +889,8 @@ impl SessionImpl { impl ClusterSession for SessionImpl { type Id = SessionIdWithSubSession; + type CreationData = Requester; + type SuccessfulResult = Signature; fn type_name() -> &'static str { "ecdsa_signing" diff --git a/secret-store/src/key_server_cluster/client_sessions/signing_session_schnorr.rs b/secret-store/src/key_server_cluster/client_sessions/signing_session_schnorr.rs index b0881dfff07..ff901fc1581 100644 --- a/secret-store/src/key_server_cluster/client_sessions/signing_session_schnorr.rs +++ b/secret-store/src/key_server_cluster/client_sessions/signing_session_schnorr.rs @@ -16,12 +16,13 @@ use std::collections::BTreeSet; use std::sync::Arc; -use parking_lot::{Mutex, Condvar}; +use futures::Oneshot; +use parking_lot::Mutex; use ethkey::{Public, Secret}; use ethereum_types::H256; use key_server_cluster::{Error, NodeId, SessionId, Requester, SessionMeta, AclStorage, DocumentKeyShare}; use key_server_cluster::cluster::{Cluster}; -use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession}; +use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal}; use key_server_cluster::generation_session::{SessionImpl as GenerationSession, SessionParams as GenerationSessionParams, SessionState as GenerationSessionState}; use key_server_cluster::message::{Message, SchnorrSigningMessage, SchnorrSigningConsensusMessage, SchnorrSigningGenerationMessage, @@ -59,8 +60,8 @@ struct SessionCore { pub cluster: Arc, /// Session-level nonce. pub nonce: u64, - /// SessionImpl completion condvar. - pub completed: Condvar, + /// SessionImpl completion signal. + pub completed: CompletionSignal<(Secret, Secret)>, } /// Signing consensus session type. @@ -160,7 +161,10 @@ enum DelegationStatus { impl SessionImpl { /// Create new signing session. - pub fn new(params: SessionParams, requester: Option) -> Result { + pub fn new( + params: SessionParams, + requester: Option, + ) -> Result<(Self, Oneshot>), Error> { debug_assert_eq!(params.meta.threshold, params.key_share.as_ref().map(|ks| ks.threshold).unwrap_or_default()); let consensus_transport = SigningConsensusTransport { @@ -179,14 +183,15 @@ impl SessionImpl { consensus_transport: consensus_transport, })?; - Ok(SessionImpl { + let (completed, oneshot) = CompletionSignal::new(); + Ok((SessionImpl { core: SessionCore { meta: params.meta, access_key: params.access_key, key_share: params.key_share, cluster: params.cluster, nonce: params.nonce, - completed: Condvar::new(), + completed, }, data: Mutex::new(SessionData { state: SessionState::ConsensusEstablishing, @@ -197,21 +202,22 @@ impl SessionImpl { delegation_status: None, result: None, }), - }) - } - - /// Get session state. - #[cfg(test)] - pub fn state(&self) -> SessionState { - self.data.lock().state + }, oneshot)) } /// Wait for session completion. + #[cfg(test)] pub fn wait(&self) -> Result<(Secret, Secret), Error> { Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone()) .expect("wait_session returns Some if called without timeout; qed") } + /// Get session state (tests only). + #[cfg(test)] + pub fn state(&self) -> SessionState { + self.data.lock().state + } + /// Delegate session to other node. pub fn delegate(&self, master: NodeId, version: H256, message_hash: H256) -> Result<(), Error> { if self.core.meta.master_node_id != self.core.meta.self_node_id { @@ -277,7 +283,7 @@ impl SessionImpl { other_nodes_ids: BTreeSet::new() }), nonce: None, - }); + }).0; generation_session.initialize(Default::default(), Default::default(), false, 0, vec![self.core.meta.self_node_id.clone()].into_iter().collect::>().into())?; debug_assert_eq!(generation_session.state(), GenerationSessionState::Finished); @@ -405,7 +411,7 @@ impl SessionImpl { other_nodes_ids: other_consensus_group_nodes, }), nonce: None, - }); + }).0; generation_session.initialize(Default::default(), Default::default(), false, key_share.threshold, consensus_group.into())?; data.generation_session = Some(generation_session); @@ -445,7 +451,7 @@ impl SessionImpl { other_nodes_ids: other_consensus_group_nodes }), nonce: None, - }); + }).0; data.generation_session = Some(generation_session); data.state = SessionState::SessionKeyGeneration; } @@ -617,13 +623,15 @@ impl SessionImpl { }; } - data.result = Some(result); - core.completed.notify_all(); + data.result = Some(result.clone()); + core.completed.send(result); } } impl ClusterSession for SessionImpl { type Id = SessionIdWithSubSession; + type CreationData = Requester; + type SuccessfulResult = (Secret, Secret); fn type_name() -> &'static str { "signing" @@ -850,7 +858,7 @@ mod tests { acl_storage: Arc::new(DummyAclStorage::default()), cluster: self.0.cluster(0).view().unwrap(), nonce: 0, - }, requester).unwrap() + }, requester).unwrap().0 } pub fn init_with_version(self, key_version: Option) -> Result<(Self, Public, H256), Error> { diff --git a/secret-store/src/key_server_cluster/cluster.rs b/secret-store/src/key_server_cluster/cluster.rs index a8416a8f700..c6ffd446ee0 100644 --- a/secret-store/src/key_server_cluster/cluster.rs +++ b/secret-store/src/key_server_cluster/cluster.rs @@ -21,8 +21,8 @@ use ethkey::{Public, Signature, Random, Generator}; use ethereum_types::{Address, H256}; use parity_runtime::Executor; use key_server_cluster::{Error, NodeId, SessionId, Requester, AclStorage, KeyStorage, KeyServerSet, NodeKeyPair}; -use key_server_cluster::cluster_sessions::{ClusterSession, AdminSession, ClusterSessions, SessionIdWithSubSession, - ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, +use key_server_cluster::cluster_sessions::{WaitableSession, ClusterSession, AdminSession, ClusterSessions, + SessionIdWithSubSession, ClusterSessionsContainer, SERVERS_SET_CHANGE_SESSION_ID, create_cluster_view, AdminSessionCreationData, ClusterSessionsListener}; use key_server_cluster::cluster_sessions_creator::ClusterSessionCreator; use key_server_cluster::cluster_connections::{ConnectionProvider, ConnectionManager}; @@ -47,19 +47,61 @@ use key_server_cluster::cluster_connections::tests::{MessagesQueue, TestConnecti /// Cluster interface for external clients. pub trait ClusterClient: Send + Sync { /// Start new generation session. - fn new_generation_session(&self, session_id: SessionId, origin: Option
, author: Address, threshold: usize) -> Result, Error>; + fn new_generation_session( + &self, + session_id: SessionId, + origin: Option
, + author: Address, + threshold: usize, + ) -> Result, Error>; /// Start new encryption session. - fn new_encryption_session(&self, session_id: SessionId, author: Requester, common_point: Public, encrypted_point: Public) -> Result, Error>; + fn new_encryption_session( + &self, + session_id: SessionId, + author: Requester, + common_point: Public, + encrypted_point: Public, + ) -> Result, Error>; /// Start new decryption session. - fn new_decryption_session(&self, session_id: SessionId, origin: Option
, requester: Requester, version: Option, is_shadow_decryption: bool, is_broadcast_decryption: bool) -> Result, Error>; + fn new_decryption_session( + &self, + session_id: SessionId, + origin: Option
, + requester: Requester, + version: Option, + is_shadow_decryption: bool, + is_broadcast_decryption: bool, + ) -> Result, Error>; /// Start new Schnorr signing session. - fn new_schnorr_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error>; + fn new_schnorr_signing_session( + &self, + session_id: SessionId, + requester: Requester, + version: Option, + message_hash: H256, + ) -> Result, Error>; /// Start new ECDSA session. - fn new_ecdsa_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error>; + fn new_ecdsa_signing_session( + &self, + session_id: SessionId, + requester: Requester, + version: Option, + message_hash: H256, + ) -> Result, Error>; /// Start new key version negotiation session. - fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error>; + fn new_key_version_negotiation_session( + &self, + session_id: SessionId, + ) -> Result>, Error>; /// Start new servers set change session. - fn new_servers_set_change_session(&self, session_id: Option, migration_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error>; + fn new_servers_set_change_session( + &self, + session_id: Option, + migration_id: Option, + new_nodes_set: BTreeSet, + old_set_signature: Signature, + new_set_signature: Signature, + ) -> Result, Error>; /// Listen for new generation sessions. fn add_generation_listener(&self, listener: Arc>); @@ -324,7 +366,10 @@ impl ClusterClientImpl { } } - fn create_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> { + fn create_key_version_negotiation_session( + &self, + session_id: SessionId, + ) -> Result>, Error> { let mut connected_nodes = self.data.connections.provider().connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); @@ -332,10 +377,10 @@ impl ClusterClientImpl { let session_id = SessionIdWithSubSession::new(session_id, access_key); let cluster = create_cluster_view(self.data.self_key_pair.clone(), self.data.connections.provider(), false)?; let session = self.data.sessions.negotiation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, None)?; - match session.initialize(connected_nodes) { + match session.session.initialize(connected_nodes) { Ok(()) => Ok(session), Err(error) => { - self.data.sessions.negotiation_sessions.remove(&session.id()); + self.data.sessions.negotiation_sessions.remove(&session.session.id()); Err(error) } } @@ -343,29 +388,49 @@ impl ClusterClientImpl { } impl ClusterClient for ClusterClientImpl { - fn new_generation_session(&self, session_id: SessionId, origin: Option
, author: Address, threshold: usize) -> Result, Error> { + fn new_generation_session( + &self, + session_id: SessionId, + origin: Option
, + author: Address, + threshold: usize, + ) -> Result, Error> { let mut connected_nodes = self.data.connections.provider().connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let cluster = create_cluster_view(self.data.self_key_pair.clone(), self.data.connections.provider(), true)?; let session = self.data.sessions.generation_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?; process_initialization_result( - session.initialize(origin, author, false, threshold, connected_nodes.into()), + session.session.initialize(origin, author, false, threshold, connected_nodes.into()), session, &self.data.sessions.generation_sessions) } - fn new_encryption_session(&self, session_id: SessionId, requester: Requester, common_point: Public, encrypted_point: Public) -> Result, Error> { + fn new_encryption_session( + &self, + session_id: SessionId, + requester: Requester, + common_point: Public, + encrypted_point: Public, + ) -> Result, Error> { let mut connected_nodes = self.data.connections.provider().connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); let cluster = create_cluster_view(self.data.self_key_pair.clone(), self.data.connections.provider(), true)?; let session = self.data.sessions.encryption_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id, None, false, None)?; process_initialization_result( - session.initialize(requester, common_point, encrypted_point), + session.session.initialize(requester, common_point, encrypted_point), session, &self.data.sessions.encryption_sessions) } - fn new_decryption_session(&self, session_id: SessionId, origin: Option
, requester: Requester, version: Option, is_shadow_decryption: bool, is_broadcast_decryption: bool) -> Result, Error> { + fn new_decryption_session( + &self, + session_id: SessionId, + origin: Option
, + requester: Requester, + version: Option, + is_shadow_decryption: bool, + is_broadcast_decryption: bool, + ) -> Result, Error> { let mut connected_nodes = self.data.connections.provider().connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); @@ -376,12 +441,18 @@ impl ClusterClient for ClusterClientImpl { session_id.clone(), None, false, Some(requester))?; let initialization_result = match version { - Some(version) => session.initialize(origin, version, is_shadow_decryption, is_broadcast_decryption), + Some(version) => session.session.initialize(origin, version, is_shadow_decryption, is_broadcast_decryption), None => { self.create_key_version_negotiation_session(session_id.id.clone()) .map(|version_session| { - version_session.set_continue_action(ContinueAction::Decrypt(session.clone(), origin, is_shadow_decryption, is_broadcast_decryption)); - self.data.message_processor.try_continue_session(Some(version_session)); + let continue_action = ContinueAction::Decrypt( + session.session.clone(), + origin, + is_shadow_decryption, + is_broadcast_decryption, + ); + version_session.session.set_continue_action(continue_action); + self.data.message_processor.try_continue_session(Some(version_session.session)); }) }, }; @@ -391,7 +462,13 @@ impl ClusterClient for ClusterClientImpl { session, &self.data.sessions.decryption_sessions) } - fn new_schnorr_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error> { + fn new_schnorr_signing_session( + &self, + session_id: SessionId, + requester: Requester, + version: Option, + message_hash: H256, + ) -> Result, Error> { let mut connected_nodes = self.data.connections.provider().connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); @@ -401,12 +478,13 @@ impl ClusterClient for ClusterClientImpl { let session = self.data.sessions.schnorr_signing_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, Some(requester))?; let initialization_result = match version { - Some(version) => session.initialize(version, message_hash), + Some(version) => session.session.initialize(version, message_hash), None => { self.create_key_version_negotiation_session(session_id.id.clone()) .map(|version_session| { - version_session.set_continue_action(ContinueAction::SchnorrSign(session.clone(), message_hash)); - self.data.message_processor.try_continue_session(Some(version_session)); + let continue_action = ContinueAction::SchnorrSign(session.session.clone(), message_hash); + version_session.session.set_continue_action(continue_action); + self.data.message_processor.try_continue_session(Some(version_session.session)); }) }, }; @@ -416,7 +494,13 @@ impl ClusterClient for ClusterClientImpl { session, &self.data.sessions.schnorr_signing_sessions) } - fn new_ecdsa_signing_session(&self, session_id: SessionId, requester: Requester, version: Option, message_hash: H256) -> Result, Error> { + fn new_ecdsa_signing_session( + &self, + session_id: SessionId, + requester: Requester, + version: Option, + message_hash: H256, + ) -> Result, Error> { let mut connected_nodes = self.data.connections.provider().connected_nodes()?; connected_nodes.insert(self.data.self_key_pair.public().clone()); @@ -426,12 +510,13 @@ impl ClusterClient for ClusterClientImpl { let session = self.data.sessions.ecdsa_signing_sessions.insert(cluster, self.data.self_key_pair.public().clone(), session_id.clone(), None, false, Some(requester))?; let initialization_result = match version { - Some(version) => session.initialize(version, message_hash), + Some(version) => session.session.initialize(version, message_hash), None => { self.create_key_version_negotiation_session(session_id.id.clone()) .map(|version_session| { - version_session.set_continue_action(ContinueAction::EcdsaSign(session.clone(), message_hash)); - self.data.message_processor.try_continue_session(Some(version_session)); + let continue_action = ContinueAction::EcdsaSign(session.session.clone(), message_hash); + version_session.session.set_continue_action(continue_action); + self.data.message_processor.try_continue_session(Some(version_session.session)); }) }, }; @@ -441,12 +526,21 @@ impl ClusterClient for ClusterClientImpl { session, &self.data.sessions.ecdsa_signing_sessions) } - fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> { - let session = self.create_key_version_negotiation_session(session_id)?; - Ok(session) + fn new_key_version_negotiation_session( + &self, + session_id: SessionId, + ) -> Result>, Error> { + self.create_key_version_negotiation_session(session_id) } - fn new_servers_set_change_session(&self, session_id: Option, migration_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error> { + fn new_servers_set_change_session( + &self, + session_id: Option, + migration_id: Option, + new_nodes_set: BTreeSet, + old_set_signature: Signature, + new_set_signature: Signature, + ) -> Result, Error> { new_servers_set_change_session( self.data.self_key_pair.clone(), &self.data.sessions, @@ -508,7 +602,7 @@ pub fn new_servers_set_change_session( connections: Arc, servers_set_change_creator_connector: Arc, params: ServersSetChangeParams, -) -> Result, Error> { +) -> Result, Error> { let session_id = match params.session_id { Some(session_id) if session_id == *SERVERS_SET_CHANGE_SESSION_ID => session_id, Some(_) => return Err(Error::InvalidMessage), @@ -519,11 +613,11 @@ pub fn new_servers_set_change_session( let creation_data = AdminSessionCreationData::ServersSetChange(params.migration_id, params.new_nodes_set.clone()); let session = sessions.admin_sessions .insert(cluster, *self_key_pair.public(), session_id, None, true, Some(creation_data))?; - let initialization_result = session.as_servers_set_change().expect("servers set change session is created; qed") + let initialization_result = session.session.as_servers_set_change().expect("servers set change session is created; qed") .initialize(params.new_nodes_set, params.old_set_signature, params.new_set_signature); if initialization_result.is_ok() { - servers_set_change_creator_connector.set_key_servers_set_change_session(session.clone()); + servers_set_change_creator_connector.set_key_servers_set_change_session(session.session.clone()); } process_initialization_result( @@ -531,23 +625,23 @@ pub fn new_servers_set_change_session( session, &sessions.admin_sessions) } -fn process_initialization_result( +fn process_initialization_result( result: Result<(), Error>, - session: Arc, - sessions: &ClusterSessionsContainer -) -> Result, Error> + session: WaitableSession, + sessions: &ClusterSessionsContainer +) -> Result, Error> where S: ClusterSession, - SC: ClusterSessionCreator + SC: ClusterSessionCreator { match result { - Ok(()) if session.is_finished() => { - sessions.remove(&session.id()); + Ok(()) if session.session.is_finished() => { + sessions.remove(&session.session.id()); Ok(session) }, Ok(()) => Ok(session), Err(error) => { - sessions.remove(&session.id()); + sessions.remove(&session.session.id()); Err(error) }, } @@ -558,6 +652,7 @@ pub mod tests { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::collections::{BTreeMap, BTreeSet, VecDeque}; + use futures::Future; use parking_lot::{Mutex, RwLock}; use ethereum_types::{Address, H256}; use ethkey::{Random, Generator, Public, Signature, sign}; @@ -567,7 +662,8 @@ pub mod tests { use key_server_cluster::cluster::{new_test_cluster, Cluster, ClusterCore, ClusterConfiguration, ClusterClient}; use key_server_cluster::cluster_connections::ConnectionManager; use key_server_cluster::cluster_connections::tests::{MessagesQueue, TestConnections}; - use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, AdminSession, ClusterSessionsListener}; + use key_server_cluster::cluster_sessions::{WaitableSession, ClusterSession, ClusterSessions, AdminSession, + ClusterSessionsListener}; use key_server_cluster::generation_session::{SessionImpl as GenerationSession, SessionState as GenerationSessionState}; use key_server_cluster::decryption_session::{SessionImpl as DecryptionSession}; @@ -595,17 +691,71 @@ pub mod tests { } impl ClusterClient for DummyClusterClient { - fn new_generation_session(&self, _session_id: SessionId, _origin: Option
, _author: Address, _threshold: usize) -> Result, Error> { + fn new_generation_session( + &self, + _session_id: SessionId, + _origin: Option
, + _author: Address, + _threshold: usize, + ) -> Result, Error> { self.generation_requests_count.fetch_add(1, Ordering::Relaxed); Err(Error::Internal("test-error".into())) } - fn new_encryption_session(&self, _session_id: SessionId, _requester: Requester, _common_point: Public, _encrypted_point: Public) -> Result, Error> { unimplemented!("test-only") } - fn new_decryption_session(&self, _session_id: SessionId, _origin: Option
, _requester: Requester, _version: Option, _is_shadow_decryption: bool, _is_broadcast_session: bool) -> Result, Error> { unimplemented!("test-only") } - fn new_schnorr_signing_session(&self, _session_id: SessionId, _requester: Requester, _version: Option, _message_hash: H256) -> Result, Error> { unimplemented!("test-only") } - fn new_ecdsa_signing_session(&self, _session_id: SessionId, _requester: Requester, _version: Option, _message_hash: H256) -> Result, Error> { unimplemented!("test-only") } + fn new_encryption_session( + &self, + _session_id: SessionId, + _requester: Requester, + _common_point: Public, + _encrypted_point: Public, + ) -> Result, Error> { + unimplemented!("test-only") + } + fn new_decryption_session( + &self, + _session_id: SessionId, + _origin: Option
, + _requester: Requester, + _version: Option, + _is_shadow_decryption: bool, + _is_broadcast_session: bool, + ) -> Result, Error> { + unimplemented!("test-only") + } + fn new_schnorr_signing_session( + &self, + _session_id: SessionId, + _requester: Requester, + _version: Option, + _message_hash: H256, + ) -> Result, Error> { + unimplemented!("test-only") + } + fn new_ecdsa_signing_session( + &self, + _session_id: SessionId, + _requester: Requester, + _version: Option, + _message_hash: H256, + ) -> Result, Error> { + unimplemented!("test-only") + } - fn new_key_version_negotiation_session(&self, _session_id: SessionId) -> Result>, Error> { unimplemented!("test-only") } - fn new_servers_set_change_session(&self, _session_id: Option, _migration_id: Option, _new_nodes_set: BTreeSet, _old_set_signature: Signature, _new_set_signature: Signature) -> Result, Error> { unimplemented!("test-only") } + fn new_key_version_negotiation_session( + &self, + _session_id: SessionId, + ) -> Result>, Error> { + unimplemented!("test-only") + } + fn new_servers_set_change_session( + &self, + _session_id: Option, + _migration_id: Option, + _new_nodes_set: BTreeSet, + _old_set_signature: Signature, + _new_set_signature: Signature, + ) -> Result, Error> { + unimplemented!("test-only") + } fn add_generation_listener(&self, _listener: Arc>) {} fn add_decryption_listener(&self, _listener: Arc>) {} @@ -897,7 +1047,7 @@ pub mod tests { // start && wait for generation session to fail let session = ml.cluster(0).client() - .new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); + .new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session; ml.loop_until(|| session.joint_public_and_secret().is_some() && ml.cluster(0).client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_err()); @@ -924,7 +1074,7 @@ pub mod tests { // start && wait for generation session to fail let session = ml.cluster(0).client() - .new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); + .new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session; ml.loop_until(|| session.joint_public_and_secret().is_some() && ml.cluster(0).client().generation_session(&SessionId::default()).is_none()); assert!(session.joint_public_and_secret().unwrap().is_err()); @@ -949,7 +1099,7 @@ pub mod tests { // start && wait for generation session to complete let session = ml.cluster(0).client() - .new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); + .new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session; ml.loop_until(|| (session.state() == GenerationSessionState::Finished || session.state() == GenerationSessionState::Failed) && ml.cluster(0).client().generation_session(&SessionId::default()).is_none()); @@ -1017,7 +1167,7 @@ pub mod tests { // start && wait for generation session to complete let session = ml.cluster(0).client(). - new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); + new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session; ml.loop_until(|| (session.state() == GenerationSessionState::Finished || session.state() == GenerationSessionState::Failed) && ml.cluster(0).client().generation_session(&SessionId::default()).is_none()); @@ -1035,7 +1185,7 @@ pub mod tests { ml.loop_until(|| session.is_finished() && (0..3).all(|i| ml.cluster(i).data.sessions.schnorr_signing_sessions.is_empty())); - session0.wait().unwrap(); + session0.into_wait_future().wait().unwrap(); // and try to sign message with generated key using node that has no key share let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap(); @@ -1045,7 +1195,7 @@ pub mod tests { ml.loop_until(|| session.is_finished() && (0..3).all(|i| ml.cluster(i).data.sessions.schnorr_signing_sessions.is_empty())); - session2.wait().unwrap(); + session2.into_wait_future().wait().unwrap(); // now remove share from node1 ml.cluster(1).data.config.key_storage.remove(&Default::default()).unwrap(); @@ -1057,7 +1207,7 @@ pub mod tests { let session = ml.cluster(0).data.sessions.schnorr_signing_sessions.first().unwrap(); ml.loop_until(|| session.is_finished()); - session1.wait().unwrap_err(); + session1.into_wait_future().wait().unwrap_err(); } #[test] @@ -1067,7 +1217,7 @@ pub mod tests { // start && wait for generation session to complete let session = ml.cluster(0).client() - .new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap(); + .new_generation_session(SessionId::default(), Default::default(), Default::default(), 1).unwrap().session; ml.loop_until(|| (session.state() == GenerationSessionState::Finished || session.state() == GenerationSessionState::Failed) && ml.cluster(0).client().generation_session(&SessionId::default()).is_none()); @@ -1085,7 +1235,7 @@ pub mod tests { ml.loop_until(|| session.is_finished() && (0..3).all(|i| ml.cluster(i).data.sessions.ecdsa_signing_sessions.is_empty())); - session0.wait().unwrap(); + session0.into_wait_future().wait().unwrap(); // and try to sign message with generated key using node that has no key share let signature = sign(Random.generate().unwrap().secret(), &Default::default()).unwrap(); @@ -1094,7 +1244,7 @@ pub mod tests { let session = ml.cluster(2).data.sessions.ecdsa_signing_sessions.first().unwrap(); ml.loop_until(|| session.is_finished() && (0..3).all(|i| ml.cluster(i).data.sessions.ecdsa_signing_sessions.is_empty())); - session2.wait().unwrap(); + session2.into_wait_future().wait().unwrap(); // now remove share from node1 ml.cluster(1).data.config.key_storage.remove(&Default::default()).unwrap(); @@ -1105,6 +1255,6 @@ pub mod tests { .new_ecdsa_signing_session(Default::default(), signature.into(), None, H256::random()).unwrap(); let session = ml.cluster(0).data.sessions.ecdsa_signing_sessions.first().unwrap(); ml.loop_until(|| session.is_finished()); - session1.wait().unwrap_err(); + session1.into_wait_future().wait().unwrap_err(); } } diff --git a/secret-store/src/key_server_cluster/cluster_message_processor.rs b/secret-store/src/key_server_cluster/cluster_message_processor.rs index b4ba5ef03b2..0624d50b13b 100644 --- a/secret-store/src/key_server_cluster/cluster_message_processor.rs +++ b/secret-store/src/key_server_cluster/cluster_message_processor.rs @@ -72,9 +72,9 @@ impl SessionsMessageProcessor { } /// Process single session message from connection. - fn process_message, D>( + fn process_message>( &self, - sessions: &ClusterSessionsContainer, + sessions: &ClusterSessionsContainer, connection: Arc, mut message: Message, ) -> Option> @@ -151,9 +151,9 @@ impl SessionsMessageProcessor { } /// Get or insert new session. - fn prepare_session, D>( + fn prepare_session>( &self, - sessions: &ClusterSessionsContainer, + sessions: &ClusterSessionsContainer, sender: &NodeId, message: &Message ) -> Result, Error> @@ -192,7 +192,7 @@ impl SessionsMessageProcessor { let nonce = Some(message.session_nonce().ok_or(Error::InvalidMessage)?); let exclusive = message.is_exclusive_session_message(); - sessions.insert(cluster, master, session_id, nonce, exclusive, creation_data) + sessions.insert(cluster, master, session_id, nonce, exclusive, creation_data).map(|s| s.session) }, } } @@ -273,8 +273,8 @@ impl MessageProcessor for SessionsMessageProcessor { let is_master_node = meta.self_node_id == meta.master_node_id; if is_master_node && session.is_finished() { self.sessions.negotiation_sessions.remove(&session.id()); - match session.wait() { - Ok(Some((version, master))) => match session.take_continue_action() { + match session.result() { + Some(Ok(Some((version, master)))) => match session.take_continue_action() { Some(ContinueAction::Decrypt( session, origin, is_shadow_decryption, is_broadcast_decryption )) => { @@ -317,10 +317,7 @@ impl MessageProcessor for SessionsMessageProcessor { }, None => (), }, - Ok(None) => unreachable!("is_master_node; session is finished; - negotiation version always finished with result on master; - qed"), - Err(error) => match session.take_continue_action() { + Some(Err(error)) => match session.take_continue_action() { Some(ContinueAction::Decrypt(session, _, _, _)) => { session.on_session_error(&meta.self_node_id, error); self.sessions.decryption_sessions.remove(&session.id()); @@ -335,6 +332,9 @@ impl MessageProcessor for SessionsMessageProcessor { }, None => (), }, + None | Some(Ok(None)) => unreachable!("is_master_node; session is finished; + negotiation version always finished with result on master; + qed"), } } } @@ -352,6 +352,6 @@ impl MessageProcessor for SessionsMessageProcessor { self.connections.clone(), self.servers_set_change_creator_connector.clone(), params, - ) + ).map(|s| s.session) } } diff --git a/secret-store/src/key_server_cluster/cluster_sessions.rs b/secret-store/src/key_server_cluster/cluster_sessions.rs index 53eec133467..888499202db 100644 --- a/secret-store/src/key_server_cluster/cluster_sessions.rs +++ b/secret-store/src/key_server_cluster/cluster_sessions.rs @@ -18,10 +18,11 @@ use std::time::{Duration, Instant}; use std::sync::{Arc, Weak}; use std::sync::atomic::AtomicBool; use std::collections::{VecDeque, BTreeMap, BTreeSet}; +use futures::{oneshot, Oneshot, Complete, Future}; use parking_lot::{Mutex, RwLock, Condvar}; use ethereum_types::H256; use ethkey::Secret; -use key_server_cluster::{Error, NodeId, SessionId, Requester, NodeKeyPair}; +use key_server_cluster::{Error, NodeId, SessionId, NodeKeyPair}; use key_server_cluster::cluster::{Cluster, ClusterConfiguration, ClusterView}; use key_server_cluster::cluster_connections::ConnectionProvider; use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector; @@ -68,6 +69,10 @@ pub struct SessionIdWithSubSession { pub trait ClusterSession { /// Session identifier type. type Id: ::std::fmt::Debug + Ord + Clone; + /// Session creation data type. + type CreationData; + /// Session (successful) result type. + type SuccessfulResult: Send + 'static; /// Session type name. fn type_name() -> &'static str; @@ -85,15 +90,22 @@ pub trait ClusterSession { fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error>; /// 'Wait for session completion' helper. - fn wait_session Option>>(completion_event: &Condvar, session_data: &Mutex, timeout: Option, result_reader: F) -> Option> { + #[cfg(test)] + fn wait_session Option>>( + completion: &CompletionSignal, + session_data: &Mutex, + timeout: Option, + result_reader: F + ) -> Option> { let mut locked_data = session_data.lock(); match result_reader(&locked_data) { Some(result) => Some(result), None => { + let completion_condvar = completion.completion_condvar.as_ref().expect("created in test mode"); match timeout { - None => completion_event.wait(&mut locked_data), + None => completion_condvar.wait(&mut locked_data), Some(timeout) => { - completion_event.wait_for(&mut locked_data, timeout); + completion_condvar.wait_for(&mut locked_data, timeout); }, } @@ -103,6 +115,23 @@ pub trait ClusterSession { } } +/// Waitable cluster session. +pub struct WaitableSession { + /// Session handle. + pub session: Arc, + /// Session result oneshot. + pub oneshot: Oneshot>, +} + +/// Session completion signal. +pub struct CompletionSignal { + /// Completion future. + pub completion_future: Mutex>>>, + + /// Completion condvar. + pub completion_condvar: Option, +} + /// Administrative session. pub enum AdminSession { /// Share add session. @@ -122,19 +151,22 @@ pub enum AdminSessionCreationData { /// Active sessions on this cluster. pub struct ClusterSessions { /// Key generation sessions. - pub generation_sessions: ClusterSessionsContainer, + pub generation_sessions: ClusterSessionsContainer, /// Encryption sessions. - pub encryption_sessions: ClusterSessionsContainer, + pub encryption_sessions: ClusterSessionsContainer, /// Decryption sessions. - pub decryption_sessions: ClusterSessionsContainer, + pub decryption_sessions: ClusterSessionsContainer, /// Schnorr signing sessions. - pub schnorr_signing_sessions: ClusterSessionsContainer, + pub schnorr_signing_sessions: ClusterSessionsContainer, /// ECDSA signing sessions. - pub ecdsa_signing_sessions: ClusterSessionsContainer, + pub ecdsa_signing_sessions: ClusterSessionsContainer, /// Key version negotiation sessions. - pub negotiation_sessions: ClusterSessionsContainer, KeyVersionNegotiationSessionCreator, ()>, + pub negotiation_sessions: ClusterSessionsContainer< + KeyVersionNegotiationSessionImpl, + KeyVersionNegotiationSessionCreator + >, /// Administrative sessions. - pub admin_sessions: ClusterSessionsContainer, + pub admin_sessions: ClusterSessionsContainer, /// Self node id. self_node_id: NodeId, /// Creator core. @@ -150,7 +182,7 @@ pub trait ClusterSessionsListener: Send + Sync { } /// Active sessions container. -pub struct ClusterSessionsContainer, D> { +pub struct ClusterSessionsContainer> { /// Sessions creator. pub creator: SC, /// Active sessions. @@ -161,8 +193,6 @@ pub struct ClusterSessionsContainer>, /// Do not actually remove sessions. preserve_sessions: bool, - /// Phantom data. - _pd: ::std::marker::PhantomData, } /// Session and its message queue. @@ -279,7 +309,7 @@ impl ClusterSessions { } } -impl ClusterSessionsContainer where S: ClusterSession, SC: ClusterSessionCreator { +impl ClusterSessionsContainer where S: ClusterSession, SC: ClusterSessionCreator { pub fn new(creator: SC, container_state: Arc>) -> Self { ClusterSessionsContainer { creator: creator, @@ -287,7 +317,6 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C listeners: Mutex::new(Vec::new()), container_state: container_state, preserve_sessions: false, - _pd: Default::default(), } } @@ -316,7 +345,15 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C self.sessions.read().values().nth(0).map(|s| s.session.clone()) } - pub fn insert(&self, cluster: Arc, master: NodeId, session_id: S::Id, session_nonce: Option, is_exclusive_session: bool, creation_data: Option) -> Result, Error> { + pub fn insert( + &self, + cluster: Arc, + master: NodeId, + session_id: S::Id, + session_nonce: Option, + is_exclusive_session: bool, + creation_data: Option, + ) -> Result, Error> { let mut sessions = self.sessions.write(); if sessions.contains_key(&session_id) { return Err(Error::DuplicateSessionId); @@ -335,11 +372,11 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C cluster_view: cluster, last_keep_alive_time: Instant::now(), last_message_time: Instant::now(), - session: session.clone(), + session: session.session.clone(), queue: VecDeque::new(), }; sessions.insert(session_id, queued_session); - self.notify_listeners(|l| l.on_session_inserted(session.clone())); + self.notify_listeners(|l| l.on_session_inserted(session.session.clone())); Ok(session) } @@ -419,7 +456,12 @@ impl ClusterSessionsContainer where S: ClusterSession, SC: C } } -impl ClusterSessionsContainer where S: ClusterSession, SC: ClusterSessionCreator, SessionId: From { +impl ClusterSessionsContainer + where + S: ClusterSession, + SC: ClusterSessionCreator, + SessionId: From, +{ pub fn send_keep_alive(&self, session_id: &S::Id, self_node_id: &NodeId) { if let Some(session) = self.sessions.write().get_mut(session_id) { let now = Instant::now(); @@ -521,6 +563,8 @@ impl AdminSession { impl ClusterSession for AdminSession { type Id = SessionId; + type CreationData = AdminSessionCreationData; + type SuccessfulResult = (); fn type_name() -> &'static str { "admin" @@ -569,6 +613,40 @@ impl ClusterSession for AdminSession { } } +impl WaitableSession { + pub fn new(session: S, oneshot: Oneshot>) -> Self { + WaitableSession { + session: Arc::new(session), + oneshot, + } + } + + pub fn into_wait_future(self) -> Box + Send> { + Box::new(self.oneshot + .map_err(|e| Error::Internal(e.to_string())) + .and_then(|res| res)) + } +} + +impl CompletionSignal { + pub fn new() -> (Self, Oneshot>) { + let (complete, oneshot) = oneshot(); + let completion_condvar = if cfg!(test) { Some(Condvar::new()) } else { None }; + (CompletionSignal { + completion_future: Mutex::new(Some(complete)), + completion_condvar, + }, oneshot) + } + + pub fn send(&self, result: Result) { + let completion_future = ::std::mem::replace(&mut *self.completion_future.lock(), None); + completion_future.map(|c| c.send(result)); + if let Some(ref completion_condvar) = self.completion_condvar { + completion_condvar.notify_all(); + } + } +} + pub fn create_cluster_view(self_key_pair: Arc, connections: Arc, requires_all_connections: bool) -> Result, Error> { let mut connected_nodes = connections.connected_nodes()?; let disconnected_nodes = connections.disconnected_nodes(); diff --git a/secret-store/src/key_server_cluster/cluster_sessions_creator.rs b/secret-store/src/key_server_cluster/cluster_sessions_creator.rs index 23a3657c42b..20d007d6f4b 100644 --- a/secret-store/src/key_server_cluster/cluster_sessions_creator.rs +++ b/secret-store/src/key_server_cluster/cluster_sessions_creator.rs @@ -22,7 +22,8 @@ use ethkey::Public; use key_server_cluster::{Error, NodeId, SessionId, Requester, AclStorage, KeyStorage, DocumentKeyShare, SessionMeta}; use key_server_cluster::cluster::{Cluster, ClusterConfiguration}; use key_server_cluster::connection_trigger::ServersSetChangeSessionCreatorConnector; -use key_server_cluster::cluster_sessions::{ClusterSession, SessionIdWithSubSession, AdminSession, AdminSessionCreationData}; +use key_server_cluster::cluster_sessions::{WaitableSession, ClusterSession, SessionIdWithSubSession, + AdminSession, AdminSessionCreationData}; use key_server_cluster::message::{self, Message, DecryptionMessage, SchnorrSigningMessage, ConsensusMessageOfShareAdd, ShareAddMessage, ServersSetChangeMessage, ConsensusMessage, ConsensusMessageWithServersSet, EcdsaSigningMessage}; use key_server_cluster::generation_session::{SessionImpl as GenerationSessionImpl, SessionParams as GenerationSessionParams}; @@ -43,9 +44,9 @@ use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVers use key_server_cluster::admin_sessions::ShareChangeSessionMeta; /// Generic cluster session creator. -pub trait ClusterSessionCreator { +pub trait ClusterSessionCreator { /// Get creation data from message. - fn creation_data_from_message(_message: &Message) -> Result, Error> { + fn creation_data_from_message(_message: &Message) -> Result, Error> { Ok(None) } @@ -53,7 +54,14 @@ pub trait ClusterSessionCreator { fn make_error_message(sid: S::Id, nonce: u64, err: Error) -> Message; /// Create cluster session. - fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: S::Id, creation_data: Option) -> Result, Error>; + fn create( + &self, + cluster: Arc, + master: NodeId, + nonce: Option, + id: S::Id, + creation_data: Option, + ) -> Result, Error>; } /// Message with session id. @@ -134,7 +142,7 @@ impl GenerationSessionCreator { } } -impl ClusterSessionCreator for GenerationSessionCreator { +impl ClusterSessionCreator for GenerationSessionCreator { fn make_error_message(sid: SessionId, nonce: u64, err: Error) -> Message { message::Message::Generation(message::GenerationMessage::SessionError(message::SessionError { session: sid.into(), @@ -143,27 +151,33 @@ impl ClusterSessionCreator for GenerationSessionCreat })) } - fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: SessionId, _creation_data: Option<()>) -> Result, Error> { + fn create( + &self, + cluster: Arc, + master: NodeId, + nonce: Option, + id: SessionId, + _creation_data: Option<()>, + ) -> Result, Error> { // check that there's no finished encryption session with the same id if self.core.key_storage.contains(&id) { return Err(Error::ServerKeyAlreadyGenerated); } let nonce = self.core.check_session_nonce(&master, nonce)?; - Ok(GenerationSessionImpl::new(GenerationSessionParams { + let (session, oneshot) = GenerationSessionImpl::new(GenerationSessionParams { id: id.clone(), self_node_id: self.core.self_node_id.clone(), key_storage: Some(self.core.key_storage.clone()), cluster: cluster, nonce: Some(nonce), - })) - .map(|session| { - if self.make_faulty_generation_sessions.load(Ordering::Relaxed) { - session.simulate_faulty_behaviour(); - } - session - }) - .map(Arc::new) + }); + + if self.make_faulty_generation_sessions.load(Ordering::Relaxed) { + session.simulate_faulty_behaviour(); + } + + Ok(WaitableSession::new(session, oneshot)) } } @@ -173,7 +187,7 @@ pub struct EncryptionSessionCreator { pub core: Arc, } -impl ClusterSessionCreator for EncryptionSessionCreator { +impl ClusterSessionCreator for EncryptionSessionCreator { fn make_error_message(sid: SessionId, nonce: u64, err: Error) -> Message { message::Message::Encryption(message::EncryptionMessage::EncryptionSessionError(message::EncryptionSessionError { session: sid.into(), @@ -182,17 +196,26 @@ impl ClusterSessionCreator for EncryptionSessionCreat })) } - fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: SessionId, _creation_data: Option<()>) -> Result, Error> { + fn create( + &self, + cluster: Arc, + master: NodeId, + nonce: Option, + id: SessionId, + _creation_data: Option<()>, + ) -> Result, Error> { let encrypted_data = self.core.read_key_share(&id)?; let nonce = self.core.check_session_nonce(&master, nonce)?; - Ok(Arc::new(EncryptionSessionImpl::new(EncryptionSessionParams { + let (session, oneshot) = EncryptionSessionImpl::new(EncryptionSessionParams { id: id, self_node_id: self.core.self_node_id.clone(), encrypted_data: encrypted_data, key_storage: self.core.key_storage.clone(), cluster: cluster, nonce: nonce, - })?)) + })?; + + Ok(WaitableSession::new(session, oneshot)) } } @@ -202,7 +225,7 @@ pub struct DecryptionSessionCreator { pub core: Arc, } -impl ClusterSessionCreator for DecryptionSessionCreator { +impl ClusterSessionCreator for DecryptionSessionCreator { fn creation_data_from_message(message: &Message) -> Result, Error> { match *message { Message::Decryption(DecryptionMessage::DecryptionConsensusMessage(ref message)) => match &message.message { @@ -223,10 +246,17 @@ impl ClusterSessionCreator for DecryptionSessi })) } - fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: SessionIdWithSubSession, requester: Option) -> Result, Error> { + fn create( + &self, + cluster: Arc, + master: NodeId, + nonce: Option, + id: SessionIdWithSubSession, + requester: Option, + ) -> Result, Error> { let encrypted_data = self.core.read_key_share(&id.id)?; let nonce = self.core.check_session_nonce(&master, nonce)?; - Ok(Arc::new(DecryptionSessionImpl::new(DecryptionSessionParams { + let (session, oneshot) = DecryptionSessionImpl::new(DecryptionSessionParams { meta: SessionMeta { id: id.id, self_node_id: self.core.self_node_id.clone(), @@ -240,7 +270,9 @@ impl ClusterSessionCreator for DecryptionSessi acl_storage: self.core.acl_storage.clone(), cluster: cluster, nonce: nonce, - }, requester)?)) + }, requester)?; + + Ok(WaitableSession::new(session, oneshot)) } } @@ -250,7 +282,7 @@ pub struct SchnorrSigningSessionCreator { pub core: Arc, } -impl ClusterSessionCreator for SchnorrSigningSessionCreator { +impl ClusterSessionCreator for SchnorrSigningSessionCreator { fn creation_data_from_message(message: &Message) -> Result, Error> { match *message { Message::SchnorrSigning(SchnorrSigningMessage::SchnorrSigningConsensusMessage(ref message)) => match &message.message { @@ -271,10 +303,17 @@ impl ClusterSessionCreator for SchnorrSign })) } - fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: SessionIdWithSubSession, requester: Option) -> Result, Error> { + fn create( + &self, + cluster: Arc, + master: NodeId, + nonce: Option, + id: SessionIdWithSubSession, + requester: Option, + ) -> Result, Error> { let encrypted_data = self.core.read_key_share(&id.id)?; let nonce = self.core.check_session_nonce(&master, nonce)?; - Ok(Arc::new(SchnorrSigningSessionImpl::new(SchnorrSigningSessionParams { + let (session, oneshot) = SchnorrSigningSessionImpl::new(SchnorrSigningSessionParams { meta: SessionMeta { id: id.id, self_node_id: self.core.self_node_id.clone(), @@ -288,7 +327,8 @@ impl ClusterSessionCreator for SchnorrSign acl_storage: self.core.acl_storage.clone(), cluster: cluster, nonce: nonce, - }, requester)?)) + }, requester)?; + Ok(WaitableSession::new(session, oneshot)) } } @@ -298,7 +338,7 @@ pub struct EcdsaSigningSessionCreator { pub core: Arc, } -impl ClusterSessionCreator for EcdsaSigningSessionCreator { +impl ClusterSessionCreator for EcdsaSigningSessionCreator { fn creation_data_from_message(message: &Message) -> Result, Error> { match *message { Message::EcdsaSigning(EcdsaSigningMessage::EcdsaSigningConsensusMessage(ref message)) => match &message.message { @@ -319,10 +359,10 @@ impl ClusterSessionCreator for EcdsaSigningS })) } - fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: SessionIdWithSubSession, requester: Option) -> Result, Error> { + fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: SessionIdWithSubSession, requester: Option) -> Result, Error> { let encrypted_data = self.core.read_key_share(&id.id)?; let nonce = self.core.check_session_nonce(&master, nonce)?; - Ok(Arc::new(EcdsaSigningSessionImpl::new(EcdsaSigningSessionParams { + let (session, oneshot) = EcdsaSigningSessionImpl::new(EcdsaSigningSessionParams { meta: SessionMeta { id: id.id, self_node_id: self.core.self_node_id.clone(), @@ -336,7 +376,9 @@ impl ClusterSessionCreator for EcdsaSigningS acl_storage: self.core.acl_storage.clone(), cluster: cluster, nonce: nonce, - }, requester)?)) + }, requester)?; + + Ok(WaitableSession::new(session, oneshot)) } } @@ -346,7 +388,7 @@ pub struct KeyVersionNegotiationSessionCreator { pub core: Arc, } -impl ClusterSessionCreator, ()> for KeyVersionNegotiationSessionCreator { +impl ClusterSessionCreator> for KeyVersionNegotiationSessionCreator { fn make_error_message(sid: SessionIdWithSubSession, nonce: u64, err: Error) -> Message { message::Message::KeyVersionNegotiation(message::KeyVersionNegotiationMessage::KeyVersionsError(message::KeyVersionsError { session: sid.id.into(), @@ -359,14 +401,21 @@ impl ClusterSessionCreator, master: NodeId, nonce: Option, id: SessionIdWithSubSession, _creation_data: Option<()>) -> Result>, Error> { + fn create( + &self, + cluster: Arc, + master: NodeId, + nonce: Option, + id: SessionIdWithSubSession, + _creation_data: Option<()>, + ) -> Result>, Error> { let configured_nodes_count = cluster.configured_nodes_count(); let connected_nodes_count = cluster.connected_nodes_count(); let encrypted_data = self.core.read_key_share(&id.id)?; let nonce = self.core.check_session_nonce(&master, nonce)?; let computer = Arc::new(FastestResultKeyVersionsResultComputer::new(self.core.self_node_id.clone(), encrypted_data.as_ref(), configured_nodes_count, configured_nodes_count)); - Ok(Arc::new(KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams { + let (session, oneshot) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams { meta: ShareChangeSessionMeta { id: id.id.clone(), self_node_id: self.core.self_node_id.clone(), @@ -384,7 +433,8 @@ impl ClusterSessionCreator, } -impl ClusterSessionCreator for AdminSessionCreator { +impl ClusterSessionCreator for AdminSessionCreator { fn creation_data_from_message(message: &Message) -> Result, Error> { match *message { Message::ServersSetChange(ServersSetChangeMessage::ServersSetChangeConsensusMessage(ref message)) => match &message.message { @@ -424,11 +474,18 @@ impl ClusterSessionCreator for AdminSess })) } - fn create(&self, cluster: Arc, master: NodeId, nonce: Option, id: SessionId, creation_data: Option) -> Result, Error> { + fn create( + &self, + cluster: Arc, + master: NodeId, + nonce: Option, + id: SessionId, + creation_data: Option, + ) -> Result, Error> { let nonce = self.core.check_session_nonce(&master, nonce)?; - Ok(Arc::new(match creation_data { + match creation_data { Some(AdminSessionCreationData::ShareAdd(version)) => { - AdminSession::ShareAdd(ShareAddSessionImpl::new(ShareAddSessionParams { + let (session, oneshot) = ShareAddSessionImpl::new(ShareAddSessionParams { meta: ShareChangeSessionMeta { id: id.clone(), self_node_id: self.core.self_node_id.clone(), @@ -440,13 +497,14 @@ impl ClusterSessionCreator for AdminSess key_storage: self.core.key_storage.clone(), nonce: nonce, admin_public: Some(self.admin_public.clone().ok_or(Error::AccessDenied)?), - })?) + })?; + Ok(WaitableSession::new(AdminSession::ShareAdd(session), oneshot)) }, Some(AdminSessionCreationData::ServersSetChange(migration_id, new_nodes_set)) => { let admin_public = self.servers_set_change_session_creator_connector.admin_public(migration_id.as_ref(), new_nodes_set) .map_err(|_| Error::AccessDenied)?; - AdminSession::ServersSetChange(ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams { + let (session, oneshot) = ServersSetChangeSessionImpl::new(ServersSetChangeSessionParams { meta: ShareChangeSessionMeta { id: id.clone(), self_node_id: self.core.self_node_id.clone(), @@ -460,10 +518,11 @@ impl ClusterSessionCreator for AdminSess all_nodes_set: cluster.nodes(), admin_public: admin_public, migration_id: migration_id, - })?) + })?; + Ok(WaitableSession::new(AdminSession::ServersSetChange(session), oneshot)) }, None => unreachable!("expected to call with non-empty creation data; qed"), - })) + } } } diff --git a/secret-store/src/key_server_cluster/connection_trigger_with_migration.rs b/secret-store/src/key_server_cluster/connection_trigger_with_migration.rs index ba11ea60c85..b4dcfad634c 100644 --- a/secret-store/src/key_server_cluster/connection_trigger_with_migration.rs +++ b/secret-store/src/key_server_cluster/connection_trigger_with_migration.rs @@ -324,9 +324,10 @@ fn session_state(session: Option>) -> SessionState { session .and_then(|s| match s.as_servers_set_change() { Some(s) if !s.is_finished() => Some(SessionState::Active(s.migration_id().cloned())), - Some(s) => match s.wait() { - Ok(_) => Some(SessionState::Finished(s.migration_id().cloned())), - Err(_) => Some(SessionState::Failed(s.migration_id().cloned())), + Some(s) => match s.result() { + Some(Ok(_)) => Some(SessionState::Finished(s.migration_id().cloned())), + Some(Err(_)) => Some(SessionState::Failed(s.migration_id().cloned())), + None => unreachable!("s.is_finished() == true; when session is finished, result is available; qed"), }, None => None, }) diff --git a/secret-store/src/key_server_cluster/mod.rs b/secret-store/src/key_server_cluster/mod.rs index fc46e10318d..c1c91ef8a53 100644 --- a/secret-store/src/key_server_cluster/mod.rs +++ b/secret-store/src/key_server_cluster/mod.rs @@ -25,7 +25,7 @@ pub use super::serialization::{SerializableSignature, SerializableH256, Serializ SerializableRequester, SerializableMessageHash, SerializableAddress}; pub use self::cluster::{new_network_cluster, ClusterCore, ClusterConfiguration, ClusterClient}; pub use self::cluster_connections_net::NetConnectionsManagerConfig; -pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener}; +pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener, WaitableSession}; #[cfg(test)] pub use self::cluster::tests::DummyClusterClient; diff --git a/secret-store/src/listener/http_listener.rs b/secret-store/src/listener/http_listener.rs index 7c5113204c4..5b037add7f9 100644 --- a/secret-store/src/listener/http_listener.rs +++ b/secret-store/src/listener/http_listener.rs @@ -16,6 +16,7 @@ use std::collections::BTreeSet; use std::sync::{Arc, Weak}; +use futures::future::{ok, result}; use hyper::{self, Uri, Request as HttpRequest, Response as HttpResponse, Method as HttpMethod, StatusCode as HttpStatusCode, Body, header::{self, HeaderValue}, @@ -129,95 +130,86 @@ impl KeyServerHttpListener { } impl KeyServerHttpHandler { - fn process(self, req_method: HttpMethod, req_uri: Uri, path: &str, req_body: &[u8], cors: AllowCors) -> HttpResponse { + fn key_server(&self) -> Result, Error> { + self.handler.key_server.upgrade() + .ok_or_else(|| Error::Internal("KeyServer is already destroyed".into())) + } + + fn process( + self, + req_method: HttpMethod, + req_uri: Uri, + path: &str, + req_body: &[u8], + cors: AllowCors, + ) -> Box, Error=hyper::Error> + Send> { match parse_request(&req_method, &path, &req_body) { - Request::GenerateServerKey(document, signature, threshold) => { - return_server_public_key(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.generate_key(&document, &signature.into(), threshold)) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "GenerateServerKey request {} has failed with: {}", req_uri, err); - err - })) - }, - Request::StoreDocumentKey(document, signature, common_point, encrypted_document_key) => { - return_empty(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.store_document_key(&document, &signature.into(), common_point, encrypted_document_key)) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "StoreDocumentKey request {} has failed with: {}", req_uri, err); - err - })) - }, - Request::GenerateDocumentKey(document, signature, threshold) => { - return_document_key(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.generate_document_key(&document, &signature.into(), threshold)) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "GenerateDocumentKey request {} has failed with: {}", req_uri, err); - err - })) - }, - Request::GetServerKey(document, signature) => { - return_server_public_key(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.restore_key_public(&document, &signature.into())) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "GetServerKey request {} has failed with: {}", req_uri, err); - err - })) - }, - Request::GetDocumentKey(document, signature) => { - return_document_key(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.restore_document_key(&document, &signature.into())) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "GetDocumentKey request {} has failed with: {}", req_uri, err); - err - })) - }, - Request::GetDocumentKeyShadow(document, signature) => { - return_document_key_shadow(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.restore_document_key_shadow(&document, &signature.into())) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "GetDocumentKeyShadow request {} has failed with: {}", req_uri, err); - err - })) - }, - Request::SchnorrSignMessage(document, signature, message_hash) => { - return_message_signature(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.sign_message_schnorr(&document, &signature.into(), message_hash)) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "SchnorrSignMessage request {} has failed with: {}", req_uri, err); - err - })) - }, - Request::EcdsaSignMessage(document, signature, message_hash) => { - return_message_signature(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.sign_message_ecdsa(&document, &signature.into(), message_hash)) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "EcdsaSignMessage request {} has failed with: {}", req_uri, err); - err - })) - }, - Request::ChangeServersSet(old_set_signature, new_set_signature, new_servers_set) => { - return_empty(&req_uri, cors, self.handler.key_server.upgrade() - .map(|key_server| key_server.change_servers_set(old_set_signature, new_set_signature, new_servers_set)) - .unwrap_or(Err(Error::Internal("KeyServer is already destroyed".into()))) - .map_err(|err| { - warn!(target: "secretstore", "ChangeServersSet request {} has failed with: {}", req_uri, err); - err - })) - }, + Request::GenerateServerKey(document, signature, threshold) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.generate_key(document, signature.into(), threshold)) + .then(move |result| ok(return_server_public_key("GenerateServerKey", &req_uri, cors, result)))), + Request::StoreDocumentKey(document, signature, common_point, encrypted_document_key) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.store_document_key( + document, + signature.into(), + common_point, + encrypted_document_key, + )) + .then(move |result| ok(return_empty("StoreDocumentKey", &req_uri, cors, result)))), + Request::GenerateDocumentKey(document, signature, threshold) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.generate_document_key( + document, + signature.into(), + threshold, + )) + .then(move |result| ok(return_document_key("GenerateDocumentKey", &req_uri, cors, result)))), + Request::GetServerKey(document, signature) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.restore_key_public( + document, + signature.into(), + )) + .then(move |result| ok(return_server_public_key("GetServerKey", &req_uri, cors, result)))), + Request::GetDocumentKey(document, signature) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.restore_document_key(document, signature.into())) + .then(move |result| ok(return_document_key("GetDocumentKey", &req_uri, cors, result)))), + Request::GetDocumentKeyShadow(document, signature) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.restore_document_key_shadow(document, signature.into())) + .then(move |result| ok(return_document_key_shadow("GetDocumentKeyShadow", &req_uri, cors, result)))), + Request::SchnorrSignMessage(document, signature, message_hash) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.sign_message_schnorr( + document, + signature.into(), + message_hash, + )) + .then(move |result| ok(return_message_signature("SchnorrSignMessage", &req_uri, cors, result)))), + Request::EcdsaSignMessage(document, signature, message_hash) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.sign_message_ecdsa( + document, + signature.into(), + message_hash, + )) + .then(move |result| ok(return_message_signature("EcdsaSignMessage", &req_uri, cors, result)))), + Request::ChangeServersSet(old_set_signature, new_set_signature, new_servers_set) => + Box::new(result(self.key_server()) + .and_then(move |key_server| key_server.change_servers_set( + old_set_signature, + new_set_signature, + new_servers_set, + )) + .then(move |result| ok(return_empty("ChangeServersSet", &req_uri, cors, result)))), Request::Invalid => { warn!(target: "secretstore", "Ignoring invalid {}-request {}", req_method, req_uri); - HttpResponse::builder() + Box::new(ok(HttpResponse::builder() .status(HttpStatusCode::BAD_REQUEST) .body(Body::empty()) - .expect("Nothing to parse, cannot fail; qed") + .expect("Nothing to parse, cannot fail; qed"))) }, } } @@ -239,61 +231,74 @@ impl Service for KeyServerHttpHandler { AllowCors::Invalid => { warn!(target: "secretstore", "Ignoring {}-request {} with unauthorized Origin header", req.method(), req.uri()); Box::new(future::ok(HttpResponse::builder() - .status(HttpStatusCode::NOT_FOUND) - .body(Body::empty()) - .expect("Nothing to parse, cannot fail; qed") - )) + .status(HttpStatusCode::NOT_FOUND) + .body(Body::empty()) + .expect("Nothing to parse, cannot fail; qed"))) }, _ => { let req_method = req.method().clone(); let req_uri = req.uri().clone(); + let path = req_uri.path().to_string(); // We cannot consume Self because of the Service trait requirement. let this = self.clone(); - Box::new(req.into_body().concat2().map(move |body| { - let path = req_uri.path().to_string(); - if path.starts_with("/") { - this.process(req_method, req_uri, &path, &body, cors) - } else { - warn!(target: "secretstore", "Ignoring invalid {}-request {}", req_method, req_uri); - HttpResponse::builder() - .status(HttpStatusCode::NOT_FOUND) - .body(Body::empty()) - .expect("Nothing to parse, cannot fail; qed") - } - })) + Box::new(req.into_body().concat2() + .and_then(move |body| this.process(req_method, req_uri, &path, &body, cors))) } } } } -fn return_empty(req_uri: &Uri, cors: AllowCors, empty: Result<(), Error>) -> HttpResponse { - return_bytes::(req_uri, cors, empty.map(|_| None)) +fn return_empty(req_type: &str, req_uri: &Uri, cors: AllowCors, empty: Result<(), Error>) -> HttpResponse { + return_bytes::(req_type, req_uri, cors, empty.map(|_| None)) } -fn return_server_public_key(req_uri: &Uri, cors: AllowCors, server_public: Result) -> HttpResponse { - return_bytes(req_uri, cors, server_public.map(|k| Some(SerializablePublic(k)))) +fn return_server_public_key( + req_type: &str, + req_uri: &Uri, + cors: AllowCors, + server_public: Result, +) -> HttpResponse { + return_bytes(req_type, req_uri, cors, server_public.map(|k| Some(SerializablePublic(k)))) } -fn return_message_signature(req_uri: &Uri, cors: AllowCors, signature: Result) -> HttpResponse { - return_bytes(req_uri, cors, signature.map(|s| Some(SerializableBytes(s)))) +fn return_message_signature( + req_type: &str, + req_uri: &Uri, + cors: AllowCors, + signature: Result, +) -> HttpResponse { + return_bytes(req_type, req_uri, cors, signature.map(|s| Some(SerializableBytes(s)))) } -fn return_document_key(req_uri: &Uri, cors: AllowCors, document_key: Result) -> HttpResponse { - return_bytes(req_uri, cors, document_key.map(|k| Some(SerializableBytes(k)))) +fn return_document_key( + req_type: &str, + req_uri: &Uri, + cors: AllowCors, + document_key: Result, +) -> HttpResponse { + return_bytes(req_type, req_uri, cors, document_key.map(|k| Some(SerializableBytes(k)))) } -fn return_document_key_shadow(req_uri: &Uri, cors: AllowCors, document_key_shadow: Result) - -> HttpResponse -{ - return_bytes(req_uri, cors, document_key_shadow.map(|k| Some(SerializableEncryptedDocumentKeyShadow { +fn return_document_key_shadow( + req_type: &str, + req_uri: &Uri, + cors: AllowCors, + document_key_shadow: Result, +) -> HttpResponse { + return_bytes(req_type, req_uri, cors, document_key_shadow.map(|k| Some(SerializableEncryptedDocumentKeyShadow { decrypted_secret: k.decrypted_secret.into(), common_point: k.common_point.expect("always filled when requesting document_key_shadow; qed").into(), decrypt_shadows: k.decrypt_shadows.expect("always filled when requesting document_key_shadow; qed").into_iter().map(Into::into).collect() }))) } -fn return_bytes(req_uri: &Uri, cors: AllowCors, result: Result, Error>) -> HttpResponse { +fn return_bytes( + req_type: &str, + req_uri: &Uri, + cors: AllowCors, + result: Result, Error>, +) -> HttpResponse { match result { Ok(Some(result)) => match serde_json::to_vec(&result) { Ok(result) => { @@ -321,7 +326,10 @@ fn return_bytes(req_uri: &Uri, cors: AllowCors return_error(err), + Err(err) => { + warn!(target: "secretstore", "{} request {} has failed with: {}", req_type, req_uri, err); + return_error(err) + }, } } diff --git a/secret-store/src/listener/mod.rs b/secret-store/src/listener/mod.rs index 0fde173c88e..f260c28ed80 100644 --- a/secret-store/src/listener/mod.rs +++ b/secret-store/src/listener/mod.rs @@ -22,6 +22,7 @@ mod tasks_queue; use std::collections::BTreeSet; use std::sync::Arc; +use futures::Future; use traits::{ServerKeyGenerator, DocumentKeyServer, MessageSigner, AdminSessionsServer, KeyServer}; use types::{Error, Public, MessageHash, EncryptedMessageSignature, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow, NodeId, Requester}; @@ -72,45 +73,88 @@ impl Listener { impl KeyServer for Listener {} impl ServerKeyGenerator for Listener { - fn generate_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result { + fn generate_key( + &self, + key_id: ServerKeyId, + author: Requester, + threshold: usize, + ) -> Box + Send> { self.key_server.generate_key(key_id, author, threshold) } - fn restore_key_public(&self, key_id: &ServerKeyId, author: &Requester) -> Result { + fn restore_key_public( + &self, + key_id: ServerKeyId, + author: Requester, + ) -> Box + Send> { self.key_server.restore_key_public(key_id, author) } } impl DocumentKeyServer for Listener { - fn store_document_key(&self, key_id: &ServerKeyId, author: &Requester, common_point: Public, encrypted_document_key: Public) -> Result<(), Error> { + fn store_document_key( + &self, + key_id: ServerKeyId, + author: Requester, + common_point: Public, + encrypted_document_key: Public, + ) -> Box + Send> { self.key_server.store_document_key(key_id, author, common_point, encrypted_document_key) } - fn generate_document_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result { + fn generate_document_key( + &self, + key_id: ServerKeyId, + author: Requester, + threshold: usize, + ) -> Box + Send> { self.key_server.generate_document_key(key_id, author, threshold) } - fn restore_document_key(&self, key_id: &ServerKeyId, requester: &Requester) -> Result { + fn restore_document_key( + &self, + key_id: ServerKeyId, + requester: Requester, + ) -> Box + Send> { self.key_server.restore_document_key(key_id, requester) } - fn restore_document_key_shadow(&self, key_id: &ServerKeyId, requester: &Requester) -> Result { + fn restore_document_key_shadow( + &self, + key_id: ServerKeyId, + requester: Requester, + ) -> Box + Send> { self.key_server.restore_document_key_shadow(key_id, requester) } } impl MessageSigner for Listener { - fn sign_message_schnorr(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result { + fn sign_message_schnorr( + &self, + key_id: ServerKeyId, + requester: Requester, + message: MessageHash, + ) -> Box + Send> { self.key_server.sign_message_schnorr(key_id, requester, message) } - fn sign_message_ecdsa(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result { + fn sign_message_ecdsa( + &self, + key_id: ServerKeyId, + requester: Requester, + message: MessageHash, + ) -> Box + Send> { self.key_server.sign_message_ecdsa(key_id, requester, message) } } impl AdminSessionsServer for Listener { - fn change_servers_set(&self, old_set_signature: RequestSignature, new_set_signature: RequestSignature, new_servers_set: BTreeSet) -> Result<(), Error> { + fn change_servers_set( + &self, + old_set_signature: RequestSignature, + new_set_signature: RequestSignature, + new_servers_set: BTreeSet, + ) -> Box + Send> { self.key_server.change_servers_set(old_set_signature, new_set_signature, new_servers_set) } } diff --git a/secret-store/src/listener/service_contract_listener.rs b/secret-store/src/listener/service_contract_listener.rs index db4253e25a9..1b6a6afdfef 100644 --- a/secret-store/src/listener/service_contract_listener.rs +++ b/secret-store/src/listener/service_contract_listener.rs @@ -467,7 +467,7 @@ impl ClusterSessionsListener for ServiceContractListener { // ignore result - the only thing that we can do is to log the error let server_key_id = session.id(); if let Some(origin) = session.origin() { - if let Some(generation_result) = session.wait(Some(Default::default())) { + if let Some(generation_result) = session.result() { let generation_result = generation_result.map(Some).map_err(Into::into); let _ = Self::process_server_key_generation_result(&self.data, origin, &server_key_id, generation_result); } @@ -484,7 +484,7 @@ impl ClusterSessionsListener for ServiceContractListener { let session_id = session.id(); let server_key_id = session_id.id; if let (Some(requester), Some(origin)) = (session.requester().and_then(|r| r.address(&server_key_id).ok()), session.origin()) { - if let Some(retrieval_result) = session.wait(Some(Default::default())) { + if let Some(retrieval_result) = session.result() { let retrieval_result = retrieval_result.map(|key_shadow| session.broadcast_shadows() .and_then(|broadcast_shadows| @@ -509,8 +509,8 @@ impl ClusterSessionsListener error.clone(), + let error = match session.result() { + Some(Err(ref error)) if !error.is_non_fatal() => error.clone(), _ => return, }; diff --git a/secret-store/src/traits.rs b/secret-store/src/traits.rs index e12c75e5ddd..ed44e2503dc 100644 --- a/secret-store/src/traits.rs +++ b/secret-store/src/traits.rs @@ -15,6 +15,7 @@ // along with Parity Ethereum. If not, see . use std::collections::BTreeSet; +use futures::Future; use ethkey::{KeyPair, Signature, Error as EthKeyError}; use ethereum_types::{H256, Address}; use types::{Error, Public, ServerKeyId, MessageHash, EncryptedMessageSignature, RequestSignature, Requester, @@ -39,11 +40,20 @@ pub trait ServerKeyGenerator { /// `author` is the author of key entry. /// `threshold + 1` is the minimal number of nodes, required to restore private key. /// Result is a public portion of SK. - fn generate_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result; + fn generate_key( + &self, + key_id: ServerKeyId, + author: Requester, + threshold: usize, + ) -> Box + Send>; /// Retrieve public portion of previously generated SK. /// `key_id` is identifier of previously generated SK. /// `author` is the same author, that has created the server key. - fn restore_key_public(&self, key_id: &ServerKeyId, author: &Requester) -> Result; + fn restore_key_public( + &self, + key_id: ServerKeyId, + author: Requester, + ) -> Box + Send>; } /// Document key (DK) server. @@ -54,20 +64,35 @@ pub trait DocumentKeyServer: ServerKeyGenerator { /// `common_point` is a result of `k * T` expression, where `T` is generation point and `k` is random scalar in EC field. /// `encrypted_document_key` is a result of `M + k * y` expression, where `M` is unencrypted document key (point on EC), /// `k` is the same scalar used in `common_point` calculation and `y` is previously generated public part of SK. - fn store_document_key(&self, key_id: &ServerKeyId, author: &Requester, common_point: Public, encrypted_document_key: Public) -> Result<(), Error>; + fn store_document_key( + &self, + key_id: ServerKeyId, + author: Requester, + common_point: Public, + encrypted_document_key: Public, + ) -> Box + Send>; /// Generate and store both SK and DK. This is a shortcut for consequent calls of `generate_key` and `store_document_key`. /// The only difference is that DK is generated by DocumentKeyServer (which might be considered unsafe). /// `key_id` is the caller-provided identifier of generated SK. /// `author` is the author of server && document key entry. /// `threshold + 1` is the minimal number of nodes, required to restore private key. /// Result is a DK, encrypted with caller public key. - fn generate_document_key(&self, key_id: &ServerKeyId, author: &Requester, threshold: usize) -> Result; + fn generate_document_key( + &self, + key_id: ServerKeyId, + author: Requester, + threshold: usize, + ) -> Box + Send>; /// Restore previously stored DK. /// DK is decrypted on the key server (which might be considered unsafe), and then encrypted with caller public key. /// `key_id` is identifier of previously generated SK. /// `requester` is the one who requests access to document key. Caller must be on ACL for this function to succeed. /// Result is a DK, encrypted with caller public key. - fn restore_document_key(&self, key_id: &ServerKeyId, requester: &Requester) -> Result; + fn restore_document_key( + &self, + key_id: ServerKeyId, + requester: Requester, + ) -> Box + Send>; /// Restore previously stored DK. /// To decrypt DK on client: /// 1) use requestor secret key to decrypt secret coefficients from result.decrypt_shadows @@ -75,7 +100,11 @@ pub trait DocumentKeyServer: ServerKeyGenerator { /// 3) calculate decrypt_shadow_point: decrypt_shadows_sum * result.common_point /// 4) calculate decrypted_secret: result.decrypted_secret + decrypt_shadow_point /// Result is a DK shadow. - fn restore_document_key_shadow(&self, key_id: &ServerKeyId, requester: &Requester) -> Result; + fn restore_document_key_shadow( + &self, + key_id: ServerKeyId, + requester: Requester, + ) -> Box + Send>; } /// Message signer. @@ -85,14 +114,24 @@ pub trait MessageSigner: ServerKeyGenerator { /// `requester` is the one who requests access to server key private. /// `message` is the message to be signed. /// Result is a signed message, encrypted with caller public key. - fn sign_message_schnorr(&self, key_id: &ServerKeyId, requester: &Requester, message: MessageHash) -> Result; + fn sign_message_schnorr( + &self, + key_id: ServerKeyId, + requester: Requester, + message: MessageHash, + ) -> Box + Send>; /// Generate ECDSA signature for message with previously generated SK. /// WARNING: only possible when SK was generated using t <= 2 * N. /// `key_id` is the caller-provided identifier of generated SK. /// `signature` is `key_id`, signed with caller public key. /// `message` is the message to be signed. /// Result is a signed message, encrypted with caller public key. - fn sign_message_ecdsa(&self, key_id: &ServerKeyId, signature: &Requester, message: MessageHash) -> Result; + fn sign_message_ecdsa( + &self, + key_id: ServerKeyId, + signature: Requester, + message: MessageHash, + ) -> Box + Send>; } /// Administrative sessions server. @@ -101,7 +140,12 @@ pub trait AdminSessionsServer { /// And old nodes (i.e. cluster nodes except new_servers_set) have clear databases. /// WARNING: newly generated keys will be distributed among all cluster nodes. So this session /// must be followed with cluster nodes change (either via contract, or config files). - fn change_servers_set(&self, old_set_signature: RequestSignature, new_set_signature: RequestSignature, new_servers_set: BTreeSet) -> Result<(), Error>; + fn change_servers_set( + &self, + old_set_signature: RequestSignature, + new_set_signature: RequestSignature, + new_servers_set: BTreeSet, + ) -> Box + Send>; } /// Key server.