diff --git a/.github/workflows/multichain-integration.yml b/.github/workflows/multichain-integration.yml index b5b3703ad..6722bae5d 100644 --- a/.github/workflows/multichain-integration.yml +++ b/.github/workflows/multichain-integration.yml @@ -40,6 +40,7 @@ jobs: run: | docker pull ghcr.io/near/os-relayer:12ba6e35690df3979fce0b36a41d0ca0db9c0ab4 docker pull ghcr.io/near/near-lake-indexer:e6519c922435f3d18b5f2ddac5d1ec171ef4dd6b + docker pull localstack/localstack:latest - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 diff --git a/integration-tests/src/multichain/mod.rs b/integration-tests/src/multichain/mod.rs index fa0b7020f..9226907c2 100644 --- a/integration-tests/src/multichain/mod.rs +++ b/integration-tests/src/multichain/mod.rs @@ -23,6 +23,17 @@ pub enum Nodes<'a> { } impl Nodes<'_> { + pub fn len(&self) -> usize { + match self { + Nodes::Local { nodes, .. } => nodes.len(), + Nodes::Docker { nodes, .. } => nodes.len(), + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + pub fn ctx(&self) -> &Context { match self { Nodes::Local { ctx, .. } => ctx, diff --git a/integration-tests/tests/lib.rs b/integration-tests/tests/lib.rs index 501465554..8ae6381ed 100644 --- a/integration-tests/tests/lib.rs +++ b/integration-tests/tests/lib.rs @@ -243,6 +243,35 @@ mod wait_for { .retry(&ExponentialBuilder::default().with_max_times(6)) .await } + + pub async fn has_at_least_presignatures<'a>( + ctx: &MultichainTestContext<'a>, + id: usize, + expected_presignature_count: usize, + ) -> anyhow::Result { + let is_enough_presignatures = || async { + let state_view: StateView = ctx + .http_client + .get(format!("{}/state", ctx.nodes.url(id))) + .send() + .await? + .json() + .await?; + + match state_view { + StateView::Running { + presignature_count, .. + } if presignature_count >= expected_presignature_count => Ok(state_view), + StateView::Running { .. } => { + anyhow::bail!("node does not have enough presignatures yet") + } + StateView::NotRunning => anyhow::bail!("node is not running"), + } + }; + is_enough_presignatures + .retry(&ExponentialBuilder::default().with_max_times(6)) + .await + } } trait MpcCheck { diff --git a/integration-tests/tests/multichain/mod.rs b/integration-tests/tests/multichain/mod.rs index 38788724d..e7fd71c47 100644 --- a/integration-tests/tests/multichain/mod.rs +++ b/integration-tests/tests/multichain/mod.rs @@ -30,10 +30,19 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { } #[test(tokio::test)] -async fn test_triples() -> anyhow::Result<()> { +async fn test_triples_and_presignatures() -> anyhow::Result<()> { with_multichain_nodes(3, |ctx| { Box::pin(async move { - wait_for::has_at_least_triples(&ctx, 0, 2).await?; + // Wait for network to complete key generation + let state_0 = wait_for::running_mpc(&ctx, 0).await?; + assert_eq!(state_0.participants.len(), 3); + + for i in 0..ctx.nodes.len() { + wait_for::has_at_least_triples(&ctx, i, 2).await?; + } + for i in 0..ctx.nodes.len() { + wait_for::has_at_least_presignatures(&ctx, i, 2).await?; + } Ok(()) }) diff --git a/node/src/protocol/consensus.rs b/node/src/protocol/consensus.rs index 6cab58e1d..ad8372983 100644 --- a/node/src/protocol/consensus.rs +++ b/node/src/protocol/consensus.rs @@ -3,6 +3,7 @@ use super::state::{ JoiningState, NodeState, PersistentNodeData, RunningState, StartedState, WaitingForConsensusState, }; +use crate::protocol::presignature::PresignatureManager; use crate::protocol::state::{GeneratingState, ResharingState}; use crate::protocol::triple::TripleManager; use crate::types::PrivateKeyShare; @@ -86,7 +87,7 @@ impl ConsensusProtocol for StartedState { tracing::info!( "contract state is running and we are already a participant" ); - let participants_vec = + let participants_vec: Vec = contract_state.participants.keys().cloned().collect(); Ok(NodeState::Running(RunningState { epoch, @@ -95,6 +96,12 @@ impl ConsensusProtocol for StartedState { private_share, public_key, triple_manager: TripleManager::new( + participants_vec.clone(), + ctx.me(), + contract_state.threshold, + epoch, + ), + presignature_manager: PresignatureManager::new( participants_vec, ctx.me(), contract_state.threshold, @@ -261,7 +268,8 @@ impl ConsensusProtocol for WaitingForConsensusState { if contract_state.public_key != self.public_key { return Err(ConsensusError::MismatchedPublicKey); } - let participants_vec = self.participants.keys().cloned().collect(); + let participants_vec: Vec = + self.participants.keys().cloned().collect(); Ok(NodeState::Running(RunningState { epoch: self.epoch, participants: self.participants, @@ -269,6 +277,12 @@ impl ConsensusProtocol for WaitingForConsensusState { private_share: self.private_share, public_key: self.public_key, triple_manager: TripleManager::new( + participants_vec.clone(), + ctx.me(), + self.threshold, + self.epoch, + ), + presignature_manager: PresignatureManager::new( participants_vec, ctx.me(), self.threshold, diff --git a/node/src/protocol/cryptography.rs b/node/src/protocol/cryptography.rs index 8f4af1415..0aed2fd36 100644 --- a/node/src/protocol/cryptography.rs +++ b/node/src/protocol/cryptography.rs @@ -109,7 +109,7 @@ impl CryptographicProtocol for ResharingState { ) -> Result { tracing::info!("progressing key reshare"); loop { - let action = self.protocol.poke().unwrap(); + let action = self.protocol.poke()?; match action { Action::Wait => { tracing::debug!("waiting"); @@ -173,13 +173,39 @@ impl CryptographicProtocol for RunningState { mut self, ctx: C, ) -> Result { - if self.triple_manager.potential_len() < 2 { + if self.triple_manager.my_len() < 2 { self.triple_manager.generate()?; } for (p, msg) in self.triple_manager.poke()? { let url = self.participants.get(&p).unwrap(); http_client::message(ctx.http_client(), url.clone(), MpcMessage::Triple(msg)).await?; } + + if self.presignature_manager.potential_len() < 2 { + // To ensure there is no contention between different nodes we are only using triples + // that we proposed. This way in a non-BFT environment we are guaranteed to never try + // to use the same triple as any other node. + if let Some((triple0, triple1)) = self.triple_manager.take_mine_twice() { + self.presignature_manager.generate( + triple0, + triple1, + &self.public_key, + &self.private_share, + )?; + } else { + tracing::debug!("we don't have enough triples to generate a presignature"); + } + } + for (p, msg) in self.presignature_manager.poke()? { + let url = self.participants.get(&p).unwrap(); + http_client::message( + ctx.http_client(), + url.clone(), + MpcMessage::Presignature(msg), + ) + .await?; + } + Ok(NodeState::Running(self)) } } diff --git a/node/src/protocol/message.rs b/node/src/protocol/message.rs index 0ef2724a3..7c64e2f85 100644 --- a/node/src/protocol/message.rs +++ b/node/src/protocol/message.rs @@ -29,11 +29,22 @@ pub struct TripleMessage { pub data: MessageData, } +#[derive(Serialize, Deserialize, Debug)] +pub struct PresignatureMessage { + pub id: u64, + pub triple0: u64, + pub triple1: u64, + pub epoch: u64, + pub from: Participant, + pub data: MessageData, +} + #[derive(Serialize, Deserialize, Debug)] pub enum MpcMessage { Generating(GeneratingMessage), Resharing(ResharingMessage), Triple(TripleMessage), + Presignature(PresignatureMessage), } #[derive(Default)] @@ -41,6 +52,7 @@ pub struct MpcMessageQueue { generating: VecDeque, resharing_bins: HashMap>, triple_bins: HashMap>>, + presignature_bins: HashMap>>, } impl MpcMessageQueue { @@ -59,6 +71,13 @@ impl MpcMessageQueue { .entry(message.id) .or_default() .push_back(message), + MpcMessage::Presignature(message) => self + .presignature_bins + .entry(message.epoch) + .or_default() + .entry(message.id) + .or_default() + .push_back(message), } } } @@ -121,6 +140,20 @@ impl MessageHandler for RunningState { } } } + for (id, queue) in queue.presignature_bins.entry(self.epoch).or_default() { + while let Some(message) = queue.pop_front() { + if let Some(protocol) = self.presignature_manager.get_or_generate( + *id, + message.triple0, + message.triple1, + &mut self.triple_manager, + &self.public_key, + &self.private_share, + )? { + protocol.message(message.from, message.data); + } + } + } Ok(()) } } diff --git a/node/src/protocol/mod.rs b/node/src/protocol/mod.rs index 1c155efd9..aaccc1132 100644 --- a/node/src/protocol/mod.rs +++ b/node/src/protocol/mod.rs @@ -2,6 +2,7 @@ mod consensus; mod contract; mod cryptography; mod message; +mod presignature; mod state; mod triple; diff --git a/node/src/protocol/presignature.rs b/node/src/protocol/presignature.rs new file mode 100644 index 000000000..9dde4bcff --- /dev/null +++ b/node/src/protocol/presignature.rs @@ -0,0 +1,236 @@ +use super::message::PresignatureMessage; +use super::triple::{Triple, TripleId, TripleManager}; +use crate::types::{PresignatureProtocol, PrivateKeyShare, PublicKey}; +use crate::util::AffinePointExt; +use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; +use cait_sith::{KeygenOutput, PresignArguments, PresignOutput}; +use k256::Secp256k1; +use std::collections::hash_map::Entry; +use std::collections::HashMap; + +/// Unique number used to identify a specific ongoing presignature generation protocol. +/// Without `PresignatureId` it would be unclear where to route incoming cait-sith presignature +/// generation messages. +pub type PresignatureId = u64; + +/// A completed presignature. +pub struct Presignature { + pub id: PresignatureId, + pub output: PresignOutput, +} + +/// An ongoing presignature generator. +pub struct PresignatureGenerator { + pub protocol: PresignatureProtocol, + pub triple0: TripleId, + pub triple1: TripleId, +} + +/// Abstracts how triples are generated by providing a way to request a new triple that will be +/// complete some time in the future and a way to take an already generated triple. +pub struct PresignatureManager { + /// Completed unspent presignatures. + presignatures: HashMap, + /// Ongoing triple generation protocols. + generators: HashMap, + + participants: Vec, + me: Participant, + threshold: usize, + epoch: u64, +} + +impl PresignatureManager { + pub fn new( + participants: Vec, + me: Participant, + threshold: usize, + epoch: u64, + ) -> Self { + Self { + presignatures: HashMap::new(), + generators: HashMap::new(), + participants, + me, + threshold, + epoch, + } + } + + /// Returns the number of unspent presignatures available in the manager. + pub fn len(&self) -> usize { + self.presignatures.len() + } + + /// Returns the number of unspent presignatures we will have in the manager once + /// all ongoing generation protocols complete. + pub fn potential_len(&self) -> usize { + self.presignatures.len() + self.generators.len() + } + + fn generate_internal( + participants: &[Participant], + me: Participant, + threshold: usize, + triple0: Triple, + triple1: Triple, + public_key: &PublicKey, + private_share: &PrivateKeyShare, + ) -> Result { + let protocol = Box::new(cait_sith::presign( + participants, + me, + PresignArguments { + triple0: (triple0.share, triple0.public), + triple1: (triple1.share, triple1.public), + keygen_out: KeygenOutput { + private_share: *private_share, + public_key: *public_key, + }, + threshold, + }, + )?); + Ok(PresignatureGenerator { + protocol, + triple0: triple0.id, + triple1: triple1.id, + }) + } + + /// Starts a new presignature generation protocol. + pub fn generate( + &mut self, + triple0: Triple, + triple1: Triple, + public_key: &PublicKey, + private_share: &PrivateKeyShare, + ) -> Result<(), InitializationError> { + let id = rand::random(); + tracing::info!(id, "starting protocol to generate a new presignature"); + let generator = Self::generate_internal( + &self.participants, + self.me, + self.threshold, + triple0, + triple1, + public_key, + private_share, + )?; + self.generators.insert(id, generator); + Ok(()) + } + + /// Ensures that the presignature with the given id is either: + /// 1) Already generated in which case returns `None`, or + /// 2) Is currently being generated by `protocol` in which case returns `Some(protocol)`, or + /// 3) Has never been seen by the manager in which case start a new protocol and returns `Some(protocol)`, or + /// 4) Depends on triples (`triple0`/`triple1`) that are unknown to the node + // TODO: What if the presignature completed generation and is already spent? + pub fn get_or_generate( + &mut self, + id: PresignatureId, + triple0: TripleId, + triple1: TripleId, + triple_manager: &mut TripleManager, + public_key: &PublicKey, + private_share: &PrivateKeyShare, + ) -> Result, InitializationError> { + if self.presignatures.contains_key(&id) { + Ok(None) + } else { + match self.generators.entry(id) { + Entry::Vacant(entry) => { + tracing::info!(id, "joining protocol to generate a new presignature"); + let Some(triple0) = triple_manager.take(triple0) else { + tracing::warn!(triple_id = triple0, "triple0 is missing, can't join"); + return Ok(None); + }; + let triple1 = match triple_manager.take(triple1) { + Some(triple1) => triple1, + None => { + tracing::warn!(triple_id = triple1, "triple1 is missing, can't join"); + return Ok(None); + } + }; + let generator = Self::generate_internal( + &self.participants, + self.me, + self.threshold, + triple0, + triple1, + public_key, + private_share, + )?; + let generator = entry.insert(generator); + Ok(Some(&mut generator.protocol)) + } + Entry::Occupied(entry) => Ok(Some(&mut entry.into_mut().protocol)), + } + } + } + + /// Pokes all of the ongoing generation protocols and returns a vector of + /// messages to be sent to the respective participant. + /// + /// An empty vector means we cannot progress until we receive a new message. + pub fn poke(&mut self) -> Result, ProtocolError> { + let mut messages = Vec::new(); + let mut result = Ok(()); + self.generators.retain(|id, generator| { + loop { + let action = match generator.protocol.poke() { + Ok(action) => action, + Err(e) => { + result = Err(e); + break false; + } + }; + match action { + Action::Wait => { + tracing::debug!("waiting"); + // Retain protocol until we are finished + return true; + } + Action::SendMany(data) => { + for p in &self.participants { + messages.push(( + *p, + PresignatureMessage { + id: *id, + triple0: generator.triple0, + triple1: generator.triple1, + epoch: self.epoch, + from: self.me, + data: data.clone(), + }, + )) + } + } + Action::SendPrivate(p, data) => messages.push(( + p, + PresignatureMessage { + id: *id, + triple0: generator.triple0, + triple1: generator.triple1, + epoch: self.epoch, + from: self.me, + data: data.clone(), + }, + )), + Action::Return(output) => { + tracing::info!( + id, + big_r = ?output.big_r.to_base58(), + "completed presignature generation" + ); + self.presignatures + .insert(*id, Presignature { id: *id, output }); + // Do not retain the protocol + return false; + } + } + } + }); + result.map(|_| messages) + } +} diff --git a/node/src/protocol/state.rs b/node/src/protocol/state.rs index 0816b3dd9..8c1d19b88 100644 --- a/node/src/protocol/state.rs +++ b/node/src/protocol/state.rs @@ -1,3 +1,4 @@ +use super::presignature::PresignatureManager; use super::triple::TripleManager; use crate::types::{KeygenProtocol, PrivateKeyShare, PublicKey, ReshareProtocol}; use cait_sith::protocol::Participant; @@ -33,6 +34,7 @@ pub struct RunningState { pub private_share: PrivateKeyShare, pub public_key: PublicKey, pub triple_manager: TripleManager, + pub presignature_manager: PresignatureManager, } pub struct ResharingState { @@ -49,6 +51,7 @@ pub struct JoiningState { } #[derive(Default)] +#[allow(clippy::large_enum_variant)] pub enum NodeState { #[default] Starting, diff --git a/node/src/protocol/triple.rs b/node/src/protocol/triple.rs index 7c1bd7dc1..ca5983d96 100644 --- a/node/src/protocol/triple.rs +++ b/node/src/protocol/triple.rs @@ -2,25 +2,40 @@ use super::message::TripleMessage; use crate::types::TripleProtocol; use crate::util::AffinePointExt; use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; -use cait_sith::triples::TripleGenerationOutput; +use cait_sith::triples::{TriplePub, TripleShare}; use k256::Secp256k1; -use std::collections::btree_map::Entry; -use std::collections::BTreeMap; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; /// Unique number used to identify a specific ongoing triple generation protocol. /// Without `TripleId` it would be unclear where to route incoming cait-sith triple generation /// messages. pub type TripleId = u64; +/// A completed triple. +pub struct Triple { + pub id: TripleId, + pub share: TripleShare, + pub public: TriplePub, +} + +/// An ongoing triple generator. +pub struct TripleGenerator { + /// Ongoing cait-sith triple generation protocol. + pub protocol: TripleProtocol, + /// Whether this triple generation was initiated by the current node. + pub mine: bool, +} + /// Abstracts how triples are generated by providing a way to request a new triple that will be /// complete some time in the future and a way to take an already generated triple. pub struct TripleManager { /// Completed unspent triples - // TODO: I put these into `BTreeMap` so we can potentially use the ordering for choosing which - // triple to use for consensus, but it might not be necessary. - triples: BTreeMap>, + triples: HashMap, /// Ongoing triple generation protocols - generators: BTreeMap, + generators: HashMap, + /// List of triple ids generation of which was initiated by the current node. + mine: VecDeque, participants: Vec, me: Participant, @@ -36,8 +51,9 @@ impl TripleManager { epoch: u64, ) -> Self { Self { - triples: BTreeMap::new(), - generators: BTreeMap::new(), + triples: HashMap::new(), + generators: HashMap::new(), + mine: VecDeque::new(), participants, me, threshold, @@ -50,6 +66,11 @@ impl TripleManager { self.triples.len() } + /// Returns the number of unspent triples assigned to this node. + pub fn my_len(&self) -> usize { + self.mine.len() + } + /// Returns the number of unspent triples we will have in the manager once /// all ongoing generation protocols complete. pub fn potential_len(&self) -> usize { @@ -65,17 +86,45 @@ impl TripleManager { self.me, self.threshold, )?); - self.generators.insert(id, protocol); + self.generators.insert( + id, + TripleGenerator { + protocol, + mine: true, + }, + ); Ok(()) } /// Take an unspent triple by its id with no way to return it. /// It is very important to NOT reuse the same triple twice for two different /// protocols. - pub fn take(&mut self, id: TripleId) -> Option> { + pub fn take(&mut self, id: TripleId) -> Option { self.triples.remove(&id) } + /// Take two random unspent triple generated by this node. Either takes both or none. + /// It is very important to NOT reuse the same triple twice for two different + /// protocols. + pub fn take_mine_twice(&mut self) -> Option<(Triple, Triple)> { + tracing::info!(mine = ?self.mine, "my triples"); + if self.mine.len() < 2 { + return None; + } + let id0 = self.mine.pop_front()?; + let id1 = self.mine.pop_front()?; + tracing::info!(id0, id1, "trying to take two triples"); + if self.triples.contains_key(&id0) && self.triples.contains_key(&id1) { + Some(( + self.triples.remove(&id0).unwrap(), + self.triples.remove(&id1).unwrap(), + )) + } else { + tracing::warn!(id0, id1, "my triples are gone"); + None + } + } + /// Ensures that the triple with the given id is either: /// 1) Already generated in which case returns `None`, or /// 2) Is currently being generated by `protocol` in which case returns `Some(protocol)`, or @@ -91,14 +140,18 @@ impl TripleManager { match self.generators.entry(id) { Entry::Vacant(e) => { tracing::info!(id, "joining protocol to generate a new triple"); - let protocol = cait_sith::triples::generate_triple( + let protocol = Box::new(cait_sith::triples::generate_triple( &self.participants, self.me, self.threshold, - )?; - Ok(Some(e.insert(Box::new(protocol)))) + )?); + let generator = e.insert(TripleGenerator { + protocol, + mine: false, + }); + Ok(Some(&mut generator.protocol)) } - Entry::Occupied(e) => Ok(Some(e.into_mut())), + Entry::Occupied(e) => Ok(Some(&mut e.into_mut().protocol)), } } } @@ -110,9 +163,9 @@ impl TripleManager { pub fn poke(&mut self) -> Result, ProtocolError> { let mut messages = Vec::new(); let mut result = Ok(()); - self.generators.retain(|id, protocol| { + self.generators.retain(|id, generator| { loop { - let action = match protocol.poke() { + let action = match generator.protocol.poke() { Ok(action) => action, Err(e) => { result = Err(e); @@ -155,7 +208,17 @@ impl TripleManager { big_c = ?output.1.big_c.to_base58(), "completed triple generation" ); - self.triples.insert(*id, output); + self.triples.insert( + *id, + Triple { + id: *id, + share: output.0, + public: output.1, + }, + ); + if generator.mine { + self.mine.push_back(*id); + } // Do not retain the protocol break false; } diff --git a/node/src/types.rs b/node/src/types.rs index 6285320d4..34b03c830 100644 --- a/node/src/types.rs +++ b/node/src/types.rs @@ -1,4 +1,5 @@ use cait_sith::triples::TripleGenerationOutput; +use cait_sith::PresignOutput; use cait_sith::{protocol::Protocol, KeygenOutput}; use k256::{elliptic_curve::CurveArithmetic, Secp256k1}; @@ -8,3 +9,4 @@ pub type KeygenProtocol = Box> + S pub type ReshareProtocol = Box + Send + Sync>; pub type TripleProtocol = Box> + Send + Sync>; +pub type PresignatureProtocol = Box> + Send + Sync>; diff --git a/node/src/web/mod.rs b/node/src/web/mod.rs index 2e63c67ba..1cb31d637 100644 --- a/node/src/web/mod.rs +++ b/node/src/web/mod.rs @@ -133,6 +133,7 @@ pub enum StateView { Running { participants: Vec, triple_count: usize, + presignature_count: usize, }, NotRunning, } @@ -149,6 +150,7 @@ async fn state(Extension(state): Extension>) -> (StatusCode, Json Json(StateView::Running { participants: state.participants.keys().cloned().collect(), triple_count: state.triple_manager.len(), + presignature_count: state.presignature_manager.len(), }), ) }