Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update syncing logic to fix duplicate block requests #3410

Open
wants to merge 4 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,18 @@ impl<N: Network> Sync<N> {
// Try to advance the ledger *to tip* without updating the BFT.
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the ledger to block {}...", block.height());
self.sync_ledger_with_block_without_bft(block).await?;
// Update the current height.
current_height += 1;
// Sync the ledger with the block without BFT.
match self.sync_ledger_with_block_without_bft(block).await {
Ok(_) => {
// Update the current height if sync succeeds.
current_height += 1;
}
Err(e) => {
// Mark the current height as processed in block_sync.
self.block_sync.remove_block_response(current_height);
return Err(e);
}
}
}
// Sync the storage with the ledger if we should transition to the BFT sync.
if current_height > max_gc_height {
Expand All @@ -332,9 +341,17 @@ impl<N: Network> Sync<N> {
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the BFT to block {}...", block.height());
// Sync the storage with the block.
self.sync_storage_with_block(block).await?;
// Update the current height.
current_height += 1;
match self.sync_storage_with_block(block).await {
Ok(_) => {
// Update the current height if sync succeeds.
current_height += 1;
}
Err(e) => {
// Mark the current height as processed in block_sync.
self.block_sync.remove_block_response(current_height);
return Err(e);
}
}
}
Ok(())
}
Expand All @@ -355,6 +372,8 @@ impl<N: Network> Sync<N> {
self_.storage.sync_height_with_block(block.height());
// Sync the round with the block.
self_.storage.sync_round_with_block(block.round());
// Mark the block height as processed in block_sync.
self_.block_sync.remove_block_response(block.height());

Ok(())
})
Expand Down Expand Up @@ -502,6 +521,8 @@ impl<N: Network> Sync<N> {
.await??;
// Remove the block height from the latest block responses.
latest_block_responses.remove(&block_height);
// Mark the block height as processed in block_sync.
self.block_sync.remove_block_response(block_height);
}
} else {
debug!(
Expand Down
65 changes: 42 additions & 23 deletions node/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,21 @@ impl<N: Network> BlockSync<N> {
/// Returns the next block to process, if one is ready.
#[inline]
pub fn process_next_block(&self, next_height: u32) -> Option<Block<N>> {
// Try to advance the ledger with a block from the sync pool.
self.remove_block_response(next_height)
// Acquire the requests write lock.
// Note: This lock must be held across the entire scope, due to asynchronous block responses
// from multiple peers that may be received concurrently.
let requests = self.requests.read();

// Determine if the request is complete.
let is_request_complete =
requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true);

// If the request is not complete, return early.
if !is_request_complete {
return None;
}

self.responses.read().get(&next_height).cloned()
}

/// Attempts to advance with blocks from the sync pool.
Expand All @@ -351,20 +364,33 @@ impl<N: Network> BlockSync<N> {

/// Handles the block responses from the sync pool.
fn try_advancing_with_block_responses(&self, mut current_height: u32) {
while let Some(block) = self.remove_block_response(current_height + 1) {
while let Some(block) = self.process_next_block(current_height + 1) {
// Ensure the block height matches.
if block.height() != current_height + 1 {
warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
break;
}
// Check the next block.
if let Err(error) = self.canon.check_next_block(&block) {
warn!("The next block ({}) is invalid - {error}", block.height());
break;
}
// Attempt to advance to the next block.
if let Err(error) = self.canon.advance_to_next_block(&block) {
warn!("{error}");

// Try to check the next block and advance to it.
let advanced = match self.canon.check_next_block(&block) {
Ok(_) => match self.canon.advance_to_next_block(&block) {
Ok(_) => true,
Err(err) => {
warn!("Failed to advance to next block ({}): {err}", block.height());
false
}
},
Err(err) => {
warn!("The next block ({}) is invalid - {err}", block.height());
false
}
};

// Remove the block response.
self.remove_block_response(current_height + 1);

// If advancing failed, exit the loop.
if !advanced {
break;
}
// Update the latest height.
Expand Down Expand Up @@ -609,32 +635,25 @@ impl<N: Network> BlockSync<N> {
self.request_timestamps.write().remove(&height);
}

/// Removes and returns the block response for the given height, if the request is complete.
fn remove_block_response(&self, height: u32) -> Option<Block<N>> {
/// Removes the block response for the given height
/// This may only be called after `process_next_block`, which checked if the request for the given height was complete.
pub fn remove_block_response(&self, height: u32) {
// Acquire the requests write lock.
// Note: This lock must be held across the entire scope, due to asynchronous block responses
// from multiple peers that may be received concurrently.
let mut requests = self.requests.write();

// Determine if the request is complete.
let is_request_complete = requests.get(&height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true);

// If the request is not complete, return early.
if !is_request_complete {
return None;
}
// Remove the request entry for the given height.
requests.remove(&height);
// Remove the request timestamp entry for the given height.
self.request_timestamps.write().remove(&height);
// Remove the response entry for the given height.
self.responses.write().remove(&height)
self.responses.write().remove(&height);
}

/// Removes the block request for the given peer IP, if it exists.
#[allow(dead_code)]
fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) {
let mut can_revoke = self.responses.read().get(&height).is_none();
let mut can_revoke = self.process_next_block(height).is_none();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is remove_block_request_to_peer now calling process_next_block instead of just removing it from the responses?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I answered my own question. You are now nesting the response removal inside process_next_block and adding some additional checks.

Copy link
Collaborator

@ljedrz ljedrz Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while I also believe it to be fine, an adjustment to the function name or extending the doc comment could be beneficial here


// Remove the peer IP from the request entry. If the request entry is now empty,
// and the response entry for this height is also empty, then remove the request entry altogether.
Expand Down