Skip to content

Commit

Permalink
fix: added naive intersection check for bypassing threshold (#504)
Browse files Browse the repository at this point in the history
* Added max_concurrent_generation for limiting triple timeouts

* Added concurrent introduction param to limit each node generating

* Added naive intersection check for threshold

* Make stockpile test use 8 nodes (#506)

* Do not take in triple generation on reaching max capacity

* Try 22.04 default for 4vcpus

* number of triples adjusted

* Back to original GH instance

* Ignore test for now

* added back 8 and 24 concurrent generation

* Added more logging

* Made signature generation ignore messages where sig already generated

---------

Co-authored-by: Serhii Volovyk <sergeyvolovyk@gmail.com>
  • Loading branch information
ChaoticTempest and volovyks authored Mar 23, 2024
1 parent d3fc7f7 commit 58935e4
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 31 deletions.
4 changes: 2 additions & 2 deletions integration-tests/src/multichain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ impl Default for MultichainConfig {
nodes: 3,
threshold: 2,
triple_cfg: TripleConfig {
min_triples: 2,
max_triples: 10,
min_triples: 8,
max_triples: 80,
max_concurrent_introduction: 8,
max_concurrent_generation: 24,
},
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/multichain/actions/wait_for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn has_at_least_triples<'a>(
let mut state_views = Vec::new();
for id in 0..ctx.nodes.len() {
let state_view = is_enough_triples(id)
.retry(&ExponentialBuilder::default().with_max_times(10))
.retry(&ExponentialBuilder::default().with_max_times(15))
.await
.with_context(|| format!("mpc node '{id}' failed to generate '{expected_triple_count}' triples before deadline"))?;
state_views.push(state_view);
Expand Down
9 changes: 5 additions & 4 deletions integration-tests/tests/multichain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async fn test_signature_offline_node() -> anyhow::Result<()> {
}

#[test(tokio::test)]
#[ignore = "This test is too slow to run in CI"]
async fn test_signature_large_stockpile() -> anyhow::Result<()> {
const SIGNATURE_AMOUNT: usize = 10;
const NODES: usize = 3;
Expand All @@ -140,7 +141,7 @@ async fn test_signature_large_stockpile() -> anyhow::Result<()> {
// This is the total amount of triples that will be generated by all nodes.
max_triples: MAX_TRIPLES,
// This is the amount each node can introduce a triple generation protocol into the system.
max_concurrent_introduction: 8,
max_concurrent_introduction: 4,
// This is the maximum amount of triples that can be generated concurrently by the whole system.
max_concurrent_generation: 24,
};
Expand Down Expand Up @@ -172,10 +173,10 @@ async fn test_key_derivation() -> anyhow::Result<()> {
Box::pin(async move {
let state_0 = wait_for::running_mpc(&ctx, 0).await?;
assert_eq!(state_0.participants.len(), 3);
wait_for::has_at_least_triples(&ctx, 2).await?;
wait_for::has_at_least_presignatures(&ctx, 2).await?;
wait_for::has_at_least_triples(&ctx, 6).await?;
wait_for::has_at_least_presignatures(&ctx, 3).await?;

for _ in 0..5 {
for _ in 0..3 {
let mpc_pk: k256::AffinePoint = state_0.public_key.clone().into_affine_point();
let (_, payload_hashed, account, tx_hash) = actions::request_sign(&ctx).await?;
let payload_hashed_rev = {
Expand Down
3 changes: 3 additions & 0 deletions node/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct SignPayload {
#[derive(LakeContext)]
struct Context {
mpc_contract_id: AccountId,
node_account_id: AccountId,
gcp_service: GcpService,
queue: Arc<RwLock<SignQueue>>,
latest_block_height: Arc<RwLock<LatestBlockHeight>>,
Expand Down Expand Up @@ -111,6 +112,7 @@ async fn handle_block(
tracing::info!(
receipt_id = %receipt_id,
caller_id = receipt.predecessor_id().to_string(),
our_account = ctx.node_account_id.to_string(),
payload = hex::encode(sign_payload.payload),
key_version = sign_payload.key_version,
entropy = hex::encode(entropy),
Expand Down Expand Up @@ -201,6 +203,7 @@ pub fn run(
})?;
let context = Context {
mpc_contract_id,
node_account_id,
gcp_service,
queue,
latest_block_height: Arc::new(RwLock::new(latest_block_height)),
Expand Down
8 changes: 8 additions & 0 deletions node/src/protocol/contract/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ impl IntoIterator for Participants {
}

impl Participants {
pub fn len(&self) -> usize {
self.participants.len()
}

pub fn is_empty(&self) -> bool {
self.participants.is_empty()
}

pub fn insert(&mut self, id: &Participant, info: ParticipantInfo) {
self.participants.insert(*id, info);
}
Expand Down
56 changes: 49 additions & 7 deletions node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,14 @@ impl CryptographicProtocol for RunningState {
ctx: C,
) -> Result<NodeState, CryptographicError> {
let active = ctx.mesh().active_participants();
if active.len() < self.threshold {
tracing::info!(
active = ?active.keys_vec(),
"running: not enough participants to progress"
);
return Ok(NodeState::Running(self));
}

let mut messages = self.messages.write().await;
let mut triple_manager = self.triple_manager.write().await;
triple_manager.stockpile(active)?;
Expand All @@ -352,13 +360,25 @@ impl CryptographicProtocol for RunningState {
if let Some((triple0, triple1)) = triple_manager.take_two_mine().await {
let presig_participants = active
.intersection(&[&triple0.public.participants, &triple1.public.participants]);
presignature_manager.generate(
&presig_participants,
triple0,
triple1,
&self.public_key,
&self.private_share,
)?;
if presig_participants.len() < self.threshold {
tracing::debug!(
participants = ?presig_participants.keys_vec(),
"running(pre): we don't have enough participants to generate a presignature"
);

// Insert back the triples to be used later since this active set of
// participants were not able to make use of these triples.
triple_manager.insert_mine(triple0).await;
triple_manager.insert_mine(triple1).await;
} else {
presignature_manager.generate(
&presig_participants,
triple0,
triple1,
&self.public_key,
&self.private_share,
)?;
}
} else {
tracing::debug!(
"running(pre): we don't have enough triples to generate a presignature"
Expand All @@ -375,12 +395,22 @@ impl CryptographicProtocol for RunningState {
let mut signature_manager = self.signature_manager.write().await;
sign_queue.organize(self.threshold, active, ctx.me().await);
let my_requests = sign_queue.my_requests(ctx.me().await);
let mut failed_presigs = Vec::new();
while presignature_manager.my_len() > 0 {
if signature_manager.failed_len() > 0 {
let Some(presignature) = presignature_manager.take_mine() else {
break;
};
let sig_participants = active.intersection(&[&presignature.participants]);
if sig_participants.len() < self.threshold {
tracing::debug!(
participants = ?sig_participants.keys_vec(),
"running: we don't have enough participants to generate a failed signature"
);
failed_presigs.push(presignature);
continue;
}

signature_manager.retry_failed_generation(presignature, &sig_participants);
break;
}
Expand All @@ -395,6 +425,15 @@ impl CryptographicProtocol for RunningState {

let receipt_id = *receipt_id;
let sig_participants = active.intersection(&[&presignature.participants]);
if sig_participants.len() < self.threshold {
tracing::debug!(
participants = ?sig_participants.keys_vec(),
"running: we don't have enough participants to generate a signature"
);
failed_presigs.push(presignature);
continue;
}

let my_request = my_requests.remove(&receipt_id).unwrap();
signature_manager.generate(
&sig_participants,
Expand All @@ -408,6 +447,9 @@ impl CryptographicProtocol for RunningState {
)?;
}
drop(sign_queue);
for presignature in failed_presigs {
presignature_manager.insert_mine(presignature);
}
drop(presignature_manager);
for (p, msg) in signature_manager.poke() {
let info = self.participants.get(&p).unwrap();
Expand Down
9 changes: 9 additions & 0 deletions node/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ impl MessageHandler for RunningState {
presignature_id = message.presignature_id,
"new signature message"
);

// TODO: make consistent with presignature manager AlreadyGenerated.
if signature_manager.has_completed(&message.presignature_id) {
tracing::info!(
presignature_id = message.presignature_id,
"signature already generated, nothing left to do"
);
continue;
}
// if !self
// .sign_queue
// .read()
Expand Down
30 changes: 22 additions & 8 deletions node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,22 @@ impl PresignatureManager {
match self.generators.entry(id) {
Entry::Vacant(entry) => {
tracing::info!(id, "joining protocol to generate a new presignature");
let (triple0, triple1) =
match triple_manager.take_two(triple0, triple1, false).await {
Ok(result) => result,
Err(error) => {
tracing::warn!(?error, triple0, triple1,);
return Err(error);
}
};
let (triple0, triple1) = match triple_manager
.take_two(triple0, triple1, false)
.await
{
Ok(result) => result,
Err(error) => {
tracing::warn!(
?error,
id,
triple0,
triple1,
"could not initiate non-introduced presignature: triple might not have completed for this node yet"
);
return Err(error);
}
};
let generator = Self::generate_internal(
participants,
self.me,
Expand Down Expand Up @@ -246,6 +254,11 @@ impl PresignatureManager {
self.presignatures.remove(&id)
}

pub fn insert_mine(&mut self, presig: Presignature) {
self.mine.push_back(presig.id);
self.presignatures.insert(presig.id, presig);
}

/// Pokes all of the ongoing generation protocols and returns a vector of
/// messages to be sent to the respective participant.
///
Expand Down Expand Up @@ -297,6 +310,7 @@ impl PresignatureManager {
Action::Return(output) => {
tracing::info!(
id,
me = ?self.me,
big_r = ?output.big_r.to_base58(),
"completed presignature generation"
);
Expand Down
34 changes: 30 additions & 4 deletions node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use rand::seq::{IteratorRandom, SliceRandom};
use rand::SeedableRng;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::time::Instant;
use std::time::{Duration, Instant};

/// Duration for which completed signatures are retained.
pub const COMPLETION_EXISTENCE_TIMEOUT: Duration = Duration::from_secs(120 * 60);

pub struct SignRequest {
pub receipt_id: CryptoHash,
Expand Down Expand Up @@ -57,6 +60,7 @@ impl SignQueue {
if subset.contains(&&me) {
tracing::info!(
receipt_id = %request.receipt_id,
?me,
?subset,
?proposer,
"saving sign request: node is in the signer subset"
Expand All @@ -66,6 +70,7 @@ impl SignQueue {
} else {
tracing::info!(
receipt_id = %request.receipt_id,
?me,
?subset,
?proposer,
"skipping sign request: node is NOT in the signer subset"
Expand Down Expand Up @@ -151,6 +156,8 @@ pub struct SignatureManager {
generators: HashMap<CryptoHash, SignatureGenerator>,
/// Failed signatures awaiting to be retried.
failed_generators: VecDeque<(CryptoHash, FailedGenerator)>,
/// Set of completed signatures
completed: HashMap<PresignatureId, Instant>,
/// Generated signatures assigned to the current node that are yet to be published.
/// Vec<(receipt_id, msg_hash, timestamp, output)>
signatures: Vec<(CryptoHash, [u8; 32], Instant, FullSignature<Secp256k1>)>,
Expand All @@ -164,6 +171,7 @@ impl SignatureManager {
Self {
generators: HashMap::new(),
failed_generators: VecDeque::new(),
completed: HashMap::new(),
signatures: Vec::new(),
me,
public_key,
Expand Down Expand Up @@ -250,7 +258,13 @@ impl SignatureManager {
delta: Scalar,
sign_request_timestamp: Instant,
) -> Result<(), InitializationError> {
tracing::info!(%receipt_id, participants = ?participants.keys().collect::<Vec<_>>(), "starting protocol to generate a new signature");
tracing::info!(
%receipt_id,
me = ?self.me,
presignature_id = presignature.id,
participants = ?participants.keys_vec(),
"starting protocol to generate a new signature",
);
let generator = Self::generate_internal(
participants,
self.me,
Expand Down Expand Up @@ -286,11 +300,12 @@ impl SignatureManager {
) -> Result<Option<&mut SignatureProtocol>, InitializationError> {
match self.generators.entry(receipt_id) {
Entry::Vacant(entry) => {
tracing::info!(%receipt_id, "joining protocol to generate a new signature");
tracing::info!(%receipt_id, me = ?self.me, presignature_id, "joining protocol to generate a new signature");
let Some(presignature) = presignature_manager.take(presignature_id) else {
tracing::warn!(presignature_id, "presignature is missing, can't join");
tracing::warn!(me = ?self.me, presignature_id, "presignature is missing, can't join");
return Ok(None);
};
tracing::info!(me = ?self.me, presignature_id, "found presignature: ready to start signature generation");
let generator = Self::generate_internal(
participants,
self.me,
Expand Down Expand Up @@ -375,10 +390,13 @@ impl SignatureManager {
Action::Return(output) => {
tracing::info!(
?receipt_id,
me = ?self.me,
presignature_id = generator.presignature_id,
big_r = ?output.big_r.to_base58(),
s = ?output.s,
"completed signature generation"
);
self.completed.insert(generator.presignature_id, Instant::now());
if generator.proposer == self.me {
self.signatures
.push((*receipt_id, generator.msg_hash, generator.sign_request_timestamp, output));
Expand Down Expand Up @@ -436,4 +454,12 @@ impl SignatureManager {
}
Ok(())
}

/// Check whether or not the signature has been completed with this presignature_id.
pub fn has_completed(&mut self, presignature_id: &PresignatureId) -> bool {
self.completed
.retain(|_, timestamp| timestamp.elapsed() < COMPLETION_EXISTENCE_TIMEOUT);

self.completed.contains_key(presignature_id)
}
}
Loading

0 comments on commit 58935e4

Please sign in to comment.