Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chainHead: Support multiple hashes for chainHead_unpin method #2295

Merged
merged 10 commits into from
Nov 14, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ sc-transaction-pool-api = { path = "../transaction-pool/api" }
sp-core = { path = "../../primitives/core" }
sp-runtime = { path = "../../primitives/runtime" }
sp-api = { path = "../../primitives/api" }
sp-rpc = { path = "../../primitives/rpc" }
sp-blockchain = { path = "../../primitives/blockchain" }
sp-version = { path = "../../primitives/version" }
sc-client-api = { path = "../api" }
Expand Down
11 changes: 9 additions & 2 deletions substrate/client/rpc-spec-v2/src/chain_head/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! API trait of the chain head.
use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use sp_rpc::list::ListOrValue;

#[rpc(client, server)]
pub trait ChainHeadApi<Hash> {
Expand Down Expand Up @@ -109,16 +110,22 @@ pub trait ChainHeadApi<Hash> {
call_parameters: String,
) -> RpcResult<MethodResponse>;

/// Unpin a block reported by the `follow` method.
/// Unpin a block or multiple blocks reported by the `follow` method.
///
/// Ongoing operations that require the provided block
/// will continue normally.
///
/// When this method returns an error, it is guaranteed that no blocks have been unpinned.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_unpin", blocking)]
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash_or_hashes: ListOrValue<Hash>,
) -> RpcResult<()>;

/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
/// `operationWaitingForContinue` event.
Expand Down
10 changes: 8 additions & 2 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use sc_client_api::{
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{traits::CallContext, Bytes};
use sp_rpc::list::ListOrValue;
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};

Expand Down Expand Up @@ -432,9 +433,14 @@ where
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash: Block::Hash,
hash_or_hashes: ListOrValue<Block::Hash>,
) -> RpcResult<()> {
match self.subscriptions.unpin_block(&follow_subscription, hash) {
let hashes = match hash_or_hashes {
ListOrValue::Value(hash) => vec![hash],
lexnv marked this conversation as resolved.
Show resolved Hide resolved
ListOrValue::List(hashes) => hashes,
};

match self.subscriptions.unpin_blocks(&follow_subscription, hashes) {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
Expand Down
42 changes: 27 additions & 15 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,22 +750,34 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
}
}

pub fn unpin_block(
pub fn unpin_blocks(
&mut self,
sub_id: &str,
hash: Block::Hash,
hashes: Vec<Block::Hash>,
) -> Result<(), SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};
{
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
};

// Check that unpin was not called before and the block was pinned
// for this subscription.
if !sub.unregister_block(hash) {
return Err(SubscriptionManagementError::BlockHashAbsent)
// Ensure that all blocks are part of the subscription before removing individual
// blocks.
for hash in &hashes {
if !sub.contains_block(*hash) {
return Err(SubscriptionManagementError::BlockHashAbsent);
}
}

for hash in &hashes {
sub.unregister_block(*hash);
}
}

// Block have been removed from the subscription. Remove them from the global tracking.
for hash in hashes {
self.global_unregister_block(hash);
}

self.global_unregister_block(hash);
Ok(())
}

Expand Down Expand Up @@ -1029,11 +1041,11 @@ mod tests {
assert_eq!(block.has_runtime(), true);

let invalid_id = "abc-invalid".to_string();
let err = subs.unpin_block(&invalid_id, hash).unwrap_err();
let err = subs.unpin_blocks(&invalid_id, vec![hash]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);

// Unpin the block.
subs.unpin_block(&id, hash).unwrap();
subs.unpin_blocks(&id, vec![hash]).unwrap();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
}
Expand Down Expand Up @@ -1077,13 +1089,13 @@ mod tests {
// Ensure the block propagated to the subscription.
subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap();

subs.unpin_block(&id, hash).unwrap();
subs.unpin_blocks(&id, vec![hash]).unwrap();
assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1);
// Cannot unpin a block twice for the same subscription.
let err = subs.unpin_block(&id, hash).unwrap_err();
let err = subs.unpin_blocks(&id, vec![hash]).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);

subs.unpin_block(&id_second, hash).unwrap();
subs.unpin_blocks(&id_second, vec![hash]).unwrap();
// Block unregistered from the memory.
assert!(subs.global_blocks.get(&hash).is_none());
}
Expand Down
17 changes: 9 additions & 8 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,23 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
inner.pin_block(sub_id, hash)
}

