diff --git a/Cargo.lock b/Cargo.lock index a9b6c68b50fb..c57a5ce1393d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15970,6 +15970,7 @@ dependencies = [ "sp-consensus", "sp-core", "sp-maybe-compressed-blob", + "sp-rpc", "sp-runtime", "sp-version", "substrate-test-runtime", diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index cfe7f8a117dd..ca61286ddfa0 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -21,6 +21,7 @@ sc-transaction-pool-api = { path = "../transaction-pool/api" } sp-core = { path = "../../primitives/core" } sp-runtime = { path = "../../primitives/runtime" } sp-api = { path = "../../primitives/api" } +sp-rpc = { path = "../../primitives/rpc" } sp-blockchain = { path = "../../primitives/blockchain" } sp-version = { path = "../../primitives/version" } sc-client-api = { path = "../api" } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs index d93c4018b60f..427b5499bc1e 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs @@ -21,6 +21,7 @@ //! API trait of the chain head. use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use sp_rpc::list::ListOrValue; #[rpc(client, server)] pub trait ChainHeadApi { @@ -109,16 +110,22 @@ pub trait ChainHeadApi { call_parameters: String, ) -> RpcResult; - /// Unpin a block reported by the `follow` method. + /// Unpin a block or multiple blocks reported by the `follow` method. /// /// Ongoing operations that require the provided block /// will continue normally. /// + /// When this method returns an error, it is guaranteed that no blocks have been unpinned. + /// /// # Unstable /// /// This method is unstable and subject to change in the future. #[method(name = "chainHead_unstable_unpin", blocking)] - fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>; + fn chain_head_unstable_unpin( + &self, + follow_subscription: String, + hash_or_hashes: ListOrValue, + ) -> RpcResult<()>; /// Resumes a storage fetch started with `chainHead_storage` after it has generated an /// `operationWaitingForContinue` event. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index a8c1c4f7e083..2d01c302037c 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -48,6 +48,7 @@ use sc_client_api::{ use sp_api::CallApiAt; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_core::{traits::CallContext, Bytes}; +use sp_rpc::list::ListOrValue; use sp_runtime::traits::Block as BlockT; use std::{marker::PhantomData, sync::Arc, time::Duration}; @@ -432,9 +433,16 @@ where fn chain_head_unstable_unpin( &self, follow_subscription: String, - hash: Block::Hash, + hash_or_hashes: ListOrValue, ) -> RpcResult<()> { - match self.subscriptions.unpin_block(&follow_subscription, hash) { + let result = match hash_or_hashes { + ListOrValue::Value(hash) => + self.subscriptions.unpin_blocks(&follow_subscription, [hash]), + ListOrValue::List(hashes) => + self.subscriptions.unpin_blocks(&follow_subscription, hashes), + }; + + match result { Ok(()) => Ok(()), Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 8a75029a9943..abd42ad96785 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -750,22 +750,36 @@ impl> SubscriptionsInner { } } - pub fn unpin_block( + pub fn unpin_blocks( &mut self, sub_id: &str, - hash: Block::Hash, + hashes: impl IntoIterator + Clone, ) -> Result<(), SubscriptionManagementError> { let Some(sub) = self.subs.get_mut(sub_id) else { return Err(SubscriptionManagementError::SubscriptionAbsent) }; - // Check that unpin was not called before and the block was pinned - // for this subscription. - if !sub.unregister_block(hash) { - return Err(SubscriptionManagementError::BlockHashAbsent) + // Ensure that all blocks are part of the subscription before removing individual + // blocks. + for hash in hashes.clone() { + if !sub.contains_block(hash) { + return Err(SubscriptionManagementError::BlockHashAbsent); + } + } + + // Note: this needs to be separate from the global mappings to avoid barrow checker + // thinking we borrow `&mut self` twice: once from `self.subs.get_mut` and once from + // `self.global_unregister_block`. Although the borrowing is correct, since different + // fields of the structure are borrowed, one at a time. + for hash in hashes.clone() { + sub.unregister_block(hash); + } + + // Block have been removed from the subscription. Remove them from the global tracking. + for hash in hashes { + self.global_unregister_block(hash); } - self.global_unregister_block(hash); Ok(()) } @@ -1029,11 +1043,11 @@ mod tests { assert_eq!(block.has_runtime(), true); let invalid_id = "abc-invalid".to_string(); - let err = subs.unpin_block(&invalid_id, hash).unwrap_err(); + let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err(); assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); // Unpin the block. - subs.unpin_block(&id, hash).unwrap(); + subs.unpin_blocks(&id, vec![hash]).unwrap(); let err = subs.lock_block(&id, hash, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); } @@ -1077,13 +1091,13 @@ mod tests { // Ensure the block propagated to the subscription. subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap(); - subs.unpin_block(&id, hash).unwrap(); + subs.unpin_blocks(&id, vec![hash]).unwrap(); assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1); // Cannot unpin a block twice for the same subscription. - let err = subs.unpin_block(&id, hash).unwrap_err(); + let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err(); assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); - subs.unpin_block(&id_second, hash).unwrap(); + subs.unpin_blocks(&id_second, vec![hash]).unwrap(); // Block unregistered from the memory. assert!(subs.global_blocks.get(&hash).is_none()); } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index b25b1a4913b4..c830e662da2e 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -94,22 +94,23 @@ impl> SubscriptionManagement { inner.pin_block(sub_id, hash) } - /// Unpin the block from the subscription. + /// Unpin the blocks from the subscription. /// - /// The last subscription that unpins the block is also unpinning the block - /// from the backend. + /// Blocks are reference counted and when the last subscription unpins a given block, the block + /// is also unpinned from the backend. /// /// This method is called only once per subscription. /// - /// Returns an error if the block is not pinned for the subscription or - /// the subscription ID is invalid. - pub fn unpin_block( + /// Returns an error if the subscription ID is invalid, or any of the blocks are not pinned + /// for the subscriptions. When an error is returned, it is guaranteed that no blocks have + /// been unpinned. + pub fn unpin_blocks( &self, sub_id: &str, - hash: Block::Hash, + hashes: impl IntoIterator + Clone, ) -> Result<(), SubscriptionManagementError> { let mut inner = self.inner.write(); - inner.unpin_block(sub_id, hash) + inner.unpin_blocks(sub_id, hashes) } /// Ensure the block remains pinned until the return object is dropped. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index c3f5564ebc4e..15b258c47567 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1591,14 +1591,17 @@ async fn follow_with_unpin() { // Unpin an invalid subscription ID must return Ok(()). let invalid_hash = hex_string(&INVALID_HASH); let _res: () = api - .call("chainHead_unstable_unpin", ["invalid_sub_id", &invalid_hash]) + .call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash]) .await .unwrap(); // Valid subscription with invalid block hash. let invalid_hash = hex_string(&INVALID_HASH); let err = api - .call::<_, serde_json::Value>("chainHead_unstable_unpin", [&sub_id, &invalid_hash]) + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, &invalid_hash], + ) .await .unwrap_err(); assert_matches!(err, @@ -1606,7 +1609,10 @@ async fn follow_with_unpin() { ); // To not exceed the number of pinned blocks, we need to unpin before the next import. - let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap(); + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash]) + .await + .unwrap(); // Block tree: // finalized_block -> block -> block2 @@ -1645,6 +1651,160 @@ async fn follow_with_unpin() { assert!(sub.next::>().await.is_none()); } +#[tokio::test] +async fn follow_with_multiple_unpin_hashes() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + // Import 3 blocks. + let block_1 = BlockBuilderBuilder::new(&*client) + .on_parent_block(client.chain_info().genesis_hash) + .with_parent_block_number(0) + .build() + .unwrap() + .build() + .unwrap() + .block; + let block_1_hash = block_1.header.hash(); + client.import(BlockOrigin::Own, block_1.clone()).await.unwrap(); + + let block_2 = BlockBuilderBuilder::new(&*client) + .on_parent_block(block_1.hash()) + .with_parent_block_number(1) + .build() + .unwrap() + .build() + .unwrap() + .block; + let block_2_hash = block_2.header.hash(); + client.import(BlockOrigin::Own, block_2.clone()).await.unwrap(); + + let block_3 = BlockBuilderBuilder::new(&*client) + .on_parent_block(block_2.hash()) + .with_parent_block_number(2) + .build() + .unwrap() + .build() + .unwrap() + .block; + let block_3_hash = block_3.header.hash(); + client.import(BlockOrigin::Own, block_3.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Unpin an invalid subscription ID must return Ok(()). + let invalid_hash = hex_string(&INVALID_HASH); + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + + // Valid subscription with invalid block hash. + let err = api + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, &invalid_hash], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_1_hash]) + .await + .unwrap(); + + // One block hash is invalid. Block 1 is already unpinned. + let err = api + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, vec![&block_1_hash, &block_2_hash, &block_3_hash]], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + // Unpin multiple blocks. + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params![&sub_id, vec![&block_2_hash, &block_3_hash]]) + .await + .unwrap(); + + // Check block 2 and 3 are unpinned. + let err = api + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, &block_2_hash], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); + + let err = api + .call::<_, serde_json::Value>( + "chainHead_unstable_unpin", + rpc_params![&sub_id, &block_3_hash], + ) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" + ); +} + #[tokio::test] async fn follow_prune_best_block() { let builder = TestClientBuilder::new(); @@ -1828,7 +1988,7 @@ async fn follow_prune_best_block() { let sub_id = sub.subscription_id(); let sub_id = serde_json::to_string(&sub_id).unwrap(); let hash = format!("{:?}", block_2_hash); - let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &hash]).await.unwrap(); + let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap(); } #[tokio::test] @@ -2305,7 +2465,10 @@ async fn pin_block_references() { wait_pinned_references(&backend, &hash, 1).await; // To not exceed the number of pinned blocks, we need to unpin before the next import. - let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap(); + let _res: () = api + .call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash]) + .await + .unwrap(); // Make sure unpin clears out the reference. let refs = backend.pin_refs(&hash).unwrap();