Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainHead: Report unique hashes for pruned blocks #3667

Merged
merged 19 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d995348
chainHead: Report unique pruned hashes
lexnv Mar 12, 2024
00fd806
chainHead: Test unique pruned blocks
lexnv Mar 12, 2024
494a1ea
chainHead: Generate best block event even for unpruned forks
lexnv Mar 12, 2024
10b143a
chainHead/tests: Add one more block to trigger pruning
lexnv Mar 12, 2024
c840c8b
chainHead/tests: An extra block to validate pruned blocks are reported
lexnv Mar 12, 2024
3f3d876
chainHead: Use LRU for caching pruned blocks
lexnv Mar 12, 2024
936434c
chainHead/tests: Remove debug logs and add comment about test
lexnv Mar 12, 2024
a33ed67
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Mar 15, 2024
2c05bc5
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Mar 15, 2024
384d22c
chainHead: Rename best_block_cache to current_best_block
lexnv Mar 15, 2024
0b7ae73
chainHead: Use num pinned blocks for maximum LRU cache size
lexnv Mar 15, 2024
e644f0a
chainHead: Simplify new block generation logic
lexnv Mar 19, 2024
43c13db
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-uni…
lexnv Apr 3, 2024
99bf9c9
chainHead/tests: Add max sub config for tests
lexnv Apr 9, 2024
1eeb5e7
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-uni…
lexnv Apr 9, 2024
c52d42d
Fix clippy
lexnv Apr 9, 2024
0acd861
chainHead/follow: Simplify generate_init_events by storing the pruned
lexnv Apr 15, 2024
c952a0b
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-uni…
lexnv Apr 16, 2024
e4ccc03
chainHead/tests: Util code to reduce testing scenarios
lexnv Apr 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ array-bytes = "6.1"
log = { workspace = true, default-features = true }
futures-util = { version = "0.3.30", default-features = false }
rand = "0.8.5"
schnellru = "0.2.1"

[dev-dependencies]
serde_json = { workspace = true, default-features = true }
Expand Down
119 changes: 93 additions & 26 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sc_rpc::utils::to_sub_message;
use schnellru::{ByLength, LruMap};
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
Expand All @@ -50,6 +51,11 @@ use std::{
/// The maximum number of finalized blocks provided by the
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;
/// The size of the LRU cache for pruned blocks.
///
/// This is the exact value of the total number of pinned blocks, and ensures
/// that all active pruned block hashes (if any) are kept in memory.
const LRU_CACHE_SIZE: u32 = 512;
lexnv marked this conversation as resolved.
Show resolved Hide resolved

use super::subscription::InsertedSubscriptionData;

Expand All @@ -67,6 +73,8 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
lexnv marked this conversation as resolved.
Show resolved Hide resolved
/// LRU cache of pruned blocks.
pruned_blocks: LruMap<Block::Hash, ()>,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
Expand All @@ -78,7 +86,15 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
with_runtime: bool,
sub_id: String,
) -> Self {
Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None }
Self {
client,
backend,
sub_handle,
with_runtime,
sub_id,
best_block_cache: None,
pruned_blocks: LruMap::new(ByLength::new(LRU_CACHE_SIZE)),
}
}
}

Expand Down Expand Up @@ -434,10 +450,9 @@ where
///
/// The result does not include hashes from `to_ignore`.
lexnv marked this conversation as resolved.
Show resolved Hide resolved
fn get_pruned_hashes(
&self,
&mut self,
stale_heads: &[Block::Hash],
last_finalized: Block::Hash,
to_ignore: &mut HashSet<Block::Hash>,
) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let mut pruned = Vec::new();
Expand All @@ -447,11 +462,13 @@ where

// Collect only blocks that are not part of the canonical chain.
pruned.extend(tree_route.enacted().iter().filter_map(|block| {
if !to_ignore.remove(&block.hash) {
Some(block.hash)
} else {
None
if self.pruned_blocks.get(&block.hash).is_some() {
// The block was already reported as pruned.
return None
}

self.pruned_blocks.insert(block.hash, ());
Some(block.hash)
}))
}

