Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add latest_block_height and is_stable to /state; only stable nodes can partake in signature generation #710

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 83 additions & 127 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ impl Cli {

/// This will whether this code is being ran on top of GCP or not.
fn is_running_on_gcp() -> bool {
// Check if running in Google Cloud Run: https://cloud.google.com/run/docs/container-contract#services-env-vars
if std::env::var("K_SERVICE").is_ok() {
return true;
}

let resp = reqwest::blocking::Client::new()
.get("http://metadata.google.internal/computeMetadata/v1/instance/id")
.header("Metadata-Flavor", "Google")
Expand All @@ -174,61 +179,12 @@ fn is_running_on_gcp() -> bool {
}
}

fn spinup_indexer(
options: &indexer::Options,
mpc_contract_id: &AccountId,
account_id: &AccountId,
sign_queue: &Arc<RwLock<SignQueue>>,
gcp: &GcpService,
) -> std::thread::JoinHandle<()> {
let options = options.clone();
let mpc_contract_id = mpc_contract_id.clone();
let account_id = account_id.clone();
let sign_queue = sign_queue.clone();
let gcp = gcp.clone();
std::thread::spawn(move || {
// If indexer fails for whatever reason, let's spin it back up:
let mut i = 0;
loop {
if i > 0 {
tracing::info!("restarting indexer after failure: restart count={i}");
}

let options = options.clone();
let mpc_contract_id = mpc_contract_id.clone();
let account_id = account_id.clone();
let sign_queue = sign_queue.clone();
let gcp = gcp.clone();

// TODO/NOTE: currently indexer does not have any interrupt handlers and will never yield back
// as successful. We can add interrupt handlers in the future but this is not important right
// now since we managing nodes through integration tests that can kill it or through docker.
let Err(err) = indexer::run(options, mpc_contract_id, account_id, sign_queue, gcp)
else {
break;
};
tracing::error!(%err, "indexer failed");

let delay = if i <= 7 {
2u64.pow(i)
} else {
// Max out at 2 minutes
120
};

std::thread::sleep(std::time::Duration::from_secs(delay));
i += 1;
}
})
}

pub fn run(cmd: Cli) -> anyhow::Result<()> {
// Install global collector configured based on RUST_LOG env var.
let mut subscriber = tracing_subscriber::fmt()
.with_thread_ids(true)
.with_env_filter(EnvFilter::from_default_env());
// Check if running in Google Cloud Run: https://cloud.google.com/run/docs/container-contract#services-env-vars
if std::env::var("K_SERVICE").is_ok() || is_running_on_gcp() {
if is_running_on_gcp() {
// Disable colored logging as it messes up GCP's log formatting
subscriber = subscriber.with_ansi(false);
}
Expand Down Expand Up @@ -256,88 +212,88 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
max_presignatures,
} => {
let sign_queue = Arc::new(RwLock::new(SignQueue::new()));
tokio::runtime::Builder::new_multi_thread()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let (sender, receiver) = mpsc::channel(16384);
let gcp_service = GcpService::init(&account_id, &storage_options).await?;
let indexer_handle = spinup_indexer(
&indexer_options,
&mpc_contract_id,
&account_id,
&sign_queue,
&gcp_service,
);
.build()?;
let gcp_service =
rt.block_on(async { GcpService::init(&account_id, &storage_options).await })?;
let (indexer_handle, indexer) = indexer::run(
&indexer_options,
&mpc_contract_id,
&account_id,
&sign_queue,
&gcp_service,
&rt,
)?;

let key_storage =
storage::secret_storage::init(Some(&gcp_service), &storage_options, &account_id);
let triple_storage: LockTripleNodeStorageBox = Arc::new(RwLock::new(
storage::triple_storage::init(Some(&gcp_service), &account_id),
));

let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone());
let my_address = my_address
.map(|mut addr| {
addr.set_port(Some(web_port)).unwrap();
addr
})
.unwrap_or_else(|| {
let my_ip = local_ip().unwrap();
Url::parse(&format!("http://{my_ip}:{web_port}")).unwrap()
});

let key_storage = storage::secret_storage::init(
Some(&gcp_service),
&storage_options,
&account_id,
);
let triple_storage: LockTripleNodeStorageBox = Arc::new(RwLock::new(
storage::triple_storage::init(Some(&gcp_service), &account_id),
));
let (sender, receiver) = mpsc::channel(16384);

tracing::info!(%my_address, "address detected");
let rpc_client = near_fetch::Client::new(&near_rpc);
tracing::debug!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
mpc_contract_id,
account_id,
rpc_client,
signer,
receiver,
sign_queue,
key_storage,
triple_storage,
Config {
triple_cfg: TripleConfig {
min_triples,
max_triples,
max_concurrent_introduction,
max_concurrent_generation,
},
presig_cfg: PresignatureConfig {
min_presignatures,
max_presignatures,
},
network_cfg: NetworkConfig {
cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?,
sign_sk,
},
},
);

let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone());
let my_address = my_address
.map(|mut addr| {
addr.set_port(Some(web_port)).unwrap();
addr
})
.unwrap_or_else(|| {
let my_ip = local_ip().unwrap();
Url::parse(&format!("http://{my_ip}:{web_port}")).unwrap()
});
tracing::info!(%my_address, "address detected");
let rpc_client = near_fetch::Client::new(&near_rpc);
tracing::debug!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
mpc_contract_id,
account_id,
rpc_client,
signer,
receiver,
sign_queue,
key_storage,
triple_storage,
Config {
triple_cfg: TripleConfig {
min_triples,
max_triples,
max_concurrent_introduction,
max_concurrent_generation,
},
presig_cfg: PresignatureConfig {
min_presignatures,
max_presignatures,
},
network_cfg: NetworkConfig {
cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(
cipher_pk,
)?)?,
sign_sk,
},
},
);
tracing::debug!("protocol initialized");
let protocol_handle = tokio::spawn(async move { protocol.run().await });
tracing::debug!("protocol thread spawned");
let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?;
let web_handle = tokio::spawn(async move {
web::run(web_port, sender, cipher_sk, protocol_state).await
});
tracing::debug!("protocol http server spawned");
rt.block_on(async {
tracing::debug!("protocol initialized");
let protocol_handle = tokio::spawn(async move { protocol.run().await });
tracing::debug!("protocol thread spawned");
let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?;
let web_handle = tokio::spawn(async move {
web::run(web_port, sender, cipher_sk, protocol_state, indexer).await
});
tracing::debug!("protocol http server spawned");

protocol_handle.await??;
web_handle.await??;
tracing::debug!("spinning down");
protocol_handle.await??;
web_handle.await??;
tracing::debug!("spinning down");

indexer_handle.join().unwrap();
anyhow::Ok(())
})?;
indexer_handle.join().unwrap()?;
anyhow::Ok(())
})?;
}
}

Expand Down
Loading
Loading