/// Unpin the block from the subscription.
/// Unpin the blocks from the subscription.
///
/// The last subscription that unpins the block is also unpinning the block
/// from the backend.
/// Blocks are reference counted and when the last subscription unpins a given block, the block
/// is also unpinned from the backend.
///
/// This method is called only once per subscription.
///
/// Returns an error if the block is not pinned for the subscription or
/// the subscription ID is invalid.
pub fn unpin_block(
/// Returns an error if the subscription ID is invalid, or any of the blocks are not pinned
/// for the subscriptions. When an error is returned, it is guaranteed that no blocks have
/// been unpinned.
pub fn unpin_blocks(
&self,
sub_id: &str,
hash: Block::Hash,
hashes: Vec<Block::Hash>,
) -> Result<(), SubscriptionManagementError> {
let mut inner = self.inner.write();
inner.unpin_block(sub_id, hash)
inner.unpin_blocks(sub_id, hashes)
}

/// Ensure the block remains pinned until the return object is dropped.
Expand Down
173 changes: 168 additions & 5 deletions substrate/client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1591,22 +1591,28 @@ async fn follow_with_unpin() {
// Unpin an invalid subscription ID must return Ok(()).
let invalid_hash = hex_string(&INVALID_HASH);
let _res: () = api
.call("chainHead_unstable_unpin", ["invalid_sub_id", &invalid_hash])
.call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash])
.await
.unwrap();

// Valid subscription with invalid block hash.
let invalid_hash = hex_string(&INVALID_HASH);
let err = api
.call::<_, serde_json::Value>("chainHead_unstable_unpin", [&sub_id, &invalid_hash])
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, &invalid_hash],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);

// To not exceed the number of pinned blocks, we need to unpin before the next import.
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash])
.await
.unwrap();

// Block tree:
// finalized_block -> block -> block2
Expand Down Expand Up @@ -1645,6 +1651,160 @@ async fn follow_with_unpin() {
assert!(sub.next::<FollowEvent<String>>().await.is_none());
}

#[tokio::test]
async fn follow_with_multiple_unpin_hashes() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut client = Arc::new(builder.build());

let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
operation_max_storage_items: MAX_PAGINATION_LIMIT,
},
)
.into_rpc();

let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();

// Import 3 blocks.
let block_1 = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_1_hash = block_1.header.hash();
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();

let block_2 = BlockBuilderBuilder::new(&*client)
.on_parent_block(block_1.hash())
.with_parent_block_number(1)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_2_hash = block_2.header.hash();
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();

let block_3 = BlockBuilderBuilder::new(&*client)
.on_parent_block(block_2.hash())
.with_parent_block_number(2)
.build()
.unwrap()
.build()
.unwrap()
.block;
let block_3_hash = block_3.header.hash();
client.import(BlockOrigin::Own, block_3.clone()).await.unwrap();

// Ensure the imported block is propagated and pinned for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::Initialized(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);

// Unpin an invalid subscription ID must return Ok(()).
let invalid_hash = hex_string(&INVALID_HASH);
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params!["invalid_sub_id", &invalid_hash])
.await
.unwrap();

// Valid subscription with invalid block hash.
let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, &invalid_hash],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);

let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_1_hash])
.await
.unwrap();

// One block hash is invalid. Block 1 is already unpinned.
let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, vec![&block_1_hash, &block_2_hash, &block_3_hash]],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);

// Unpin multiple blocks.
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, vec![&block_2_hash, &block_3_hash]])
.await
.unwrap();

// Check block 2 and 3 are unpinned.
let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, &block_2_hash],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);

let err = api
.call::<_, serde_json::Value>(
"chainHead_unstable_unpin",
rpc_params![&sub_id, &block_3_hash],
)
.await
.unwrap_err();
assert_matches!(err,
Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
);
}

#[tokio::test]
async fn follow_prune_best_block() {
let builder = TestClientBuilder::new();
Expand Down Expand Up @@ -1828,7 +1988,7 @@ async fn follow_prune_best_block() {
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
let hash = format!("{:?}", block_2_hash);
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &hash]).await.unwrap();
let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -2305,7 +2465,10 @@ async fn pin_block_references() {
wait_pinned_references(&backend, &hash, 1).await;

// To not exceed the number of pinned blocks, we need to unpin before the next import.
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap();
let _res: () = api
.call("chainHead_unstable_unpin", rpc_params![&sub_id, &block_hash])
.await
.unwrap();

// Make sure unpin clears out the reference.
let refs = backend.pin_refs(&hash).unwrap();
Expand Down