diff --git a/bitacross-worker/Cargo.lock b/bitacross-worker/Cargo.lock index d214983c6c..c970f480a6 100644 --- a/bitacross-worker/Cargo.lock +++ b/bitacross-worker/Cargo.lock @@ -376,6 +376,7 @@ dependencies = [ name = "bc-musig2-event" version = "0.1.0" dependencies = [ + "bc-enclave-registry", "bc-musig2-ceremony", "itc-direct-rpc-client", "itc-direct-rpc-server", diff --git a/bitacross-worker/bitacross/core/bc-enclave-registry/src/lib.rs b/bitacross-worker/bitacross/core/bc-enclave-registry/src/lib.rs index f1f4c929b8..b08c7b65c4 100644 --- a/bitacross-worker/bitacross/core/bc-enclave-registry/src/lib.rs +++ b/bitacross-worker/bitacross/core/bc-enclave-registry/src/lib.rs @@ -137,6 +137,7 @@ pub trait EnclaveRegistryUpdater { pub trait EnclaveRegistryLookup { fn contains_key(&self, account: &Address32) -> bool; fn get_all(&self) -> Vec<(Address32, String)>; + fn get_worker_url(&self, account: &Address32) -> Option; } impl EnclaveRegistrySealer for EnclaveRegistry { @@ -226,29 +227,20 @@ impl EnclaveRegistryUpdater for EnclaveRegistry { } impl EnclaveRegistryLookup for EnclaveRegistry { - #[cfg(feature = "std")] - fn contains_key(&self, account: &Address32) -> bool { - let registry = self.registry.read().unwrap(); - registry.contains_key(account) - } - - #[cfg(feature = "std")] fn get_all(&self) -> Vec<(Address32, String)> { let registry = self.registry.read().unwrap(); registry.iter().map(|(k, v)| (*k, v.clone())).collect() } - #[cfg(feature = "sgx")] fn contains_key(&self, account: &Address32) -> bool { // Using unwrap becaused poisoned locks are unrecoverable errors let registry = self.registry.read().unwrap(); registry.contains_key(account) } - #[cfg(feature = "sgx")] - fn get_all(&self) -> Vec<(Address32, String)> { + fn get_worker_url(&self, account: &Address32) -> Option { // Using unwrap becaused poisoned locks are unrecoverable errors let registry = self.registry.read().unwrap(); - registry.iter().map(|(k, v)| (k.clone(), v.clone())).collect() + registry.get(account).cloned() } } diff --git a/bitacross-worker/bitacross/core/bc-musig2-ceremony/src/lib.rs b/bitacross-worker/bitacross/core/bc-musig2-ceremony/src/lib.rs index 1f0d054d80..f30563f526 100644 --- a/bitacross-worker/bitacross/core/bc-musig2-ceremony/src/lib.rs +++ b/bitacross-worker/bitacross/core/bc-musig2-ceremony/src/lib.rs @@ -86,7 +86,7 @@ pub enum CeremonyCommand { KillCeremony, } -// commands are created by ceremony and executed by runner +// events are created by ceremony and executed by runner #[derive(Debug, Eq, PartialEq)] pub enum CeremonyEvent { FirstRoundStarted(Signers, CeremonyId, PubNonce), diff --git a/bitacross-worker/bitacross/core/bc-musig2-event/Cargo.toml b/bitacross-worker/bitacross/core/bc-musig2-event/Cargo.toml index b8f3e76c3e..60ce17a4cd 100644 --- a/bitacross-worker/bitacross/core/bc-musig2-event/Cargo.toml +++ b/bitacross-worker/bitacross/core/bc-musig2-event/Cargo.toml @@ -14,6 +14,7 @@ threadpool = { version = "1.8.0", optional = true } sgx_tstd = { git = "https://github.com/apache/teaclave-sgx-sdk.git", branch = "master", optional = true, features = ["net", "thread"] } threadpool_sgx = { git = "https://github.com/mesalock-linux/rust-threadpool-sgx", package = "threadpool", tag = "sgx_1.1.3", optional = true } +bc-enclave-registry = { path = "../bc-enclave-registry", default-features = false } bc-musig2-ceremony = { path = "../bc-musig2-ceremony", default-features = false } itc-direct-rpc-client = { path = "../../../core/direct-rpc-client", default-features = false } itc-direct-rpc-server = { path = "../../../core/direct-rpc-server", default-features = false } @@ -45,6 +46,7 @@ std = [ "litentry-primitives/std", "itp-rpc/std", "bc-musig2-ceremony/std", + "bc-enclave-registry/std", "lc-direct-call/std", "itp-sgx-crypto/std", "rand", @@ -57,6 +59,7 @@ sgx = [ "litentry-primitives/sgx", "itp-rpc/sgx", "bc-musig2-ceremony/sgx", + "bc-enclave-registry/sgx", "lc-direct-call/sgx", "itp-sgx-crypto/sgx", "sgx_rand", diff --git a/bitacross-worker/bitacross/core/bc-musig2-event/src/lib.rs b/bitacross-worker/bitacross/core/bc-musig2-event/src/lib.rs index 41fdd1e411..af9ca183d4 100644 --- a/bitacross-worker/bitacross/core/bc-musig2-event/src/lib.rs +++ b/bitacross-worker/bitacross/core/bc-musig2-event/src/lib.rs @@ -23,6 +23,7 @@ extern crate sgx_tstd as std; #[cfg(all(feature = "std", feature = "sgx"))] compile_error!("feature \"std\" and feature \"sgx\" cannot be enabled at the same time"); +use core::time::Duration; #[cfg(feature = "std")] use threadpool::ThreadPool; @@ -35,34 +36,45 @@ use std::sync::Mutex; #[cfg(feature = "sgx")] use std::sync::SgxMutex as Mutex; -use bc_musig2_ceremony::{CeremonyEvent, CeremonyId}; +#[cfg(feature = "std")] +use std::sync::RwLock; + +#[cfg(feature = "sgx")] +use std::sync::SgxRwLock as RwLock; + +use bc_enclave_registry::EnclaveRegistryLookup; +use bc_musig2_ceremony::{CeremonyEvent, CeremonyId, CeremonyRegistry, SignerId}; use codec::Encode; -use itc_direct_rpc_client::{DirectRpcClient, RpcClient}; +use itc_direct_rpc_client::{DirectRpcClient, DirectRpcClientFactory, RpcClient, RpcClientFactory}; use itc_direct_rpc_server::SendRpcResponse; use itp_ocall_api::EnclaveAttestationOCallApi; use itp_rpc::{Id, RpcRequest}; -use itp_sgx_crypto::key_repository::AccessKey; +use itp_sgx_crypto::{key_repository::AccessKey, schnorr::Pair as SchnorrPair}; pub use itp_types::{DirectRequestStatus, Hash}; use itp_utils::hex::ToHexPrefixed; use lc_direct_call::CeremonyRoundCall; use litentry_primitives::{Address32, Identity, PlainRequest, ShardIdentifier}; use log::*; use sp_core::{blake2_256, ed25519, Pair as SpCorePair, H256}; -use std::{collections::HashMap, string::ToString, sync::Arc, vec}; +use std::{collections::HashMap, string::ToString, sync::Arc, thread::sleep, vec}; #[allow(clippy::too_many_arguments)] -pub fn process_event( +pub fn process_event( signing_key_access: Arc, ocall_api: Arc, responder: Arc, + enclave_registry_lookup: Arc, event: CeremonyEvent, ceremony_id: CeremonyId, event_threads_pool: ThreadPool, peers_map: Arc>>, + ceremony_registry: Arc>>, ) where OCallApi: EnclaveAttestationOCallApi + 'static, SIGNINGAK: AccessKey + Send + Sync + 'static, Responder: SendRpcResponse + 'static, + ECL: EnclaveRegistryLookup + Send + Sync + 'static, + BKR: AccessKey + Send + Sync + 'static, { let my_identity: Address32 = signing_key_access.retrieve_key().unwrap().public().0.into(); let identity = Identity::Substrate(my_identity); @@ -80,20 +92,21 @@ pub fn process_event( ); let signer_id = *signer_id; - let client = peers_map.lock().unwrap().get(&signer_id).cloned(); - if let Some(mut client) = client { - let request = request.clone(); - event_threads_pool.execute(move || { - if let Err(e) = client.send(&request) { - error!( - "Could not send request to signer: {:?}, reason: {:?}", - signer_id, e - ) - } - }); - } else { - error!("Fail to share nonce, unknown signer: {:?}", signer_id); - } + let peers_map_clone = peers_map.clone(); + let request = request.clone(); + let enclave_lookup_cloned = enclave_registry_lookup.clone(); + let ceremony_registry_cloned = ceremony_registry.clone(); + let ceremony_id_cloned = ceremony_id.clone(); + event_threads_pool.execute(move || { + send_request( + signer_id, + &ceremony_id_cloned, + request, + peers_map_clone, + enclave_lookup_cloned, + ceremony_registry_cloned, + ); + }); }); }, CeremonyEvent::SecondRoundStarted(signers, message, signature) => { @@ -108,20 +121,21 @@ pub fn process_event( ); let signer_id = *signer_id; - let client = peers_map.lock().unwrap().get(&signer_id).cloned(); - if let Some(mut client) = client { - let request = request.clone(); - event_threads_pool.execute(move || { - if let Err(e) = client.send(&request) { - error!( - "Could not send request to signer: {:?}, reason: {:?}", - signer_id, e - ) - } - }); - } else { - error!("Fail to share partial signature, unknown signer: {:?}", signer_id); - } + let peers_map_clone = peers_map.clone(); + let request = request.clone(); + let enclave_lookup_cloned = enclave_registry_lookup.clone(); + let ceremony_registry_cloned = ceremony_registry.clone(); + let ceremony_id_cloned = ceremony_id.clone(); + event_threads_pool.execute(move || { + send_request( + signer_id, + &ceremony_id_cloned, + request, + peers_map_clone, + enclave_lookup_cloned, + ceremony_registry_cloned, + ); + }); }); }, CeremonyEvent::CeremonyEnded(signature, is_check_run, verification_result) => { @@ -168,25 +182,75 @@ pub fn process_event( ); let signer_id = *signer_id; - let client = peers_map.lock().unwrap().get(&signer_id).cloned(); - if let Some(mut client) = client { - let request = request.clone(); - event_threads_pool.execute(move || { - if let Err(e) = client.send(&request) { - error!( - "Could not send request to signer: {:?}, reason: {:?}", - signer_id, e - ) - } - }); - } else { - error!("Fail to share killing info, unknown signer: {:?}", signer_id); - } + let peers_map_clone = peers_map.clone(); + let request = request.clone(); + let enclave_lookup_cloned = enclave_registry_lookup.clone(); + let ceremony_registry_cloned = ceremony_registry.clone(); + let ceremony_id_cloned = ceremony_id.clone(); + event_threads_pool.execute(move || { + send_request( + signer_id, + &ceremony_id_cloned, + request, + peers_map_clone, + enclave_lookup_cloned, + ceremony_registry_cloned, + ); + }); }); }, } } +// it will try to send request until it succeeds, the peer is removed from registry or ceremony is removed +fn send_request( + signer_id: SignerId, + ceremony_id: &CeremonyId, + request: RpcRequest, + peers_map: Arc>>, + enclave_registry_lookup: Arc, + ceremony_registry: Arc>>, +) where + ECL: EnclaveRegistryLookup, + BKR: AccessKey, +{ + loop { + let client = peers_map.lock().unwrap().get(&signer_id).cloned(); + if let Some(mut client) = client { + if let Err(e) = client.send(&request) { + error!("Could not send request to signer: {:?}, reason: {:?}", signer_id, e); + sleep(Duration::from_secs(5)); + let mut peers_lock = peers_map.lock().unwrap(); + peers_lock.remove(&signer_id); + } else { + // finish if request was sent + break + } + } else { + // check if ceremony still exists, if not stop + if !ceremony_registry.read().unwrap().contains_key(ceremony_id) { + break + } + + if let Some(url) = enclave_registry_lookup.get_worker_url(&Address32::from(signer_id)) { + match (DirectRpcClientFactory {}).create(&url) { + Ok(new_client) => { + peers_map.lock().unwrap().insert(signer_id, new_client.clone()); + }, + Err(e) => { + error!("Could not connect to peer {}, reason: {:?}", url, e); + sleep(Duration::from_secs(5)); + }, + } + } else { + error!("Could not find {:?} in registry", signer_id.to_hex()); + // stop if peer is not found in registry + break + } + } + } +} + fn prepare_request( signing_key_access: &SIGNINGAK, mr_enclave: [u8; 32], diff --git a/bitacross-worker/bitacross/core/bc-task-processor/src/lib.rs b/bitacross-worker/bitacross/core/bc-task-processor/src/lib.rs index f2c8e4b7e7..0278723f6c 100644 --- a/bitacross-worker/bitacross/core/bc-task-processor/src/lib.rs +++ b/bitacross-worker/bitacross/core/bc-task-processor/src/lib.rs @@ -417,10 +417,12 @@ fn handle_ceremony_command, + request_sink: Sender<(String, Sender)>, } impl DirectRpcClient { @@ -133,20 +133,21 @@ impl DirectRpcClient { client_tls_with_config(server_url.as_str(), stream, None, Some(connector)) .map_err(|e| format!("Could not open websocket connection: {:?}", e))?; - let (request_sender, request_receiver) = channel(); + let (request_sender, request_receiver) = channel::<(String, Sender)>(); //it fails to perform handshake in non_blocking mode so we are setting it up after the handshake is performed Self::switch_to_non_blocking(&mut socket); std::thread::spawn(move || { - loop { - // let's flush all pending requests first - while let Ok(request) = request_receiver.try_recv() { - if let Err(e) = socket.write_message(Message::Text(request)) { - error!("Could not write message to socket, reason: {:?}", e) - } + while let Ok((request, result_sender)) = request_receiver.recv() { + let mut result = true; + if let Err(e) = socket.write_message(Message::Text(request)) { + error!("Could not write message to socket, reason: {:?}", e); + result = false; + } + if let Err(e) = result_sender.send(result) { + log::error!("Could not send rpc result back, reason: {:?}", e); } - std::thread::sleep(Duration::from_millis(1)) } }); debug!("Connected to peer: {}", url); @@ -183,8 +184,15 @@ impl RpcClient for DirectRpcClient { fn send(&mut self, request: &RpcRequest) -> Result<(), Box> { let request = serde_json::to_string(request) .map_err(|e| format!("Could not parse RpcRequest {:?}", e))?; + let (sender, receiver) = channel(); self.request_sink - .send(request) - .map_err(|e| format!("Could not write message, reason: {:?}", e).into()) + .send((request, sender)) + .map_err(|e| format!("Could not parse RpcRequest {:?}", e))?; + + if receiver.recv()? { + Ok(()) + } else { + Err("Could not send request".into()) + } } } diff --git a/bitacross-worker/enclave-runtime/Cargo.lock b/bitacross-worker/enclave-runtime/Cargo.lock index 777327428f..29aa66c721 100644 --- a/bitacross-worker/enclave-runtime/Cargo.lock +++ b/bitacross-worker/enclave-runtime/Cargo.lock @@ -286,6 +286,7 @@ dependencies = [ name = "bc-musig2-event" version = "0.1.0" dependencies = [ + "bc-enclave-registry", "bc-musig2-ceremony", "itc-direct-rpc-client", "itc-direct-rpc-server",