Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

rpc: Use the blocks pinning API for chainHead methods #13233

Merged
merged 48 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
a72b8a8
rpc/chain_head: Add backend to subscription management
lexnv Jan 24, 2023
d672047
rpc/chain_head: Pin blocks internally and adjust testing
lexnv Jan 24, 2023
c9a0d7e
client/in_mem: Reference for the number of pinned blocks
lexnv Jan 24, 2023
3536c94
rpc/tests: Check in-memory references to pinned blocks
lexnv Jan 24, 2023
bdfdb7d
rpc/chain_head: Fix clippy
lexnv Jan 25, 2023
43d6825
rpc/chain_head: Remove unused comment
lexnv Jan 25, 2023
6bc2e87
rpc/chain_head: Place subscription handle under `Arc` and unpin block…
lexnv Jan 25, 2023
9015061
rpc/tests: Check all pinned blocks are unpinned on drop
lexnv Jan 25, 2023
09773d3
Apply suggestions from code review
lexnv Jan 27, 2023
4a38133
Update client/rpc-spec-v2/src/chain_head/subscription.rs
lexnv Jan 27, 2023
e509399
rpc/tests: Retry fetching the pinned references for CI correctness
lexnv Jan 27, 2023
3c2cc6c
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
lexnv Jan 30, 2023
c6bfb6f
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
Feb 7, 2023
09fe732
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
Feb 14, 2023
b7dcacd
client/service: Use 512 as maximum number of pinned blocks
lexnv Feb 14, 2023
d67c6ed
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
lexnv Mar 17, 2023
fa5323f
chain_head: Fix merging conflicts
lexnv Mar 17, 2023
852c302
rpc/chain_head: Adjust subscriptions to use pinning API
lexnv Mar 20, 2023
cdd1428
rpc/chain_head/tests: Test subscription management
lexnv Mar 20, 2023
ad9e921
rpc/chain_head: Adjust chain_head follow to the new API
lexnv Mar 20, 2023
866bb87
rpc/chain_head: Adjust chain_head.rs to the new API
lexnv Mar 20, 2023
b2e08ce
rpc/chain_head/tests: Adjust test.rs to the new API
lexnv Mar 20, 2023
b63f102
client/builder: Use new chainHead API
lexnv Mar 20, 2023
c46d7d4
rpc/chain_head: Fix documentation
lexnv Mar 20, 2023
7da0406
rpc/chain_head: Fix clippy
lexnv Mar 20, 2023
5561e56
client/in_mem: ChainHead no longer uses `in_mem::children`
lexnv Mar 21, 2023
d36f748
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
lexnv Mar 23, 2023
ff3c9d5
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
lexnv Mar 28, 2023
9781584
Update client/rpc-spec-v2/src/chain_head/subscription.rs
lexnv Mar 30, 2023
cc31043
Update client/rpc-spec-v2/src/chain_head/subscription.rs
lexnv Mar 30, 2023
efb14c4
Update client/rpc-spec-v2/src/chain_head/subscription.rs
lexnv Mar 30, 2023
10b61d5
Update client/rpc-spec-v2/src/chain_head/subscription.rs
lexnv Mar 30, 2023
c5266cc
chain_head: Add block state machine
lexnv Mar 30, 2023
f5a911f
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
lexnv Mar 30, 2023
7bf9da0
Address feedback
lexnv Apr 24, 2023
6c054ba
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
lexnv Apr 24, 2023
6a3f9e5
Use new_native_or_wasm_executor
lexnv Apr 24, 2023
d287533
chain_head: Remove 'static on Backend
lexnv Apr 25, 2023
6073ae2
chain_head: Add documentation
lexnv Apr 25, 2023
871dcb8
chain_head: Lock blocks before async blocks
lexnv Apr 25, 2023
083e3c3
chain_head_follower: Remove static on backend
lexnv Apr 25, 2023
1bb4b00
Update client/service/src/builder.rs
lexnv Apr 25, 2023
04a167c
Update client/service/src/builder.rs
lexnv Apr 25, 2023
cc24ad1
chain_head: Add BlockHeaderAbsent to the PartialEq impl
lexnv Apr 25, 2023
b3ca736
client: Add better documentation around pinning constants
lexnv Apr 25, 2023
3b1764f
chain_head: Move subscription to dedicated module
lexnv Apr 25, 2023
9cd53f1
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pin…
lexnv Apr 25, 2023
8b0eaff
subscription: Rename global pin / unpin functions
lexnv Apr 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 23 additions & 2 deletions client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,20 +624,36 @@ where
states: RwLock<HashMap<Block::Hash, InMemoryBackend<HashFor<Block>>>>,
blockchain: Blockchain<Block>,
import_lock: RwLock<()>,
pinned_blocks: RwLock<HashMap<Block::Hash, i64>>,
}

