From 6b95f4ee3d26ab7a41bc137c2a08dceec29679ac Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 13:45:19 +0200 Subject: [PATCH 1/9] chainHead/unpin: Support multiple hashes for unpinning Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/chain_head/api.rs | 8 +++- .../rpc-spec-v2/src/chain_head/chain_head.rs | 3 +- .../client/rpc-spec-v2/src/chain_head/mod.rs | 11 +++++ .../src/chain_head/subscription/inner.rs | 45 +++++++++++++++---- .../src/chain_head/subscription/mod.rs | 4 +- 5 files changed, 60 insertions(+), 11 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs index d93c4018b60f..659cbff153ef 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs @@ -22,6 +22,8 @@ use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use super::HashOrHashes; + #[rpc(client, server)] pub trait ChainHeadApi { /// Track the state of the head of the chain: the finalized, non-finalized, and best blocks. @@ -118,7 +120,11 @@ pub trait ChainHeadApi { /// /// This method is unstable and subject to change in the future. #[method(name = "chainHead_unstable_unpin", blocking)] - fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>; + fn chain_head_unstable_unpin( + &self, + follow_subscription: String, + hash: HashOrHashes, + ) -> RpcResult<()>; /// Resumes a storage fetch started with `chainHead_storage` after it has generated an /// `operationWaitingForContinue` event. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index a8c1c4f7e083..4ee07001d79f 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -21,6 +21,7 @@ use super::{ chain_head_storage::ChainHeadStorage, event::{MethodResponseStarted, OperationBodyDone, OperationCallDone}, + HashOrHashes, }; use crate::{ chain_head::{ @@ -432,7 +433,7 @@ where fn chain_head_unstable_unpin( &self, follow_subscription: String, - hash: Block::Hash, + hash: HashOrHashes, ) -> RpcResult<()> { match self.subscriptions.unpin_block(&follow_subscription, hash) { Ok(()) => Ok(()), diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs index 1bd228857802..410fe7bb7223 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -43,9 +43,20 @@ pub use event::{ RuntimeVersionEvent, }; +use serde::{Deserialize, Serialize}; use sp_core::hexdisplay::{AsBytesRef, HexDisplay}; /// Util function to print the results of `chianHead` as hex string pub(crate) fn hex_string(data: &Data) -> String { format!("0x{:?}", HexDisplay::from(data)) } + +/// A parameter type that can be either a single hash or a list of hashes. +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(untagged)] +pub enum HashOrHashes { + /// Single hash. + Hash(Hash), + /// List of hashes. + List(Vec), +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 8a75029a9943..2179dc348448 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -27,7 +27,7 @@ use std::{ time::{Duration, Instant}, }; -use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent}; +use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent, HashOrHashes}; /// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings. const QUEUE_SIZE_WARNING: usize = 512; @@ -753,19 +753,48 @@ impl> SubscriptionsInner { pub fn unpin_block( &mut self, sub_id: &str, - hash: Block::Hash, + hash: HashOrHashes, ) -> Result<(), SubscriptionManagementError> { - let Some(sub) = self.subs.get_mut(sub_id) else { + // Check if the subscription ID is valid or not. + if self.subs.get(sub_id).is_none() { return Err(SubscriptionManagementError::SubscriptionAbsent) }; - // Check that unpin was not called before and the block was pinned - // for this subscription. - if !sub.unregister_block(hash) { - return Err(SubscriptionManagementError::BlockHashAbsent) + let hashes = match hash { + HashOrHashes::Hash(hash) => vec![hash], + HashOrHashes::List(hashes) => hashes, + }; + + // Ensure that all blocks are part of the subscription. + { + let sub = self.subs.get(sub_id).expect("Subscription ID is present from above; qed"); + + for hash in &hashes { + if !sub.contains_block(*hash) { + return Err(SubscriptionManagementError::BlockHashAbsent); + } + } + } + + for hash in hashes { + // Get a short-lived `&mut SubscriptionState` to avoid borrowing self twice. + // - once from `self.subs` + // - once from `self.global_unregister_block()`. + // + // Borrowing `self.sub` and `self.global_unregister_block` is again safe because they + // operate on different fields, but the compiler is not capable of introspecting at that + // level, not even with `#[inline(always)]` on `global_unregister_block`. + // + // This is safe because we already checked that the subscription ID is valid and we + // operate under a lock. + let sub = + self.subs.get_mut(sub_id).expect("Subscription ID is present from above; qed"); + + // Block was checked above for presence. + sub.unregister_block(hash); + self.global_unregister_block(hash); } - self.global_unregister_block(hash); Ok(()) } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index b25b1a4913b4..37886840d037 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -30,6 +30,8 @@ pub use self::inner::OperationState; pub use error::SubscriptionManagementError; pub use inner::{BlockGuard, InsertedSubscriptionData}; +use super::HashOrHashes; + /// Manage block pinning / unpinning for subscription IDs. pub struct SubscriptionManagement> { /// Manage subscription by mapping the subscription ID @@ -106,7 +108,7 @@ impl> SubscriptionManagement { pub fn unpin_block( &self, sub_id: &str, - hash: Block::Hash, + hash: HashOrHashes, ) -> Result<(), SubscriptionManagementError> { let mut inner = self.inner.write(); inner.unpin_block(sub_id, hash) From 8d3d1017969cf05a80ccc44013f054b69a0f230d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 13:50:53 +0200 Subject: [PATCH 2/9] chainHead/unpin: Adjust inner API to use a list of blocks Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/chain_head/api.rs | 4 +++- .../rpc-spec-v2/src/chain_head/chain_head.rs | 7 ++++++- .../src/chain_head/subscription/inner.rs | 11 +++-------- .../src/chain_head/subscription/mod.rs | 19 +++++++++---------- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs index 659cbff153ef..c4f2eee17299 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs @@ -111,11 +111,13 @@ pub trait ChainHeadApi { call_parameters: String, ) -> RpcResult; - /// Unpin a block reported by the `follow` method. + /// Unpin a block or multiple blocks reported by the `follow` method. /// /// Ongoing operations that require the provided block /// will continue normally. /// + /// When this method returns an error, it is guaranteed that no blocks have been unpinned. + /// /// # Unstable /// /// This method is unstable and subject to change in the future. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 4ee07001d79f..2c7bcd04f847 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -435,7 +435,12 @@ where follow_subscription: String, hash: HashOrHashes, ) -> RpcResult<()> { - match self.subscriptions.unpin_block(&follow_subscription, hash) { + let hashes = match hash { + HashOrHashes::Hash(hash) => vec![hash], + HashOrHashes::List(hashes) => hashes, + }; + + match self.subscriptions.unpin_blocks(&follow_subscription, hashes) { Ok(()) => Ok(()), Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 2179dc348448..cdc7e73f1ef2 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -27,7 +27,7 @@ use std::{ time::{Duration, Instant}, }; -use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent, HashOrHashes}; +use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent}; /// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings. const QUEUE_SIZE_WARNING: usize = 512; @@ -750,21 +750,16 @@ impl> SubscriptionsInner { } } - pub fn unpin_block( + pub fn unpin_blocks( &mut self, sub_id: &str, - hash: HashOrHashes, + hashes: Vec, ) -> Result<(), SubscriptionManagementError> { // Check if the subscription ID is valid or not. if self.subs.get(sub_id).is_none() { return Err(SubscriptionManagementError::SubscriptionAbsent) }; - let hashes = match hash { - HashOrHashes::Hash(hash) => vec![hash], - HashOrHashes::List(hashes) => hashes, - }; - // Ensure that all blocks are part of the subscription. { let sub = self.subs.get(sub_id).expect("Subscription ID is present from above; qed"); diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 37886840d037..2a3331fafe29 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -30,8 +30,6 @@ pub use self::inner::OperationState; pub use error::SubscriptionManagementError; pub use inner::{BlockGuard, InsertedSubscriptionData}; -use super::HashOrHashes; - /// Manage block pinning / unpinning for subscription IDs. pub struct SubscriptionManagement> { /// Manage subscription by mapping the subscription ID @@ -96,22 +94,23 @@ impl> SubscriptionManagement { inner.pin_block(sub_id, hash) } - /// Unpin the block from the subscription. + /// Unpin the blocks from the subscription. /// - /// The last subscription that unpins the block is also unpinning the block - /// from the backend. + /// Blocks are reference counted and when the last subscription unpins a given block, the block + /// is also unpinned from the backend. /// /// This method is called only once per subscription. /// - /// Returns an error if the block is not pinned for the subscription or - /// the subscription ID is invalid. - pub fn unpin_block( + /// Returns an error if the subscription ID is invalid, or any of the blocks are not pinned + /// for the subscriptions. When an error is returned, it is guaranteed that no blocks have + /// been unpinned. + pub fn unpin_blocks( &self, sub_id: &str, - hash: HashOrHashes, + hashes: Vec, ) -> Result<(), SubscriptionManagementError> { let mut inner = self.inner.write(); - inner.unpin_block(sub_id, hash) + inner.unpin_blocks(sub_id, hashes) } /// Ensure the block remains pinned until the return object is dropped. From 2b421fc274e4fba0494dea4e6b5837077c2e044e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 14:52:13 +0200 Subject: [PATCH 3/9] chainHead/tests: Adjust subscription unit tests Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/subscription/inner.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index cdc7e73f1ef2..c08b0a5702a8 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -1053,11 +1053,11 @@ mod tests { assert_eq!(block.has_runtime(), true); let invalid_id = "abc-invalid".to_string(); - let err = subs.unpin_block(&invalid_id, hash).unwrap_err(); + let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err(); assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); // Unpin the block. - subs.unpin_block(&id, hash).unwrap(); + subs.unpin_blocks(&id, vec![hash]).unwrap(); let err = subs.lock_block(&id, hash, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); } @@ -1101,13 +1101,13 @@ mod tests { // Ensure the block propagated to the subscription. subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap(); - subs.unpin_block(&id, hash).unwrap(); + subs.unpin_blocks(&id, vec![hash]).unwrap(); assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1); // Cannot unpin a block twice for the same subscription. - let err = subs.unpin_block(&id, hash).unwrap_err(); + let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err(); assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); - subs.unpin_block(&id_second, hash).unwrap(); + subs.unpin_blocks(&id_second, vec![hash]).unwrap(); // Block unregistered from the memory. assert!(subs.global_blocks.get(&hash).is_none()); } From 856d0f0ea2289c11f60e76ac96a6cc3d0c7136a1 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 16:04:36 +0200 Subject: [PATCH 4/9] chainHead/tests: Check multiple hashes unpinned at once Signed-off-by: Alexandru Vasile --- Cargo.lock | 1 + substrate/client/rpc-spec-v2/Cargo.toml | 1 + .../client/rpc-spec-v2/src/chain_head/api.rs | 5 +- .../rpc-spec-v2/src/chain_head/chain_head.rs | 8 +- .../client/rpc-spec-v2/src/chain_head/mod.rs | 11 -- .../rpc-spec-v2/src/chain_head/tests.rs | 173 +++++++++++++++++- 6 files changed, 176 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a091ce6817d..42f1bce5baa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15978,6 +15978,7 @@ dependencies = [ "sp-consensus", "sp-core", "sp-maybe-compressed-blob", + "sp-rpc", "sp-runtime", "sp-version", "substrate-test-runtime", diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index cfe7f8a117dd..ca61286ddfa0 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -21,6 +21,7 @@ sc-transaction-pool-api = { path = "../transaction-pool/api" } sp-core = { path = "../../primitives/core" } sp-runtime = { path = "../../primitives/runtime" } sp-api = { path = "../../primitives/api" } +sp-rpc = { path = "../../primitives/rpc" } sp-blockchain = { path = "../../primitives/blockchain" } sp-version = { path = "../../primitives/version" } sc-client-api = { path = "../api" } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs index c4f2eee17299..108d5cdac970 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs @@ -21,8 +21,7 @@ //! API trait of the chain head. use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; - -use super::HashOrHashes; +use sp_rpc::list::ListOrValue; #[rpc(client, server)] pub trait ChainHeadApi { @@ -125,7 +124,7 @@ pub trait ChainHeadApi { fn chain_head_unstable_unpin( &self, follow_subscription: String, - hash: HashOrHashes, + hash: ListOrValue, ) -> RpcResult<()>; /// Resumes a storage fetch started with `chainHead_storage` after it has generated an diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 2c7bcd04f847..b82dbd8e8cb8 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -21,7 +21,6 @@ use super::{ chain_head_storage::ChainHeadStorage, event::{MethodResponseStarted, OperationBodyDone, OperationCallDone}, - HashOrHashes, }; use crate::{ chain_head::{ @@ -49,6 +48,7 @@ use sc_client_api::{ use sp_api::CallApiAt; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_core::{traits::CallContext, Bytes}; +use sp_rpc::list::ListOrValue; use sp_runtime::traits::Block as BlockT; use std::{marker::PhantomData, sync::Arc, time::Duration}; @@ -433,11 +433,11 @@ where fn chain_head_unstable_unpin( &self, follow_subscription: String, - hash: HashOrHashes, + hash: ListOrValue, ) -> RpcResult<()> { let hashes = match hash { - HashOrHashes::Hash(hash) => vec![hash], - HashOrHashes::List(hashes) => hashes, + ListOrValue::Value(hash) => vec![hash], + ListOrValue::List(hashes) => hashes, }; match self.subscriptions.unpin_blocks(&follow_subscription, hashes) { diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs index 410fe7bb7223..1bd228857802 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -43,20 +43,9 @@ pub use event::{ RuntimeVersionEvent, }; -use serde::{Deserialize, Serialize}; use sp_core::hexdisplay::{AsBytesRef, HexDisplay}; /// Util function to print the results of `chianHead` as hex string pub(crate) fn hex_string(data: &Data) -> String { format!("0x{:?}", HexDisplay::from(data)) } - -/// A parameter type that can be either a single hash or a list of hashes. -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] -#[serde(untagged)] -pub enum HashOrHashes { - /// Single hash. - Hash(Hash), - /// List of hashes. - List(Vec), -} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index c3f5564ebc4e..15b258c47567 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1591,14 +1591,17 @@ async fn follow_with_unpin() { // Unpin an invalid subscription ID must return Ok(()). let invalid_hash = hex_string(&INVALID_HASH); let _res: () = api - .call("chainHead_unstable_unpin", ["invalid_sub_id", &invalid_hash]) + .call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash]) .await .unwrap(); // Valid subscription with invalid block hash. let invalid_hash = hex_string(&INVALID_HASH); let err = api - .call::<_, serde_json::Value>("chainHead_unstable_unpin", [&sub_id, &invalid_hash]) + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, &invalid_hash], + ) .await .unwrap_err(); assert_matches!(err, @@ -1606,7 +1609,10 @@ async fn follow_with_unpin() { ); // To not exceed the number of pinned blocks, we need to unpin before the next import. - let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap(); + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash]) + .await + .unwrap(); // Block tree: // finalized_block -> block -> block2 @@ -1645,6 +1651,160 @@ async fn follow_with_unpin() { assert!(sub.next::>().await.is_none()); } +#[tokio::test] +async fn follow_with_multiple_unpin_hashes() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + // Import 3 blocks. + let block_1 = BlockBuilderBuilder::new(&*client) + .on_parent_block(client.chain_info().genesis_hash) + .with_parent_block_number(0) + .build() + .unwrap() + .build() + .unwrap() + .block; + let block_1_hash = block_1.header.hash(); + client.import(BlockOrigin::Own, block_1.clone()).await.unwrap(); + + let block_2 = BlockBuilderBuilder::new(&*client) + .on_parent_block(block_1.hash()) + .with_parent_block_number(1) + .build() + .unwrap() + .build() + .unwrap() + .block; + let block_2_hash = block_2.header.hash(); + client.import(BlockOrigin::Own, block_2.clone()).await.unwrap(); + + let block_3 = BlockBuilderBuilder::new(&*client) + .on_parent_block(block_2.hash()) + .with_parent_block_number(2) + .build() + .unwrap() + .build() + .unwrap() + .block; + let block_3_hash = block_3.header.hash(); + client.import(BlockOrigin::Own, block_3.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Unpin an invalid subscription ID must return Ok(()). + let invalid_hash = hex_string(&INVALID_HASH); + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + + // Valid subscription with invalid block hash. + let err = api + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, &invalid_hash], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_1_hash]) + .await + .unwrap(); + + // One block hash is invalid. Block 1 is already unpinned. + let err = api + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, vec![&block_1_hash, &block_2_hash, &block_3_hash]], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + // Unpin multiple blocks. + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params![&sub_id, vec![&block_2_hash, &block_3_hash]]) + .await + .unwrap(); + + // Check block 2 and 3 are unpinned. + let err = api + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, &block_2_hash], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + let err = api + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, &block_3_hash], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); +} + #[tokio::test] async fn follow_prune_best_block() { let builder = TestClientBuilder::new(); @@ -1828,7 +1988,7 @@ async fn follow_prune_best_block() { let sub_id = sub.subscription_id(); let sub_id = serde_json::to_string(&sub_id).unwrap(); let hash = format!("{:?}", block_2_hash); - let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &hash]).await.unwrap(); + let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap(); } #[tokio::test] @@ -2305,7 +2465,10 @@ async fn pin_block_references() { wait_pinned_references(&backend, &hash, 1).await; // To not exceed the number of pinned blocks, we need to unpin before the next import. - let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap(); + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash]) + .await + .unwrap(); // Make sure unpin clears out the reference. let refs = backend.pin_refs(&hash).unwrap(); From b8004de0a8f21b72e7f9e101032db2755f19e6b0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 16:06:29 +0200 Subject: [PATCH 5/9] chainHead/unpin: Rename param from hash to hash_or_hashes Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/chain_head/api.rs | 2 +- substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs index 108d5cdac970..427b5499bc1e 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs @@ -124,7 +124,7 @@ pub trait ChainHeadApi { fn chain_head_unstable_unpin( &self, follow_subscription: String, - hash: ListOrValue, + hash_or_hashes: ListOrValue, ) -> RpcResult<()>; /// Resumes a storage fetch started with `chainHead_storage` after it has generated an diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index b82dbd8e8cb8..0002bd1c247b 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -433,9 +433,9 @@ where fn chain_head_unstable_unpin( &self, follow_subscription: String, - hash: ListOrValue, + hash_or_hashes: ListOrValue, ) -> RpcResult<()> { - let hashes = match hash { + let hashes = match hash_or_hashes { ListOrValue::Value(hash) => vec![hash], ListOrValue::List(hashes) => hashes, }; From 0e7203113ac575051e42e4674f7d3b96e1d93d5f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 17:00:03 +0200 Subject: [PATCH 6/9] chainHead: Avoid HashMap::get method Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 32 ++++++------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index c08b0a5702a8..d537979cd126 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -755,38 +755,26 @@ impl> SubscriptionsInner { sub_id: &str, hashes: Vec, ) -> Result<(), SubscriptionManagementError> { - // Check if the subscription ID is valid or not. - if self.subs.get(sub_id).is_none() { - return Err(SubscriptionManagementError::SubscriptionAbsent) - }; - - // Ensure that all blocks are part of the subscription. { - let sub = self.subs.get(sub_id).expect("Subscription ID is present from above; qed"); + let Some(sub) = self.subs.get_mut(sub_id) else { + return Err(SubscriptionManagementError::SubscriptionAbsent) + }; + // Ensure that all blocks are part of the subscription before removing individual + // blocks. for hash in &hashes { if !sub.contains_block(*hash) { return Err(SubscriptionManagementError::BlockHashAbsent); } } + + for hash in &hashes { + sub.unregister_block(*hash); + } } + // Block have been removed from the subscription. Remove them from the global tracking. for hash in hashes { - // Get a short-lived `&mut SubscriptionState` to avoid borrowing self twice. - // - once from `self.subs` - // - once from `self.global_unregister_block()`. - // - // Borrowing `self.sub` and `self.global_unregister_block` is again safe because they - // operate on different fields, but the compiler is not capable of introspecting at that - // level, not even with `#[inline(always)]` on `global_unregister_block`. - // - // This is safe because we already checked that the subscription ID is valid and we - // operate under a lock. - let sub = - self.subs.get_mut(sub_id).expect("Subscription ID is present from above; qed"); - - // Block was checked above for presence. - sub.unregister_block(hash); self.global_unregister_block(hash); } From cc2d6530dac3ddda0214ae3c70523625bac1f04d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 17:03:43 +0200 Subject: [PATCH 7/9] chainHead: Remove extra indentation block Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index d537979cd126..c0808eafd266 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -755,22 +755,20 @@ impl> SubscriptionsInner { sub_id: &str, hashes: Vec, ) -> Result<(), SubscriptionManagementError> { - { - let Some(sub) = self.subs.get_mut(sub_id) else { - return Err(SubscriptionManagementError::SubscriptionAbsent) - }; + let Some(sub) = self.subs.get_mut(sub_id) else { + return Err(SubscriptionManagementError::SubscriptionAbsent) + }; - // Ensure that all blocks are part of the subscription before removing individual - // blocks. - for hash in &hashes { - if !sub.contains_block(*hash) { - return Err(SubscriptionManagementError::BlockHashAbsent); - } + // Ensure that all blocks are part of the subscription before removing individual + // blocks. + for hash in &hashes { + if !sub.contains_block(*hash) { + return Err(SubscriptionManagementError::BlockHashAbsent); } + } - for hash in &hashes { - sub.unregister_block(*hash); - } + for hash in &hashes { + sub.unregister_block(*hash); } // Block have been removed from the subscription. Remove them from the global tracking. From f2a1cfaea926a822993cf787b98eca64a604e2a5 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 17:05:23 +0200 Subject: [PATCH 8/9] chainHead: Add comment wrt borrow checker Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/chain_head/subscription/inner.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index c0808eafd266..c13e2de623a7 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -767,6 +767,10 @@ impl> SubscriptionsInner { } } + // Note: this needs to be separate from the global mappings to avoid barrow checker + // thinking we borrow `&mut self` twice: once from `self.subs.get_mut` and once from + // `self.global_unregister_block`. Although the borrowing is correct, since different + // fields of the structure are borrowed, one at a time. for hash in &hashes { sub.unregister_block(*hash); } From b552fb9a04542a9ddf3dc94c941aab40716e8872 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 13 Nov 2023 20:10:16 +0200 Subject: [PATCH 9/9] chainHead: Use IntoIter interface instead of Vec Signed-off-by: Alexandru Vasile --- .../client/rpc-spec-v2/src/chain_head/chain_head.rs | 10 ++++++---- .../rpc-spec-v2/src/chain_head/subscription/inner.rs | 10 +++++----- .../rpc-spec-v2/src/chain_head/subscription/mod.rs | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 0002bd1c247b..2d01c302037c 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -435,12 +435,14 @@ where follow_subscription: String, hash_or_hashes: ListOrValue, ) -> RpcResult<()> { - let hashes = match hash_or_hashes { - ListOrValue::Value(hash) => vec![hash], - ListOrValue::List(hashes) => hashes, + let result = match hash_or_hashes { + ListOrValue::Value(hash) => + self.subscriptions.unpin_blocks(&follow_subscription, [hash]), + ListOrValue::List(hashes) => + self.subscriptions.unpin_blocks(&follow_subscription, hashes), }; - match self.subscriptions.unpin_blocks(&follow_subscription, hashes) { + match result { Ok(()) => Ok(()), Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index c13e2de623a7..abd42ad96785 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -753,7 +753,7 @@ impl> SubscriptionsInner { pub fn unpin_blocks( &mut self, sub_id: &str, - hashes: Vec, + hashes: impl IntoIterator + Clone, ) -> Result<(), SubscriptionManagementError> { let Some(sub) = self.subs.get_mut(sub_id) else { return Err(SubscriptionManagementError::SubscriptionAbsent) @@ -761,8 +761,8 @@ impl> SubscriptionsInner { // Ensure that all blocks are part of the subscription before removing individual // blocks. - for hash in &hashes { - if !sub.contains_block(*hash) { + for hash in hashes.clone() { + if !sub.contains_block(hash) { return Err(SubscriptionManagementError::BlockHashAbsent); } } @@ -771,8 +771,8 @@ impl> SubscriptionsInner { // thinking we borrow `&mut self` twice: once from `self.subs.get_mut` and once from // `self.global_unregister_block`. Although the borrowing is correct, since different // fields of the structure are borrowed, one at a time. - for hash in &hashes { - sub.unregister_block(*hash); + for hash in hashes.clone() { + sub.unregister_block(hash); } // Block have been removed from the subscription. Remove them from the global tracking. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 2a3331fafe29..c830e662da2e 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -107,7 +107,7 @@ impl> SubscriptionManagement { pub fn unpin_blocks( &self, sub_id: &str, - hashes: Vec, + hashes: impl IntoIterator + Clone, ) -> Result<(), SubscriptionManagementError> { let mut inner = self.inner.write(); inner.unpin_blocks(sub_id, hashes)