Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
SecretStore: non-blocking wait of session completion (#10303)
Browse files Browse the repository at this point in the history
* make SS sessions return future

* fix grumbles

* do not create unused Condvar in production mode
  • Loading branch information
svyatonik authored Jun 6, 2019
1 parent eed630a commit 9de1afe
Show file tree
Hide file tree
Showing 20 changed files with 1,133 additions and 529 deletions.
384 changes: 280 additions & 104 deletions secret-store/src/key_server.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use ethereum_types::{Address, H256};
use ethkey::Secret;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use key_server_cluster::{Error, SessionId, NodeId, DocumentKeyShare};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession};
use key_server_cluster::cluster_sessions::{SessionIdWithSubSession, ClusterSession, CompletionSignal};
use key_server_cluster::decryption_session::SessionImpl as DecryptionSession;
use key_server_cluster::signing_session_ecdsa::SessionImpl as EcdsaSigningSession;
use key_server_cluster::signing_session_schnorr::SessionImpl as SchnorrSigningSession;
Expand Down Expand Up @@ -87,8 +88,8 @@ struct SessionCore<T: SessionTransport> {
pub transport: T,
/// Session nonce.
pub nonce: u64,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<Option<(H256, NodeId)>>,
}

/// Mutable session data.
Expand Down Expand Up @@ -166,16 +167,17 @@ pub struct LargestSupportResultComputer;

impl<T> SessionImpl<T> where T: SessionTransport {
/// Create new session.
pub fn new(params: SessionParams<T>) -> Self {
SessionImpl {
pub fn new(params: SessionParams<T>) -> (Self, Oneshot<Result<Option<(H256, NodeId)>, Error>>) {
let (completed, oneshot) = CompletionSignal::new();
(SessionImpl {
core: SessionCore {
meta: params.meta,
sub_session: params.sub_session,
key_share: params.key_share.clone(),
result_computer: params.result_computer,
transport: params.transport,
nonce: params.nonce,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::WaitingForInitialization,
Expand All @@ -191,7 +193,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
continue_with: None,
failed_continue_with: None,
})
}
}, oneshot)
}

/// Return session meta.
Expand Down Expand Up @@ -221,10 +223,9 @@ impl<T> SessionImpl<T> where T: SessionTransport {
self.data.lock().failed_continue_with.take()
}

/// Wait for session completion.
pub fn wait(&self) -> Result<Option<(H256, NodeId)>, Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
/// Return session completion result (if available).
pub fn result(&self) -> Option<Result<Option<(H256, NodeId)>, Error>> {
self.data.lock().result.clone()
}

/// Retrieve common key data (author, threshold, public), if available.
Expand Down Expand Up @@ -344,7 +345,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// update state
data.state = SessionState::Finished;
data.result = Some(Ok(None));
self.core.completed.notify_all();
self.core.completed.send(Ok(None));

Ok(())
}
Expand Down Expand Up @@ -450,15 +451,18 @@ impl<T> SessionImpl<T> where T: SessionTransport {
}
}

let result = result.map(Some);
data.state = SessionState::Finished;
data.result = Some(result.map(Some));
core.completed.notify_all();
data.result = Some(result.clone());
core.completed.send(result);
}
}
}

impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
type Id = SessionIdWithSubSession;
type CreationData = ();
type SuccessfulResult = Option<(H256, NodeId)>;

fn type_name() -> &'static str {
"version negotiation"
Expand All @@ -482,7 +486,7 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
warn!(target: "secretstore_net", "{}: key version negotiation session failed with timeout", self.core.meta.self_node_id);

data.result = Some(Err(Error::ConsensusTemporaryUnreachable));
self.core.completed.notify_all();
self.core.completed.send(Err(Error::ConsensusTemporaryUnreachable));
}
}
}
Expand Down Expand Up @@ -510,8 +514,8 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
self.core.meta.self_node_id, error, node);

data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}

fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
Expand Down Expand Up @@ -698,7 +702,7 @@ mod tests {
cluster: cluster,
},
nonce: 0,
}),
}).0,
})
}).collect(),
queue: VecDeque::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use std::collections::btree_map::Entry;
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use ethereum_types::H256;
use ethkey::{Public, Signature};
use key_server_cluster::{Error, NodeId, SessionId, KeyStorage};
use key_server_cluster::math;
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal};
use key_server_cluster::message::{Message, ServersSetChangeMessage,
ConsensusMessageWithServersSet, InitializeConsensusSessionWithServersSet,
ServersSetChangeConsensusMessage, ConfirmConsensusInitialization, UnknownSessionsRequest, UnknownSessions,
Expand Down Expand Up @@ -93,8 +94,8 @@ struct SessionCore {
pub admin_public: Public,
/// Migration id (if this session is a part of auto-migration process).
pub migration_id: Option<H256>,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<()>,
}

/// Servers set change consensus session type.
Expand Down Expand Up @@ -182,8 +183,9 @@ struct ServersSetChangeKeyVersionNegotiationTransport {

impl SessionImpl {
/// Create new servers set change session.
pub fn new(params: SessionParams) -> Result<Self, Error> {
Ok(SessionImpl {
pub fn new(params: SessionParams) -> Result<(Self, Oneshot<Result<(), Error>>), Error> {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
cluster: params.cluster,
Expand All @@ -192,7 +194,7 @@ impl SessionImpl {
all_nodes_set: params.all_nodes_set,
admin_public: params.admin_public,
migration_id: params.migration_id,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::EstablishingConsensus,
Expand All @@ -205,7 +207,7 @@ impl SessionImpl {
active_key_sessions: BTreeMap::new(),
result: None,
}),
})
}, oneshot))
}

/// Get session id.
Expand All @@ -218,10 +220,9 @@ impl SessionImpl {
self.core.migration_id.as_ref()
}

/// Wait for session completion.
pub fn wait(&self) -> Result<(), Error> {
Self::wait_session(&self.core.completed, &self.data, None, |data| data.result.clone())
.expect("wait_session returns Some if called without timeout; qed")
/// Return session completion result (if available).
pub fn result(&self) -> Option<Result<(), Error>> {
self.data.lock().result.clone()
}

/// Initialize servers set change session on master node.
Expand Down Expand Up @@ -423,7 +424,7 @@ impl SessionImpl {
&KeyVersionNegotiationMessage::RequestKeyVersions(ref message) if sender == &self.core.meta.master_node_id => {
let key_id = message.session.clone().into();
let key_share = self.core.key_storage.get(&key_id)?;
let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
let (negotiation_session, _) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: key_id.clone(),
self_node_id: self.core.meta.self_node_id.clone(),
Expand Down Expand Up @@ -671,7 +672,7 @@ impl SessionImpl {
}

data.state = SessionState::Finished;
self.core.completed.notify_all();
self.core.completed.send(Ok(()));

Ok(())
}
Expand Down Expand Up @@ -741,7 +742,7 @@ impl SessionImpl {
};

let key_share = core.key_storage.get(&key_id)?;
let negotiation_session = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
let (negotiation_session, _) = KeyVersionNegotiationSessionImpl::new(KeyVersionNegotiationSessionParams {
meta: ShareChangeSessionMeta {
id: key_id,
self_node_id: core.meta.self_node_id.clone(),
Expand Down Expand Up @@ -797,7 +798,8 @@ impl SessionImpl {
let negotiation_session = data.negotiation_sessions.remove(&key_id)
.expect("share change session is only initialized when negotiation is completed; qed");
let (selected_version, selected_master) = negotiation_session
.wait()?
.result()
.expect("share change session is only initialized when negotiation is completed; qed")?
.expect("initialize_share_change_session is only called on share change master; negotiation session completes with some on master; qed");
let selected_version_holders = negotiation_session.version_holders(&selected_version)?;
let selected_version_threshold = negotiation_session.common_key_data()?.threshold;
Expand Down Expand Up @@ -882,7 +884,7 @@ impl SessionImpl {

if data.result.is_some() && data.active_key_sessions.len() == 0 {
data.state = SessionState::Finished;
core.completed.notify_all();
core.completed.send(Ok(()));
}

Ok(())
Expand All @@ -907,14 +909,16 @@ impl SessionImpl {

data.state = SessionState::Finished;
data.result = Some(Ok(()));
core.completed.notify_all();
core.completed.send(Ok(()));

Ok(())
}
}

impl ClusterSession for SessionImpl {
type Id = SessionId;
type CreationData = (); // never used directly
type SuccessfulResult = ();

fn type_name() -> &'static str {
"servers set change"
Expand Down Expand Up @@ -954,8 +958,8 @@ impl ClusterSession for SessionImpl {
self.core.meta.self_node_id, error, node);

data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}

fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
Expand Down Expand Up @@ -1109,7 +1113,7 @@ pub mod tests {
nonce: 1,
admin_public: admin_public,
migration_id: None,
}).unwrap()
}).unwrap().0
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap};
use ethereum_types::{H256, Address};
use ethkey::{Public, Secret, Signature};
use parking_lot::{Mutex, Condvar};
use futures::Oneshot;
use parking_lot::Mutex;
use key_server_cluster::{Error, SessionId, NodeId, DocumentKeyShare, DocumentKeyShareVersion, KeyStorage};
use key_server_cluster::cluster::Cluster;
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::cluster_sessions::{ClusterSession, CompletionSignal};
use key_server_cluster::math;
use key_server_cluster::message::{Message, ShareAddMessage, ShareAddConsensusMessage, ConsensusMessageOfShareAdd,
InitializeConsensusSessionOfShareAdd, KeyShareCommon, NewKeysDissemination, ShareAddError,
Expand Down Expand Up @@ -71,8 +72,8 @@ struct SessionCore<T: SessionTransport> {
pub key_storage: Arc<KeyStorage>,
/// Administrator public key.
pub admin_public: Option<Public>,
/// SessionImpl completion condvar.
pub completed: Condvar,
/// Session completion signal.
pub completed: CompletionSignal<()>,
}

/// Share add consensus session type.
Expand Down Expand Up @@ -158,18 +159,18 @@ pub struct IsolatedSessionTransport {

impl<T> SessionImpl<T> where T: SessionTransport {
/// Create new share addition session.
pub fn new(params: SessionParams<T>) -> Result<Self, Error> {
pub fn new(params: SessionParams<T>) -> Result<(Self, Oneshot<Result<(), Error>>), Error> {
let key_share = params.key_storage.get(&params.meta.id)?;

Ok(SessionImpl {
let (completed, oneshot) = CompletionSignal::new();
Ok((SessionImpl {
core: SessionCore {
meta: params.meta,
nonce: params.nonce,
key_share: key_share,
transport: params.transport,
key_storage: params.key_storage,
admin_public: params.admin_public,
completed: Condvar::new(),
completed,
},
data: Mutex::new(SessionData {
state: SessionState::ConsensusEstablishing,
Expand All @@ -181,7 +182,7 @@ impl<T> SessionImpl<T> where T: SessionTransport {
secret_subshares: None,
result: None,
}),
})
}, oneshot))
}

/// Set pre-established consensus data.
Expand Down Expand Up @@ -752,14 +753,16 @@ impl<T> SessionImpl<T> where T: SessionTransport {
// signal session completion
data.state = SessionState::Finished;
data.result = Some(Ok(()));
core.completed.notify_all();
core.completed.send(Ok(()));

Ok(())
}
}

impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
type Id = SessionId;
type CreationData = (); // never used directly
type SuccessfulResult = ();

fn type_name() -> &'static str {
"share add"
Expand Down Expand Up @@ -801,8 +804,8 @@ impl<T> ClusterSession for SessionImpl<T> where T: SessionTransport {
self.core.meta.self_node_id, error, node);

data.state = SessionState::Finished;
data.result = Some(Err(error));
self.core.completed.notify_all();
data.result = Some(Err(error.clone()));
self.core.completed.send(Err(error));
}

fn on_message(&self, sender: &NodeId, message: &Message) -> Result<(), Error> {
Expand Down Expand Up @@ -914,7 +917,7 @@ pub mod tests {
key_storage,
admin_public: Some(admin_public),
nonce: 1,
}).unwrap()
}).unwrap().0
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl ShareChangeSession {
let consensus_group = self.consensus_group.take().ok_or(Error::InvalidStateForRequest)?;
let version_holders = self.version_holders.take().ok_or(Error::InvalidStateForRequest)?;
let new_nodes_map = self.new_nodes_map.take().ok_or(Error::InvalidStateForRequest)?;
let share_add_session = ShareAddSessionImpl::new(ShareAddSessionParams {
let (share_add_session, _) = ShareAddSessionImpl::new(ShareAddSessionParams {
meta: self.meta.clone(),
nonce: self.nonce,
transport: ShareChangeTransport::new(self.session_id, self.nonce, self.cluster.clone()),
Expand Down
Loading

0 comments on commit 9de1afe

Please sign in to comment.