Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): made attestation controller non-critical #3180

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl EN {
)
.await
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
s.spawn_bg(async { Ok(runner.run(ctx).await.context("Store::runner()")?) });

// Run the temporary fetcher until the certificates are backfilled.
// Temporary fetcher should be removed once json RPC syncing is fully deprecated.
Expand All @@ -146,14 +146,25 @@ impl EN {
let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
s.spawn_bg(async { Ok(runner.run(ctx).await.context("BlockStore::run()")?) });

let attestation = Arc::new(attestation::Controller::new(attester));
s.spawn_bg(self.run_attestation_controller(
ctx,
global_config.clone(),
attestation.clone(),
));
s.spawn_bg({
let global_config = global_config.clone();
let attestation = attestation.clone();
async {
let res = self
.run_attestation_controller(ctx, global_config, attestation)
.await
.wrap("run_attestation_controller()");
// Attestation currently is not critical for the node to function.
// If it fails, we just log the error and continue.
if let Err(err) = res {
tracing::error!("attestation controller failed: {err:#}");
}
Ok(())
}
});

let executor = executor::Executor {
config: config::executor(&cfg, &secrets, &global_config, build_version)?,
Expand Down
201 changes: 105 additions & 96 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn run_main_node(

tracing::debug!(is_attester = attester.is_some(), "main node attester mode");

scope::run!(&ctx, |ctx, s| async {
let res: ctx::Result<()> = scope::run!(&ctx, |ctx, s| async {
if let Some(spec) = &cfg.genesis_spec {
let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?;

Expand All @@ -46,7 +46,7 @@ pub async fn run_main_node(
let (store, runner) = Store::new(ctx, pool.clone(), None, None)
.await
.wrap("Store::new()")?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(async { Ok(runner.run(ctx).await.context("Store::runner()")?) });

let global_config = pool
.connection(ctx)
Expand All @@ -56,25 +56,36 @@ pub async fn run_main_node(
.await
.wrap("global_config()")?
.context("global_config() disappeared")?;
anyhow::ensure!(
global_config.genesis.leader_selection
== validator::LeaderSelectionMode::Sticky(validator_key.public()),
"unsupported leader selection mode - main node has to be the leader"
);
if global_config.genesis.leader_selection
!= validator::LeaderSelectionMode::Sticky(validator_key.public())
{
return Err(anyhow::format_err!(
"unsupported leader selection mode - main node has to be the leader"
)
.into());
}

let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(async { Ok(runner.run(ctx).await.context("BlockStore::run()")?) });

let attestation = Arc::new(attestation::Controller::new(attester));
s.spawn_bg(run_attestation_controller(
ctx,
&pool,
global_config.clone(),
attestation.clone(),
));

s.spawn_bg({
let global_config = global_config.clone();
let attestation = attestation.clone();
async {
let res = run_attestation_controller(ctx, &pool, global_config, attestation)
.await
.wrap("run_attestation_controller()");
// Attestation currently is not critical for the node to function.
// If it fails, we just log the error and continue.
if let Err(err) = res {
tracing::error!("attestation controller failed: {err:#}");
}
Ok(())
}
});
let executor = executor::Executor {
config: config::executor(&cfg, &secrets, &global_config, None)?,
block_store,
Expand All @@ -87,9 +98,14 @@ pub async fn run_main_node(
};

tracing::info!("running the main node executor");
executor.run(ctx).await
executor.run(ctx).await.context("executor")?;
Ok(())
})
.await
.await;
match res {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
}
}

/// Manages attestation state by configuring the
Expand All @@ -100,91 +116,84 @@ async fn run_attestation_controller(
pool: &ConnectionPool,
cfg: consensus_dal::GlobalConfig,
attestation: Arc<attestation::Controller>,
) -> anyhow::Result<()> {
) -> ctx::Result<()> {
const POLL_INTERVAL: time::Duration = time::Duration::seconds(5);
let registry = registry::Registry::new(cfg.genesis, pool.clone()).await;
let registry_addr = cfg.registry_address.map(registry::Address::new);
let mut next = attester::BatchNumber(0);
let res = async {
loop {
// After regenesis it might happen that the batch number for the first block
// is not immediately known (the first block was not produced yet),
// therefore we need to wait for it.
let status = loop {
match pool
.connection(ctx)
.await
.wrap("connection()")?
.attestation_status(ctx)
.await
.wrap("attestation_status()")?
{
Some(status) if status.next_batch_to_attest >= next => break status,
_ => {}
}
ctx.sleep(POLL_INTERVAL).await?;
};
next = status.next_batch_to_attest.next();
tracing::info!(
"waiting for hash of batch {:?}",
status.next_batch_to_attest
);
let info = pool
.wait_for_batch_info(ctx, status.next_batch_to_attest, POLL_INTERVAL)
.await?;
let hash = consensus_dal::batch_hash(&info);
let Some(committee) = registry
.attester_committee_for(ctx, registry_addr, status.next_batch_to_attest)
.await
.wrap("attester_committee_for()")?
else {
tracing::info!("attestation not required");
continue;
};
let committee = Arc::new(committee);
// Persist the derived committee.
pool.connection(ctx)
.await
.wrap("connection")?
.upsert_attester_committee(ctx, status.next_batch_to_attest, &committee)
.await
.wrap("upsert_attester_committee()")?;
tracing::info!(
"attesting batch {:?} with hash {hash:?}",
status.next_batch_to_attest
);
attestation
.start_attestation(Arc::new(attestation::Info {
batch_to_attest: attester::Batch {
hash,
number: status.next_batch_to_attest,
genesis: status.genesis,
},
committee,
}))
.await
.context("start_attestation()")?;
// Main node is the only node which can update the global AttestationStatus,
// therefore we can synchronously wait for the certificate.
let qc = attestation
.wait_for_cert(ctx, status.next_batch_to_attest)
.await?
.context("attestation config has changed unexpectedly")?;
tracing::info!(
"collected certificate for batch {:?}",
status.next_batch_to_attest
);
pool.connection(ctx)
loop {
// After regenesis it might happen that the batch number for the first block
// is not immediately known (the first block was not produced yet),
// therefore we need to wait for it.
let status = loop {
match pool
.connection(ctx)
.await
.wrap("connection()")?
.insert_batch_certificate(ctx, &qc)
.attestation_status(ctx)
.await
.wrap("insert_batch_certificate()")?;
}
}
.await;
match res {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
.wrap("attestation_status()")?
{
Some(status) if status.next_batch_to_attest >= next => break status,
_ => {}
}
ctx.sleep(POLL_INTERVAL).await?;
};
next = status.next_batch_to_attest.next();
tracing::info!(
"waiting for hash of batch {:?}",
status.next_batch_to_attest
);
let info = pool
.wait_for_batch_info(ctx, status.next_batch_to_attest, POLL_INTERVAL)
.await?;
let hash = consensus_dal::batch_hash(&info);
let Some(committee) = registry
.attester_committee_for(ctx, registry_addr, status.next_batch_to_attest)
.await
.wrap("attester_committee_for()")?
else {
tracing::info!("attestation not required");
continue;
};
let committee = Arc::new(committee);
// Persist the derived committee.
pool.connection(ctx)
.await
.wrap("connection")?
.upsert_attester_committee(ctx, status.next_batch_to_attest, &committee)
.await
.wrap("upsert_attester_committee()")?;
tracing::info!(
"attesting batch {:?} with hash {hash:?}",
status.next_batch_to_attest
);
attestation
.start_attestation(Arc::new(attestation::Info {
batch_to_attest: attester::Batch {
hash,
number: status.next_batch_to_attest,
genesis: status.genesis,
},
committee,
}))
.await
.context("start_attestation()")?;
// Main node is the only node which can update the global AttestationStatus,
// therefore we can synchronously wait for the certificate.
let qc = attestation
.wait_for_cert(ctx, status.next_batch_to_attest)
.await?
.context("attestation config has changed unexpectedly")?;
tracing::info!(
"collected certificate for batch {:?}",
status.next_batch_to_attest
);
pool.connection(ctx)
.await
.wrap("connection()")?
.insert_batch_certificate(ctx, &qc)
.await
.wrap("insert_batch_certificate()")?;
}
}
Loading