Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainHead: Track reported blocks to capture notification gaps #5856

Merged
merged 20 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c42a4d8
chainHead: Introduce state for block tracking
lexnv Sep 27, 2024
d277b87
chainHead: Fix typo in distace_within_reason
lexnv Sep 27, 2024
00e8eab
chainHead: Keep track of blocks from the initial event
lexnv Sep 27, 2024
6e55454
chainHead: Track reported blocks from the NewBlock import path
lexnv Sep 27, 2024
678e9bf
chainHead: Track reported blocks from the Finalized import path
lexnv Sep 27, 2024
35b6573
chainHead: Track announced blocks as finalized
lexnv Sep 27, 2024
5e03600
chainHead/tests: Detect block gaps with unknwon parents
lexnv Sep 27, 2024
5c80006
chainHead: Simplify state tracking
lexnv Sep 27, 2024
7e74e4a
prdoc: Add prdoc
lexnv Sep 27, 2024
fbb8b16
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Oct 1, 2024
2b44eaf
chainHead: Move finalized blocks to a shorter lived LRU
lexnv Oct 2, 2024
e97b590
chainHead: Use LRU::peak instead of get to not change order
lexnv Oct 2, 2024
45f2bce
chainHead: Extra sanity check for subscription pinning and races
lexnv Oct 2, 2024
28911be
chainHead: Move `pin_block` logic above
lexnv Oct 2, 2024
1d5af1a
chainHead: Add most recently finalized blocks wrapper
lexnv Oct 3, 2024
308d388
chainHead: Apply suggestion
lexnv Oct 3, 2024
a05ae72
chainHead: Move comment to the top of the struct
lexnv Oct 4, 2024
d8e9a35
Merge remote-tracking branch 'origin/master' into lexnv/chain-head-state
lexnv Oct 4, 2024
5135e89
chainHead/tests: Adjust interface to latest origin/master
lexnv Oct 4, 2024
a5bf18e
chainHead/tests: Use mock client instead of client
lexnv Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions prdoc/pr_5856.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Extend state tracking of chainHead to capture notification gaps

doc:
- audience: Node Dev
description: |
This PR extends the state tracking of the RPC-v2 chainHead methods.
ChainHead tracks the reported blocks to detect notification gaps.
This state tracking ensures we can detect `NewBlock` events for
which we did not report previously the parent hash.

crates:
- name: sc-rpc-spec-v2
bump: minor

