Skip to content

Commit

Permalink
restore connections to other musig2 signers (#2894)
Browse files Browse the repository at this point in the history
* restore connections to other musig2 signers

* adjust to new code

* review suggestions
  • Loading branch information
kziemianek authored Aug 17, 2024
1 parent 986d954 commit 88f047f
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 71 deletions.
1 change: 1 addition & 0 deletions bitacross-worker/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 3 additions & 11 deletions bitacross-worker/bitacross/core/bc-enclave-registry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub trait EnclaveRegistryUpdater {
pub trait EnclaveRegistryLookup {
fn contains_key(&self, account: &Address32) -> bool;
fn get_all(&self) -> Vec<(Address32, String)>;
fn get_worker_url(&self, account: &Address32) -> Option<String>;
}

impl EnclaveRegistrySealer for EnclaveRegistry {
Expand Down Expand Up @@ -226,29 +227,20 @@ impl EnclaveRegistryUpdater for EnclaveRegistry {
}

impl EnclaveRegistryLookup for EnclaveRegistry {
#[cfg(feature = "std")]
fn contains_key(&self, account: &Address32) -> bool {
let registry = self.registry.read().unwrap();
registry.contains_key(account)
}

#[cfg(feature = "std")]
fn get_all(&self) -> Vec<(Address32, String)> {
let registry = self.registry.read().unwrap();
registry.iter().map(|(k, v)| (*k, v.clone())).collect()
}

#[cfg(feature = "sgx")]
fn contains_key(&self, account: &Address32) -> bool {
// Using unwrap becaused poisoned locks are unrecoverable errors
let registry = self.registry.read().unwrap();
registry.contains_key(account)
}

#[cfg(feature = "sgx")]
fn get_all(&self) -> Vec<(Address32, String)> {
fn get_worker_url(&self, account: &Address32) -> Option<String> {
// Using unwrap becaused poisoned locks are unrecoverable errors
let registry = self.registry.read().unwrap();
registry.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
registry.get(account).cloned()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub enum CeremonyCommand {
KillCeremony,
}

// commands are created by ceremony and executed by runner
// events are created by ceremony and executed by runner
#[derive(Debug, Eq, PartialEq)]
pub enum CeremonyEvent {
FirstRoundStarted(Signers, CeremonyId, PubNonce),
Expand Down
3 changes: 3 additions & 0 deletions bitacross-worker/bitacross/core/bc-musig2-event/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ threadpool = { version = "1.8.0", optional = true }
sgx_tstd = { git = "https://github.com/apache/teaclave-sgx-sdk.git", branch = "master", optional = true, features = ["net", "thread"] }
threadpool_sgx = { git = "https://github.com/mesalock-linux/rust-threadpool-sgx", package = "threadpool", tag = "sgx_1.1.3", optional = true }

bc-enclave-registry = { path = "../bc-enclave-registry", default-features = false }
bc-musig2-ceremony = { path = "../bc-musig2-ceremony", default-features = false }
itc-direct-rpc-client = { path = "../../../core/direct-rpc-client", default-features = false }
itc-direct-rpc-server = { path = "../../../core/direct-rpc-server", default-features = false }
Expand Down Expand Up @@ -45,6 +46,7 @@ std = [
"litentry-primitives/std",
"itp-rpc/std",
"bc-musig2-ceremony/std",
"bc-enclave-registry/std",
"lc-direct-call/std",
"itp-sgx-crypto/std",
"rand",
Expand All @@ -57,6 +59,7 @@ sgx = [
"litentry-primitives/sgx",
"itp-rpc/sgx",
"bc-musig2-ceremony/sgx",
"bc-enclave-registry/sgx",
"lc-direct-call/sgx",
"itp-sgx-crypto/sgx",
"sgx_rand",
Expand Down
158 changes: 111 additions & 47 deletions bitacross-worker/bitacross/core/bc-musig2-event/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ extern crate sgx_tstd as std;
#[cfg(all(feature = "std", feature = "sgx"))]
compile_error!("feature \"std\" and feature \"sgx\" cannot be enabled at the same time");

use core::time::Duration;
#[cfg(feature = "std")]
use threadpool::ThreadPool;

Expand All @@ -35,34 +36,45 @@ use std::sync::Mutex;
#[cfg(feature = "sgx")]
use std::sync::SgxMutex as Mutex;

use bc_musig2_ceremony::{CeremonyEvent, CeremonyId};
#[cfg(feature = "std")]
use std::sync::RwLock;

#[cfg(feature = "sgx")]
use std::sync::SgxRwLock as RwLock;

use bc_enclave_registry::EnclaveRegistryLookup;
use bc_musig2_ceremony::{CeremonyEvent, CeremonyId, CeremonyRegistry, SignerId};
use codec::Encode;
use itc_direct_rpc_client::{DirectRpcClient, RpcClient};
use itc_direct_rpc_client::{DirectRpcClient, DirectRpcClientFactory, RpcClient, RpcClientFactory};
use itc_direct_rpc_server::SendRpcResponse;
use itp_ocall_api::EnclaveAttestationOCallApi;
use itp_rpc::{Id, RpcRequest};
use itp_sgx_crypto::key_repository::AccessKey;
use itp_sgx_crypto::{key_repository::AccessKey, schnorr::Pair as SchnorrPair};
pub use itp_types::{DirectRequestStatus, Hash};
use itp_utils::hex::ToHexPrefixed;
use lc_direct_call::CeremonyRoundCall;
use litentry_primitives::{Address32, Identity, PlainRequest, ShardIdentifier};
use log::*;
use sp_core::{blake2_256, ed25519, Pair as SpCorePair, H256};
use std::{collections::HashMap, string::ToString, sync::Arc, vec};
use std::{collections::HashMap, string::ToString, sync::Arc, thread::sleep, vec};

#[allow(clippy::too_many_arguments)]
pub fn process_event<OCallApi, SIGNINGAK, Responder>(
pub fn process_event<OCallApi, SIGNINGAK, Responder, ECL, BKR>(
signing_key_access: Arc<SIGNINGAK>,
ocall_api: Arc<OCallApi>,
responder: Arc<Responder>,
enclave_registry_lookup: Arc<ECL>,
event: CeremonyEvent,
ceremony_id: CeremonyId,
event_threads_pool: ThreadPool,
peers_map: Arc<Mutex<HashMap<[u8; 32], DirectRpcClient>>>,
ceremony_registry: Arc<RwLock<CeremonyRegistry<BKR>>>,
) where
OCallApi: EnclaveAttestationOCallApi + 'static,
SIGNINGAK: AccessKey<KeyType = ed25519::Pair> + Send + Sync + 'static,
Responder: SendRpcResponse<Hash = H256> + 'static,
ECL: EnclaveRegistryLookup + Send + Sync + 'static,
BKR: AccessKey<KeyType = SchnorrPair> + Send + Sync + 'static,
{
let my_identity: Address32 = signing_key_access.retrieve_key().unwrap().public().0.into();
let identity = Identity::Substrate(my_identity);
Expand All @@ -80,20 +92,21 @@ pub fn process_event<OCallApi, SIGNINGAK, Responder>(
);

let signer_id = *signer_id;
let client = peers_map.lock().unwrap().get(&signer_id).cloned();
if let Some(mut client) = client {
let request = request.clone();
event_threads_pool.execute(move || {
if let Err(e) = client.send(&request) {
error!(
"Could not send request to signer: {:?}, reason: {:?}",
signer_id, e
)
}
});
} else {
error!("Fail to share nonce, unknown signer: {:?}", signer_id);
}
let peers_map_clone = peers_map.clone();
let request = request.clone();
let enclave_lookup_cloned = enclave_registry_lookup.clone();
let ceremony_registry_cloned = ceremony_registry.clone();
let ceremony_id_cloned = ceremony_id.clone();
event_threads_pool.execute(move || {
send_request(
signer_id,
&ceremony_id_cloned,
request,
peers_map_clone,
enclave_lookup_cloned,
ceremony_registry_cloned,
);
});
});
},
CeremonyEvent::SecondRoundStarted(signers, message, signature) => {
Expand All @@ -108,20 +121,21 @@ pub fn process_event<OCallApi, SIGNINGAK, Responder>(
);

let signer_id = *signer_id;
let client = peers_map.lock().unwrap().get(&signer_id).cloned();
if let Some(mut client) = client {
let request = request.clone();
event_threads_pool.execute(move || {
if let Err(e) = client.send(&request) {
error!(
"Could not send request to signer: {:?}, reason: {:?}",
signer_id, e
)
}
});
} else {
error!("Fail to share partial signature, unknown signer: {:?}", signer_id);
}
let peers_map_clone = peers_map.clone();
let request = request.clone();
let enclave_lookup_cloned = enclave_registry_lookup.clone();
let ceremony_registry_cloned = ceremony_registry.clone();
let ceremony_id_cloned = ceremony_id.clone();
event_threads_pool.execute(move || {
send_request(
signer_id,
&ceremony_id_cloned,
request,
peers_map_clone,
enclave_lookup_cloned,
ceremony_registry_cloned,
);
});
});
},
CeremonyEvent::CeremonyEnded(signature, is_check_run, verification_result) => {
Expand Down Expand Up @@ -168,25 +182,75 @@ pub fn process_event<OCallApi, SIGNINGAK, Responder>(
);

let signer_id = *signer_id;
let client = peers_map.lock().unwrap().get(&signer_id).cloned();
if let Some(mut client) = client {
let request = request.clone();
event_threads_pool.execute(move || {
if let Err(e) = client.send(&request) {
error!(
"Could not send request to signer: {:?}, reason: {:?}",
signer_id, e
)
}
});
} else {
error!("Fail to share killing info, unknown signer: {:?}", signer_id);
}
let peers_map_clone = peers_map.clone();
let request = request.clone();
let enclave_lookup_cloned = enclave_registry_lookup.clone();
let ceremony_registry_cloned = ceremony_registry.clone();
let ceremony_id_cloned = ceremony_id.clone();
event_threads_pool.execute(move || {
send_request(
signer_id,
&ceremony_id_cloned,
request,
peers_map_clone,
enclave_lookup_cloned,
ceremony_registry_cloned,
);
});
});
},
}
}

// it will try to send request until it succeeds, the peer is removed from registry or ceremony is removed
fn send_request<ECL, BKR>(
signer_id: SignerId,
ceremony_id: &CeremonyId,
request: RpcRequest,
peers_map: Arc<Mutex<HashMap<[u8; 32], DirectRpcClient>>>,
enclave_registry_lookup: Arc<ECL>,
ceremony_registry: Arc<RwLock<CeremonyRegistry<BKR>>>,
) where
ECL: EnclaveRegistryLookup,
BKR: AccessKey<KeyType = SchnorrPair>,
{
loop {
let client = peers_map.lock().unwrap().get(&signer_id).cloned();
if let Some(mut client) = client {
if let Err(e) = client.send(&request) {
error!("Could not send request to signer: {:?}, reason: {:?}", signer_id, e);
sleep(Duration::from_secs(5));
let mut peers_lock = peers_map.lock().unwrap();
peers_lock.remove(&signer_id);
} else {
// finish if request was sent
break
}
} else {
// check if ceremony still exists, if not stop
if !ceremony_registry.read().unwrap().contains_key(ceremony_id) {
break
}

if let Some(url) = enclave_registry_lookup.get_worker_url(&Address32::from(signer_id)) {
match (DirectRpcClientFactory {}).create(&url) {
Ok(new_client) => {
peers_map.lock().unwrap().insert(signer_id, new_client.clone());
},
Err(e) => {
error!("Could not connect to peer {}, reason: {:?}", url, e);
sleep(Duration::from_secs(5));
},
}
} else {
error!("Could not find {:?} in registry", signer_id.to_hex());
// stop if peer is not found in registry
break
}
}
}
}

fn prepare_request<SIGNINGAK>(
signing_key_access: &SIGNINGAK,
mr_enclave: [u8; 32],
Expand Down
2 changes: 2 additions & 0 deletions bitacross-worker/bitacross/core/bc-task-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,12 @@ fn handle_ceremony_command<SKR, SIGNINGAK, EKR, BKR, S, H, O, RRL, ERL, SRL, Res
context.signing_key_access.clone(),
context.ocall_api.clone(),
context.responder.clone(),
context.enclave_registry_lookup.clone(),
event,
ceremony_id.clone(),
event_threads_pool.clone(),
peers_map.clone(),
context.ceremony_registry.clone(),
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion bitacross-worker/core-primitives/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct RpcResponse {
pub id: Id,
}

#[derive(Clone, Encode, Decode, Serialize, Deserialize)]
#[derive(Clone, Encode, Decode, Serialize, Deserialize, Debug)]
pub struct RpcRequest {
pub jsonrpc: String,
pub method: String,
Expand Down
30 changes: 19 additions & 11 deletions bitacross-worker/core/direct-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub trait RpcClient {

#[derive(Clone)]
pub struct DirectRpcClient {
request_sink: Sender<String>,
request_sink: Sender<(String, Sender<bool>)>,
}

impl DirectRpcClient {
Expand All @@ -133,20 +133,21 @@ impl DirectRpcClient {
client_tls_with_config(server_url.as_str(), stream, None, Some(connector))
.map_err(|e| format!("Could not open websocket connection: {:?}", e))?;

let (request_sender, request_receiver) = channel();
let (request_sender, request_receiver) = channel::<(String, Sender<bool>)>();

//it fails to perform handshake in non_blocking mode so we are setting it up after the handshake is performed
Self::switch_to_non_blocking(&mut socket);

std::thread::spawn(move || {
loop {
// let's flush all pending requests first
while let Ok(request) = request_receiver.try_recv() {
if let Err(e) = socket.write_message(Message::Text(request)) {
error!("Could not write message to socket, reason: {:?}", e)
}
while let Ok((request, result_sender)) = request_receiver.recv() {
let mut result = true;
if let Err(e) = socket.write_message(Message::Text(request)) {
error!("Could not write message to socket, reason: {:?}", e);
result = false;
}
if let Err(e) = result_sender.send(result) {
log::error!("Could not send rpc result back, reason: {:?}", e);
}
std::thread::sleep(Duration::from_millis(1))
}
});
debug!("Connected to peer: {}", url);
Expand Down Expand Up @@ -183,8 +184,15 @@ impl RpcClient for DirectRpcClient {
fn send(&mut self, request: &RpcRequest) -> Result<(), Box<dyn Error>> {
let request = serde_json::to_string(request)
.map_err(|e| format!("Could not parse RpcRequest {:?}", e))?;
let (sender, receiver) = channel();
self.request_sink
.send(request)
.map_err(|e| format!("Could not write message, reason: {:?}", e).into())
.send((request, sender))
.map_err(|e| format!("Could not parse RpcRequest {:?}", e))?;

if receiver.recv()? {
Ok(())
} else {
Err("Could not send request".into())
}
}
}
1 change: 1 addition & 0 deletions bitacross-worker/enclave-runtime/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 88f047f

Please sign in to comment.