Skip to content

Commit

Permalink
separate mesh and contract state update out
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiangyi Zheng committed Nov 9, 2024
1 parent 144fae5 commit 04125bd
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 78 deletions.
22 changes: 18 additions & 4 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig};
use crate::gcp::GcpService;
use crate::mesh::Mesh;
use crate::protocol::{MpcSignProtocol, SignQueue};
use crate::{http_client, indexer, mesh, storage, web};
use clap::Parser;
Expand Down Expand Up @@ -237,11 +238,12 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {

tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (mesh, mesh_state) = Mesh::init(mesh_options);
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
mpc_contract_id,
mpc_contract_id.clone(),
account_id,
rpc_client,
rpc_client.clone(),
signer,
receiver,
sign_queue,
Expand All @@ -255,20 +257,32 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
sign_sk,
},
}),
mesh_options,
mesh_state.clone(),
message_options,
);

let (contract_updater, contract_state) =
crate::contract_updater::ContractStateUpdater::init(rpc_client, mpc_contract_id);

rt.block_on(async {
tracing::info!("protocol initialized");
let protocol_handle = tokio::spawn(async move { protocol.run().await });
let contract_state_clone = contract_state.clone();
let contract_handle =
tokio::spawn(async move { contract_updater.run(contract_state_clone).await });
let contract_state_clone = contract_state.clone();
let mesh_handle =
tokio::spawn(async move { mesh.run(contract_state_clone, mesh_state).await });
let protocol_handle =
tokio::spawn(async move { protocol.run(contract_state).await });
tracing::info!("protocol thread spawned");
let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?;
let web_handle = tokio::spawn(async move {
web::run(web_port, sender, cipher_sk, protocol_state, indexer).await
});
tracing::info!("protocol http server spawned");

contract_handle.await??;
mesh_handle.await??;
protocol_handle.await??;
web_handle.await??;
tracing::info!("spinning down");
Expand Down
42 changes: 42 additions & 0 deletions chain-signatures/node/src/contract_updater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::protocol::ProtocolState;
use crate::rpc_client;
use near_account_id::AccountId;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;

pub struct ContractStateUpdater {
rpc_client: near_fetch::Client,
mpc_contract_id: AccountId,
}

impl ContractStateUpdater {
pub fn init(
rpc_client: near_fetch::Client,
mpc_contract_id: AccountId,
) -> (Self, Arc<RwLock<Option<ProtocolState>>>) {
let updater = Self {
rpc_client,
mpc_contract_id: mpc_contract_id.clone(),
};
let contract_state = Arc::new(RwLock::new(None));
(updater, contract_state)
}

pub async fn run(
&self,
contract_state: Arc<RwLock<Option<ProtocolState>>>,
) -> anyhow::Result<()> {
let mut last_update = Instant::now();
loop {
if last_update.elapsed() > Duration::from_millis(1000) {
let mut contract_state = contract_state.write().await;
*contract_state =
rpc_client::fetch_mpc_contract_state(&self.rpc_client, &self.mpc_contract_id)
.await
.ok();
last_update = Instant::now();
}
}
}
}
1 change: 1 addition & 0 deletions chain-signatures/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod cli;
pub mod config;
pub mod contract_updater;
pub mod gcp;
pub mod http_client;
pub mod indexer;
Expand Down
64 changes: 55 additions & 9 deletions chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::time::Duration;
use std::time::{Duration, Instant};

use crate::protocol::contract::primitives::Participants;
use crate::protocol::ProtocolState;
use std::sync::Arc;
use tokio::sync::RwLock;

pub mod connection;

Expand Down Expand Up @@ -29,28 +31,49 @@ impl Options {
}
}

#[derive(Clone)]
pub struct MeshState {
/// Participants that are active at the beginning of each protocol loop.
pub active_participants: Participants,

/// Potential participants that are active at the beginning of each protocol loop. This
/// includes participants belonging to the next epoch.
pub active_potential_participants: Participants,

pub potential_participants: Participants,

pub stable_participants: Participants,
}

pub struct Mesh {
/// Pool of connections to participants. Used to check who is alive in the network.
pub connections: connection::Pool,
connections: connection::Pool,

/// Participants that are active at the beginning of each protocol loop.
pub active_participants: Participants,
active_participants: Participants,

/// Potential participants that are active at the beginning of each protocol loop. This
/// includes participants belonging to the next epoch.
pub active_potential_participants: Participants,
active_potential_participants: Participants,
}

impl Mesh {
pub fn new(options: Options) -> Self {
Self {
pub fn init(options: Options) -> (Self, Arc<RwLock<MeshState>>) {
let mesh = Self {
connections: connection::Pool::new(
Duration::from_millis(options.fetch_participant_timeout),
Duration::from_millis(options.refresh_active_timeout),
),
active_participants: Participants::default(),
active_potential_participants: Participants::default(),
}
};
let mesh_state = Arc::new(RwLock::new(MeshState {
active_participants: Participants::default(),
active_potential_participants: Participants::default(),
potential_participants: Participants::default(),
stable_participants: Participants::default(),
}));
(mesh, mesh_state)
}

/// Participants that are active at the beginning of each protocol loop.
Expand Down Expand Up @@ -96,7 +119,7 @@ impl Mesh {
stable
}

pub async fn establish_participants(&mut self, contract_state: &ProtocolState) {
async fn establish_participants(&mut self, contract_state: &ProtocolState) {
self.connections
.establish_participants(contract_state)
.await;
Expand All @@ -110,8 +133,31 @@ impl Mesh {
}

/// Ping the active participants such that we can see who is alive.
pub async fn ping(&mut self) {
async fn ping(&mut self) {
self.active_participants = self.connections.ping().await;
self.active_potential_participants = self.connections.ping_potential().await;
}

pub async fn run(
mut self,
contract_state: Arc<RwLock<Option<ProtocolState>>>,
mesh_state: Arc<RwLock<MeshState>>,
) -> anyhow::Result<()> {
let mut last_pinged = Instant::now();
loop {
if last_pinged.elapsed() > Duration::from_millis(300) {
if let Some(state) = contract_state.read().await.clone() {
self.establish_participants(&state).await;
let mut mesh_state = mesh_state.write().await;
*mesh_state = MeshState {
active_participants: self.active_participants().clone(),
active_potential_participants: self.active_potential_participants().clone(),
potential_participants: self.potential_participants().await.clone(),
stable_participants: self.potential_participants().await.clone(),
};
last_pinged = Instant::now();
}
}
}
}
}
8 changes: 4 additions & 4 deletions chain-signatures/node/src/protocol/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{collections::HashSet, str::FromStr};

use self::primitives::{Candidates, Participants, PkVotes, Votes};

#[derive(Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct InitializingContractState {
pub candidates: Candidates,
pub threshold: usize,
Expand All @@ -26,7 +26,7 @@ impl From<mpc_contract::InitializingContractState> for InitializingContractState
}
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct RunningContractState {
pub epoch: u64,
pub participants: Participants,
Expand All @@ -51,7 +51,7 @@ impl From<mpc_contract::RunningContractState> for RunningContractState {
}
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct ResharingContractState {
pub old_epoch: u64,
pub old_participants: Participants,
Expand All @@ -78,7 +78,7 @@ impl From<mpc_contract::ResharingContractState> for ResharingContractState {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum ProtocolState {
Initializing(InitializingContractState),
Running(RunningContractState),
Expand Down
2 changes: 1 addition & 1 deletion chain-signatures/node/src/protocol/contract/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl From<mpc_contract::primitives::PkVotes> for PkVotes {
}
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Votes {
pub votes: BTreeMap<AccountId, HashSet<AccountId>>,
}
Expand Down
Loading

0 comments on commit 04125bd

Please sign in to comment.