From 7e7555df31e2b603c2af0b442aeef6b56b9119f7 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Fri, 18 Oct 2024 17:44:27 +0900 Subject: [PATCH 1/5] Prevent messages from taking too long to be sent --- chain-signatures/node/src/http_client.rs | 68 ++++++++++++------- chain-signatures/node/src/metrics.rs | 19 ------ .../node/src/protocol/consensus.rs | 11 ++- chain-signatures/node/src/protocol/mod.rs | 7 +- 4 files changed, 57 insertions(+), 48 deletions(-) diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index 40a80a5a..5fa70079 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -11,6 +11,8 @@ use std::time::{Duration, Instant}; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; +use near_account_id::AccountId; + #[derive(Debug, thiserror::Error)] pub enum SendError { #[error("http request was unsuccessful: {0}")] @@ -33,7 +35,7 @@ pub enum SendError { async fn send_encrypted( from: Participant, - client: &Client, + client: Client, url: U, message: Vec, ) -> Result<(), SendError> { @@ -75,13 +77,21 @@ async fn send_encrypted( // TODO: add in retry logic either in struct or at call site. // TODO: add check for participant list to see if the messages to be sent are still valid. -#[derive(Default)] pub struct MessageQueue { deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>, seen_counts: HashSet, + account_id: AccountId, } impl MessageQueue { + pub fn new(id: &AccountId) -> Self { + Self { + deque: VecDeque::new(), + seen_counts: HashSet::new(), + account_id: id.clone(), + } + } + pub fn len(&self) -> usize { self.deque.len() } @@ -135,47 +145,57 @@ impl MessageQueue { encrypted.push((encrypted_msg, (info, msg, instant))); } - let mut compacted = 0; + let mut tasks = tokio::task::JoinSet::new(); for (id, encrypted) in encrypted { for partition in partition_ciphered_256kb(encrypted) { - let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip(); + let (encrypted_partition, _msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip(); // guaranteed to unwrap due to our previous loop check: - let info = participants.get(&Participant::from(id)).unwrap(); + let id = Participant::from(id); + let info = participants.get(&id).unwrap(); let account_id = &info.account_id; - let start = Instant::now(); crate::metrics::NUM_SEND_ENCRYPTED_TOTAL .with_label_values(&[account_id.as_str()]) .inc(); - if let Err(err) = send_encrypted(from, client, &info.url, encrypted_partition).await - { - crate::metrics::NUM_SEND_ENCRYPTED_FAILURE - .with_label_values(&[account_id.as_str()]) - .inc(); - crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY - .with_label_values(&[account_id.as_str()]) - .observe(start.elapsed().as_millis() as f64); - // since we failed, put back all the messages related to this - failed.extend(msgs); + tasks.spawn(send_encrypted( + from, + client.clone(), + info.url.clone(), + encrypted_partition, + )); + } + } + + let mut compacted = 0; + while let Some(result) = tasks.join_next().await { + match result { + Ok(result) => { + let Err(err) = result else { + compacted += 1; + continue; + }; errors.push(err); - } else { - compacted += msgs.len(); - crate::metrics::SEND_ENCRYPTED_LATENCY - .with_label_values(&[account_id.as_str()]) - .observe(start.elapsed().as_millis() as f64); + } + Err(err) => { + tracing::error!(?err, "message queue task failure"); } } } - if uncompacted > 0 { + let elapsed = outer.elapsed(); + if elapsed > Duration::from_millis(100) && uncompacted > 0 { tracing::info!( uncompacted, compacted, - "{from:?} sent messages in {:?};", - outer.elapsed() + "{from:?} sent messages in {:?}", + elapsed, ); } + crate::metrics::SEND_ENCRYPTED_LATENCY + .with_label_values(&[self.account_id.as_str()]) + .observe(elapsed.as_millis() as f64); + // only add the participant count if it hasn't been seen before. let counts = format!("{participant_counter:?}"); if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) { diff --git a/chain-signatures/node/src/metrics.rs b/chain-signatures/node/src/metrics.rs index d03fae28..7b4b2ae6 100644 --- a/chain-signatures/node/src/metrics.rs +++ b/chain-signatures/node/src/metrics.rs @@ -307,15 +307,6 @@ pub(crate) static PROTOCOL_LATENCY_ITER_MESSAGE: Lazy = Lazy::new( .unwrap() }); -pub(crate) static NUM_SEND_ENCRYPTED_FAILURE: Lazy = Lazy::new(|| { - try_create_counter_vec( - "multichain_send_encrypted_failure", - "number of successful send encrypted", - &["node_account_id"], - ) - .unwrap() -}); - pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy = Lazy::new(|| { try_create_counter_vec( "multichain_send_encrypted_total", @@ -325,16 +316,6 @@ pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy = Lazy::new(|| { .unwrap() }); -pub(crate) static FAILED_SEND_ENCRYPTED_LATENCY: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "multichain_failed_send_encrypted_ms", - "Latency of failed send encrypted.", - &["node_account_id"], - Some(exponential_buckets(0.5, 1.5, 20).unwrap()), - ) - .unwrap() -}); - pub(crate) static NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS: Lazy = Lazy::new(|| { try_create_counter_vec( "multichain_num_total_historical_signature_generators", diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index 6d3917af..0d913485 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -6,6 +6,7 @@ use super::state::{ use super::{Config, SignQueue}; use crate::gcp::error::DatastoreStorageError; use crate::gcp::error::SecretStorageError; +use crate::http_client::MessageQueue; use crate::protocol::contract::primitives::Participants; use crate::protocol::monitor::StuckMonitor; use crate::protocol::presignature::PresignatureManager; @@ -170,7 +171,9 @@ impl ConsensusProtocol for StartedState { ctx.my_account_id(), ), )), - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.my_account_id(), + ))), })) } None => Ok(NodeState::Joining(JoiningState { @@ -224,7 +227,9 @@ impl ConsensusProtocol for StartedState { participants, threshold: contract_state.threshold, protocol, - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.my_account_id(), + ))), })) } None => { @@ -760,6 +765,6 @@ async fn start_resharing( threshold: contract_state.threshold, public_key: contract_state.public_key, protocol, - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new(ctx.my_account_id()))), })) } diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index e8f3b00d..d17a14e3 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -239,15 +239,17 @@ impl MpcSignProtocol { crate::metrics::PROTOCOL_ITER_CNT .with_label_values(&[my_account_id.as_str()]) .inc(); + + let msg_time = Instant::now(); + let mut msg_count = 0; loop { let msg_result = self.receiver.try_recv(); match msg_result { Ok(msg) => { - tracing::debug!("received a new message"); + msg_count += 1; queue.push(msg); } Err(TryRecvError::Empty) => { - tracing::debug!("no new messages received"); break; } Err(TryRecvError::Disconnected) => { @@ -256,6 +258,7 @@ impl MpcSignProtocol { } } } + tracing::debug!("received {msg_count} messages in {:?}", msg_time.elapsed()); let contract_state = if last_state_update.elapsed() > Duration::from_secs(1) { let contract_state = match rpc_client::fetch_mpc_contract_state( From d8d894b42c4ec294d6ed84968545e56adf91a10a Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Sat, 19 Oct 2024 16:42:52 +0900 Subject: [PATCH 2/5] comments --- chain-signatures/node/src/http_client.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index ee4438a1..76d59d17 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -206,12 +206,11 @@ impl MessageQueue { } let elapsed = outer.elapsed(); - if elapsed > Duration::from_millis(100) && uncompacted > 0 { + if uncompacted > 0 { tracing::info!( uncompacted, compacted, - "{from:?} sent messages in {:?}", - elapsed, + "{from:?} sent messages in {elapsed:?}", ); } crate::metrics::SEND_ENCRYPTED_LATENCY From 83ab09fef8c5509e786a8016f7931fa356ed8925 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Tue, 29 Oct 2024 19:57:49 +0900 Subject: [PATCH 3/5] Fix test --- chain-signatures/node/src/mesh/connection.rs | 2 +- integration-tests/chain-signatures/tests/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 310f4363..c446ddcc 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -206,7 +206,7 @@ impl Pool { let empty_msg: Vec = Vec::new(); crate::http_client::send_encrypted( *participant, - &self.http, + self.http.clone(), participant_info.url.clone(), empty_msg, self.fetch_participant_timeout, diff --git a/integration-tests/chain-signatures/tests/lib.rs b/integration-tests/chain-signatures/tests/lib.rs index 74fab552..d234926d 100644 --- a/integration-tests/chain-signatures/tests/lib.rs +++ b/integration-tests/chain-signatures/tests/lib.rs @@ -61,7 +61,7 @@ impl MultichainTestContext<'_> { self.nodes.start_node(&self.cfg, &node_account).await?; // Wait for new node to add itself as a candidate - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(20)).await; // T number of participants should vote let participants = self.participant_accounts().await?; From 9f05f0cb74a63f585bd21432ea0fd2a3d187311a Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Tue, 29 Oct 2024 23:46:39 +0900 Subject: [PATCH 4/5] unwrap results to get actual error --- .../chain-signatures/tests/cases/mod.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index a166d7d2..27665c53 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -28,7 +28,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { actions::single_signature_production(&ctx, &state).await?; tracing::info!("!!! Add participant 3"); - assert!(ctx.add_participant(None).await.is_ok()); + ctx.add_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; @@ -39,14 +39,12 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { state.participants.keys().nth(2).unwrap().clone().as_ref(), ) .unwrap(); - assert!(ctx.remove_participant(Some(&account_2)).await.is_ok()); + ctx.remove_participant(Some(&account_2)).await.unwrap(); let account_0 = near_workspaces::types::AccountId::from_str( state.participants.keys().next().unwrap().clone().as_ref(), ) .unwrap(); - let node_cfg_0 = ctx.remove_participant(Some(&account_0)).await; - assert!(node_cfg_0.is_ok()); - let node_cfg_0 = node_cfg_0.unwrap(); + let node_cfg_0 = ctx.remove_participant(Some(&account_0)).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; @@ -56,14 +54,14 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { assert!(ctx.remove_participant(None).await.is_err()); tracing::info!("!!! Add participant 5"); - assert!(ctx.add_participant(None).await.is_ok()); + ctx.add_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; actions::single_signature_production(&ctx, &state).await?; tracing::info!("!!! Add back participant 0"); - assert!(ctx.add_participant(Some(node_cfg_0)).await.is_ok()); + ctx.add_participant(Some(node_cfg_0)).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; @@ -331,19 +329,19 @@ async fn test_multichain_reshare_with_lake_congestion() -> anyhow::Result<()> { add_latency(&ctx.nodes.proxy_name_for_node(1), true, 1.0, 1_000, 100).await?; // remove node2, node0 and node1 should still reach concensus // this fails if the latency above is too long (10s) - assert!(ctx.remove_participant(None).await.is_ok()); + ctx.remove_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, Some(0)).await?; assert!(state.participants.len() == 2); // Going below T should error out assert!(ctx.remove_participant(None).await.is_err()); let state = wait_for::running_mpc(&ctx, Some(0)).await?; assert!(state.participants.len() == 2); - assert!(ctx.add_participant(None).await.is_ok()); + ctx.add_participant(None).await.unwrap(); // add latency to node2->rpc add_latency(&ctx.nodes.proxy_name_for_node(2), true, 1.0, 1_000, 100).await?; let state = wait_for::running_mpc(&ctx, Some(0)).await?; assert!(state.participants.len() == 3); - assert!(ctx.remove_participant(None).await.is_ok()); + ctx.remove_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, Some(0)).await?; assert!(state.participants.len() == 2); // make sure signing works after reshare From 9102575e20e868e71b761e3cb6c7b0446a5b0049 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Wed, 30 Oct 2024 13:01:29 +0900 Subject: [PATCH 5/5] Increase wait time for running_mpc check --- integration-tests/chain-signatures/tests/actions/wait_for.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/chain-signatures/tests/actions/wait_for.rs b/integration-tests/chain-signatures/tests/actions/wait_for.rs index ac2e0069..3c774cdc 100644 --- a/integration-tests/chain-signatures/tests/actions/wait_for.rs +++ b/integration-tests/chain-signatures/tests/actions/wait_for.rs @@ -51,7 +51,7 @@ pub async fn running_mpc<'a>( } ); is_running - .retry(&ExponentialBuilder::default().with_max_times(6)) + .retry(&ExponentialBuilder::default().with_max_times(7)) .await .with_context(|| err_msg) }