Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Coupling block sync to DAG state #3268

Closed
wants to merge 10 commits into from
13 changes: 13 additions & 0 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ impl<N: Network> BFT<N> {
mut rx_primary_certificate,
mut rx_sync_bft_dag_at_bootup,
mut rx_sync_bft,
mut rx_is_recently_committed,
} = bft_receiver;

// Process the current round from the primary.
Expand Down Expand Up @@ -819,6 +820,18 @@ impl<N: Network> BFT<N> {
callback.send(result).ok();
}
});

// Process the request to check if the batch certificate was recently committed.
let self_ = self.clone();
self.spawn(async move {
while let Some(((round, certificate_id), callback)) = rx_is_recently_committed.recv().await {
// Check if the certificate was recently committed.
let is_committed = self_.dag.read().is_recently_committed(round, certificate_id);
// Send the callback **after** updating the DAG.
// Note: We must await the DAG update before proceeding.
callback.send(is_committed).ok();
}
});
}

/// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
Expand Down
33 changes: 29 additions & 4 deletions node/bft/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use snarkvm::{
narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
puzzle::{Solution, SolutionID},
},
prelude::Result,
prelude::{Field, Result},
};

use indexmap::IndexMap;
Expand Down Expand Up @@ -65,6 +65,7 @@ pub struct BFTSender<N: Network> {
pub tx_primary_certificate: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub tx_sync_bft_dag_at_bootup: mpsc::Sender<Vec<BatchCertificate<N>>>,
pub tx_sync_bft: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub tx_is_recently_committed: mpsc::Sender<((u64, Field<N>), oneshot::Sender<bool>)>,
}

impl<N: Network> BFTSender<N> {
Expand Down Expand Up @@ -97,6 +98,16 @@ impl<N: Network> BFTSender<N> {
// Await the callback to continue.
callback_receiver.await?
}

/// Sends the certificate round and ID to the BFT to receive a callback on whether the certificate was recently committed.
pub async fn send_sync_is_recently_committed(&self, round: u64, certificate_id: Field<N>) -> Result<bool> {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the round and certificate ID to the BFT.
self.tx_is_recently_committed.send(((round, certificate_id), callback_sender)).await?;
// Await the callback to continue.
Ok(callback_receiver.await?)
}
}

#[derive(Debug)]
Expand All @@ -105,6 +116,7 @@ pub struct BFTReceiver<N: Network> {
pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub rx_sync_bft_dag_at_bootup: mpsc::Receiver<Vec<BatchCertificate<N>>>,
pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub rx_is_recently_committed: mpsc::Receiver<((u64, Field<N>), oneshot::Sender<bool>)>,
}

/// Initializes the BFT channels.
Expand All @@ -113,9 +125,22 @@ pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) {
let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE);

let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft };
let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft };
let (tx_is_recently_committed, rx_is_recently_committed) = mpsc::channel(MAX_CHANNEL_SIZE);

let sender = BFTSender {
tx_primary_round,
tx_primary_certificate,
tx_sync_bft_dag_at_bootup,
tx_sync_bft,
tx_is_recently_committed,
};
let receiver = BFTReceiver {
rx_primary_round,
rx_primary_certificate,
rx_sync_bft_dag_at_bootup,
rx_sync_bft,
rx_is_recently_committed,
};

(sender, receiver)
}
Expand Down
101 changes: 78 additions & 23 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ use snarkvm::{
use anyhow::{bail, Result};
use parking_lot::Mutex;
use rayon::prelude::*;
use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
future::Future,
net::SocketAddr,
sync::Arc,
time::Duration,
};
use tokio::{
sync::{oneshot, Mutex as TMutex, OnceCell},
task::JoinHandle,
Expand Down Expand Up @@ -380,29 +386,17 @@ impl<N: Network> Sync<N> {
})
.collect::<HashMap<_, _>>();