150 changes: 132 additions & 18 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::chain_head::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
},
subscription::{SubscriptionManagement, SubscriptionManagementError},
subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError},
};
use futures::{
channel::oneshot,
Expand Down Expand Up @@ -53,8 +53,6 @@ use std::{
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;

use super::subscription::InsertedSubscriptionData;

/// Generates the events of the `chainHead_follow` method.
pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
Expand All @@ -71,11 +69,75 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
current_best_block: Option<Block::Hash>,
/// LRU cache of pruned blocks.
pruned_blocks: LruMap<Block::Hash, ()>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Believe that pruned_blocks are no longer needed (#5605), will tackle this in another PR

/// LRU cache of announced blocks.
announced_blocks: AnnouncedBlocks<Block>,
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
}

struct AnnouncedBlocks<Block: BlockT> {
/// Unfinalized blocks.
blocks: LruMap<Block::Hash, ()>,
/// Finalized blocks.
finalized: MostRecentFinalizedBlocks<Block>,
}

/// Wrapper over LRU to efficiently lookup hashes and remove elements as FIFO queue.
struct MostRecentFinalizedBlocks<Block: BlockT>(LruMap<Block::Hash, ()>);

impl<Block: BlockT> MostRecentFinalizedBlocks<Block> {
/// Insert the finalized block hash into the LRU cache.
fn insert(&mut self, block: Block::Hash) {
self.0.insert(block, ());
}

/// Check if the block is contained in the LRU cache.
fn contains(&mut self, block: &Block::Hash) -> Option<&()> {
// For the finalized blocks we use `peek` to avoid moving the block counter to the front.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
// This effectively means that the LRU acts as a FIFO queue. Otherwise, we might
// end up with scenarios where the "finalized block" in the end of LRU is overwritten which
// may not necessarily be the oldest finalized block i.e, possible that "get" promotes an
// older finalized block because it was accessed more recently.
self.0.peek(block)
}
}

impl<Block: BlockT> AnnouncedBlocks<Block> {
/// Creates a new `AnnouncedBlocks`.
fn new() -> Self {
Self {
// The total number of pinned blocks is `MAX_PINNED_BLOCKS`, ensure we don't
// exceed the limit.
blocks: LruMap::new(ByLength::new((MAX_PINNED_BLOCKS - MAX_FINALIZED_BLOCKS) as u32)),
// We are keeping a smaller number of announced finalized blocks in memory.
// This is because the `Finalized` event might be triggered before the `NewBlock` event.
finalized: MostRecentFinalizedBlocks(LruMap::new(ByLength::new(
MAX_FINALIZED_BLOCKS as u32,
))),
}
}

/// Insert the block into the announced blocks.
fn insert(&mut self, block: Block::Hash, finalized: bool) {
if finalized {
// When a block is declared as finalized, it is removed from the unfinalized blocks.
//
// Given that the finalized blocks are bounded to `MAX_FINALIZED_BLOCKS`,
// this ensures we keep the minimum number of blocks in memory.
self.blocks.remove(&block);
self.finalized.insert(block);
} else {
self.blocks.insert(block, ());
}
}

/// Check if the block was previously announced.
fn was_announced(&mut self, block: &Block::Hash) -> bool {
self.blocks.get(block).is_some() || self.finalized.contains(block).is_some()
}
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
/// Create a new [`ChainHeadFollower`].
pub fn new(
Expand All @@ -96,6 +158,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
pruned_blocks: LruMap::new(ByLength::new(
MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
)),
announced_blocks: AnnouncedBlocks::new(),
max_lagging_distance,
}
}
Expand Down Expand Up @@ -214,7 +277,7 @@ where
///
/// This edge-case can happen for parachains where the relay chain syncs slower to
/// the head of the chain than the parachain node that is synced already.
fn distace_within_reason(
fn distance_within_reason(
&self,
block: Block::Hash,
finalized: Block::Hash,
Expand Down Expand Up @@ -250,7 +313,7 @@ where
// Ensure all leaves are within a reasonable distance from the finalized block,
// before traversing the tree.
for leaf in &leaves {
self.distace_within_reason(*leaf, finalized)?;
self.distance_within_reason(*leaf, finalized)?;
}

for leaf in leaves {
Expand Down Expand Up @@ -326,6 +389,10 @@ where
let finalized_block_hash = startup_point.finalized_hash;
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);

for finalized in &finalized_block_hashes {
self.announced_blocks.insert(*finalized, true);
}

let initialized_event = FollowEvent::Initialized(Initialized {
finalized_block_hashes: finalized_block_hashes.into(),
finalized_block_runtime,
Expand All @@ -336,6 +403,13 @@ where

finalized_block_descendants.push(initialized_event);
for (child, parent) in initial_blocks.into_iter() {
// If the parent was not announced we have a gap currently.
// This can happen during a WarpSync.
if !self.announced_blocks.was_announced(&parent) {
return Err(SubscriptionManagementError::BlockHeaderAbsent);
}
self.announced_blocks.insert(child, false);

let new_runtime = self.generate_runtime_event(child, Some(parent));

let event = FollowEvent::NewBlock(NewBlock {
Expand All @@ -351,6 +425,11 @@ where
// Generate a new best block event.
let best_block_hash = startup_point.best_hash;
if best_block_hash != finalized_block_hash {
if !self.announced_blocks.was_announced(&best_block_hash) {
return Err(SubscriptionManagementError::BlockHeaderAbsent);
}
self.announced_blocks.insert(best_block_hash, true);

let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
self.current_best_block = Some(best_block_hash);
finalized_block_descendants.push(best_block);
Expand Down Expand Up @@ -408,21 +487,41 @@ where
notification: BlockImportNotification<Block>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
// The block was already pinned by the initial block events or by the finalized event.
if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? {
return Ok(Default::default())
}
let block_hash = notification.hash;

// Ensure we are only reporting blocks after the starting point.
if *notification.header.number() < startup_point.finalized_number {
return Ok(Default::default())
}

Ok(self.generate_import_events(
notification.hash,
*notification.header.parent_hash(),
notification.is_new_best,
))
// Ensure the block can be pinned before generating the events.
if !self.sub_handle.pin_block(&self.sub_id, block_hash)? {
// The block is already pinned, this is similar to the check above.
//
// The `SubscriptionManagement` ensures the block is tracked until (short lived):
// - 2 calls to `pin_block` are made (from `Finalized` and `NewBlock` branches).
// - the block is unpinned by the user
//
// This is rather a sanity checks for edge-cases (in theory), where
// [`MAX_FINALIZED_BLOCKS` + 1] finalized events are triggered before the `NewBlock`
// event of the first `Finalized` event.
return Ok(Default::default())
}

if self.announced_blocks.was_announced(&block_hash) {
// Block was already reported by the finalized branch.
return Ok(Default::default())
}

// Double check the parent hash. If the parent hash is not reported, we have a gap.
let parent_block_hash = *notification.header.parent_hash();
if !self.announced_blocks.was_announced(&parent_block_hash) {
// The parent block was not reported, we have a gap.
return Err(SubscriptionManagementError::Custom("Parent block was not reported".into()))
}

self.announced_blocks.insert(block_hash, false);
Ok(self.generate_import_events(block_hash, parent_block_hash, notification.is_new_best))
}

/// Generates new block events from the given finalized hashes.
Expand All @@ -448,19 +547,29 @@ where
return Err(SubscriptionManagementError::BlockHeaderAbsent)
};

if !self.announced_blocks.was_announced(first_header.parent_hash()) {
return Err(SubscriptionManagementError::Custom(
"Parent block was not reported for a finalized block".into(),
));
}

let parents =
std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
// Check if the block was already reported and thus, is already pinned.
if !self.sub_handle.pin_block(&self.sub_id, *hash)? {
continue
// Ensure the block is pinned before generating the events.
self.sub_handle.pin_block(&self.sub_id, *hash)?;

// Check if the block was already reported.
if self.announced_blocks.was_announced(hash) {
continue;
}

// Generate `NewBlock` events for all blocks beside the last block in the list
let is_last = i + 1 == finalized_block_hashes.len();
if !is_last {
// Generate only the `NewBlock` event for this block.
events.extend(self.generate_import_events(*hash, *parent, false));
self.announced_blocks.insert(*hash, true);
continue;
}

Expand All @@ -483,7 +592,8 @@ where
}

// Let's generate the `NewBlock` and `NewBestBlock` events for the block.
events.extend(self.generate_import_events(*hash, *parent, true))
events.extend(self.generate_import_events(*hash, *parent, true));
self.announced_blocks.insert(*hash, true);
}

Ok(events)
Expand Down Expand Up @@ -545,6 +655,10 @@ where
let pruned_block_hashes =
self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;

for finalized in &finalized_block_hashes {
self.announced_blocks.insert(*finalized, true);
}

let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
Expand Down
69 changes: 69 additions & 0 deletions substrate/client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4052,3 +4052,72 @@ async fn follow_report_best_block_of_a_known_block() {
});
assert_eq!(event, expected);
}

#[tokio::test]
async fn follow_event_with_unknown_parent() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let client = Arc::new(builder.build());

let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));

let api = ChainHead::new(
client_mock.clone(),
backend,
Arc::new(TaskExecutor::default()),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
.into_rpc();

let finalized_hash = client.info().finalized_hash;
let mut sub = api.subscribe_unbounded("chainHead_v1_follow", [false]).await.unwrap();
// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
finalized_block_runtime: None,
with_runtime: false,
});
assert_eq!(event, expected);

// Block tree:
//
// finalized -> (gap: block 1) -> block 2
//
// Block 1 is not announced yet. ChainHead should report the stop
// event when encountering an unknown parent of block 2.

// Note: `client` is used just for constructing the blocks.
// The blocks are imported to chainHead using the `client_mock`.
let block_1 = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_1_hash = block_1.hash();
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();

let block_2 = BlockBuilderBuilder::new(&*client)
.on_parent_block(block_1_hash)
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();

run_with_timeout(client_mock.trigger_import_stream(block_2.header)).await;
// When importing the block 2, chainHead detects a gap in our blocks and stops.
assert_matches!(get_next_event::<FollowEvent<String>>(&mut sub).await, FollowEvent::Stop);
}
Loading