Expand All @@ -465,7 +482,6 @@ where
fn handle_finalized_blocks(
&mut self,
notification: FinalityNotification<Block>,
to_ignore: &mut HashSet<Block::Hash>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let last_finalized = notification.hash;
Expand All @@ -486,7 +502,7 @@ where
// Report all pruned blocks from the notification that are not
// part of the fork we need to ignore.
let pruned_block_hashes =
self.get_pruned_hashes(&notification.stale_heads, last_finalized, to_ignore)?;
self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;

let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
Expand All @@ -495,15 +511,64 @@ where

match self.best_block_cache {
Some(block_cache) => {
// If the best block wasn't pruned, we are done here.
if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) {
// We need to generate a `NewBlock` event for the finalized block when:
lexnv marked this conversation as resolved.
Show resolved Hide resolved
lexnv marked this conversation as resolved.
Show resolved Hide resolved
// - (i) the last reported best block was pruned
// - (ii) the last reported best block is on a fork that will be pruned in the
// future.
// Note: pruning happens on level n - 1.

// Best block already generated.
if block_cache == last_finalized {
events.push(finalized_event);
return Ok(events);
}

// Checking if the block was pruned is faster than computing the route tree.
let was_pruned = pruned_block_hashes.iter().any(|hash| *hash == block_cache);
if was_pruned {
// We need to generate a best block event.
let best_block_hash = self.client.info().best_hash;

// Defensive check against state missmatch.
if best_block_hash == block_cache {
// The client doest not have any new information about the best block.
// The information from `.info()` is updated from the DB as the last
// step of the finalization and it should be up to date.
// If the info is outdated, there is nothing the RPC can do for now.
error!(
target: LOG_TARGET,
"[follow][id={:?}] Client does not contain different best block",
self.sub_id,
);
events.push(finalized_event);
return Ok(events);
}

// The RPC needs to also submit a new best block changed before the
// finalized event.
self.best_block_cache = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
return Ok(events)
}

// The best block was not pruned, however it might be on a fork that will be pruned.
let tree_route = sp_blockchain::tree_route(
self.backend.blockchain(),
last_finalized,
block_cache,
)?;

// The best block is a descendent of the finalized block.
if tree_route.retracted().is_empty() {
events.push(finalized_event);
return Ok(events)
}

// The best block is reported as pruned. Therefore, we need to signal a new
// best block event before submitting the finalized event.
// The best block is on a fork that will be pruned.
let best_block_hash = self.client.info().best_hash;
// Defensive check against state missmatch.
if best_block_hash == block_cache {
// The client doest not have any new information about the best block.
// The information from `.info()` is updated from the DB as the last
Expand All @@ -515,16 +580,16 @@ where
self.sub_id,
);
events.push(finalized_event);
Ok(events)
} else {
// The RPC needs to also submit a new best block changed before the
// finalized event.
self.best_block_cache = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
Ok(events)
return Ok(events);
}

// The RPC needs to also submit a new best block changed before the
// finalized event.
self.best_block_cache = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
Ok(events)
},
None => {
events.push(finalized_event);
Expand All @@ -539,7 +604,6 @@ where
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
mut to_ignore: HashSet<Block::Hash>,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) where
Expand All @@ -556,7 +620,7 @@ where
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point),
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};

Expand Down Expand Up @@ -642,7 +706,10 @@ where
let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
let stream = stream::once(futures::future::ready(initial)).chain(merged);

self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await;
// These are the pruned blocks that we should not report again.
for pruned in pruned_forks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure I understand this correctly. When we generate the initial events we consider all blocks that are part of a fork that started below the finalized one as pruned, even if they are technically not pruned yet. So these will never be reported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I can't remember exactly, this was introduced quite a while ago to handle the case where the Finalized event would contain a pruned block not reported by a new block event.

This abuses the purpose of self.pruned_blocks, which now holds:

  • (previously stored in to_ignore) forks that we did not report previously by the Initialized event
  • (the intended purpose of this PR) pruned blocks already reported once

self.pruned_blocks.insert(pruned, ());
}
self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await;
}
}
Loading
Loading