Skip to content

Commit

Permalink
chainHead: Report unique hashes for pruned blocks (#3667)
Browse files Browse the repository at this point in the history
This PR ensures that the reported pruned blocks are unique.

While at it, ensure that the best block event is properly generated when
the last best block is a fork that will be pruned in the future.

To achieve this, the chainHead keeps a LRU set of reported pruned blocks
to ensure the following are not reported twice:

```bash
	 finalized -> block 1 -> block 2 -> block 3
	
	                      -> block 2 -> block 4 -> block 5
	
	           -> block 1 -> block 2_f -> block 6 -> block 7 -> block 8
```

When block 7 is finalized the branch [block 2; block 3] is reported as
pruned.
When block 8 is finalized the branch [block 2; block 4; block 5] should
be reported as pruned, however block 2 was already reported as pruned at
the previous step.

This is a side-effect of the pruned blocks being reported at level N -
1. For example, if all pruned forks would be reported with the first
encounter (when block 6 is finalized we know that block 3 and block 5
are stale), we would not need the LRU cache.

cc @paritytech/subxt-team  

Closes #3658

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
lexnv and skunert authored Apr 17, 2024
1 parent ca7c01c commit bfbf7f5
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ array-bytes = "6.1"
log = { workspace = true, default-features = true }
futures-util = { version = "0.3.30", default-features = false }
rand = "0.8.5"
schnellru = "0.2.1"

[dev-dependencies]
jsonrpsee = { version = "0.22", features = ["server", "ws-client"] }
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct ChainHeadConfig {
/// Maximum pinned blocks across all connections.
/// This number is large enough to consider immediate blocks.
/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;
pub(crate) const MAX_PINNED_BLOCKS: usize = 512;

/// Any block of any subscription should not be pinned more than
/// this constant. When a subscription contains a block older than this,
Expand Down
108 changes: 57 additions & 51 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! Implementation of the `chainHead_follow` method.
use crate::chain_head::{
chain_head::LOG_TARGET,
chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
event::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
Expand All @@ -37,6 +37,7 @@ use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sc_rpc::utils::to_sub_message;
use schnellru::{ByLength, LruMap};
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
Expand Down Expand Up @@ -68,7 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Subscription ID.
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
current_best_block: Option<Block::Hash>,
/// LRU cache of pruned blocks.
pruned_blocks: LruMap<Block::Hash, ()>,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
Expand All @@ -90,7 +93,10 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
sub_handle,
with_runtime,
sub_id,
best_block_cache: None,
current_best_block: None,
pruned_blocks: LruMap::new(ByLength::new(
MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
)),
max_lagging_distance,
}
}
Expand Down Expand Up @@ -303,18 +309,20 @@ where

/// Generate the initial events reported by the RPC `follow` method.
///
/// Returns the initial events that should be reported directly, together with pruned
/// block hashes that should be ignored by the `Finalized` event.
/// Returns the initial events that should be reported directly.
fn generate_init_events(
&mut self,
startup_point: &StartupPoint<Block>,
) -> Result<(Vec<FollowEvent<Block::Hash>>, HashSet<Block::Hash>), SubscriptionManagementError>
{
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;

// The initialized event is the first one sent.
let initial_blocks = init.finalized_block_descendants;
let finalized_block_hashes = init.finalized_block_hashes;
// These are the pruned blocks that we should not report again.
for pruned in init.pruned_forks {
self.pruned_blocks.insert(pruned, ());
}

let finalized_block_hash = startup_point.finalized_hash;
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
Expand Down Expand Up @@ -345,11 +353,11 @@ where
let best_block_hash = startup_point.best_hash;
if best_block_hash != finalized_block_hash {
let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
self.best_block_cache = Some(best_block_hash);
self.current_best_block = Some(best_block_hash);
finalized_block_descendants.push(best_block);
};

Ok((finalized_block_descendants, init.pruned_forks))
Ok(finalized_block_descendants)
}

/// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for the
Expand Down Expand Up @@ -377,19 +385,19 @@ where
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });

match self.best_block_cache {
match self.current_best_block {
Some(block_cache) => {
// The RPC layer has not reported this block as best before.
// Note: This handles the race with the finalized branch.
if block_cache != block_hash {
self.best_block_cache = Some(block_hash);
self.current_best_block = Some(block_hash);
vec![new_block, best_block_event]
} else {
vec![new_block]
}
},
None => {
self.best_block_cache = Some(block_hash);
self.current_best_block = Some(block_hash);
vec![new_block, best_block_event]
},
}
Expand Down Expand Up @@ -458,7 +466,7 @@ where
// When the node falls out of sync and then syncs up to the tip of the chain, it can
// happen that we skip notifications. Then it is better to terminate the connection
// instead of trying to send notifications for all missed blocks.
if let Some(best_block_hash) = self.best_block_cache {
if let Some(best_block_hash) = self.current_best_block {
let ancestor = sp_blockchain::lowest_common_ancestor(
&*self.client,
*hash,
Expand All @@ -481,13 +489,10 @@ where
}

/// Get all pruned block hashes from the provided stale heads.
///
/// The result does not include hashes from `to_ignore`.
fn get_pruned_hashes(
&self,
&mut self,
stale_heads: &[Block::Hash],
last_finalized: Block::Hash,
to_ignore: &mut HashSet<Block::Hash>,
) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let mut pruned = Vec::new();
Expand All @@ -497,11 +502,13 @@ where

// Collect only blocks that are not part of the canonical chain.
pruned.extend(tree_route.enacted().iter().filter_map(|block| {
if !to_ignore.remove(&block.hash) {
Some(block.hash)
} else {
None
if self.pruned_blocks.get(&block.hash).is_some() {
// The block was already reported as pruned.
return None
}

self.pruned_blocks.insert(block.hash, ());
Some(block.hash)
}))
}

Expand All @@ -515,7 +522,6 @@ where
fn handle_finalized_blocks(
&mut self,
notification: FinalityNotification<Block>,
to_ignore: &mut HashSet<Block::Hash>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let last_finalized = notification.hash;
Expand All @@ -536,25 +542,32 @@ where
// Report all pruned blocks from the notification that are not
// part of the fork we need to ignore.
let pruned_block_hashes =
self.get_pruned_hashes(&notification.stale_heads, last_finalized, to_ignore)?;
self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;

let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
});

match self.best_block_cache {
Some(block_cache) => {
// If the best block wasn't pruned, we are done here.
if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) {
events.push(finalized_event);
return Ok(events)
}

// The best block is reported as pruned. Therefore, we need to signal a new
// best block event before submitting the finalized event.
if let Some(current_best_block) = self.current_best_block {
// The best reported block is in the pruned list. Report a new best block.
let is_in_pruned_list =
pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
// The block is not the last finalized block.
//
// It can be either:
// - a descendant of the last finalized block
// - a block on a fork that will be pruned in the future.
//
// In those cases, we emit a new best block.
let is_not_last_finalized = current_best_block != last_finalized;

if is_in_pruned_list || is_not_last_finalized {
// We need to generate a best block event.
let best_block_hash = self.client.info().best_hash;
if best_block_hash == block_cache {

// Defensive check against state missmatch.
if best_block_hash == current_best_block {
// The client doest not have any new information about the best block.
// The information from `.info()` is updated from the DB as the last
// step of the finalization and it should be up to date.
Expand All @@ -564,23 +577,18 @@ where
"[follow][id={:?}] Client does not contain different best block",
self.sub_id,
);
events.push(finalized_event);
Ok(events)
} else {
// The RPC needs to also submit a new best block changed before the
// finalized event.
self.best_block_cache = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
Ok(events)
self.current_best_block = Some(best_block_hash);
events
.push(FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }));
}
},
None => {
events.push(finalized_event);
Ok(events)
},
}
}

events.push(finalized_event);
Ok(events)
}

/// Submit the events from the provided stream to the RPC client
Expand All @@ -589,7 +597,6 @@ where
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
mut to_ignore: HashSet<Block::Hash>,
sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
Expand All @@ -612,7 +619,7 @@ where
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point),
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};

Expand Down Expand Up @@ -682,7 +689,7 @@ where
.map(|response| NotificationType::MethodResponse(response));

let startup_point = StartupPoint::from(self.client.info());
let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) {
let initial_events = match self.generate_init_events(&startup_point) {
Ok(blocks) => blocks,
Err(err) => {
debug!(
Expand All @@ -702,7 +709,6 @@ where
let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
let stream = stream::once(futures::future::ready(initial)).chain(merged);

self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
.await
self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await
}
}
Loading

0 comments on commit bfbf7f5

Please sign in to comment.