Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainHead: Report unique hashes for pruned blocks #3667

Merged
merged 19 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d995348
chainHead: Report unique pruned hashes
lexnv Mar 12, 2024
00fd806
chainHead: Test unique pruned blocks
lexnv Mar 12, 2024
494a1ea
chainHead: Generate best block event even for unpruned forks
lexnv Mar 12, 2024
10b143a
chainHead/tests: Add one more block to trigger pruning
lexnv Mar 12, 2024
c840c8b
chainHead/tests: An extra block to validate pruned blocks are reported
lexnv Mar 12, 2024
3f3d876
chainHead: Use LRU for caching pruned blocks
lexnv Mar 12, 2024
936434c
chainHead/tests: Remove debug logs and add comment about test
lexnv Mar 12, 2024
a33ed67
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Mar 15, 2024
2c05bc5
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Mar 15, 2024
384d22c
chainHead: Rename best_block_cache to current_best_block
lexnv Mar 15, 2024
0b7ae73
chainHead: Use num pinned blocks for maximum LRU cache size
lexnv Mar 15, 2024
e644f0a
chainHead: Simplify new block generation logic
lexnv Mar 19, 2024
43c13db
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-uni…
lexnv Apr 3, 2024
99bf9c9
chainHead/tests: Add max sub config for tests
lexnv Apr 9, 2024
1eeb5e7
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-uni…
lexnv Apr 9, 2024
c52d42d
Fix clippy
lexnv Apr 9, 2024
0acd861
chainHead/follow: Simplify generate_init_events by storing the pruned
lexnv Apr 15, 2024
c952a0b
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-uni…
lexnv Apr 16, 2024
e4ccc03
chainHead/tests: Util code to reduce testing scenarios
lexnv Apr 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ array-bytes = "6.1"
log = { workspace = true, default-features = true }
futures-util = { version = "0.3.30", default-features = false }
rand = "0.8.5"
schnellru = "0.2.1"

[dev-dependencies]
serde_json = { workspace = true, default-features = true }
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct ChainHeadConfig {
/// Maximum pinned blocks across all connections.
/// This number is large enough to consider immediate blocks.
/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;
pub(crate) const MAX_PINNED_BLOCKS: usize = 512;

/// Any block of any subscription should not be pinned more than
/// this constant. When a subscription contains a block older than this,
Expand Down
134 changes: 98 additions & 36 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! Implementation of the `chainHead_follow` method.

use crate::chain_head::{
chain_head::LOG_TARGET,
chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
event::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
Expand All @@ -37,6 +37,7 @@ use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sc_rpc::utils::to_sub_message;
use schnellru::{ByLength, LruMap};
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
Expand Down Expand Up @@ -66,7 +67,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Subscription ID.
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
current_best_block: Option<Block::Hash>,
/// LRU cache of pruned blocks.
pruned_blocks: LruMap<Block::Hash, ()>,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
Expand All @@ -78,7 +81,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
with_runtime: bool,
sub_id: String,
) -> Self {
Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None }
Self {
client,
backend,
sub_handle,
with_runtime,
sub_id,
current_best_block: None,
pruned_blocks: LruMap::new(ByLength::new(
MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
)),
}
}
}

Expand Down Expand Up @@ -295,7 +308,7 @@ where
let best_block_hash = startup_point.best_hash;
if best_block_hash != finalized_block_hash {
let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
self.best_block_cache = Some(best_block_hash);
self.current_best_block = Some(best_block_hash);
finalized_block_descendants.push(best_block);
};

Expand Down Expand Up @@ -327,19 +340,19 @@ where
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });

match self.best_block_cache {
match self.current_best_block {
Some(block_cache) => {
// The RPC layer has not reported this block as best before.
// Note: This handles the race with the finalized branch.
if block_cache != block_hash {
self.best_block_cache = Some(block_hash);
self.current_best_block = Some(block_hash);
vec![new_block, best_block_event]
} else {
vec![new_block]
}
},
None => {
self.best_block_cache = Some(block_hash);
self.current_best_block = Some(block_hash);
vec![new_block, best_block_event]
},
}
Expand Down Expand Up @@ -408,7 +421,7 @@ where
// When the node falls out of sync and then syncs up to the tip of the chain, it can
// happen that we skip notifications. Then it is better to terminate the connection
// instead of trying to send notifications for all missed blocks.
if let Some(best_block_hash) = self.best_block_cache {
if let Some(best_block_hash) = self.current_best_block {
let ancestor = sp_blockchain::lowest_common_ancestor(
&*self.client,
*hash,
Expand All @@ -431,13 +444,10 @@ where
}

/// Get all pruned block hashes from the provided stale heads.
///
/// The result does not include hashes from `to_ignore`.
fn get_pruned_hashes(
&self,
&mut self,
stale_heads: &[Block::Hash],
last_finalized: Block::Hash,
to_ignore: &mut HashSet<Block::Hash>,
) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let mut pruned = Vec::new();
Expand All @@ -447,11 +457,13 @@ where

// Collect only blocks that are not part of the canonical chain.
pruned.extend(tree_route.enacted().iter().filter_map(|block| {
if !to_ignore.remove(&block.hash) {
Some(block.hash)
} else {
None
if self.pruned_blocks.get(&block.hash).is_some() {
// The block was already reported as pruned.
return None
}

self.pruned_blocks.insert(block.hash, ());
Some(block.hash)
}))
}

Expand All @@ -465,7 +477,6 @@ where
fn handle_finalized_blocks(
&mut self,
notification: FinalityNotification<Block>,
to_ignore: &mut HashSet<Block::Hash>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let last_finalized = notification.hash;
Expand All @@ -486,24 +497,73 @@ where
// Report all pruned blocks from the notification that are not
// part of the fork we need to ignore.
let pruned_block_hashes =
self.get_pruned_hashes(&notification.stale_heads, last_finalized, to_ignore)?;
self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;

let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
});

match self.best_block_cache {
match self.current_best_block {
Some(block_cache) => {
// If the best block wasn't pruned, we are done here.
if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) {
// We need to generate a `BestBlock` event for the finalized block when:
// - (i) the last reported best block was pruned
// - (ii) the last reported best block is on a fork that will be pruned in the
// future.
// Note: pruning happens on level n - 1.

// Best block already generated.
if block_cache == last_finalized {
events.push(finalized_event);
return Ok(events);
}
Copy link
Contributor

@jsdw jsdw Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check my understanding, this is all abotu addressing this bit of the spec:

The current best block, in other words the last block reported through a bestBlockChanged event, is guaranteed to either be the last item in finalizedBlockHashes, or to not be present in either finalizedBlockHashes or prunedBlockHashes.

Here it looks like when a new last finalized block is seen, we:

  • Return happily if the best block was finalized.
  • Else, return a new BestBlockChanged event if the current best block has been pruned.
  • Else, return a new BestBlockChanged event if the current best block will be pruned (ie is not a descendant of the new finalized chain)

To satisfy the spec, could the whole thing be simplified to pseudocode like this?:

if let Some(current_best_block) = self.best_block_cache {
    // current best block is in pruned list, so need to emit new BestBlock
    let is_in_pruned_list = pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
    // current best block is not the last one we finalized
    let is_not_last_finalized = current_best_block != last_finalized;

    if is_in_pruned_list || is_not_last_finalized {
        let new_best_block = self.client.info().best_hash;
        events.push(BestBlockChanged { best_block_hash })
    }
}

events.push(finalized_event);
Ok(events)

In any case, I wonder whether we could avoid some of the early returns and such, and separate it so that we first write the logic to decide whether we need to push a new BestBlockChanged event and then do the push, with a single return at the end to hand back the events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense!

I think I was more worried about cases where we've already generated a BestBlockChanged event for a descendant for the finalized block.

However, I think this is safe to regenerate a BestBlockChanged with an older block number, as long as it is the last reported finalized, from this spec statement:

The current best block, in other words the last block reported through a bestBlockChanged event, is guaranteed to either be the last item in finalizedBlockHashes

The suggestion simplifies the code quite a bit, thanks! 🙏


// Checking if the block was pruned is faster than computing the route tree.
let was_pruned = pruned_block_hashes.iter().any(|hash| *hash == block_cache);
if was_pruned {
// We need to generate a best block event.
let best_block_hash = self.client.info().best_hash;

// Defensive check against state missmatch.
if best_block_hash == block_cache {
// The client doest not have any new information about the best block.
// The information from `.info()` is updated from the DB as the last
// step of the finalization and it should be up to date.
// If the info is outdated, there is nothing the RPC can do for now.
error!(
target: LOG_TARGET,
"[follow][id={:?}] Client does not contain different best block",
self.sub_id,
);
events.push(finalized_event);
return Ok(events);
}

// The RPC needs to also submit a new best block changed before the
// finalized event.
self.current_best_block = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
return Ok(events)
}

// The best block is reported as pruned. Therefore, we need to signal a new
// best block event before submitting the finalized event.
// The best block was not pruned, however it might be on a fork that will be pruned.
let tree_route = sp_blockchain::tree_route(
self.backend.blockchain(),
last_finalized,
block_cache,
)?;

// The best block is a descendent of the finalized block.
if tree_route.retracted().is_empty() {
events.push(finalized_event);
return Ok(events)
}

// The best block is on a fork that will be pruned.
let best_block_hash = self.client.info().best_hash;
// Defensive check against state missmatch.
if best_block_hash == block_cache {
// The client doest not have any new information about the best block.
// The information from `.info()` is updated from the DB as the last
Expand All @@ -515,16 +575,16 @@ where
self.sub_id,
);
events.push(finalized_event);
Ok(events)
} else {
// The RPC needs to also submit a new best block changed before the
// finalized event.
self.best_block_cache = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
Ok(events)
return Ok(events);
}

// The RPC needs to also submit a new best block changed before the
// finalized event.
self.current_best_block = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
Ok(events)
},
None => {
events.push(finalized_event);
Expand All @@ -539,7 +599,6 @@ where
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
mut to_ignore: HashSet<Block::Hash>,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) where
Expand All @@ -556,7 +615,7 @@ where
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point),
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};

Expand Down Expand Up @@ -642,7 +701,10 @@ where
let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
let stream = stream::once(futures::future::ready(initial)).chain(merged);

self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await;
// These are the pruned blocks that we should not report again.
for pruned in pruned_forks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure I understand this correctly. When we generate the initial events we consider all blocks that are part of a fork that started below the finalized one as pruned, even if they are technically not pruned yet. So these will never be reported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I can't remember exactly, this was introduced quite a while ago to handle the case where the Finalized event would contain a pruned block not reported by a new block event.

This abuses the purpose of self.pruned_blocks, which now holds:

  • (previously stored in to_ignore) forks that we did not report previously by the Initialized event
  • (the intended purpose of this PR) pruned blocks already reported once

self.pruned_blocks.insert(pruned, ());
}
self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await;
}
}
Loading
Loading