impl<Block: BlockT> Backend<Block>
where
Block::Hash: Ord,
{
/// Create a new instance of in-mem backend.
///
/// # Warning
///
/// For testing purposes only!
pub fn new() -> Self {
Backend {
states: RwLock::new(HashMap::new()),
blockchain: Blockchain::new(),
import_lock: Default::default(),
pinned_blocks: Default::default(),
}
}

/// Return the number of references active for a pinned block.
///
/// # Warning
///
/// For testing purposes only!
pub fn pin_refs(&self, hash: &<Block as BlockT>::Hash) -> Option<i64> {
let blocks = self.pinned_blocks.read();
blocks.get(hash).map(|value| *value)
}
}

impl<Block: BlockT> backend::AuxStore for Backend<Block>
Expand Down Expand Up @@ -787,11 +803,16 @@ where
false
}

fn pin_block(&self, _: <Block as BlockT>::Hash) -> blockchain::Result<()> {
fn pin_block(&self, hash: <Block as BlockT>::Hash) -> blockchain::Result<()> {
let mut blocks = self.pinned_blocks.write();
*blocks.entry(hash).or_default() += 1;
Ok(())
}

fn unpin_block(&self, _: <Block as BlockT>::Hash) {}
fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
let mut blocks = self.pinned_blocks.write();
blocks.entry(hash).and_modify(|counter| *counter -= 1).or_insert(-1);
}
}

impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> where Block::Hash: Ord {}
Expand Down
1 change: 1 addition & 0 deletions client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime"
sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" }
sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" }
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
assert_matches = "1.3.0"
152 changes: 92 additions & 60 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig},
subscription::SubscriptionManagement,
subscription::{SubscriptionManagement, SubscriptionManagementError},
},
SubscriptionTaskExecutor,
};
Expand All @@ -44,46 +44,48 @@ use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::Arc, time::Duration};

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";

/// An API for chain head RPC calls.
pub struct ChainHead<BE, Block: BlockT, Client> {
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
client: Arc<Client>,
/// Backend of the chain.
backend: Arc<BE>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// Keep track of the pinned blocks for each subscription.
subscriptions: Arc<SubscriptionManagement<Block>>,
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of pinned blocks allowed per connection.
max_pinned_blocks: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}

impl<BE, Block: BlockT, Client> ChainHead<BE, Block, Client> {
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
/// Create a new [`ChainHead`].
pub fn new<GenesisHash: AsRef<[u8]>>(
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
max_pinned_duration: Duration,
) -> Self {
let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref()));

Self {
client,
backend,
backend: backend.clone(),
executor,
subscriptions: Arc::new(SubscriptionManagement::new()),
subscriptions: Arc::new(SubscriptionManagement::new(
max_pinned_blocks,
max_pinned_duration,
backend,
)),
genesis_hash,
max_pinned_blocks,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -159,9 +161,8 @@ where
return Err(err)
},
};

// Keep track of the subscription.
let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else {
let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates) else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
Expand All @@ -177,7 +178,7 @@ where
let mut chain_head_follow = ChainHeadFollower::new(
client,
backend,
sub_handle,
subscriptions.clone(),
runtime_updates,
sub_id.clone(),
);
Expand All @@ -202,19 +203,28 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();

let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};

// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};

let fut = async move {
let _block_guard = block_guard;
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block.block.extrinsics();
Expand All @@ -226,10 +236,10 @@ where
debug!(
target: LOG_TARGET,
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
follow_subscription,
&follow_subscription,
hash
);
handle.stop();
subscriptions.remove_subscription(&follow_subscription);
ChainHeadEvent::<String>::Disjoint
},
Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }),
Expand All @@ -246,16 +256,19 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
return Ok(None)
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(None)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Err(ChainHeadRpcError::InvalidBlock.into())
}

self.client
.header(hash)
.map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode()))))
Expand Down Expand Up @@ -286,19 +299,28 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();

let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};

// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};

let fut = async move {
let _block_guard = block_guard;
// The child key is provided, use the key to query the child trie.
if let Some(child_key) = child_key {
// The child key must not be prefixed with ":child_storage:" nor
Expand Down Expand Up @@ -367,21 +389,29 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();

let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};

// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};

let fut = async move {
// Reject subscription if runtime_updates is false.
if !handle.has_runtime_updates() {
if !block_guard.has_runtime_updates() {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"The runtime updates flag must be set".into(),
));
Expand Down Expand Up @@ -417,15 +447,17 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<()> {
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
return Ok(())
};

if !handle.unpin_block(&hash) {
return Err(ChainHeadRpcError::InvalidBlock.into())
match self.subscriptions.unpin_block(&follow_subscription, hash) {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
}

Ok(())
}
}
Loading