diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 5e9aadc8f37f..6e3619f57e2e 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -127,7 +127,7 @@ impl EN { ) .await .wrap("Store::new()")?; - s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + s.spawn_bg(async { Ok(runner.run(ctx).await.context("Store::runner()")?) }); // Run the temporary fetcher until the certificates are backfilled. // Temporary fetcher should be removed once json RPC syncing is fully deprecated. @@ -146,14 +146,25 @@ impl EN { let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) .await .wrap("BlockStore::new()")?; - s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + s.spawn_bg(async { Ok(runner.run(ctx).await.context("BlockStore::run()")?) }); let attestation = Arc::new(attestation::Controller::new(attester)); - s.spawn_bg(self.run_attestation_controller( - ctx, - global_config.clone(), - attestation.clone(), - )); + s.spawn_bg({ + let global_config = global_config.clone(); + let attestation = attestation.clone(); + async { + let res = self + .run_attestation_controller(ctx, global_config, attestation) + .await + .wrap("run_attestation_controller()"); + // Attestation currently is not critical for the node to function. + // If it fails, we just log the error and continue. + if let Err(err) = res { + tracing::error!("attestation controller failed: {err:#}"); + } + Ok(()) + } + }); let executor = executor::Executor { config: config::executor(&cfg, &secrets, &global_config, build_version)?, diff --git a/core/node/consensus/src/mn.rs b/core/node/consensus/src/mn.rs index 2a280b2f1616..a392acfbe5f0 100644 --- a/core/node/consensus/src/mn.rs +++ b/core/node/consensus/src/mn.rs @@ -30,7 +30,7 @@ pub async fn run_main_node( tracing::debug!(is_attester = attester.is_some(), "main node attester mode"); - scope::run!(&ctx, |ctx, s| async { + let res: ctx::Result<()> = scope::run!(&ctx, |ctx, s| async { if let Some(spec) = &cfg.genesis_spec { let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?; @@ -46,7 +46,7 @@ pub async fn run_main_node( let (store, runner) = Store::new(ctx, pool.clone(), None, None) .await .wrap("Store::new()")?; - s.spawn_bg(runner.run(ctx)); + s.spawn_bg(async { Ok(runner.run(ctx).await.context("Store::runner()")?) }); let global_config = pool .connection(ctx) @@ -56,25 +56,36 @@ pub async fn run_main_node( .await .wrap("global_config()")? .context("global_config() disappeared")?; - anyhow::ensure!( - global_config.genesis.leader_selection - == validator::LeaderSelectionMode::Sticky(validator_key.public()), - "unsupported leader selection mode - main node has to be the leader" - ); + if global_config.genesis.leader_selection + != validator::LeaderSelectionMode::Sticky(validator_key.public()) + { + return Err(anyhow::format_err!( + "unsupported leader selection mode - main node has to be the leader" + ) + .into()); + } let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) .await .wrap("BlockStore::new()")?; - s.spawn_bg(runner.run(ctx)); + s.spawn_bg(async { Ok(runner.run(ctx).await.context("BlockStore::run()")?) }); let attestation = Arc::new(attestation::Controller::new(attester)); - s.spawn_bg(run_attestation_controller( - ctx, - &pool, - global_config.clone(), - attestation.clone(), - )); - + s.spawn_bg({ + let global_config = global_config.clone(); + let attestation = attestation.clone(); + async { + let res = run_attestation_controller(ctx, &pool, global_config, attestation) + .await + .wrap("run_attestation_controller()"); + // Attestation currently is not critical for the node to function. + // If it fails, we just log the error and continue. + if let Err(err) = res { + tracing::error!("attestation controller failed: {err:#}"); + } + Ok(()) + } + }); let executor = executor::Executor { config: config::executor(&cfg, &secrets, &global_config, None)?, block_store, @@ -87,9 +98,14 @@ pub async fn run_main_node( }; tracing::info!("running the main node executor"); - executor.run(ctx).await + executor.run(ctx).await.context("executor")?; + Ok(()) }) - .await + .await; + match res { + Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } } /// Manages attestation state by configuring the @@ -100,91 +116,84 @@ async fn run_attestation_controller( pool: &ConnectionPool, cfg: consensus_dal::GlobalConfig, attestation: Arc, -) -> anyhow::Result<()> { +) -> ctx::Result<()> { const POLL_INTERVAL: time::Duration = time::Duration::seconds(5); let registry = registry::Registry::new(cfg.genesis, pool.clone()).await; let registry_addr = cfg.registry_address.map(registry::Address::new); let mut next = attester::BatchNumber(0); - let res = async { - loop { - // After regenesis it might happen that the batch number for the first block - // is not immediately known (the first block was not produced yet), - // therefore we need to wait for it. - let status = loop { - match pool - .connection(ctx) - .await - .wrap("connection()")? - .attestation_status(ctx) - .await - .wrap("attestation_status()")? - { - Some(status) if status.next_batch_to_attest >= next => break status, - _ => {} - } - ctx.sleep(POLL_INTERVAL).await?; - }; - next = status.next_batch_to_attest.next(); - tracing::info!( - "waiting for hash of batch {:?}", - status.next_batch_to_attest - ); - let info = pool - .wait_for_batch_info(ctx, status.next_batch_to_attest, POLL_INTERVAL) - .await?; - let hash = consensus_dal::batch_hash(&info); - let Some(committee) = registry - .attester_committee_for(ctx, registry_addr, status.next_batch_to_attest) - .await - .wrap("attester_committee_for()")? - else { - tracing::info!("attestation not required"); - continue; - }; - let committee = Arc::new(committee); - // Persist the derived committee. - pool.connection(ctx) - .await - .wrap("connection")? - .upsert_attester_committee(ctx, status.next_batch_to_attest, &committee) - .await - .wrap("upsert_attester_committee()")?; - tracing::info!( - "attesting batch {:?} with hash {hash:?}", - status.next_batch_to_attest - ); - attestation - .start_attestation(Arc::new(attestation::Info { - batch_to_attest: attester::Batch { - hash, - number: status.next_batch_to_attest, - genesis: status.genesis, - }, - committee, - })) - .await - .context("start_attestation()")?; - // Main node is the only node which can update the global AttestationStatus, - // therefore we can synchronously wait for the certificate. - let qc = attestation - .wait_for_cert(ctx, status.next_batch_to_attest) - .await? - .context("attestation config has changed unexpectedly")?; - tracing::info!( - "collected certificate for batch {:?}", - status.next_batch_to_attest - ); - pool.connection(ctx) + loop { + // After regenesis it might happen that the batch number for the first block + // is not immediately known (the first block was not produced yet), + // therefore we need to wait for it. + let status = loop { + match pool + .connection(ctx) .await .wrap("connection()")? - .insert_batch_certificate(ctx, &qc) + .attestation_status(ctx) .await - .wrap("insert_batch_certificate()")?; - } - } - .await; - match res { - Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), - Err(ctx::Error::Internal(err)) => Err(err), + .wrap("attestation_status()")? + { + Some(status) if status.next_batch_to_attest >= next => break status, + _ => {} + } + ctx.sleep(POLL_INTERVAL).await?; + }; + next = status.next_batch_to_attest.next(); + tracing::info!( + "waiting for hash of batch {:?}", + status.next_batch_to_attest + ); + let info = pool + .wait_for_batch_info(ctx, status.next_batch_to_attest, POLL_INTERVAL) + .await?; + let hash = consensus_dal::batch_hash(&info); + let Some(committee) = registry + .attester_committee_for(ctx, registry_addr, status.next_batch_to_attest) + .await + .wrap("attester_committee_for()")? + else { + tracing::info!("attestation not required"); + continue; + }; + let committee = Arc::new(committee); + // Persist the derived committee. + pool.connection(ctx) + .await + .wrap("connection")? + .upsert_attester_committee(ctx, status.next_batch_to_attest, &committee) + .await + .wrap("upsert_attester_committee()")?; + tracing::info!( + "attesting batch {:?} with hash {hash:?}", + status.next_batch_to_attest + ); + attestation + .start_attestation(Arc::new(attestation::Info { + batch_to_attest: attester::Batch { + hash, + number: status.next_batch_to_attest, + genesis: status.genesis, + }, + committee, + })) + .await + .context("start_attestation()")?; + // Main node is the only node which can update the global AttestationStatus, + // therefore we can synchronously wait for the certificate. + let qc = attestation + .wait_for_cert(ctx, status.next_batch_to_attest) + .await? + .context("attestation config has changed unexpectedly")?; + tracing::info!( + "collected certificate for batch {:?}", + status.next_batch_to_attest + ); + pool.connection(ctx) + .await + .wrap("connection()")? + .insert_batch_certificate(ctx, &qc) + .await + .wrap("insert_batch_certificate()")?; } }