diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 15122674f..25bbf7449 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -2,7 +2,7 @@ use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig}; use crate::gcp::GcpService; use crate::protocol::{MpcSignProtocol, SignQueue}; use crate::storage::triple_storage::LockTripleNodeStorageBox; -use crate::{indexer, storage, web}; +use crate::{http_client, indexer, mesh, storage, web}; use clap::Parser; use local_ip_address::local_ip; use near_account_id::AccountId; @@ -63,6 +63,10 @@ pub enum Cli { /// referer header for mainnet whitelist #[arg(long, env("MPC_CLIENT_HEADER_REFERER"), default_value(None))] client_header_referer: Option, + #[clap(flatten)] + mesh_options: mesh::Options, + #[clap(flatten)] + message_options: http_client::Options, }, } @@ -83,6 +87,8 @@ impl Cli { storage_options, override_config, client_header_referer, + mesh_options, + message_options, } => { let mut args = vec![ "start".to_string(), @@ -120,6 +126,8 @@ impl Cli { args.extend(indexer_options.into_str_args()); args.extend(storage_options.into_str_args()); + args.extend(mesh_options.into_str_args()); + args.extend(message_options.into_str_args()); args } } @@ -176,6 +184,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { storage_options, override_config, client_header_referer, + mesh_options, + message_options, } => { let sign_queue = Arc::new(RwLock::new(SignQueue::new())); let rt = tokio::runtime::Builder::new_multi_thread() @@ -237,6 +247,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { sign_sk, }, }), + mesh_options, + message_options, ); rt.block_on(async { diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index 40a80a5a9..eaf5c37c2 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -11,6 +11,19 @@ use std::time::{Duration, Instant}; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; +#[derive(Debug, Clone, clap::Parser)] +#[group(id = "message_options")] +pub struct Options { + #[clap(long, env("MPC_MESSAGE_TIMEOUT"), default_value = "1000")] + pub timeout: u64, +} + +impl Options { + pub fn into_str_args(self) -> Vec { + vec!["--timeout".to_string(), self.timeout.to_string()] + } +} + #[derive(Debug, thiserror::Error)] pub enum SendError { #[error("http request was unsuccessful: {0}")] @@ -36,19 +49,25 @@ async fn send_encrypted( client: &Client, url: U, message: Vec, + request_timeout: Duration, ) -> Result<(), SendError> { let _span = tracing::info_span!("message_request"); let mut url = url.into_url()?; url.set_path("msg"); tracing::debug!(?from, to = %url, "making http request: sending encrypted message"); let action = || async { - let response = client - .post(url.clone()) - .header("content-type", "application/json") - .json(&message) - .send() - .await - .map_err(SendError::ReqwestClientError)?; + let response = tokio::time::timeout( + request_timeout, + client + .post(url.clone()) + .header("content-type", "application/json") + .json(&message) + .send(), + ) + .await + .map_err(|_| SendError::Timeout(format!("send encrypted from {from:?} to {url}")))? + .map_err(SendError::ReqwestClientError)?; + let status = response.status(); let response_bytes = response .bytes() @@ -75,13 +94,21 @@ async fn send_encrypted( // TODO: add in retry logic either in struct or at call site. // TODO: add check for participant list to see if the messages to be sent are still valid. -#[derive(Default)] pub struct MessageQueue { deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>, seen_counts: HashSet, + message_options: Options, } impl MessageQueue { + pub fn new(options: Options) -> Self { + Self { + deque: VecDeque::default(), + seen_counts: HashSet::default(), + message_options: options, + } + } + pub fn len(&self) -> usize { self.deque.len() } @@ -147,7 +174,14 @@ impl MessageQueue { crate::metrics::NUM_SEND_ENCRYPTED_TOTAL .with_label_values(&[account_id.as_str()]) .inc(); - if let Err(err) = send_encrypted(from, client, &info.url, encrypted_partition).await + if let Err(err) = send_encrypted( + from, + client, + &info.url, + encrypted_partition, + Duration::from_millis(self.message_options.timeout), + ) + .await { crate::metrics::NUM_SEND_ENCRYPTED_FAILURE .with_label_values(&[account_id.as_str()]) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index ef6029ac3..18041fe1e 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -6,11 +6,10 @@ use tokio::sync::RwLock; use url::Url; use crate::protocol::contract::primitives::Participants; +use crate::protocol::ParticipantInfo; use crate::protocol::ProtocolState; use crate::web::StateView; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1); - // TODO: this is a basic connection pool and does not do most of the work yet. This is // mostly here just to facilitate offline node handling for now. // TODO/NOTE: we can use libp2p to facilitate most the of low level TCP connection work. @@ -25,12 +24,43 @@ pub struct Pool { current_active: RwLock>, // Potentially active participants that we can use to establish a connection in the next epoch. potential_active: RwLock>, + fetch_participant_timeout: Duration, + refresh_active_timeout: Duration, +} + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum FetchParticipantError { + #[error("request timed out")] + Timeout, + #[error("Response cannot be converted to JSON")] + JsonConversion, + #[error("Invalid URL")] + InvalidUrl, + #[error("Network error: {0}")] + NetworkError(String), } impl Pool { + pub fn new(fetch_participant_timeout: Duration, refresh_active_timeout: Duration) -> Self { + tracing::info!( + ?fetch_participant_timeout, + ?refresh_active_timeout, + "creating a new pool" + ); + Self { + http: reqwest::Client::new(), + connections: RwLock::new(Participants::default()), + potential_connections: RwLock::new(Participants::default()), + status: RwLock::new(HashMap::default()), + current_active: RwLock::new(Option::default()), + potential_active: RwLock::new(Option::default()), + fetch_participant_timeout, + refresh_active_timeout, + } + } pub async fn ping(&self) -> Participants { if let Some((ref active, timestamp)) = *self.current_active.read().await { - if timestamp.elapsed() < DEFAULT_TIMEOUT { + if timestamp.elapsed() < self.refresh_active_timeout { return active.clone(); } } @@ -40,35 +70,15 @@ impl Pool { let mut status = self.status.write().await; let mut participants = Participants::default(); for (participant, info) in connections.iter() { - let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else { - tracing::error!( - "Pool.ping url is invalid participant {:?} url {} /state", - participant, - info.url - ); - continue; - }; - - let Ok(resp) = self.http.get(url.clone()).send().await else { - tracing::warn!( - "Pool.ping resp err participant {:?} url {}", - participant, - url - ); - continue; - }; - - let Ok(state): Result = resp.json().await else { - tracing::warn!( - "Pool.ping state view err participant {:?} url {}", - participant, - url - ); - continue; - }; - - status.insert(*participant, state); - participants.insert(participant, info.clone()); + match self.fetch_participant_state(info).await { + Ok(state) => { + status.insert(*participant, state); + participants.insert(participant, info.clone()); + } + Err(e) => { + tracing::warn!("Fetch state for participant {participant:?} with url {} has failed with error {e}.", info.url); + } + } } drop(status); @@ -79,7 +89,7 @@ impl Pool { pub async fn ping_potential(&self) -> Participants { if let Some((ref active, timestamp)) = *self.potential_active.read().await { - if timestamp.elapsed() < DEFAULT_TIMEOUT { + if timestamp.elapsed() < self.refresh_active_timeout { return active.clone(); } } @@ -89,20 +99,15 @@ impl Pool { let mut status = self.status.write().await; let mut participants = Participants::default(); for (participant, info) in connections.iter() { - let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else { - continue; - }; - - let Ok(resp) = self.http.get(url).send().await else { - continue; - }; - - let Ok(state): Result = resp.json().await else { - continue; - }; - - status.insert(*participant, state); - participants.insert(participant, info.clone()); + match self.fetch_participant_state(info).await { + Ok(state) => { + status.insert(*participant, state); + participants.insert(participant, info.clone()); + } + Err(e) => { + tracing::warn!("Fetch state for participant {participant:?} with url {} has failed with error {e}.", info.url); + } + } } drop(status); @@ -159,4 +164,26 @@ impl Pool { _ => false, }) } + + async fn fetch_participant_state( + &self, + participant_info: &ParticipantInfo, + ) -> Result { + let Ok(Ok(url)) = Url::parse(&participant_info.url).map(|url| url.join("/state")) else { + return Err(FetchParticipantError::InvalidUrl); + }; + match tokio::time::timeout( + self.fetch_participant_timeout, + self.http.get(url.clone()).send(), + ) + .await + { + Ok(Ok(resp)) => match resp.json::().await { + Ok(state) => Ok(state), + Err(_) => Err(FetchParticipantError::JsonConversion), + }, + Ok(Err(e)) => Err(FetchParticipantError::NetworkError(e.to_string())), + Err(_) => Err(FetchParticipantError::Timeout), + } + } } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 56b7b9ace..7dcedbcc6 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -1,9 +1,34 @@ +use std::time::Duration; + use crate::protocol::contract::primitives::Participants; use crate::protocol::ProtocolState; pub mod connection; -#[derive(Default)] +#[derive(Debug, Clone, clap::Parser)] +#[group(id = "mesh_options")] +pub struct Options { + #[clap( + long, + env("MPC_MESH_FETCH_PARTICIPANT_TIMEOUT"), + default_value = "1000" + )] + pub fetch_participant_timeout: u64, + #[clap(long, env("MPC_MESH_REFRESH_ACTIVE_TIMEOUT"), default_value = "1000")] + pub refresh_active_timeout: u64, +} + +impl Options { + pub fn into_str_args(self) -> Vec { + vec![ + "--fetch-participant-timeout".to_string(), + self.fetch_participant_timeout.to_string(), + "--refresh-active-timeout".to_string(), + self.refresh_active_timeout.to_string(), + ] + } +} + pub struct Mesh { /// Pool of connections to participants. Used to check who is alive in the network. pub connections: connection::Pool, @@ -17,6 +42,17 @@ pub struct Mesh { } impl Mesh { + pub fn new(options: Options) -> Self { + Self { + connections: connection::Pool::new( + Duration::from_millis(options.fetch_participant_timeout), + Duration::from_millis(options.refresh_active_timeout), + ), + active_participants: Participants::default(), + active_potential_participants: Participants::default(), + } + } + /// Participants that are active at the beginning of each protocol loop. pub fn active_participants(&self) -> &Participants { &self.active_participants diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index 6d3917af4..af03d18a5 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -6,18 +6,19 @@ use super::state::{ use super::{Config, SignQueue}; use crate::gcp::error::DatastoreStorageError; use crate::gcp::error::SecretStorageError; +use crate::http_client::MessageQueue; use crate::protocol::contract::primitives::Participants; use crate::protocol::monitor::StuckMonitor; use crate::protocol::presignature::PresignatureManager; use crate::protocol::signature::SignatureManager; use crate::protocol::state::{GeneratingState, ResharingState}; use crate::protocol::triple::TripleManager; -use crate::rpc_client; use crate::storage::secret_storage::SecretNodeStorageBox; use crate::storage::triple_storage::LockTripleNodeStorageBox; use crate::storage::triple_storage::TripleData; use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use crate::util::AffinePointExt; +use crate::{http_client, rpc_client}; use std::cmp::Ordering; use std::sync::Arc; @@ -42,6 +43,7 @@ pub trait ConsensusCtx { fn secret_storage(&self) -> &SecretNodeStorageBox; fn triple_storage(&self) -> LockTripleNodeStorageBox; fn cfg(&self) -> &Config; + fn message_options(&self) -> http_client::Options; } #[derive(thiserror::Error, Debug)] @@ -170,7 +172,9 @@ impl ConsensusProtocol for StartedState { ctx.my_account_id(), ), )), - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.message_options().clone(), + ))), })) } None => Ok(NodeState::Joining(JoiningState { @@ -224,7 +228,9 @@ impl ConsensusProtocol for StartedState { participants, threshold: contract_state.threshold, protocol, - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.message_options().clone(), + ))), })) } None => { @@ -760,6 +766,8 @@ async fn start_resharing( threshold: contract_state.threshold, public_key: contract_state.public_key, protocol, - messages: Default::default(), + messages: Arc::new(RwLock::new(MessageQueue::new( + ctx.message_options().clone(), + ))), })) } diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index e8f3b00da..7c87e9b4f 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -23,6 +23,8 @@ use self::consensus::ConsensusCtx; use self::cryptography::CryptographicCtx; use self::message::MessageCtx; use crate::config::Config; +use crate::http_client; +use crate::mesh; use crate::mesh::Mesh; use crate::protocol::consensus::ConsensusProtocol; use crate::protocol::cryptography::CryptographicProtocol; @@ -54,6 +56,7 @@ struct Ctx { triple_storage: LockTripleNodeStorageBox, cfg: Config, mesh: Mesh, + message_options: http_client::Options, } impl ConsensusCtx for &mut MpcSignProtocol { @@ -96,6 +99,10 @@ impl ConsensusCtx for &mut MpcSignProtocol { fn triple_storage(&self) -> LockTripleNodeStorageBox { self.ctx.triple_storage.clone() } + + fn message_options(&self) -> http_client::Options { + self.ctx.message_options.clone() + } } #[async_trait::async_trait] @@ -167,6 +174,8 @@ impl MpcSignProtocol { secret_storage: SecretNodeStorageBox, triple_storage: LockTripleNodeStorageBox, cfg: Config, + mesh_options: mesh::Options, + message_options: http_client::Options, ) -> (Self, Arc>) { let my_address = my_address.into_url().unwrap(); let rpc_url = rpc_client.rpc_addr(); @@ -192,7 +201,8 @@ impl MpcSignProtocol { secret_storage, triple_storage, cfg, - mesh: Mesh::default(), + mesh: Mesh::new(mesh_options), + message_options, }; let protocol = MpcSignProtocol { ctx, diff --git a/integration-tests/chain-signatures/src/containers.rs b/integration-tests/chain-signatures/src/containers.rs index 97eec7fd7..b6b69d173 100644 --- a/integration-tests/chain-signatures/src/containers.rs +++ b/integration-tests/chain-signatures/src/containers.rs @@ -117,6 +117,8 @@ impl<'a> Node<'a> { config.cfg.protocol.clone(), )?)), client_header_referer: None, + mesh_options: ctx.mesh_options.clone(), + message_options: ctx.message_options.clone(), } .into_str_args(); let image: GenericImage = GenericImage::new("near/mpc-node", "latest") diff --git a/integration-tests/chain-signatures/src/lib.rs b/integration-tests/chain-signatures/src/lib.rs index dbdba4d35..0771a0ca0 100644 --- a/integration-tests/chain-signatures/src/lib.rs +++ b/integration-tests/chain-signatures/src/lib.rs @@ -15,6 +15,8 @@ use futures::StreamExt; use mpc_contract::config::{PresignatureConfig, ProtocolConfig, TripleConfig}; use mpc_contract::primitives::CandidateInfo; use mpc_node::gcp::GcpService; +use mpc_node::http_client; +use mpc_node::mesh; use mpc_node::storage; use mpc_node::storage::triple_storage::TripleNodeStorageBox; use near_crypto::KeyFile; @@ -206,6 +208,8 @@ pub struct Context<'a> { pub mpc_contract: Contract, pub datastore: crate::containers::Datastore<'a>, pub storage_options: storage::Options, + pub mesh_options: mesh::Options, + pub message_options: http_client::Options, } pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> { @@ -240,6 +244,14 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> gcp_datastore_url: Some(datastore.local_address.clone()), sk_share_local_path: Some(sk_share_local_path), }; + + let mesh_options = mpc_node::mesh::Options { + fetch_participant_timeout: 1000, + refresh_active_timeout: 1000, + }; + + let message_options = http_client::Options { timeout: 1000 }; + Ok(Context { docker_client, docker_network: docker_network.to_string(), @@ -250,6 +262,8 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> mpc_contract, datastore, storage_options, + mesh_options, + message_options, }) } diff --git a/integration-tests/chain-signatures/src/local.rs b/integration-tests/chain-signatures/src/local.rs index a3051b6b4..923ccb155 100644 --- a/integration-tests/chain-signatures/src/local.rs +++ b/integration-tests/chain-signatures/src/local.rs @@ -74,6 +74,8 @@ impl Node { cfg.protocol.clone(), )?)), client_header_referer: None, + mesh_options: ctx.mesh_options.clone(), + message_options: ctx.message_options.clone(), }; let cmd = executable(ctx.release, crate::execute::PACKAGE_MULTICHAIN) @@ -165,6 +167,8 @@ impl Node { config.cfg.protocol.clone(), )?)), client_header_referer: None, + mesh_options: ctx.mesh_options.clone(), + message_options: ctx.message_options.clone(), }; let mpc_node_id = format!("multichain/{}", config.account.id());