Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

separate mesh and contract state update out #935

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig};
use crate::gcp::GcpService;
use crate::mesh::Mesh;
use crate::protocol::{MpcSignProtocol, SignQueue};
use crate::{http_client, indexer, mesh, storage, web};
use clap::Parser;
Expand Down Expand Up @@ -237,11 +238,12 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {

tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (mesh, mesh_state) = Mesh::init(mesh_options);
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
mpc_contract_id,
mpc_contract_id.clone(),
account_id,
rpc_client,
rpc_client.clone(),
signer,
receiver,
sign_queue,
Expand All @@ -255,20 +257,32 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
sign_sk,
},
}),
mesh_options,
mesh_state.clone(),
message_options,
);

let (contract_updater, contract_state) =
crate::contract_updater::ContractStateUpdater::init(rpc_client, mpc_contract_id);

rt.block_on(async {
tracing::info!("protocol initialized");
let protocol_handle = tokio::spawn(async move { protocol.run().await });
let contract_state_clone = contract_state.clone();
let contract_handle =
tokio::spawn(async move { contract_updater.run(contract_state_clone).await });
let contract_state_clone = contract_state.clone();
let mesh_handle =
tokio::spawn(async move { mesh.run(contract_state_clone, mesh_state).await });
let protocol_handle =
tokio::spawn(async move { protocol.run(contract_state).await });
tracing::info!("protocol thread spawned");
let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?;
let web_handle = tokio::spawn(async move {
web::run(web_port, sender, cipher_sk, protocol_state, indexer).await
});
tracing::info!("protocol http server spawned");

contract_handle.await??;
mesh_handle.await??;
protocol_handle.await??;
web_handle.await??;
tracing::info!("spinning down");
Expand Down
42 changes: 42 additions & 0 deletions chain-signatures/node/src/contract_updater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::protocol::ProtocolState;
use crate::rpc_client;
use near_account_id::AccountId;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;

pub struct ContractStateUpdater {
rpc_client: near_fetch::Client,
mpc_contract_id: AccountId,
}

impl ContractStateUpdater {
pub fn init(
rpc_client: near_fetch::Client,
mpc_contract_id: AccountId,
) -> (Self, Arc<RwLock<Option<ProtocolState>>>) {
let updater = Self {
rpc_client,
mpc_contract_id: mpc_contract_id.clone(),
};
let contract_state = Arc::new(RwLock::new(None));
(updater, contract_state)
}

pub async fn run(
&self,
contract_state: Arc<RwLock<Option<ProtocolState>>>,
) -> anyhow::Result<()> {
let mut last_update = Instant::now();
loop {
if last_update.elapsed() > Duration::from_millis(1000) {
let mut contract_state = contract_state.write().await;
*contract_state =
rpc_client::fetch_mpc_contract_state(&self.rpc_client, &self.mpc_contract_id)
.await
.ok();
last_update = Instant::now();
}
}
}
}
1 change: 1 addition & 0 deletions chain-signatures/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod cli;
pub mod config;
pub mod contract_updater;
pub mod gcp;
pub mod http_client;
pub mod indexer;
Expand Down
64 changes: 55 additions & 9 deletions chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::time::Duration;
use std::time::{Duration, Instant};

use crate::protocol::contract::primitives::Participants;
use crate::protocol::ProtocolState;
use std::sync::Arc;
use tokio::sync::RwLock;

pub mod connection;

Expand Down Expand Up @@ -29,28 +31,49 @@ impl Options {
}
}

#[derive(Clone)]
pub struct MeshState {
/// Participants that are active at the beginning of each protocol loop.
pub active_participants: Participants,

/// Potential participants that are active at the beginning of each protocol loop. This
/// includes participants belonging to the next epoch.
pub active_potential_participants: Participants,

pub potential_participants: Participants,

pub stable_participants: Participants,
}

pub struct Mesh {
/// Pool of connections to participants. Used to check who is alive in the network.
pub connections: connection::Pool,
connections: connection::Pool,

/// Participants that are active at the beginning of each protocol loop.
pub active_participants: Participants,
active_participants: Participants,

/// Potential participants that are active at the beginning of each protocol loop. This
/// includes participants belonging to the next epoch.
pub active_potential_participants: Participants,
active_potential_participants: Participants,
}

impl Mesh {
pub fn new(options: Options) -> Self {
Self {
pub fn init(options: Options) -> (Self, Arc<RwLock<MeshState>>) {
let mesh = Self {
connections: connection::Pool::new(
Duration::from_millis(options.fetch_participant_timeout),
Duration::from_millis(options.refresh_active_timeout),
),
active_participants: Participants::default(),
active_potential_participants: Participants::default(),
}
};
let mesh_state = Arc::new(RwLock::new(MeshState {
active_participants: Participants::default(),
active_potential_participants: Participants::default(),
potential_participants: Participants::default(),
stable_participants: Participants::default(),
}));
(mesh, mesh_state)
}

/// Participants that are active at the beginning of each protocol loop.
Expand Down Expand Up @@ -96,7 +119,7 @@ impl Mesh {
stable
}

pub async fn establish_participants(&mut self, contract_state: &ProtocolState) {
async fn establish_participants(&mut self, contract_state: &ProtocolState) {
self.connections
.establish_participants(contract_state)
.await;
Expand All @@ -110,8 +133,31 @@ impl Mesh {
}

/// Ping the active participants such that we can see who is alive.
pub async fn ping(&mut self) {
async fn ping(&mut self) {
self.active_participants = self.connections.ping().await;
self.active_potential_participants = self.connections.ping_potential().await;
}

pub async fn run(
mut self,
contract_state: Arc<RwLock<Option<ProtocolState>>>,
mesh_state: Arc<RwLock<MeshState>>,
) -> anyhow::Result<()> {
let mut last_pinged = Instant::now();
loop {
if last_pinged.elapsed() > Duration::from_millis(300) {
if let Some(state) = contract_state.read().await.clone() {
self.establish_participants(&state).await;
let mut mesh_state = mesh_state.write().await;
*mesh_state = MeshState {
active_participants: self.active_participants().clone(),
active_potential_participants: self.active_potential_participants().clone(),
potential_participants: self.potential_participants().await.clone(),
stable_participants: self.potential_participants().await.clone(),
};
last_pinged = Instant::now();
}
}
}
}
}
8 changes: 4 additions & 4 deletions chain-signatures/node/src/protocol/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{collections::HashSet, str::FromStr};

use self::primitives::{Candidates, Participants, PkVotes, Votes};

#[derive(Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct InitializingContractState {
pub candidates: Candidates,
pub threshold: usize,
Expand All @@ -26,7 +26,7 @@ impl From<mpc_contract::InitializingContractState> for InitializingContractState
}
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct RunningContractState {
pub epoch: u64,
pub participants: Participants,
Expand All @@ -51,7 +51,7 @@ impl From<mpc_contract::RunningContractState> for RunningContractState {
}
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct ResharingContractState {
pub old_epoch: u64,
pub old_participants: Participants,
Expand All @@ -78,7 +78,7 @@ impl From<mpc_contract::ResharingContractState> for ResharingContractState {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum ProtocolState {
Initializing(InitializingContractState),
Running(RunningContractState),
Expand Down
2 changes: 1 addition & 1 deletion chain-signatures/node/src/protocol/contract/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl From<mpc_contract::primitives::PkVotes> for PkVotes {
}
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Votes {
pub votes: BTreeMap<AccountId, HashSet<AccountId>>,
}
Expand Down
Loading
Loading