// Iterate over the certificates.
// Sync the storage with the certificates.
for certificates in subdag.values().cloned() {
cfg_into_iter!(certificates.clone()).for_each(|certificate| {
cfg_into_iter!(certificates).for_each(|certificate| {
// Sync the batch certificate with the block.
self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
self.storage.sync_certificate_with_block(&block, certificate, &unconfirmed_transactions);
});

// Sync the BFT DAG with the certificates.
for certificate in certificates {
// If a BFT sender was provided, send the certificate to the BFT.
if let Some(bft_sender) = self.bft_sender.get() {
// Await the callback to continue.
if let Err(e) = bft_sender.send_sync_bft(certificate).await {
bail!("Sync - {e}");
};
}
}
}
}

// Fetch the latest block height.
let latest_block_height = self.ledger.latest_block_height();

// Insert the latest block response.
latest_block_responses.insert(block.height(), block);
// Clear the latest block responses of older blocks.
Expand Down Expand Up @@ -436,18 +430,18 @@ impl<N: Network> Sync<N> {
let committee_lookback = self.ledger.get_committee_lookback_for_round(commit_round)?;
// Retrieve all of the certificates for the **certificate** round.
let certificates = self.storage.get_certificates_for_round(certificate_round);
// Construct a set over the authors who included the leader's certificate in the certificate round.
let authors = certificates
// Construct a set over the certificates that included the leader's certificate in the certificate round.
let (election_authors, election_certificates): (HashSet<_>, HashSet<_>) = certificates
.iter()
.filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
true => Some(c.author()),
true => Some((c.author(), c)),
false => None,
})
.collect();
.unzip();

debug!("Validating sync block {next_block_height} at round {commit_round}...");
// Check if the leader is ready to be committed.
if committee_lookback.is_availability_threshold_reached(&authors) {
if committee_lookback.is_availability_threshold_reached(&election_authors) {
// Initialize the current certificate.
let mut current_certificate = leader_certificate;
// Check if there are any linked blocks that need to be added.
Expand All @@ -474,15 +468,76 @@ impl<N: Network> Sync<N> {
}
}

// Add the blocks to the ledger.
for block in blocks_to_add {
// Sync the BFT DAG with the blocks.
// Note: Subdags are committed by the linking rule. So, it is essential to check recent commits
// only after the root subdag, which has reached the availability threshold, has been committed in the BFT.
for block in blocks_to_add.iter() {
// Check that the blocks are sequential and can be added to the ledger.
let block_height = block.height();
if block_height != self.ledger.latest_block_height().saturating_add(1) {
warn!("Skipping block {block_height} from the latest block responses - not sequential.");
continue;
}
if let Authority::Quorum(subdag) = block.authority() {
// Iterate over the certificates.
for certificates in subdag.values().cloned() {
// Sync the BFT DAG with the certificates.
for certificate in certificates {
// If a BFT sender was provided, send the certificate to the BFT.
if let Some(bft_sender) = self.bft_sender.get() {
// Await the callback to continue.
if let Err(e) = bft_sender.send_sync_bft(certificate).await {
bail!("Sync - {e}");
};
}
}
}
}
}

// Sync the election certificates with the BFT DAG. This ensures that the root subdag is committed.
for election_certificate in election_certificates {
// If a BFT sender was provided, send the certificate to the BFT.
if let Some(bft_sender) = self.bft_sender.get() {
// Await the callback to continue.
if let Err(e) = bft_sender.send_sync_bft(election_certificate.clone()).await {
bail!("Sync - {e}");
};
}
}

// Add the blocks to the ledger.
for block in blocks_to_add {
// Retrieve the block height.
let block_height = block.height();
if let Authority::Quorum(subdag) = block.authority() {
// Retrieve the leader certificate of the subdag.
let leader_certificate = subdag.leader_certificate();
let leader_round = leader_certificate.round();
let leader_author = leader_certificate.author();
let leader_id = leader_certificate.id();
if let Some(bft_sender) = self.bft_sender.get() {
// Check if the leader certificate of the block has recently been committed in the replicated DAG state above.
// This ensures consistency between block sync and the BFT DAG state.
match bft_sender.send_sync_is_recently_committed(leader_round, leader_id).await {
Ok(is_recently_committed) => {
if !is_recently_committed {
bail!(
"Sync - Failed to advance blocks - leader certificate with author {leader_author} from round {leader_round} was not recently committed.",
);
}
debug!(
"Sync - Leader certificate with author {leader_author} from round {leader_round} was recently committed.",
);
}
Err(e) => {
bail!("Sync - Failed to check if leader certificate was recently committed - {e}");
}
};
}
}
// Add the block to the ledger.
info!("Proceeding to advance to sync block at height {}. ", block_height);
let self_ = self.clone();
tokio::task::spawn_blocking(move || {
// Check the next block.
Expand Down