Skip to content

Commit

Permalink
feat: add latest_block_height and is_stable to /state; only stable no…
Browse files Browse the repository at this point in the history
…des can partake in signature generation (#710)

* Include latest block height in /state

* Added is_behind to indexer

* Added unstable for /state

* Added stable participants

* Make Unstable a flag

* Rename active to stable for signature functions
  • Loading branch information
ChaoticTempest authored Jul 23, 2024
1 parent ce157c4 commit 044c05f
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 186 deletions.
210 changes: 83 additions & 127 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ impl Cli {

/// This will whether this code is being ran on top of GCP or not.
fn is_running_on_gcp() -> bool {
// Check if running in Google Cloud Run: https://cloud.google.com/run/docs/container-contract#services-env-vars
if std::env::var("K_SERVICE").is_ok() {
return true;
}

let resp = reqwest::blocking::Client::new()
.get("http://metadata.google.internal/computeMetadata/v1/instance/id")
.header("Metadata-Flavor", "Google")
Expand All @@ -174,61 +179,12 @@ fn is_running_on_gcp() -> bool {
}
}

fn spinup_indexer(
options: &indexer::Options,
mpc_contract_id: &AccountId,
account_id: &AccountId,
sign_queue: &Arc<RwLock<SignQueue>>,
gcp: &GcpService,
) -> std::thread::JoinHandle<()> {
let options = options.clone();
let mpc_contract_id = mpc_contract_id.clone();
let account_id = account_id.clone();
let sign_queue = sign_queue.clone();
let gcp = gcp.clone();
std::thread::spawn(move || {
// If indexer fails for whatever reason, let's spin it back up:
let mut i = 0;
loop {
if i > 0 {
tracing::info!("restarting indexer after failure: restart count={i}");
}

let options = options.clone();
let mpc_contract_id = mpc_contract_id.clone();
let account_id = account_id.clone();
let sign_queue = sign_queue.clone();
let gcp = gcp.clone();

// TODO/NOTE: currently indexer does not have any interrupt handlers and will never yield back
// as successful. We can add interrupt handlers in the future but this is not important right
// now since we managing nodes through integration tests that can kill it or through docker.
let Err(err) = indexer::run(options, mpc_contract_id, account_id, sign_queue, gcp)
else {
break;
};
tracing::error!(%err, "indexer failed");

let delay = if i <= 7 {
2u64.pow(i)
} else {
// Max out at 2 minutes
120
};

std::thread::sleep(std::time::Duration::from_secs(delay));
i += 1;
}
})
}

pub fn run(cmd: Cli) -> anyhow::Result<()> {
// Install global collector configured based on RUST_LOG env var.
let mut subscriber = tracing_subscriber::fmt()
.with_thread_ids(true)
.with_env_filter(EnvFilter::from_default_env());
// Check if running in Google Cloud Run: https://cloud.google.com/run/docs/container-contract#services-env-vars
if std::env::var("K_SERVICE").is_ok() || is_running_on_gcp() {
if is_running_on_gcp() {
// Disable colored logging as it messes up GCP's log formatting
subscriber = subscriber.with_ansi(false);
}
Expand Down Expand Up @@ -256,88 +212,88 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
max_presignatures,
} => {
let sign_queue = Arc::new(RwLock::new(SignQueue::new()));
tokio::runtime::Builder::new_multi_thread()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let (sender, receiver) = mpsc::channel(16384);
let gcp_service = GcpService::init(&account_id, &storage_options).await?;
let indexer_handle = spinup_indexer(
&indexer_options,
&mpc_contract_id,
&account_id,
&sign_queue,
&gcp_service,
);
.build()?;
let gcp_service =
rt.block_on(async { GcpService::init(&account_id, &storage_options).await })?;
let (indexer_handle, indexer) = indexer::run(
&indexer_options,
&mpc_contract_id,
&account_id,
&sign_queue,
&gcp_service,
&rt,
)?;

let key_storage =
storage::secret_storage::init(Some(&gcp_service), &storage_options, &account_id);
let triple_storage: LockTripleNodeStorageBox = Arc::new(RwLock::new(
storage::triple_storage::init(Some(&gcp_service), &account_id),
));

let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone());
let my_address = my_address
.map(|mut addr| {
addr.set_port(Some(web_port)).unwrap();
addr
})
.unwrap_or_else(|| {
let my_ip = local_ip().unwrap();
Url::parse(&format!("http://{my_ip}:{web_port}")).unwrap()
});

let key_storage = storage::secret_storage::init(
Some(&gcp_service),
&storage_options,
&account_id,
);
let triple_storage: LockTripleNodeStorageBox = Arc::new(RwLock::new(
storage::triple_storage::init(Some(&gcp_service), &account_id),
));
let (sender, receiver) = mpsc::channel(16384);

tracing::info!(%my_address, "address detected");
let rpc_client = near_fetch::Client::new(&near_rpc);
tracing::debug!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
mpc_contract_id,
account_id,
rpc_client,
signer,
receiver,
sign_queue,
key_storage,
triple_storage,
Config {
triple_cfg: TripleConfig {
min_triples,
max_triples,
max_concurrent_introduction,
max_concurrent_generation,
},
presig_cfg: PresignatureConfig {
min_presignatures,
max_presignatures,
},
network_cfg: NetworkConfig {
cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?,
sign_sk,
},
},
);

let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone());
let my_address = my_address
.map(|mut addr| {
addr.set_port(Some(web_port)).unwrap();
addr
})
.unwrap_or_else(|| {
let my_ip = local_ip().unwrap();
Url::parse(&format!("http://{my_ip}:{web_port}")).unwrap()
});
tracing::info!(%my_address, "address detected");
let rpc_client = near_fetch::Client::new(&near_rpc);
tracing::debug!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
mpc_contract_id,
account_id,
rpc_client,
signer,
receiver,
sign_queue,
key_storage,
triple_storage,
Config {
triple_cfg: TripleConfig {
min_triples,
max_triples,
max_concurrent_introduction,
max_concurrent_generation,
},
presig_cfg: PresignatureConfig {
min_presignatures,
max_presignatures,
},
network_cfg: NetworkConfig {
cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(
cipher_pk,
)?)?,
sign_sk,
},
},
);
tracing::debug!("protocol initialized");
let protocol_handle = tokio::spawn(async move { protocol.run().await });
tracing::debug!("protocol thread spawned");
let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?;
let web_handle = tokio::spawn(async move {
web::run(web_port, sender, cipher_sk, protocol_state).await
});
tracing::debug!("protocol http server spawned");
rt.block_on(async {
tracing::debug!("protocol initialized");
let protocol_handle = tokio::spawn(async move { protocol.run().await });
tracing::debug!("protocol thread spawned");
let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?;
let web_handle = tokio::spawn(async move {
web::run(web_port, sender, cipher_sk, protocol_state, indexer).await
});
tracing::debug!("protocol http server spawned");

protocol_handle.await??;
web_handle.await??;
tracing::debug!("spinning down");
protocol_handle.await??;
web_handle.await??;
tracing::debug!("spinning down");

indexer_handle.join().unwrap();
anyhow::Ok(())
})?;
indexer_handle.join().unwrap()?;
anyhow::Ok(())
})?;
}
}

Expand Down
Loading

0 comments on commit 044c05f

Please sign in to comment.