From a72b8a823c1a607eeb0bd5ca02f763dd1fc7df0f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Jan 2023 15:47:08 +0000 Subject: [PATCH 01/39] rpc/chain_head: Add backend to subscription management Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 21 ++++---- .../src/chain_head/subscription.rs | 51 +++++++++++++++---- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index c63d373e04f16..36be74447ff7a 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -60,7 +60,7 @@ use sp_runtime::{ use std::{marker::PhantomData, sync::Arc}; /// An API for chain head RPC calls. -pub struct ChainHead { +pub struct ChainHead + 'static, Block: BlockT, Client> { /// Substrate client. client: Arc, /// Backend of the chain. @@ -68,7 +68,7 @@ pub struct ChainHead { /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, /// Keep track of the pinned blocks for each subscription. - subscriptions: Arc>, + subscriptions: Arc>, /// The hexadecimal encoded hash of the genesis block. genesis_hash: String, /// The maximum number of pinned blocks allowed per connection. @@ -77,7 +77,7 @@ pub struct ChainHead { _phantom: PhantomData, } -impl ChainHead { +impl + 'static, Block: BlockT, Client> ChainHead { /// Create a new [`ChainHead`]. pub fn new>( client: Arc, @@ -129,7 +129,7 @@ impl ChainHead { fn generate_initial_events( client: &Arc, backend: &Arc, - handle: &SubscriptionHandle, + handle: &SubscriptionHandle, runtime_updates: bool, ) -> Result>, SubscriptionManagementError> where @@ -314,13 +314,14 @@ async fn submit_events( /// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for /// every notification. -fn handle_import_blocks( +fn handle_import_blocks( client: &Arc, - handle: &SubscriptionHandle, + handle: &SubscriptionHandle, runtime_updates: bool, notification: BlockImportNotification, ) -> Result<(FollowEvent, Option>), SubscriptionManagementError> where + BE: Backend + 'static, Block: BlockT + 'static, Client: CallApiAt + 'static, { @@ -370,12 +371,13 @@ where /// Generate the "Finalized" event and potentially the "BestBlockChanged" for /// every notification. -fn handle_finalized_blocks( +fn handle_finalized_blocks( client: &Arc, - handle: &SubscriptionHandle, + handle: &SubscriptionHandle, notification: FinalityNotification, ) -> Result<(FollowEvent, Option>), SubscriptionManagementError> where + BE: Backend + 'static, Block: BlockT + 'static, Client: HeaderBackend + HeaderMetadata + 'static, { @@ -474,9 +476,8 @@ where return Err(err) }, }; - // Keep track of the subscription. - let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else { + let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks, self.backend.clone()) else { // Inserting the subscription can only fail if the JsonRPSee // generated a duplicate subscription ID. debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription already accepted", sub_id); diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 033db45ca755c..4c75f4bb5f017 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -20,6 +20,7 @@ use futures::channel::oneshot; use parking_lot::{RwLock, RwLockWriteGuard}; +use sc_client_api::Backend; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -50,18 +51,37 @@ struct SubscriptionInner { } /// Manage the blocks of a specific subscription ID. -#[derive(Clone)] -pub struct SubscriptionHandle { +// #[derive(Clone)] +pub struct SubscriptionHandle + 'static> { inner: Arc>>, /// The best reported block by this subscription. /// Have this as a separate variable to easily share /// the write guard with the RPC layer. best_block: Arc>>, + /// Backend pinning / unpinning blocks. + backend: Arc, } -impl SubscriptionHandle { +// We cannot derive `Clone` because that would imply the `Clone` bound on the `BE` type. +// However, that is not necessary since we hold an `Arc`. +impl + 'static> Clone for SubscriptionHandle { + fn clone(&self) -> SubscriptionHandle { + SubscriptionHandle { + inner: self.inner.clone(), + best_block: self.best_block.clone(), + backend: self.backend.clone(), + } + } +} + +impl + 'static> SubscriptionHandle { /// Construct a new [`SubscriptionHandle`]. - fn new(runtime_updates: bool, tx_stop: oneshot::Sender<()>, max_pinned_blocks: usize) -> Self { + fn new( + runtime_updates: bool, + tx_stop: oneshot::Sender<()>, + max_pinned_blocks: usize, + backend: Arc, + ) -> Self { SubscriptionHandle { inner: Arc::new(RwLock::new(SubscriptionInner { runtime_updates, @@ -70,6 +90,7 @@ impl SubscriptionHandle { max_pinned_blocks, })), best_block: Arc::new(RwLock::new(None)), + backend, } } @@ -133,13 +154,13 @@ impl SubscriptionHandle { } /// Manage block pinning / unpinning for subscription IDs. -pub struct SubscriptionManagement { +pub struct SubscriptionManagement + 'static> { /// Manage subscription by mapping the subscription ID /// to a set of block hashes. - inner: RwLock>>, + inner: RwLock>>, } -impl SubscriptionManagement { +impl + 'static> SubscriptionManagement { /// Construct a new [`SubscriptionManagement`]. pub fn new() -> Self { SubscriptionManagement { inner: RwLock::new(HashMap::new()) } @@ -155,13 +176,18 @@ impl SubscriptionManagement { subscription_id: String, runtime_updates: bool, max_pinned_blocks: usize, - ) -> Option<(oneshot::Receiver<()>, SubscriptionHandle)> { + backend: Arc, + ) -> Option<(oneshot::Receiver<()>, SubscriptionHandle)> { let mut subs = self.inner.write(); if let Entry::Vacant(entry) = subs.entry(subscription_id) { let (tx_stop, rx_stop) = oneshot::channel(); - let handle = - SubscriptionHandle::::new(runtime_updates, tx_stop, max_pinned_blocks); + let handle = SubscriptionHandle::::new( + runtime_updates, + tx_stop, + max_pinned_blocks, + backend, + ); entry.insert(handle.clone()); Some((rx_stop, handle)) } else { @@ -176,7 +202,10 @@ impl SubscriptionManagement { } /// Obtain the specific subscription handle. - pub fn get_subscription(&self, subscription_id: &String) -> Option> { + pub fn get_subscription( + &self, + subscription_id: &String, + ) -> Option> { let subs = self.inner.write(); subs.get(subscription_id).and_then(|handle| Some(handle.clone())) } From d672047919a3490fff3fe8d89ebd2befa751ac4a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Jan 2023 16:16:29 +0000 Subject: [PATCH 02/39] rpc/chain_head: Pin blocks internally and adjust testing Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription.rs | 65 ++++++++++++++----- 1 file changed, 49 insertions(+), 16 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 4c75f4bb5f017..54c84ca695cb2 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -122,6 +122,9 @@ impl + 'static> SubscriptionHandle } } + self.backend + .pin_block(hash.clone()) + .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; Ok(inner.blocks.insert(hash)) } @@ -130,7 +133,13 @@ impl + 'static> SubscriptionHandle /// Returns whether the value was present in the set. pub fn unpin_block(&self, hash: &Block::Hash) -> bool { let mut inner = self.inner.write(); - inner.blocks.remove(hash) + if inner.blocks.remove(hash) { + // Unpin the block if it was previously pinned. + self.backend.unpin_block(*hash); + true + } else { + false + } } /// Check if the block hash is present for the provided subscription ID. @@ -214,12 +223,19 @@ impl + 'static> SubscriptionManagement::new(); + let subs = SubscriptionManagement::::new(); + let builder = TestClientBuilder::new(); + let backend = builder.backend(); let id = "abc".to_string(); let hash = H256::random(); @@ -227,7 +243,7 @@ mod tests { let handle = subs.get_subscription(&id); assert!(handle.is_none()); - let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); + let (_, handle) = subs.insert_subscription(id.clone(), false, 10, backend).unwrap(); assert!(!handle.contains_block(&hash)); subs.remove_subscription(&id); @@ -238,13 +254,19 @@ mod tests { #[test] fn subscription_check_block() { - let subs = SubscriptionManagement::::new(); + let subs = SubscriptionManagement::::new(); + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); let id = "abc".to_string(); - let hash = H256::random(); // Check with subscription. - let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); + let (_, handle) = subs.insert_subscription(id.clone(), false, 10, backend).unwrap(); assert!(!handle.contains_block(&hash)); assert!(!handle.unpin_block(&hash)); @@ -260,19 +282,22 @@ mod tests { #[test] fn subscription_check_stop_event() { - let subs = SubscriptionManagement::::new(); + let subs = SubscriptionManagement::::new(); + let builder = TestClientBuilder::new(); + let backend = builder.backend(); let id = "abc".to_string(); // Check with subscription. - let (mut rx_stop, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); + let (mut rx_stop, handle) = + subs.insert_subscription(id.clone(), false, 10, backend.clone()).unwrap(); // Check the stop signal was not received. let res = rx_stop.try_recv().unwrap(); assert!(res.is_none()); // Inserting a second time returns None. - let res = subs.insert_subscription(id.clone(), false, 10); + let res = subs.insert_subscription(id.clone(), false, 10, backend); assert!(res.is_none()); handle.stop(); @@ -284,25 +309,33 @@ mod tests { #[test] fn subscription_check_data() { - let subs = SubscriptionManagement::::new(); + let subs = SubscriptionManagement::::new(); + let builder = TestClientBuilder::new(); + let backend = builder.backend(); let id = "abc".to_string(); - let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); + let (_, handle) = subs.insert_subscription(id.clone(), false, 10, backend.clone()).unwrap(); assert!(!handle.has_runtime_updates()); let id2 = "abcd".to_string(); - let (_, handle) = subs.insert_subscription(id2.clone(), true, 10).unwrap(); + let (_, handle) = subs.insert_subscription(id2.clone(), true, 10, backend).unwrap(); assert!(handle.has_runtime_updates()); } #[test] fn subscription_check_max_pinned() { - let subs = SubscriptionManagement::::new(); + let subs = SubscriptionManagement::::new(); + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); let id = "abc".to_string(); - let hash = H256::random(); let hash_2 = H256::random(); - let (_, handle) = subs.insert_subscription(id.clone(), false, 1).unwrap(); + let (_, handle) = subs.insert_subscription(id.clone(), false, 1, backend).unwrap(); handle.pin_block(hash).unwrap(); // The same block can be pinned multiple times. From c9a0d7e485ae3ef3b40658d9dc4e2500ea5a5b21 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Jan 2023 17:19:55 +0000 Subject: [PATCH 03/39] client/in_mem: Reference for the number of pinned blocks Signed-off-by: Alexandru Vasile --- client/api/src/in_mem.rs | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index 5e82757e7d9cd..765b6fe00c7bd 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -440,7 +440,7 @@ impl blockchain::Backend for Blockchain { } fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result> { - unimplemented!() + Ok(Default::default()) } fn indexed_transaction(&self, _hash: Block::Hash) -> sp_blockchain::Result>> { @@ -626,6 +626,7 @@ where states: RwLock>>>, blockchain: Blockchain, import_lock: RwLock<()>, + pinned_blocks: RwLock>, } impl Backend @@ -633,13 +634,28 @@ where Block::Hash: Ord, { /// Create a new instance of in-mem backend. + /// + /// # Warning + /// + /// For testing purposes only! pub fn new() -> Self { Backend { states: RwLock::new(HashMap::new()), blockchain: Blockchain::new(), import_lock: Default::default(), + pinned_blocks: Default::default(), } } + + /// Return the number of references active for a pinned block. + /// + /// # Warning + /// + /// For testing purposes only! + pub fn pin_refs(&self, hash: &::Hash) -> Option { + let blocks = self.pinned_blocks.read(); + blocks.get(hash).map(|value| *value) + } } impl backend::AuxStore for Backend @@ -789,11 +805,17 @@ where false } - fn pin_block(&self, _: ::Hash) -> blockchain::Result<()> { + fn pin_block(&self, hash: ::Hash) -> blockchain::Result<()> { + let mut blocks = self.pinned_blocks.write(); + blocks.entry(hash).and_modify(|counter| *counter += 1).or_insert(1); + Ok(()) } - fn unpin_block(&self, _: ::Hash) {} + fn unpin_block(&self, hash: ::Hash) { + let mut blocks = self.pinned_blocks.write(); + blocks.entry(hash).and_modify(|counter| *counter -= 1).or_insert(-1); + } } impl backend::LocalBackend for Backend where Block::Hash: Ord {} From 3536c94a045f8268250471a7094061dbef9e4916 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Jan 2023 17:26:02 +0000 Subject: [PATCH 04/39] rpc/tests: Check in-memory references to pinned blocks Signed-off-by: Alexandru Vasile --- Cargo.lock | 1 + client/rpc-spec-v2/Cargo.toml | 1 + client/rpc-spec-v2/src/chain_head/tests.rs | 85 +++++++++++++++++++++- 3 files changed, 86 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 08948ffbd970b..a4934947d1e72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8833,6 +8833,7 @@ dependencies = [ "sc-block-builder", "sc-chain-spec", "sc-client-api", + "sc-service", "sc-transaction-pool-api", "serde", "serde_json", diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index 43fb189081bae..f6fff058d3512 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -43,4 +43,5 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime" sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" } sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } +sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" } assert_matches = "1.3.0" diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 4084075f0b321..b2d8684991555 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -8,6 +8,7 @@ use jsonrpsee::{ }; use sc_block_builder::BlockBuilderProvider; use sc_client_api::ChildInfo; +use sc_service::client::new_in_mem; use sp_api::BlockId; use sp_blockchain::HeaderBackend; use sp_consensus::BlockOrigin; @@ -20,7 +21,8 @@ use sp_version::RuntimeVersion; use std::sync::Arc; use substrate_test_runtime::Transfer; use substrate_test_runtime_client::{ - prelude::*, runtime, Backend, BlockBuilderExt, Client, ClientBlockImportExt, + prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client, + ClientBlockImportExt, GenesisInit, }; type Header = substrate_test_runtime_client::runtime::Header; @@ -1020,3 +1022,84 @@ async fn follow_prune_best_block() { }); assert_eq!(event, expected); } + +#[tokio::test] +async fn pin_block_references() { + // Manually construct an in-memory backend and client. + let backend = Arc::new(sc_client_api::in_mem::Backend::new()); + let executor = substrate_test_runtime_client::new_native_executor(); + let client_config = sc_service::ClientConfig::default(); + + let genesis_block_builder = sc_service::GenesisBlockBuilder::new( + &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(), + !client_config.no_genesis, + backend.clone(), + executor.clone(), + ) + .unwrap(); + + let mut client = Arc::new( + new_in_mem::<_, Block, _, RuntimeApi>( + backend.clone(), + executor, + genesis_block_builder, + None, + None, + None, + Box::new(TaskExecutor::new()), + client_config, + ) + .unwrap(), + ); + + let api = ChainHead::new( + client.clone(), + backend.clone(), + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + 2, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + let block_hash = format!("{:?}", hash); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // We need to wait a bit for: + // 1. `NewBlock` and `BestBlockChanged` notifications to propagate to the chainHead + // subscription. (pin_refs == 2) + // 2. The chainHead to call `pin_blocks` only once for the `NewBlock` + // notification (pin_refs == 3) + // 3. Both notifications to go out of scope (pin_refs == 1 (total 3 - dropped 2)). + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // The imported hash must be referenced once in the database (pin_block called once). + let refs = backend.pin_refs(&hash).unwrap(); + assert_eq!(refs, 1); + + // To not exceed the number of pinned blocks, we need to unpin before the next import. + let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap(); + + // Make sure unpin clears out the reference. + let refs = backend.pin_refs(&hash).unwrap(); + assert_eq!(refs, 0); +} From bdfdb7d409c3c2b1b00d8958dd6d0a93246a8ae3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 25 Jan 2023 07:20:30 +0000 Subject: [PATCH 05/39] rpc/chain_head: Fix clippy Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 54c84ca695cb2..bddb8840c6076 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -123,7 +123,7 @@ impl + 'static> SubscriptionHandle } self.backend - .pin_block(hash.clone()) + .pin_block(hash) .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; Ok(inner.blocks.insert(hash)) } From 43d6825f94eaaa1d07c14f260b344a161b7c82aa Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 25 Jan 2023 07:22:17 +0000 Subject: [PATCH 06/39] rpc/chain_head: Remove unused comment Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index bddb8840c6076..cd4fd193ad294 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -51,7 +51,6 @@ struct SubscriptionInner { } /// Manage the blocks of a specific subscription ID. -// #[derive(Clone)] pub struct SubscriptionHandle + 'static> { inner: Arc>>, /// The best reported block by this subscription. From 6bc2e8745b388cf75f34d56745defb475655ec91 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 25 Jan 2023 12:50:07 +0000 Subject: [PATCH 07/39] rpc/chain_head: Place subscription handle under `Arc` and unpin blocks on drop Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription.rs | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index cd4fd193ad294..de66eed7654f1 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -52,27 +52,17 @@ struct SubscriptionInner { /// Manage the blocks of a specific subscription ID. pub struct SubscriptionHandle + 'static> { - inner: Arc>>, + inner: RwLock>, /// The best reported block by this subscription. /// Have this as a separate variable to easily share /// the write guard with the RPC layer. - best_block: Arc>>, + best_block: RwLock>, /// Backend pinning / unpinning blocks. + /// + /// The `Arc` is handled one level-above, but substrate exposes the backend as Arc. backend: Arc, } -// We cannot derive `Clone` because that would imply the `Clone` bound on the `BE` type. -// However, that is not necessary since we hold an `Arc`. -impl + 'static> Clone for SubscriptionHandle { - fn clone(&self) -> SubscriptionHandle { - SubscriptionHandle { - inner: self.inner.clone(), - best_block: self.best_block.clone(), - backend: self.backend.clone(), - } - } -} - impl + 'static> SubscriptionHandle { /// Construct a new [`SubscriptionHandle`]. fn new( @@ -82,13 +72,13 @@ impl + 'static> SubscriptionHandle backend: Arc, ) -> Self { SubscriptionHandle { - inner: Arc::new(RwLock::new(SubscriptionInner { + inner: RwLock::new(SubscriptionInner { runtime_updates, tx_stop: Some(tx_stop), blocks: HashSet::new(), max_pinned_blocks, - })), - best_block: Arc::new(RwLock::new(None)), + }), + best_block: RwLock::new(None), backend, } } @@ -114,6 +104,7 @@ impl + 'static> SubscriptionHandle if inner.blocks.len() == inner.max_pinned_blocks { // We have reached the limit. However, the block can be already inserted. + if inner.blocks.contains(&hash) { return Ok(false) } else { @@ -132,6 +123,7 @@ impl + 'static> SubscriptionHandle /// Returns whether the value was present in the set. pub fn unpin_block(&self, hash: &Block::Hash) -> bool { let mut inner = self.inner.write(); + if inner.blocks.remove(hash) { // Unpin the block if it was previously pinned. self.backend.unpin_block(*hash); @@ -161,11 +153,23 @@ impl + 'static> SubscriptionHandle } } +impl + 'static> Drop for SubscriptionHandle { + fn drop(&mut self) { + let mut inner = self.inner.write(); + + // Unpin any remaining blocks of this subscription. + for hash in &inner.blocks { + self.backend.unpin_block(*hash); + } + inner.blocks.clear(); + } +} + /// Manage block pinning / unpinning for subscription IDs. pub struct SubscriptionManagement + 'static> { /// Manage subscription by mapping the subscription ID /// to a set of block hashes. - inner: RwLock>>, + inner: RwLock>>>, } impl + 'static> SubscriptionManagement { @@ -185,17 +189,17 @@ impl + 'static> SubscriptionManagement, - ) -> Option<(oneshot::Receiver<()>, SubscriptionHandle)> { + ) -> Option<(oneshot::Receiver<()>, Arc>)> { let mut subs = self.inner.write(); if let Entry::Vacant(entry) = subs.entry(subscription_id) { let (tx_stop, rx_stop) = oneshot::channel(); - let handle = SubscriptionHandle::::new( + let handle = Arc::new(SubscriptionHandle::::new( runtime_updates, tx_stop, max_pinned_blocks, backend, - ); + )); entry.insert(handle.clone()); Some((rx_stop, handle)) } else { @@ -213,7 +217,7 @@ impl + 'static> SubscriptionManagement Option> { + ) -> Option>> { let subs = self.inner.write(); subs.get(subscription_id).and_then(|handle| Some(handle.clone())) } From 901506114ec3102d245049ab0f89d89190e30c21 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 25 Jan 2023 12:50:45 +0000 Subject: [PATCH 08/39] rpc/tests: Check all pinned blocks are unpinned on drop Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 46 +++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index b2d8684991555..dce06529aad65 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1057,7 +1057,7 @@ async fn pin_block_references() { backend.clone(), Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - 2, + 3, ) .into_rpc(); @@ -1102,4 +1102,48 @@ async fn pin_block_references() { // Make sure unpin clears out the reference. let refs = backend.pin_refs(&hash).unwrap(); assert_eq!(refs, 0); + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // Add another 2 blocks and make sure we drop the subscription with the blocks pinned. + let mut hashes = Vec::new(); + for _ in 0..2 { + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + hashes.push(hash); + } + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // Make sure the pin was propagated. + for hash in &hashes { + let refs = backend.pin_refs(&hash).unwrap(); + assert_eq!(refs, 1); + } + + // Drop the subscription and expect the pinned blocks to be released. + drop(sub); + // The `chainHead` detects the subscription was terminated when it tries + // to send another block. + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + for hash in &hashes { + let refs = backend.pin_refs(&hash).unwrap(); + assert_eq!(refs, 0); + } } From 09773d354890e55d653147c484e2f044f128e56a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 Jan 2023 15:10:15 +0200 Subject: [PATCH 09/39] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index de66eed7654f1..c0d8cec0db0c0 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -103,7 +103,7 @@ impl + 'static> SubscriptionHandle let mut inner = self.inner.write(); if inner.blocks.len() == inner.max_pinned_blocks { - // We have reached the limit. However, the block can be already inserted. + // We reached the limit. However, the block could already be pinned. if inner.blocks.contains(&hash) { return Ok(false) From 4a38133750eae29eb1354cd58c68b5819d25a057 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 27 Jan 2023 15:10:40 +0200 Subject: [PATCH 10/39] Update client/rpc-spec-v2/src/chain_head/subscription.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/rpc-spec-v2/src/chain_head/subscription.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index c0d8cec0db0c0..9489f452fff7f 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -158,10 +158,9 @@ impl + 'static> Drop for SubscriptionHandle Date: Fri, 27 Jan 2023 13:31:12 +0000 Subject: [PATCH 11/39] rpc/tests: Retry fetching the pinned references for CI correctness Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription.rs | 2 +- client/rpc-spec-v2/src/chain_head/tests.rs | 37 +++++++++++-------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 9489f452fff7f..c8cb1a5faa08d 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -159,7 +159,7 @@ impl + 'static> Drop for SubscriptionHandle( + backend: &Arc>, + hash: &Block::Hash, + target: i64, + ) { + // Retry for at most 2 minutes. + let mut retries = 120; + while backend.pin_refs(hash).unwrap() != target { + if retries == 0 { + panic!("Expected target={} pinned references for hash={:?}", target, hash); + } + retries -= 1; + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); let sub_id = sub.subscription_id(); let sub_id = serde_json::to_string(&sub_id).unwrap(); @@ -1090,11 +1107,7 @@ async fn pin_block_references() { // 2. The chainHead to call `pin_blocks` only once for the `NewBlock` // notification (pin_refs == 3) // 3. Both notifications to go out of scope (pin_refs == 1 (total 3 - dropped 2)). - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - - // The imported hash must be referenced once in the database (pin_block called once). - let refs = backend.pin_refs(&hash).unwrap(); - assert_eq!(refs, 1); + wait_pinned_references(&backend, &hash, 1).await; // To not exceed the number of pinned blocks, we need to unpin before the next import. let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap(); @@ -1103,8 +1116,6 @@ async fn pin_block_references() { let refs = backend.pin_refs(&hash).unwrap(); assert_eq!(refs, 0); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - // Add another 2 blocks and make sure we drop the subscription with the blocks pinned. let mut hashes = Vec::new(); for _ in 0..2 { @@ -1125,12 +1136,9 @@ async fn pin_block_references() { hashes.push(hash); } - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - // Make sure the pin was propagated. for hash in &hashes { - let refs = backend.pin_refs(&hash).unwrap(); - assert_eq!(refs, 1); + wait_pinned_references(&backend, hash, 1).await; } // Drop the subscription and expect the pinned blocks to be released. @@ -1140,10 +1148,7 @@ async fn pin_block_references() { let block = client.new_block(Default::default()).unwrap().build().unwrap().block; client.import(BlockOrigin::Own, block.clone()).await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - for hash in &hashes { - let refs = backend.pin_refs(&hash).unwrap(); - assert_eq!(refs, 0); + wait_pinned_references(&backend, &hash, 0).await; } } From b7dcacd991aef0653ca70fe91143fc579ecbcbef Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 14 Feb 2023 14:14:29 +0200 Subject: [PATCH 12/39] client/service: Use 512 as maximum number of pinned blocks Signed-off-by: Alexandru Vasile --- client/service/src/builder.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 0b09f550ce338..d4b59b99f2354 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -698,9 +698,9 @@ where .into_rpc(); // Maximum pinned blocks per connection. - // This number is large enough to consider immediate blocks, - // but it will change to facilitate adequate limits for the pinning API. - const MAX_PINNED_BLOCKS: usize = 4096; + // This number is large enough to consider immediate blocks. + // Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db. + const MAX_PINNED_BLOCKS: usize = 512; let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new( client.clone(), backend.clone(), From fa5323fe148102d9df02bb613c11cb385cfa2045 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 17 Mar 2023 14:33:06 +0200 Subject: [PATCH 13/39] chain_head: Fix merging conflicts Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/chain_head_follow.rs | 8 ++++---- client/rpc-spec-v2/src/chain_head/subscription.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 9173b7340b7e5..21eef7e3f4fd0 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -47,13 +47,13 @@ use sp_runtime::{ use std::{collections::HashSet, sync::Arc}; /// Generates the events of the `chainHead_follow` method. -pub struct ChainHeadFollower { +pub struct ChainHeadFollower + 'static, Block: BlockT, Client> { /// Substrate client. client: Arc, /// Backend of the chain. backend: Arc, /// Subscription handle. - sub_handle: SubscriptionHandle, + sub_handle: Arc>, /// Subscription was started with the runtime updates flag. runtime_updates: bool, /// Subscription ID. @@ -62,12 +62,12 @@ pub struct ChainHeadFollower { best_block_cache: Option, } -impl ChainHeadFollower { +impl + 'static, Block: BlockT, Client> ChainHeadFollower { /// Create a new [`ChainHeadFollower`]. pub fn new( client: Arc, backend: Arc, - sub_handle: SubscriptionHandle, + sub_handle: Arc>, runtime_updates: bool, sub_id: String, ) -> Self { diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 416e7f58e0143..92fecccf5361c 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -19,7 +19,7 @@ //! Subscription management for tracking subscription IDs to pinned blocks. use futures::channel::oneshot; -use parking_lot::{RwLock, RwLockWriteGuard}; +use parking_lot::RwLock; use sc_client_api::Backend; use sp_blockchain::Error; use sp_runtime::traits::Block as BlockT; @@ -65,7 +65,7 @@ struct SubscriptionInner { /// Manage the blocks of a specific subscription ID. #[derive(Clone)] -pub struct SubscriptionHandle { +pub struct SubscriptionHandle + 'static> { inner: Arc>>, /// Backend pinning / unpinning blocks. /// @@ -82,12 +82,12 @@ impl + 'static> SubscriptionHandle backend: Arc, ) -> Self { SubscriptionHandle { - inner: RwLock::new(SubscriptionInner { + inner: Arc::new(RwLock::new(SubscriptionInner { runtime_updates, tx_stop: Some(tx_stop), blocks: HashSet::new(), max_pinned_blocks, - }), + })), backend, } } From 852c30260078866ae9752347e9f854009e57645f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Mar 2023 21:07:16 +0200 Subject: [PATCH 14/39] rpc/chain_head: Adjust subscriptions to use pinning API Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription.rs | 565 ++++++++++++++---- 1 file changed, 456 insertions(+), 109 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 92fecccf5361c..ef35ab4fe95ff 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -24,147 +24,457 @@ use sc_client_api::Backend; use sp_blockchain::Error; use sp_runtime::traits::Block as BlockT; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap}, sync::Arc, + time::{Duration, Instant}, }; /// Subscription management error. -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum SubscriptionManagementError { /// The block cannot be pinned into memory because /// the subscription has exceeded the maximum number /// of blocks pinned. + #[error("Exceeded pinning limits")] ExceededLimits, /// Error originated from the blockchain (client or backend). + #[error("Blockchain error {0}")] Blockchain(Error), /// The database does not contain a block number. + #[error("Block number is absent")] BlockNumberAbsent, /// The database does not contain a block hash. + #[error("Block hash is absent")] BlockHashAbsent, + /// The specified subscription ID is not present. + #[error("Subscription is absent")] + SubscriptionAbsent, /// Custom error. + #[error("Subscription error {0}")] Custom(String), } +// Blockchain error does not implement `PartialEq` needed for testing. +impl PartialEq for SubscriptionManagementError { + fn eq(&self, other: &SubscriptionManagementError) -> bool { + match (self, other) { + (Self::ExceededLimits, Self::ExceededLimits) | + // Not needed for testing. + (Self::Blockchain(_), Self::Blockchain(_)) | + (Self::BlockNumberAbsent, Self::BlockNumberAbsent) | + (Self::BlockHashAbsent, Self::BlockHashAbsent) | + (Self::SubscriptionAbsent, Self::SubscriptionAbsent) => true, + (Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs, + _ => false, + } + } +} + impl From for SubscriptionManagementError { fn from(err: Error) -> Self { SubscriptionManagementError::Blockchain(err) } } -/// Inner subscription data structure. -struct SubscriptionInner { +/// The state of a block of a single subscription ID. +struct BlockState { + /// True if the block can be unregistered from the + /// internal memory of the subscription. + /// + /// Each block is registered twice: once from the `BestBlock` event + /// and once from the `Finalized` event. This becomes true when + /// both events registered the block. + /// + /// Field is added to avoid losing track of the block for the following + /// timeline: + /// T0. BestBlock event: hash is tracked and pinned in backend. + /// T1. User calls unpin: hash is untracked and unpinned in backend. + /// T2. Finalized event: hash is tracked (no previous history) and pinned again. + can_unregister: bool, + /// True if the block was unpinned. + /// + /// Because of the previous condition, a block can be unregistered + /// only when both `can_unregister` and `was_unpinned` are true. + was_unpinned: bool, + /// The timestamp when the block was inserted. + timestamp: Instant, +} + +/// The state of a single subscription ID. +struct SubscriptionState { /// The `runtime_updates` parameter flag of the subscription. runtime_updates: bool, /// Signals the "Stop" event. tx_stop: Option>, - /// The blocks pinned. - blocks: HashSet, - /// The maximum number of pinned blocks allowed per subscription. - max_pinned_blocks: usize, + /// Track the block hashes available for this subscription. + /// + /// This implementation assumes: + /// - most of the time subscriptions keep a few blocks of the chain's head pinned + /// - iteration through the blocks happens only when the hard limit is exceeded. + /// + /// Considering the assumption, iterating (in the unlike case) the hashmap O(N) is + /// more time efficient and code friendly than paying for: + /// - extra space: an extra BTreeMap to older hashes by oldest insertion + /// - extra time: O(log(N)) for insert/remove/find each `pin` block time per subscriptions + blocks: HashMap, } -/// Manage the blocks of a specific subscription ID. -#[derive(Clone)] -pub struct SubscriptionHandle + 'static> { - inner: Arc>>, +impl SubscriptionState { + /// Trigger the stop event for the current subscription. + /// + /// This can happen on internal failure (ie, the pruning deleted the block from memory) + /// or if the subscription exceeded the available pinned blocks. + fn stop(&mut self) { + if let Some(tx_stop) = self.tx_stop.take() { + let _ = tx_stop.send(()); + } + } + + /// Keep track of the given block hash for this subscription. + /// + /// This does not handle pinning in the backend. + /// + /// Returns: + /// - true if this is the first time that the block is registered + /// - false if the block was already registered + fn register_block(&mut self, hash: Block::Hash) -> bool { + match self.blocks.entry(hash) { + Entry::Occupied(mut occupied) => { + let mut block_state = occupied.get_mut(); + if block_state.was_unpinned { + // If `unpin` was called between two events + // unregister the block now. + occupied.remove(); + } else { + // Both `BestBlock` and `Finalized` events registered the block. + // Unregister the block on `unpin`. + block_state.can_unregister = true; + } + + false + }, + Entry::Vacant(vacant) => { + vacant.insert(BlockState { + can_unregister: false, + was_unpinned: false, + timestamp: Instant::now(), + }); + + true + }, + } + } + + /// A block is unregistered when the user calls `unpin`. + /// + /// Returns: + /// - true if the block can be unpinned. + /// - false if the subscription does not contain the block. + fn unregister_block(&mut self, hash: Block::Hash) -> bool { + match self.blocks.entry(hash) { + Entry::Occupied(mut occupied) => { + let mut block_state = occupied.get_mut(); + + // Cannot unpin a block twice. + if block_state.was_unpinned { + return false + } + + // If `pin` was called twice unregister the block now. + if block_state.can_unregister { + occupied.remove(); + } else { + block_state.was_unpinned = true; + } + + true + }, + // Block was not tracked. + Entry::Vacant(_) => false, + } + } + + /// A subscription contains a block when the block was + /// registered (`pin` was called) and the block was not `unpinned` yet. + /// + /// Returns `true` if the subscription contains the block. + fn contains_block(&self, hash: Block::Hash) -> bool { + let Some(state) = self.blocks.get(&hash) else { + // Block was not tracked. + return false + }; + + // Subscription no longer contains the block if `unpin` was called. + !state.was_unpinned + } + + fn oldest_block_timestamp(&self) -> Instant { + let mut timestamp = Instant::now(); + for (_, state) in self.blocks.iter() { + timestamp = std::cmp::min(timestamp, state.timestamp); + } + timestamp + } +} + +pub struct BlockGuard + 'static> { + hash: Block::Hash, + runtime_updates: bool, + backend: Arc, +} + +// Custom implementation of Debug to avoid bounds on `backend: Debug` for `unwrap_err()` needed for +// testing. +impl + 'static> std::fmt::Debug for BlockGuard { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BlockGuard hash {:?} runtime_updates {:?}", self.hash, self.runtime_updates) + } +} + +impl + 'static> BlockGuard { + /// Construct a new [`BlockGuard`] . + fn new( + hash: Block::Hash, + runtime_updates: bool, + backend: Arc, + ) -> Result { + backend + .pin_block(hash) + .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; + + Ok(Self { hash, runtime_updates, backend }) + } + + /// The `runtime_updates` flag of the subscription. + pub fn has_runtime_updates(&self) -> bool { + self.runtime_updates + } +} + +impl + 'static> Drop for BlockGuard { + fn drop(&mut self) { + self.backend.unpin_block(self.hash); + } +} + +struct SubscriptionsInner + 'static> { + /// Reference count the block hashes across all subscriptions. + /// + /// The pinned blocks cannot exceed the [`Self::global_limit`] limit. + /// When the limit is exceeded subscriptions are stopped via the `Stop` event. + global_blocks: HashMap, + /// The maximum number of pinned blocks across all subscriptions. + global_max_pinned_blocks: usize, + /// The maximum duration that a block is allowed to be pinned per subscription. + local_max_pin_duration: Duration, + /// Map the subscription ID to internal details of the subscription. + subs: HashMap>, /// Backend pinning / unpinning blocks. /// /// The `Arc` is handled one level-above, but substrate exposes the backend as Arc. backend: Arc, } -impl + 'static> SubscriptionHandle { - /// Construct a new [`SubscriptionHandle`]. +impl + 'static> SubscriptionsInner { + /// Construct a new [`GlobalSubscriptionInner`] from the specified limits. fn new( - runtime_updates: bool, - tx_stop: oneshot::Sender<()>, - max_pinned_blocks: usize, + global_max_pinned_blocks: usize, + local_max_pin_duration: Duration, backend: Arc, ) -> Self { - SubscriptionHandle { - inner: Arc::new(RwLock::new(SubscriptionInner { + SubscriptionsInner { + global_blocks: Default::default(), + global_max_pinned_blocks, + local_max_pin_duration, + subs: Default::default(), + backend, + } + } + + /// Insert a new subscription ID. + fn insert_subscription( + &mut self, + sub_id: String, + runtime_updates: bool, + ) -> Option> { + if let Entry::Vacant(entry) = self.subs.entry(sub_id) { + let (tx_stop, rx_stop) = oneshot::channel(); + let state = SubscriptionState:: { runtime_updates, tx_stop: Some(tx_stop), - blocks: HashSet::new(), - max_pinned_blocks, - })), - backend, + blocks: Default::default(), + }; + entry.insert(state); + Some(rx_stop) + } else { + None } } - /// Trigger the stop event for the current subscription. - /// - /// This can happen on internal failure (ie, the pruning deleted the block from memory) - /// or if the user exceeded the amount of available pinned blocks. - pub fn stop(&self) { - let mut inner = self.inner.write(); + /// Remove the subscription ID with associated pinned blocks. + fn remove_subscription(&mut self, sub_id: &str) { + let Some(mut sub) = self.subs.remove(sub_id) else { + return + }; - if let Some(tx_stop) = inner.tx_stop.take() { - let _ = tx_stop.send(()); + // The `Stop` event can be generated only once. + sub.stop(); + + for (hash, state) in sub.blocks.iter() { + if !state.was_unpinned { + self.global_unpin_block(*hash); + } } } - /// Pin a new block for the current subscription ID. + /// Ensure that a new block could be pinned. /// - /// Returns whether the value was newly inserted if the block can be pinned. - /// Otherwise, returns an error if the maximum number of blocks has been exceeded. - pub fn pin_block(&self, hash: Block::Hash) -> Result { - let mut inner = self.inner.write(); - - if inner.blocks.len() == inner.max_pinned_blocks { - // We reached the limit. However, the block could already be pinned. + /// If the global number of blocks has been reached this method + /// will remove all subscriptions that have blocks older than the + /// specified pin duration. + /// + /// If after removing all subscriptions that exceed the pin duration + /// there is no space for pinning a new block, then all subscriptions + /// are terminated. + /// + /// Returns true if the given subscription is also terminated. + fn ensure_block_space(&mut self, request_sub_id: &str) -> bool { + if self.global_blocks.len() < self.global_max_pinned_blocks { + return false + } - if inner.blocks.contains(&hash) { - return Ok(false) - } else { - return Err(SubscriptionManagementError::ExceededLimits) + // Terminate all subscriptions that have blocks older than + // the specified pin duration. + let now = Instant::now(); + + let to_remove: Vec<_> = self + .subs + .iter_mut() + .filter_map(|(sub_id, sub)| { + let sub_time = sub.oldest_block_timestamp(); + // Subscriptions older than the specified pin duration should be removed. + let should_remove = match now.checked_duration_since(sub_time) { + Some(duration) => duration > self.local_max_pin_duration, + None => true, + }; + should_remove.then_some(sub_id.clone()) + }) + .collect(); + + let mut is_terminated = false; + for sub_id in to_remove.into_iter() { + if sub_id == request_sub_id { + is_terminated = true; } + self.remove_subscription(&sub_id); } - self.backend - .pin_block(hash) - .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; - Ok(inner.blocks.insert(hash)) + // Make sure we have enough space after first pass of terminating subscriptions. + if self.global_blocks.len() < self.global_max_pinned_blocks { + return is_terminated + } + + // Sanity check: cannot uphold `chainHead` guarantees anymore. We have not + // found any subscriptions that have older pinned blocks to terminate. + let to_remove: Vec<_> = self.subs.iter().map(|(sub_id, _)| sub_id.clone()).collect(); + for sub_id in to_remove.into_iter() { + if sub_id == request_sub_id { + is_terminated = true; + } + self.remove_subscription(&sub_id); + } + return is_terminated } - /// Unpin a new block for the current subscription ID. - /// - /// Returns whether the value was present in the set. - pub fn unpin_block(&self, hash: &Block::Hash) -> bool { - let mut inner = self.inner.write(); + fn pin_block( + &mut self, + sub_id: &str, + hash: Block::Hash, + ) -> Result { + let Some(sub) = self.subs.get_mut(sub_id) else { + return Err(SubscriptionManagementError::SubscriptionAbsent) + }; + + // Block was already registered for this subscription and therefore + // globally tracked. + if !sub.register_block(hash) { + return Ok(false) + } - if inner.blocks.remove(hash) { - // Unpin the block if it was previously pinned. - self.backend.unpin_block(*hash); - true - } else { - false + // Ensure we have enough space only if the hash is not globally registered. + if !self.global_blocks.contains_key(&hash) { + // Subscription ID was terminated while ensuring enough space. + if self.ensure_block_space(sub_id) { + return Err(SubscriptionManagementError::ExceededLimits) + } } + + self.global_pin_block(hash)?; + Ok(true) } - /// Check if the block hash is present for the provided subscription ID. - /// - /// Returns `true` if the set contains the block. - pub fn contains_block(&self, hash: &Block::Hash) -> bool { - let inner = self.inner.read(); - inner.blocks.contains(hash) + fn global_pin_block(&mut self, hash: Block::Hash) -> Result<(), SubscriptionManagementError> { + match self.global_blocks.entry(hash) { + Entry::Occupied(mut occupied) => { + *occupied.get_mut() += 1; + }, + Entry::Vacant(vacant) => { + self.backend + .pin_block(hash) + .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; + + vacant.insert(1); + }, + }; + Ok(()) } - /// Get the `runtime_updates` flag of this subscription. - pub fn has_runtime_updates(&self) -> bool { - let inner = self.inner.read(); - inner.runtime_updates + fn global_unpin_block(&mut self, hash: Block::Hash) { + if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) { + let counter = occupied.get_mut(); + if *counter == 1 { + // Unpin the block from the backend. + self.backend.unpin_block(hash); + occupied.remove(); + } else { + *counter -= 1; + } + } } -} -impl + 'static> Drop for SubscriptionHandle { - fn drop(&mut self) { - let mut inner = self.inner.write(); + fn unpin_block( + &mut self, + sub_id: &str, + hash: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { + let Some(sub) = self.subs.get_mut(sub_id) else { + return Err(SubscriptionManagementError::SubscriptionAbsent) + }; + + // Check that unpin was not called before and the block was pinned + // for this subscription. + if !sub.unregister_block(hash) { + return Err(SubscriptionManagementError::BlockHashAbsent) + } - // Unpin any remaining blocks of this subscription. - for hash in inner.blocks.drain() { - self.backend.unpin_block(hash); + self.global_unpin_block(hash); + Ok(()) + } + + fn lock_block( + &mut self, + sub_id: &str, + hash: Block::Hash, + ) -> Result, SubscriptionManagementError> { + let Some(sub) = self.subs.get(sub_id) else { + return Err(SubscriptionManagementError::SubscriptionAbsent) + }; + + if !sub.contains_block(hash) { + return Err(SubscriptionManagementError::BlockHashAbsent) } + + BlockGuard::new(hash, sub.runtime_updates, self.backend.clone()) } } @@ -172,57 +482,94 @@ impl + 'static> Drop for SubscriptionHandle + 'static> { /// Manage subscription by mapping the subscription ID /// to a set of block hashes. - inner: RwLock>>>, + inner: RwLock>, } impl + 'static> SubscriptionManagement { /// Construct a new [`SubscriptionManagement`]. - pub fn new() -> Self { - SubscriptionManagement { inner: RwLock::new(HashMap::new()) } + pub fn new( + global_max_pinned_blocks: usize, + local_max_pin_duration: Duration, + backend: Arc, + ) -> Self { + SubscriptionManagement { + inner: RwLock::new(SubscriptionsInner::new( + global_max_pinned_blocks, + local_max_pin_duration, + backend, + )), + } } /// Insert a new subscription ID. /// - /// If the subscription was not previously inserted, the method returns a tuple of - /// the receiver that is triggered upon the "Stop" event and the subscription - /// handle. Otherwise, when the subscription ID was already inserted returns none. + /// If the subscription was not previously inserted, returns the receiver that is + /// triggered upon the "Stop" event. Otherwise, if the subscription ID was already + /// inserted returns none. pub fn insert_subscription( &self, - subscription_id: String, + sub_id: String, runtime_updates: bool, - max_pinned_blocks: usize, - backend: Arc, - ) -> Option<(oneshot::Receiver<()>, Arc>)> { - let mut subs = self.inner.write(); - - if let Entry::Vacant(entry) = subs.entry(subscription_id) { - let (tx_stop, rx_stop) = oneshot::channel(); - let handle = Arc::new(SubscriptionHandle::::new( - runtime_updates, - tx_stop, - max_pinned_blocks, - backend, - )); - entry.insert(handle.clone()); - Some((rx_stop, handle)) - } else { - None - } + ) -> Option> { + let mut inner = self.inner.write(); + inner.insert_subscription(sub_id, runtime_updates) } /// Remove the subscription ID with associated pinned blocks. - pub fn remove_subscription(&self, subscription_id: &String) { - let mut subs = self.inner.write(); - subs.remove(subscription_id); + pub fn remove_subscription(&self, sub_id: &str) { + let mut inner = self.inner.write(); + inner.remove_subscription(sub_id) } - /// Obtain the specific subscription handle. - pub fn get_subscription( + /// The block is pinned in the backend only once when the block's hash is first encountered. + /// + /// Each subscription is expected to call this method twice: + /// - once from the `NewBlock` import + /// - once from the `Finalized` import + /// + /// Returns + /// - Ok(true) if the subscription did not previously contain this block + /// - Ok(false) if the subscription already contained this this + /// - Error if the backend failed to pin the block or the subscription ID is invalid + pub fn pin_block( &self, - subscription_id: &String, - ) -> Option>> { - let subs = self.inner.write(); - subs.get(subscription_id).and_then(|handle| Some(handle.clone())) + sub_id: &str, + hash: Block::Hash, + ) -> Result { + let mut inner = self.inner.write(); + inner.pin_block(sub_id, hash) + } + + /// Unpin the block from the subscription. + /// + /// The last subscription that unpins the block is also unpinning the block + /// from the backend. + /// + /// This method is called only once per subscription. + /// + /// Returns an error if the block is not pinned for the subscription or + /// the subscription ID is invalid. + pub fn unpin_block( + &self, + sub_id: &str, + hash: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { + let mut inner = self.inner.write(); + inner.unpin_block(sub_id, hash) + } + + /// Ensure the block remains pinned until the return object is dropped. + /// + /// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner. + /// Returns an error if the block hash is not pinned for the subscription or + /// the subscription ID is invalid. + pub fn lock_block( + &self, + sub_id: &str, + hash: Block::Hash, + ) -> Result, SubscriptionManagementError> { + let mut inner = self.inner.write(); + inner.lock_block(sub_id, hash) } } From cdd142830b868139f793ccbb059abaec885644a1 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Mar 2023 21:07:44 +0200 Subject: [PATCH 15/39] rpc/chain_head/tests: Test subscription management Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription.rs | 394 +++++++++++++++--- 1 file changed, 325 insertions(+), 69 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index ef35ab4fe95ff..936dc089acc1e 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -576,124 +576,380 @@ impl + 'static> SubscriptionManagement ( + Arc>, + Arc>>, + ) { + let backend = Arc::new(sc_client_api::in_mem::Backend::new()); + let executor = substrate_test_runtime_client::new_native_executor(); + let client_config = sc_service::ClientConfig::default(); + let genesis_block_builder = sc_service::GenesisBlockBuilder::new( + &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(), + !client_config.no_genesis, + backend.clone(), + executor.clone(), + ) + .unwrap(); + let client = Arc::new( + new_in_mem::<_, Block, _, RuntimeApi>( + backend.clone(), + executor, + genesis_block_builder, + None, + None, + None, + Box::new(TaskExecutor::new()), + client_config, + ) + .unwrap(), + ); + (backend, client) + } + + #[test] + fn sub_state_register_twice() { + let mut sub_state = SubscriptionState:: { + runtime_updates: false, + tx_stop: None, + blocks: Default::default(), + }; + + let hash = H256::random(); + assert_eq!(sub_state.register_block(hash), true); + let block_state = sub_state.blocks.get(&hash).unwrap(); + // Did not call `register_block` twice. + assert_eq!(block_state.can_unregister, false); + assert_eq!(block_state.was_unpinned, false); + + assert_eq!(sub_state.register_block(hash), false); + let block_state = sub_state.blocks.get(&hash).unwrap(); + assert_eq!(block_state.can_unregister, true); + assert_eq!(block_state.was_unpinned, false); + + // Block is no longer tracked when: `register_block` is called twice and + // `unregister_block` is called once. + assert_eq!(sub_state.unregister_block(hash), true); + let block_state = sub_state.blocks.get(&hash); + assert!(block_state.is_none()); + } + + #[test] + fn sub_state_register_unregister() { + let mut sub_state = SubscriptionState:: { + runtime_updates: false, + tx_stop: None, + blocks: Default::default(), + }; + + let hash = H256::random(); + // Block was not registered before. + assert_eq!(sub_state.unregister_block(hash), false); + + assert_eq!(sub_state.register_block(hash), true); + let block_state = sub_state.blocks.get(&hash).unwrap(); + // Did not call `register_block` twice. + assert_eq!(block_state.can_unregister, false); + assert_eq!(block_state.was_unpinned, false); + + // Unregister block before the second `register_block`. + assert_eq!(sub_state.unregister_block(hash), true); + let block_state = sub_state.blocks.get(&hash).unwrap(); + assert_eq!(block_state.can_unregister, false); + assert_eq!(block_state.was_unpinned, true); + + assert_eq!(sub_state.register_block(hash), false); + let block_state = sub_state.blocks.get(&hash); + assert!(block_state.is_none()); + + // Block is no longer tracked when: `register_block` is called twice and + // `unregister_block` is called once. + assert_eq!(sub_state.unregister_block(hash), false); + let block_state = sub_state.blocks.get(&hash); + assert!(block_state.is_none()); + } + #[test] - fn subscription_check_id() { - let subs = SubscriptionManagement::::new(); + fn subscription_lock_block() { let builder = TestClientBuilder::new(); let backend = builder.backend(); + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); let id = "abc".to_string(); let hash = H256::random(); - let handle = subs.get_subscription(&id); - assert!(handle.is_none()); + // Subscription not inserted. + let err = subs.lock_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + let _stop = subs.insert_subscription(id.clone(), true).unwrap(); + // Cannot insert the same subscription ID twice. + assert!(subs.insert_subscription(id.clone(), true).is_none()); - let (_, handle) = subs.insert_subscription(id.clone(), false, 10, backend).unwrap(); - assert!(!handle.contains_block(&hash)); + // No block hash. + let err = subs.lock_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); subs.remove_subscription(&id); - let handle = subs.get_subscription(&id); - assert!(handle.is_none()); + // No subscription. + let err = subs.lock_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); } #[test] fn subscription_check_block() { - let subs = SubscriptionManagement::::new(); - let builder = TestClientBuilder::new(); - let backend = builder.backend(); - let mut client = Arc::new(builder.build()); + let (backend, mut client) = init_backend(); let block = client.new_block(Default::default()).unwrap().build().unwrap().block; let hash = block.header.hash(); - block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); let id = "abc".to_string(); - // Check with subscription. - let (_, handle) = subs.insert_subscription(id.clone(), false, 10, backend).unwrap(); - assert!(!handle.contains_block(&hash)); - assert!(!handle.unpin_block(&hash)); + let _stop = subs.insert_subscription(id.clone(), true).unwrap(); + + // First time we are pinning the block. + assert_eq!(subs.pin_block(&id, hash).unwrap(), true); - handle.pin_block(hash).unwrap(); - assert!(handle.contains_block(&hash)); - // Unpin an invalid block. - assert!(!handle.unpin_block(&H256::random())); + let block = subs.lock_block(&id, hash).unwrap(); + // Subscription started with runtime updates + assert_eq!(block.has_runtime_updates(), true); - // Unpin the valid block. - assert!(handle.unpin_block(&hash)); - assert!(!handle.contains_block(&hash)); + let invalid_id = "abc-invalid".to_string(); + let err = subs.unpin_block(&invalid_id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + // Unpin the block. + subs.unpin_block(&id, hash).unwrap(); + let err = subs.lock_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); } #[test] - fn subscription_check_stop_event() { - let subs = SubscriptionManagement::::new(); - let builder = TestClientBuilder::new(); - let backend = builder.backend(); + fn subscription_ref_count() { + let (backend, mut client) = init_backend(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); let id = "abc".to_string(); - // Check with subscription. - let (mut rx_stop, handle) = - subs.insert_subscription(id.clone(), false, 10, backend.clone()).unwrap(); + let _stop = subs.insert_subscription(id.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id, hash).unwrap(), true); + // Check the global ref count. + assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1); + // Ensure the block propagated to the subscription. + subs.subs.get(&id).unwrap().blocks.get(&hash).unwrap(); + + // Insert the block for the same subscription again (simulate NewBlock + Finalized pinning) + assert_eq!(subs.pin_block(&id, hash).unwrap(), false); + // Check the global ref count should not get incremented. + assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1); + + // Ensure the hash propagates for the second subscription. + let id_second = "abcd".to_string(); + let _stop = subs.insert_subscription(id_second.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_second, hash).unwrap(), true); + // Check the global ref count. + assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 2); + // Ensure the block propagated to the subscription. + subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap(); + + subs.unpin_block(&id, hash).unwrap(); + assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1); + // Cannot unpin a block twice for the same subscription. + let err = subs.unpin_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); + + subs.unpin_block(&id_second, hash).unwrap(); + // Block unregistered from the memory. + assert!(subs.global_blocks.get(&hash).is_none()); + } - // Check the stop signal was not received. - let res = rx_stop.try_recv().unwrap(); - assert!(res.is_none()); + #[test] + fn subscription_remove_subscription() { + let (backend, mut client) = init_backend(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_1 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_2 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_3 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - // Inserting a second time returns None. - let res = subs.insert_subscription(id.clone(), false, 10, backend); - assert!(res.is_none()); + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let id_1 = "abc".to_string(); + let id_2 = "abcd".to_string(); - handle.stop(); + // Pin all blocks for the first subscription. + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true); - // Check the signal was received. - let res = rx_stop.try_recv().unwrap(); - assert!(res.is_some()); + // Pin only block 2 for the second subscription. + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true); + + // Check reference count. + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2); + assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1); + + subs.remove_subscription(&id_1); + + assert!(subs.global_blocks.get(&hash_1).is_none()); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1); + assert!(subs.global_blocks.get(&hash_3).is_none()); + + subs.remove_subscription(&id_2); + + assert!(subs.global_blocks.get(&hash_2).is_none()); + assert_eq!(subs.global_blocks.len(), 0); } #[test] - fn subscription_check_data() { - let subs = SubscriptionManagement::::new(); - let builder = TestClientBuilder::new(); - let backend = builder.backend(); + fn subscription_check_limits() { + let (backend, mut client) = init_backend(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_1 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_2 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_3 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + // Maximum number of pinned blocks is 2. + let mut subs = SubscriptionsInner::new(2, Duration::from_secs(10), backend); + let id_1 = "abc".to_string(); + let id_2 = "abcd".to_string(); + + // Both subscriptions can pin the maximum limit. + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true); + + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true); + + // Check reference count. + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2); + + // Block 3 pinning will exceed the limit and both subscriptions + // are terminated because no subscription with older blocks than 10 + // seconds are present. + let err = subs.pin_block(&id_1, hash_3).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::ExceededLimits); + + // Ensure both subscriptions are removed. + let err = subs.lock_block(&id_1, hash_1).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + let err = subs.lock_block(&id_2, hash_1).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + assert!(subs.global_blocks.get(&hash_1).is_none()); + assert!(subs.global_blocks.get(&hash_2).is_none()); + assert!(subs.global_blocks.get(&hash_3).is_none()); + assert_eq!(subs.global_blocks.len(), 0); + } - let id = "abc".to_string(); - let (_, handle) = subs.insert_subscription(id.clone(), false, 10, backend.clone()).unwrap(); - assert!(!handle.has_runtime_updates()); + #[test] + fn subscription_check_limits_with_duration() { + let (backend, mut client) = init_backend(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_1 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_2 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_3 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + // Maximum number of pinned blocks is 2 and maximum pin duration is 5 second. + let mut subs = SubscriptionsInner::new(2, Duration::from_secs(5), backend); + let id_1 = "abc".to_string(); + let id_2 = "abcd".to_string(); + + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true); + + // Maximum pin duration is 5 second, sleep 5 seconds to ensure we clean up + // the first subscription. + std::thread::sleep(std::time::Duration::from_secs(5)); + + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true); + + // Check reference count. + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1); + + // Second subscription has only 1 block pinned. Only the first subscription is terminated. + let err = subs.pin_block(&id_1, hash_3).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::ExceededLimits); - let id2 = "abcd".to_string(); - let (_, handle) = subs.insert_subscription(id2.clone(), true, 10, backend).unwrap(); - assert!(handle.has_runtime_updates()); + // Ensure both subscriptions are removed. + let err = subs.lock_block(&id_1, hash_1).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + let _block_guard = subs.lock_block(&id_2, hash_1).unwrap(); + + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1); + assert!(subs.global_blocks.get(&hash_2).is_none()); + assert!(subs.global_blocks.get(&hash_3).is_none()); + assert_eq!(subs.global_blocks.len(), 1); + + // Force second subscription to get terminated. + assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true); + let err = subs.pin_block(&id_2, hash_3).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::ExceededLimits); + + assert!(subs.global_blocks.get(&hash_1).is_none()); + assert!(subs.global_blocks.get(&hash_2).is_none()); + assert!(subs.global_blocks.get(&hash_3).is_none()); + assert_eq!(subs.global_blocks.len(), 0); } #[test] - fn subscription_check_max_pinned() { - let subs = SubscriptionManagement::::new(); + fn subscription_check_stop_event() { let builder = TestClientBuilder::new(); let backend = builder.backend(); - let mut client = Arc::new(builder.build()); - - let block = client.new_block(Default::default()).unwrap().build().unwrap().block; - let hash = block.header.hash(); - block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); let id = "abc".to_string(); - let hash_2 = H256::random(); - let (_, handle) = subs.insert_subscription(id.clone(), false, 1, backend).unwrap(); - - handle.pin_block(hash).unwrap(); - // The same block can be pinned multiple times. - handle.pin_block(hash).unwrap(); - // Exceeded number of pinned blocks. - handle.pin_block(hash_2).unwrap_err(); + + let mut rx_stop = subs.insert_subscription(id.clone(), true).unwrap(); + + // Check the stop signal was not received. + let res = rx_stop.try_recv().unwrap(); + assert!(res.is_none()); + + let sub = subs.subs.get_mut(&id).unwrap(); + sub.stop(); + + // Check the signal was received. + let res = rx_stop.try_recv().unwrap(); + assert!(res.is_some()); } } From ad9e92109340f9e68eae169918453dc2211f2fd5 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Mar 2023 21:08:30 +0200 Subject: [PATCH 16/39] rpc/chain_head: Adjust chain_head follow to the new API Signed-off-by: Alexandru Vasile --- .../src/chain_head/chain_head_follow.rs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 21eef7e3f4fd0..b095f5d005a6a 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -24,7 +24,7 @@ use crate::chain_head::{ BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, }, - subscription::{SubscriptionHandle, SubscriptionManagementError}, + subscription::{SubscriptionManagement, SubscriptionManagementError}, }; use futures::{ channel::oneshot, @@ -52,8 +52,8 @@ pub struct ChainHeadFollower + 'static, Block: BlockT, Client client: Arc, /// Backend of the chain. backend: Arc, - /// Subscription handle. - sub_handle: Arc>, + /// Subscriptions handle. + sub_handle: Arc>, /// Subscription was started with the runtime updates flag. runtime_updates: bool, /// Subscription ID. @@ -67,7 +67,7 @@ impl + 'static, Block: BlockT, Client> ChainHeadFollower, backend: Arc, - sub_handle: Arc>, + sub_handle: Arc>, runtime_updates: bool, sub_id: String, ) -> Self { @@ -224,7 +224,7 @@ where // The initialized event is the first one sent. let finalized_block_hash = startup_point.finalized_hash; - self.sub_handle.pin_block(finalized_block_hash)?; + self.sub_handle.pin_block(&self.sub_id, finalized_block_hash)?; let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None); @@ -238,7 +238,7 @@ where finalized_block_descendants.push(initialized_event); for (child, parent) in initial_blocks.into_iter() { - self.sub_handle.pin_block(child)?; + self.sub_handle.pin_block(&self.sub_id, child)?; let new_runtime = self.generate_runtime_event(child, Some(parent)); @@ -313,7 +313,7 @@ where startup_point: &StartupPoint, ) -> Result>, SubscriptionManagementError> { // The block was already pinned by the initial block events or by the finalized event. - if !self.sub_handle.pin_block(notification.hash)? { + if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? { return Ok(Default::default()) } @@ -363,7 +363,7 @@ where let parents = std::iter::once(&parent).chain(finalized_block_hashes.iter()); for (hash, parent) in finalized_block_hashes.iter().zip(parents) { // This block is already reported by the import notification. - if !self.sub_handle.pin_block(*hash)? { + if !self.sub_handle.pin_block(&self.sub_id, *hash)? { continue } @@ -600,6 +600,10 @@ where stream_item = stream.next(); stop_event = next_stop_event; } + + // If we got here either the substrate streams have closed + // or the `Stop` receiver was triggered. + let _ = sink.send(&FollowEvent::::Stop); } /// Generate the block events for the `chainHead_follow` method. From 866bb876c6a9e5ede350fb2fc654a94fb45f7090 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Mar 2023 21:09:48 +0200 Subject: [PATCH 17/39] rpc/chain_head: Adjust chain_head.rs to the new API Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 149 +++++++++++------- 1 file changed, 90 insertions(+), 59 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 1bd4fcdbff921..6f41a88ea2b70 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -24,7 +24,7 @@ use crate::{ chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig}, - subscription::SubscriptionManagement, + subscription::{SubscriptionManagement, SubscriptionManagementError}, }, SubscriptionTaskExecutor, }; @@ -44,7 +44,7 @@ use sp_api::CallApiAt; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes}; use sp_runtime::traits::Block as BlockT; -use std::{marker::PhantomData, sync::Arc}; +use std::{marker::PhantomData, sync::Arc, time::Duration}; pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; @@ -60,8 +60,6 @@ pub struct ChainHead + 'static, Block: BlockT, Client> { subscriptions: Arc>, /// The hexadecimal encoded hash of the genesis block. genesis_hash: String, - /// The maximum number of pinned blocks allowed per connection. - max_pinned_blocks: usize, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -74,16 +72,20 @@ impl + 'static, Block: BlockT, Client> ChainHead Self { let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref())); Self { client, - backend, + backend: backend.clone(), executor, - subscriptions: Arc::new(SubscriptionManagement::new()), + subscriptions: Arc::new(SubscriptionManagement::new( + max_pinned_blocks, + max_pinned_duration, + backend, + )), genesis_hash, - max_pinned_blocks, _phantom: PhantomData, } } @@ -160,7 +162,7 @@ where }, }; // Keep track of the subscription. - let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks, self.backend.clone()) else { + let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates) else { // Inserting the subscription can only fail if the JsonRPSee // generated a duplicate subscription ID. debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id); @@ -176,7 +178,7 @@ where let mut chain_head_follow = ChainHeadFollower::new( client, backend, - sub_handle, + subscriptions.clone(), runtime_updates, sub_id.clone(), ); @@ -202,18 +204,26 @@ where let subscriptions = self.subscriptions.clone(); let fut = async move { - let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { - // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return + let _block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return + }, }; - // Block is not part of the subscription. - if !handle.contains_block(&hash) { - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - } - let event = match client.block(hash) { Ok(Some(signed_block)) => { let extrinsics = signed_block.block.extrinsics(); @@ -225,10 +235,10 @@ where debug!( target: LOG_TARGET, "[body][id={:?}] Stopping subscription because hash={:?} was pruned", - follow_subscription, + &follow_subscription, hash ); - handle.stop(); + subscriptions.remove_subscription(&follow_subscription); ChainHeadEvent::::Disjoint }, Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }), @@ -245,16 +255,19 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult> { - let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else { - // Invalid invalid subscription ID. - return Ok(None) + let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + return Ok(None) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + return Err(ChainHeadRpcError::InvalidBlock.into()) + }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - // Block is not part of the subscription. - if !handle.contains_block(&hash) { - return Err(ChainHeadRpcError::InvalidBlock.into()) - } - self.client .header(hash) .map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode())))) @@ -286,18 +299,26 @@ where let subscriptions = self.subscriptions.clone(); let fut = async move { - let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { - // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return + let _block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return + }, }; - // Block is not part of the subscription. - if !handle.contains_block(&hash) { - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - } - // The child key is provided, use the key to query the child trie. if let Some(child_key) = child_key { // The child key must not be prefixed with ":child_storage:" nor @@ -367,20 +388,28 @@ where let subscriptions = self.subscriptions.clone(); let fut = async move { - let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { - // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return + let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return + }, }; - // Block is not part of the subscription. - if !handle.contains_block(&hash) { - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - } - // Reject subscription if runtime_updates is false. - if !handle.has_runtime_updates() { + if !block_guard.has_runtime_updates() { let _ = sink.reject(ChainHeadRpcError::InvalidParam( "The runtime updates flag must be set".into(), )); @@ -416,15 +445,17 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult<()> { - let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else { - // Invalid invalid subscription ID. - return Ok(()) - }; - - if !handle.unpin_block(&hash) { - return Err(ChainHeadRpcError::InvalidBlock.into()) + match self.subscriptions.unpin_block(&follow_subscription, hash) { + Ok(()) => Ok(()), + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + Ok(()) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + Err(ChainHeadRpcError::InvalidBlock.into()) + }, + Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()), } - - Ok(()) } } From b2e08cef1d54fc58751be7cd6c093d9d09541d91 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Mar 2023 21:10:33 +0200 Subject: [PATCH 18/39] rpc/chain_head/tests: Adjust test.rs to the new API Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 51821098a668e..83ca4a40679ba 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -18,7 +18,7 @@ use sp_core::{ testing::TaskExecutor, }; use sp_version::RuntimeVersion; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use substrate_test_runtime::Transfer; use substrate_test_runtime_client::{ prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client, @@ -28,6 +28,7 @@ use substrate_test_runtime_client::{ type Header = substrate_test_runtime_client::runtime::Header; type Block = substrate_test_runtime_client::runtime::Block; const MAX_PINNED_BLOCKS: usize = 32; +const MAX_PINNED_SECS: u64 = 60; const CHAIN_GENESIS: [u8; 32] = [0; 32]; const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; @@ -66,6 +67,7 @@ async fn setup_api() -> ( Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -105,6 +107,7 @@ async fn follow_subscription_produces_blocks() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -162,6 +165,7 @@ async fn follow_with_runtime() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -267,6 +271,7 @@ async fn get_genesis() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -451,6 +456,7 @@ async fn call_runtime_without_flag() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -625,6 +631,7 @@ async fn follow_generates_initial_blocks() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -752,6 +759,7 @@ async fn follow_exceeding_pinned_blocks() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, 2, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -802,6 +810,7 @@ async fn follow_with_unpin() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, 2, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -882,6 +891,7 @@ async fn follow_prune_best_block() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -1038,6 +1048,7 @@ async fn follow_forks_pruned_block() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -1151,6 +1162,7 @@ async fn follow_report_multiple_pruned_block() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -1356,6 +1368,7 @@ async fn pin_block_references() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, 3, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); From b63f102ac3b88f9af5fa90c9cb1e06fefbbb8e4d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Mar 2023 21:11:07 +0200 Subject: [PATCH 19/39] client/builder: Use new chainHead API Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription.rs | 1 + client/service/src/builder.rs | 13 +++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 936dc089acc1e..474933115fac3 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -208,6 +208,7 @@ impl SubscriptionState { !state.was_unpinned } + /// Get the timestamp of the oldest inserted block. fn oldest_block_timestamp(&self) -> Instant { let mut timestamp = Instant::now(); for (_, state) in self.blocks.iter() { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index bd523ab417f4b..a7d29b499220a 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -69,7 +69,11 @@ use sp_consensus::block_validation::{ use sp_core::traits::{CodeExecutor, SpawnNamed}; use sp_keystore::{Keystore, KeystorePtr}; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}; -use std::{str::FromStr, sync::Arc, time::SystemTime}; +use std::{ + str::FromStr, + sync::Arc, + time::{Duration, SystemTime}, +}; /// Full client type. pub type TFullClient = @@ -690,16 +694,21 @@ where ) .into_rpc(); - // Maximum pinned blocks per connection. + // Maximum pinned blocks across all connections. // This number is large enough to consider immediate blocks. // Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db. const MAX_PINNED_BLOCKS: usize = 512; + // Maximum number of seconds that a block can be pinned before the + // subscription can be terminated. + // Note: This should be enough for immediate blocks. + const MAX_PINNED_SECONDS: u64 = 60; let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new( client.clone(), backend.clone(), task_executor.clone(), client.info().genesis_hash, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECONDS), ) .into_rpc(); From c46d7d4a4c82cb4c8e0e001da96d746483c74908 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Mar 2023 21:15:45 +0200 Subject: [PATCH 20/39] rpc/chain_head: Fix documentation Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 474933115fac3..15e2448672959 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -277,7 +277,7 @@ struct SubscriptionsInner + 'static> { } impl + 'static> SubscriptionsInner { - /// Construct a new [`GlobalSubscriptionInner`] from the specified limits. + /// Construct a new [`SubscriptionsInner`] from the specified limits. fn new( global_max_pinned_blocks: usize, local_max_pin_duration: Duration, From 7da040688a03f2d7fba0cd9bcb2f04b3e6d866a1 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Mar 2023 21:28:07 +0200 Subject: [PATCH 21/39] rpc/chain_head: Fix clippy Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 15e2448672959..421a84375be78 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -377,7 +377,7 @@ impl + 'static> SubscriptionsInner // Sanity check: cannot uphold `chainHead` guarantees anymore. We have not // found any subscriptions that have older pinned blocks to terminate. - let to_remove: Vec<_> = self.subs.iter().map(|(sub_id, _)| sub_id.clone()).collect(); + let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect(); for sub_id in to_remove.into_iter() { if sub_id == request_sub_id { is_terminated = true; From 5561e567658a03111306f07657538949ce0ae16b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 Mar 2023 11:03:53 +0200 Subject: [PATCH 22/39] client/in_mem: ChainHead no longer uses `in_mem::children` Signed-off-by: Alexandru Vasile --- client/api/src/in_mem.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index 25a7ef1146301..815eff872a119 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -440,7 +440,7 @@ impl blockchain::Backend for Blockchain { } fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result> { - Ok(Default::default()) + unimplemented!() } fn indexed_transaction(&self, _hash: Block::Hash) -> sp_blockchain::Result>> { From 9781584bbc3514604fa8aa698749fcfe9348ca06 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Thu, 30 Mar 2023 15:46:14 +0300 Subject: [PATCH 23/39] Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Sebastian Kunert --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 38213332e0699..5e9017b5496d6 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -357,7 +357,7 @@ impl + 'static> SubscriptionsInner Some(duration) => duration > self.local_max_pin_duration, None => true, }; - should_remove.then_some(sub_id.clone()) + should_remove.then(|| sub_id.clone()) }) .collect(); From cc310432f059fa61d5d9c91d6afbe12c6051f912 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Thu, 30 Mar 2023 15:46:24 +0300 Subject: [PATCH 24/39] Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Sebastian Kunert --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 5e9017b5496d6..69aaab52fc9ab 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -362,7 +362,7 @@ impl + 'static> SubscriptionsInner .collect(); let mut is_terminated = false; - for sub_id in to_remove.into_iter() { + for sub_id in to_remove { if sub_id == request_sub_id { is_terminated = true; } From efb14c47197de13f8b9b23518268c8c2e33b3c03 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Thu, 30 Mar 2023 15:46:36 +0300 Subject: [PATCH 25/39] Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Sebastian Kunert --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 69aaab52fc9ab..2294f359d79f0 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -377,7 +377,7 @@ impl + 'static> SubscriptionsInner // Sanity check: cannot uphold `chainHead` guarantees anymore. We have not // found any subscriptions that have older pinned blocks to terminate. let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect(); - for sub_id in to_remove.into_iter() { + for sub_id in to_remove { if sub_id == request_sub_id { is_terminated = true; } From 10b61d533d7509b466be695e799e14b238415e67 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Thu, 30 Mar 2023 15:47:53 +0300 Subject: [PATCH 26/39] Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Sebastian Kunert --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 2294f359d79f0..83a247ac21cf6 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -179,7 +179,7 @@ impl SubscriptionState { return false } - // If `pin` was called twice unregister the block now. + // If `register_block` was called twice unregister the block now. if block_state.can_unregister { occupied.remove(); } else { From c5266cc823d8036b8741396859edf5ca164c995b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 30 Mar 2023 16:47:12 +0300 Subject: [PATCH 27/39] chain_head: Add block state machine Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription.rs | 195 ++++++++++++++---- 1 file changed, 152 insertions(+), 43 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 83a247ac21cf6..dac0e7f411385 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -75,26 +75,85 @@ impl From for SubscriptionManagementError { } } -/// The state of a block of a single subscription ID. -struct BlockState { - /// True if the block can be unregistered from the - /// internal memory of the subscription. +/// The state machine of a block of a single subscription ID. +/// +/// # Motivation +/// +/// Each block is registered twice: once from the `BestBlock` event +/// and once from the `Finalized` event. +/// +/// The state of a block must be tracked until both events register the +/// block and the user calls `unpin`. +/// +/// Otherwise, the following race might happen: +/// T0. BestBlock event: hash is tracked and pinned in backend. +/// T1. User calls unpin: hash is untracked and unpinned in backend. +/// T2. Finalized event: hash is tracked (no previous history) and pinned again. +/// +/// # State Machine Transition +/// +/// ```ignore +/// (register) +/// [ REGISTERED ] ---------------> [ FULLY REGISTERED ] +/// | | +/// | (unpin) | (unpin) +/// | | +/// V (register) V +/// [ UNPINNED ] -----------------> [ FULLY UNPINNED ] +/// ``` +#[derive(Debug, Clone, PartialEq)] +enum BlockStateMachine { + /// The block was registered by one event (either `Finalized` or `BestBlock` event). + /// + /// Unpin was not called. + Registered, + /// The block was registered by both events (`Finalized` and `BestBlock` events). /// - /// Each block is registered twice: once from the `BestBlock` event - /// and once from the `Finalized` event. This becomes true when - /// both events registered the block. + /// Unpin was not called. + FullyRegistered, + /// The block was registered by one event (either `Finalized` or `BestBlock` event), /// - /// Field is added to avoid losing track of the block for the following - /// timeline: - /// T0. BestBlock event: hash is tracked and pinned in backend. - /// T1. User calls unpin: hash is untracked and unpinned in backend. - /// T2. Finalized event: hash is tracked (no previous history) and pinned again. - can_unregister: bool, - /// True if the block was unpinned. + /// Unpin __was__ called. + Unpinned, + /// The block was registered by both events (`Finalized` and `BestBlock` events). /// - /// Because of the previous condition, a block can be unregistered - /// only when both `can_unregister` and `was_unpinned` are true. - was_unpinned: bool, + /// Unpin __was__ called. + FullyUnpinned, +} + +impl BlockStateMachine { + fn new() -> Self { + BlockStateMachine::Registered + } + + fn advance_register(&mut self) { + match self { + BlockStateMachine::Registered => *self = BlockStateMachine::FullyRegistered, + BlockStateMachine::Unpinned => *self = BlockStateMachine::FullyUnpinned, + _ => (), + } + } + + fn advance_unpin(&mut self) { + match self { + BlockStateMachine::Registered => *self = BlockStateMachine::Unpinned, + BlockStateMachine::FullyRegistered => *self = BlockStateMachine::FullyUnpinned, + _ => (), + } + } + + fn was_unpinned(&self) -> bool { + match self { + BlockStateMachine::Unpinned => true, + BlockStateMachine::FullyUnpinned => true, + _ => false, + } + } +} + +struct BlockState { + /// The state machine of this block. + state_machine: BlockStateMachine, /// The timestamp when the block was inserted. timestamp: Instant, } @@ -139,26 +198,24 @@ impl SubscriptionState { fn register_block(&mut self, hash: Block::Hash) -> bool { match self.blocks.entry(hash) { Entry::Occupied(mut occupied) => { - let mut block_state = occupied.get_mut(); - if block_state.was_unpinned { - // If `unpin` was called between two events - // unregister the block now. + let block_state = occupied.get_mut(); + + block_state.state_machine.advance_register(); + // Block was registered twice and unpin was called. + if block_state.state_machine == BlockStateMachine::FullyUnpinned { occupied.remove(); - } else { - // Both `BestBlock` and `Finalized` events registered the block. - // Unregister the block on `unpin`. - block_state.can_unregister = true; } + // Second time we register this block. false }, Entry::Vacant(vacant) => { vacant.insert(BlockState { - can_unregister: false, - was_unpinned: false, + state_machine: BlockStateMachine::new(), timestamp: Instant::now(), }); + // First time we register this block. true }, } @@ -172,18 +229,17 @@ impl SubscriptionState { fn unregister_block(&mut self, hash: Block::Hash) -> bool { match self.blocks.entry(hash) { Entry::Occupied(mut occupied) => { - let mut block_state = occupied.get_mut(); + let block_state = occupied.get_mut(); // Cannot unpin a block twice. - if block_state.was_unpinned { + if block_state.state_machine.was_unpinned() { return false } - // If `register_block` was called twice unregister the block now. - if block_state.can_unregister { + block_state.state_machine.advance_unpin(); + // Block was registered twice and unpin was called. + if block_state.state_machine == BlockStateMachine::FullyUnpinned { occupied.remove(); - } else { - block_state.was_unpinned = true; } true @@ -204,7 +260,7 @@ impl SubscriptionState { }; // Subscription no longer contains the block if `unpin` was called. - !state.was_unpinned + !state.state_machine.was_unpinned() } /// Get the timestamp of the oldest inserted block. @@ -321,7 +377,7 @@ impl + 'static> SubscriptionsInner sub.stop(); for (hash, state) in sub.blocks.iter() { - if !state.was_unpinned { + if !state.state_machine.was_unpinned() { self.global_unpin_block(*hash); } } @@ -616,6 +672,63 @@ mod tests { (backend, client) } + #[test] + fn block_state_machine_register_unpin() { + let mut state = BlockStateMachine::new(); + // Starts in `Registered` state. + assert_eq!(state, BlockStateMachine::Registered); + + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyRegistered); + + // Can call register multiple times. + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyRegistered); + + assert!(!state.was_unpinned()); + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + assert!(state.was_unpinned()); + + // Can call unpin multiple times. + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + assert!(state.was_unpinned()); + + // Nothing to advance. + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + } + + #[test] + fn block_state_machine_unpin_register() { + let mut state = BlockStateMachine::new(); + // Starts in `Registered` state. + assert_eq!(state, BlockStateMachine::Registered); + + assert!(!state.was_unpinned()); + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::Unpinned); + assert!(state.was_unpinned()); + + // Can call unpin multiple times. + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::Unpinned); + assert!(state.was_unpinned()); + + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + assert!(state.was_unpinned()); + + // Nothing to advance. + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + // Nothing to unpin. + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + assert!(state.was_unpinned()); + } + #[test] fn sub_state_register_twice() { let mut sub_state = SubscriptionState:: { @@ -628,13 +741,11 @@ mod tests { assert_eq!(sub_state.register_block(hash), true); let block_state = sub_state.blocks.get(&hash).unwrap(); // Did not call `register_block` twice. - assert_eq!(block_state.can_unregister, false); - assert_eq!(block_state.was_unpinned, false); + assert_eq!(block_state.state_machine, BlockStateMachine::Registered); assert_eq!(sub_state.register_block(hash), false); let block_state = sub_state.blocks.get(&hash).unwrap(); - assert_eq!(block_state.can_unregister, true); - assert_eq!(block_state.was_unpinned, false); + assert_eq!(block_state.state_machine, BlockStateMachine::FullyRegistered); // Block is no longer tracked when: `register_block` is called twice and // `unregister_block` is called once. @@ -658,14 +769,12 @@ mod tests { assert_eq!(sub_state.register_block(hash), true); let block_state = sub_state.blocks.get(&hash).unwrap(); // Did not call `register_block` twice. - assert_eq!(block_state.can_unregister, false); - assert_eq!(block_state.was_unpinned, false); + assert_eq!(block_state.state_machine, BlockStateMachine::Registered); // Unregister block before the second `register_block`. assert_eq!(sub_state.unregister_block(hash), true); let block_state = sub_state.blocks.get(&hash).unwrap(); - assert_eq!(block_state.can_unregister, false); - assert_eq!(block_state.was_unpinned, true); + assert_eq!(block_state.state_machine, BlockStateMachine::Unpinned); assert_eq!(sub_state.register_block(hash), false); let block_state = sub_state.blocks.get(&hash); From 7bf9da087fe86f4eef89ea68bbd07cdf598a4e8a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 24 Apr 2023 11:37:11 +0300 Subject: [PATCH 28/39] Address feedback Signed-off-by: Alexandru Vasile --- client/api/src/in_mem.rs | 3 +-- client/rpc-spec-v2/src/chain_head/subscription.rs | 10 +++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index 815eff872a119..cd2a96c7f40f3 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -805,8 +805,7 @@ where fn pin_block(&self, hash: ::Hash) -> blockchain::Result<()> { let mut blocks = self.pinned_blocks.write(); - blocks.entry(hash).and_modify(|counter| *counter += 1).or_insert(1); - + *blocks.entry(hash).or_default() += 1; Ok(()) } diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index dac0e7f411385..8f0baf4f0caeb 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -225,7 +225,7 @@ impl SubscriptionState { /// /// Returns: /// - true if the block can be unpinned. - /// - false if the subscription does not contain the block. + /// - false if the subscription does not contain the block or it was unpinned. fn unregister_block(&mut self, hash: Block::Hash) -> bool { match self.blocks.entry(hash) { Entry::Occupied(mut occupied) => { @@ -264,7 +264,11 @@ impl SubscriptionState { } /// Get the timestamp of the oldest inserted block. - fn oldest_block_timestamp(&self) -> Instant { + /// + /// # Note + /// + /// This iterates over all the blocks of the subscription. + fn find_oldest_block_timestamp(&self) -> Instant { let mut timestamp = Instant::now(); for (_, state) in self.blocks.iter() { timestamp = std::cmp::min(timestamp, state.timestamp); @@ -407,7 +411,7 @@ impl + 'static> SubscriptionsInner .subs .iter_mut() .filter_map(|(sub_id, sub)| { - let sub_time = sub.oldest_block_timestamp(); + let sub_time = sub.find_oldest_block_timestamp(); // Subscriptions older than the specified pin duration should be removed. let should_remove = match now.checked_duration_since(sub_time) { Some(duration) => duration > self.local_max_pin_duration, From 6a3f9e5dd60cc2e799335e1d299cdbf17859b56f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 24 Apr 2023 12:22:37 +0300 Subject: [PATCH 29/39] Use new_native_or_wasm_executor Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription.rs | 2 +- client/rpc-spec-v2/src/chain_head/tests.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 8f0baf4f0caeb..f823644edc8e4 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -651,7 +651,7 @@ mod tests { Arc>>, ) { let backend = Arc::new(sc_client_api::in_mem::Backend::new()); - let executor = substrate_test_runtime_client::new_native_executor(); + let executor = substrate_test_runtime_client::new_native_or_wasm_executor(); let client_config = sc_service::ClientConfig::default(); let genesis_block_builder = sc_service::GenesisBlockBuilder::new( &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(), diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index b6805592592cc..76644ccb472e8 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1346,7 +1346,7 @@ async fn follow_report_multiple_pruned_block() { async fn pin_block_references() { // Manually construct an in-memory backend and client. let backend = Arc::new(sc_client_api::in_mem::Backend::new()); - let executor = substrate_test_runtime_client::new_native_executor(); + let executor = substrate_test_runtime_client::new_native_or_wasm_executor(); let client_config = sc_service::ClientConfig::default(); let genesis_block_builder = sc_service::GenesisBlockBuilder::new( From d287533c0270e2e1699f85dfdff10bfb97e4f6f3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Apr 2023 10:37:00 +0300 Subject: [PATCH 30/39] chain_head: Remove 'static on Backend Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/chain_head.rs | 4 ++-- .../rpc-spec-v2/src/chain_head/subscription.rs | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 6f41a88ea2b70..ff9a805299d94 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -49,7 +49,7 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; /// An API for chain head RPC calls. -pub struct ChainHead + 'static, Block: BlockT, Client> { +pub struct ChainHead, Block: BlockT, Client> { /// Substrate client. client: Arc, /// Backend of the chain. @@ -64,7 +64,7 @@ pub struct ChainHead + 'static, Block: BlockT, Client> { _phantom: PhantomData, } -impl + 'static, Block: BlockT, Client> ChainHead { +impl, Block: BlockT, Client> ChainHead { /// Create a new [`ChainHead`]. pub fn new>( client: Arc, diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index f823644edc8e4..b4c5e3cbe0728 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -277,7 +277,7 @@ impl SubscriptionState { } } -pub struct BlockGuard + 'static> { +pub struct BlockGuard> { hash: Block::Hash, runtime_updates: bool, backend: Arc, @@ -285,13 +285,13 @@ pub struct BlockGuard + 'static> { // Custom implementation of Debug to avoid bounds on `backend: Debug` for `unwrap_err()` needed for // testing. -impl + 'static> std::fmt::Debug for BlockGuard { +impl> std::fmt::Debug for BlockGuard { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "BlockGuard hash {:?} runtime_updates {:?}", self.hash, self.runtime_updates) } } -impl + 'static> BlockGuard { +impl> BlockGuard { /// Construct a new [`BlockGuard`] . fn new( hash: Block::Hash, @@ -311,13 +311,13 @@ impl + 'static> BlockGuard { } } -impl + 'static> Drop for BlockGuard { +impl> Drop for BlockGuard { fn drop(&mut self) { self.backend.unpin_block(self.hash); } } -struct SubscriptionsInner + 'static> { +struct SubscriptionsInner> { /// Reference count the block hashes across all subscriptions. /// /// The pinned blocks cannot exceed the [`Self::global_limit`] limit. @@ -335,7 +335,7 @@ struct SubscriptionsInner + 'static> { backend: Arc, } -impl + 'static> SubscriptionsInner { +impl> SubscriptionsInner { /// Construct a new [`SubscriptionsInner`] from the specified limits. fn new( global_max_pinned_blocks: usize, @@ -539,13 +539,13 @@ impl + 'static> SubscriptionsInner } /// Manage block pinning / unpinning for subscription IDs. -pub struct SubscriptionManagement + 'static> { +pub struct SubscriptionManagement> { /// Manage subscription by mapping the subscription ID /// to a set of block hashes. inner: RwLock>, } -impl + 'static> SubscriptionManagement { +impl> SubscriptionManagement { /// Construct a new [`SubscriptionManagement`]. pub fn new( global_max_pinned_blocks: usize, From 6073ae2fac7fe9e8ab531619d82b3d7b43739c2c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Apr 2023 11:15:22 +0300 Subject: [PATCH 31/39] chain_head: Add documentation Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index b4c5e3cbe0728..003ae52dda058 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -277,6 +277,9 @@ impl SubscriptionState { } } +/// Keeps a specific block pinned while the handle is alive. +/// This object ensures that the block is not unpinned while +/// executing an RPC method call. pub struct BlockGuard> { hash: Block::Hash, runtime_updates: bool, From 871dcb850505fb35b7911ea9a3694cd4f4ab7dcc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Apr 2023 11:15:58 +0300 Subject: [PATCH 32/39] chain_head: Lock blocks before async blocks Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 122 +++++++++--------- 1 file changed, 62 insertions(+), 60 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index ff9a805299d94..763fc5d9acc5d 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -203,27 +203,28 @@ where let client = self.client.clone(); let subscriptions = self.subscriptions.clone(); - let fut = async move { - let _block_guard = match subscriptions.lock_block(&follow_subscription, hash) { - Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) => { - // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return - }, - Err(SubscriptionManagementError::BlockHashAbsent) => { - // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - }, - Err(error) => { - let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { - error: error.to_string(), - })); - return - }, - }; + let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return Ok(()) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return Ok(()) + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return Ok(()) + }, + }; + let fut = async move { + let _block_guard = block_guard; let event = match client.block(hash) { Ok(Some(signed_block)) => { let extrinsics = signed_block.block.extrinsics(); @@ -298,27 +299,28 @@ where let client = self.client.clone(); let subscriptions = self.subscriptions.clone(); - let fut = async move { - let _block_guard = match subscriptions.lock_block(&follow_subscription, hash) { - Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) => { - // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return - }, - Err(SubscriptionManagementError::BlockHashAbsent) => { - // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - }, - Err(error) => { - let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { - error: error.to_string(), - })); - return - }, - }; + let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return Ok(()) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return Ok(()) + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return Ok(()) + }, + }; + let fut = async move { + let _block_guard = block_guard; // The child key is provided, use the key to query the child trie. if let Some(child_key) = child_key { // The child key must not be prefixed with ":child_storage:" nor @@ -387,27 +389,27 @@ where let client = self.client.clone(); let subscriptions = self.subscriptions.clone(); - let fut = async move { - let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { - Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) => { - // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return - }, - Err(SubscriptionManagementError::BlockHashAbsent) => { - // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - }, - Err(error) => { - let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { - error: error.to_string(), - })); - return - }, - }; + let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + let _ = sink.send(&ChainHeadEvent::::Disjoint); + return Ok(()) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + let _ = sink.reject(ChainHeadRpcError::InvalidBlock); + return Ok(()) + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return Ok(()) + }, + }; + let fut = async move { // Reject subscription if runtime_updates is false. if !block_guard.has_runtime_updates() { let _ = sink.reject(ChainHeadRpcError::InvalidParam( From 083e3c3a7d20edd0cd320b113074828017c95794 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Apr 2023 11:18:38 +0300 Subject: [PATCH 33/39] chain_head_follower: Remove static on backend Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/chain_head_follow.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 37a988501da29..f496f07a37b18 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -44,7 +44,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::{collections::HashSet, sync::Arc}; /// Generates the events of the `chainHead_follow` method. -pub struct ChainHeadFollower + 'static, Block: BlockT, Client> { +pub struct ChainHeadFollower, Block: BlockT, Client> { /// Substrate client. client: Arc, /// Backend of the chain. @@ -59,7 +59,7 @@ pub struct ChainHeadFollower + 'static, Block: BlockT, Client best_block_cache: Option, } -impl + 'static, Block: BlockT, Client> ChainHeadFollower { +impl, Block: BlockT, Client> ChainHeadFollower { /// Create a new [`ChainHeadFollower`]. pub fn new( client: Arc, From 1bb4b00bf13ae210f22865e131a6aba7760e5efd Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 25 Apr 2023 11:34:32 +0300 Subject: [PATCH 34/39] Update client/service/src/builder.rs Co-authored-by: Davide Galassi --- client/service/src/builder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index c7d0148f30354..c64d891d201ad 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -679,6 +679,7 @@ where // subscription can be terminated. // Note: This should be enough for immediate blocks. const MAX_PINNED_SECONDS: u64 = 60; + let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new( client.clone(), backend.clone(), From 04a167c4c6b613f0d47bcecb6bafdb6e8efe8d12 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 25 Apr 2023 11:34:44 +0300 Subject: [PATCH 35/39] Update client/service/src/builder.rs Co-authored-by: Davide Galassi --- client/service/src/builder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index c64d891d201ad..8d4532a72fd3a 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -675,6 +675,7 @@ where // This number is large enough to consider immediate blocks. // Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db. const MAX_PINNED_BLOCKS: usize = 512; + // Maximum number of seconds that a block can be pinned before the // subscription can be terminated. // Note: This should be enough for immediate blocks. From cc24ad1f330908c8671634278ea8167d6e4e282c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Apr 2023 11:36:43 +0300 Subject: [PATCH 36/39] chain_head: Add BlockHeaderAbsent to the PartialEq impl Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index 003ae52dda058..e33ce2bbd302b 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -62,6 +62,7 @@ impl PartialEq for SubscriptionManagementError { // Not needed for testing. (Self::Blockchain(_), Self::Blockchain(_)) | (Self::BlockHashAbsent, Self::BlockHashAbsent) | + (Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) | (Self::SubscriptionAbsent, Self::SubscriptionAbsent) => true, (Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs, _ => false, From b3ca736d91c432c4e67cc46faa3b29a585cbe374 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Apr 2023 15:20:50 +0300 Subject: [PATCH 37/39] client: Add better documentation around pinning constants Signed-off-by: Alexandru Vasile --- client/service/src/builder.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 8d4532a72fd3a..d338ceb78958d 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -676,8 +676,9 @@ where // Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db. const MAX_PINNED_BLOCKS: usize = 512; - // Maximum number of seconds that a block can be pinned before the - // subscription can be terminated. + // Any block of any subscription should not be pinned more than + // this constant. When a subscription contains a block older than this, + // the subscription becomes subject to termination. // Note: This should be enough for immediate blocks. const MAX_PINNED_SECONDS: u64 = 60; From 3b1764f7c155ec378f0abc8d39bda98908e3c2ae Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Apr 2023 15:57:02 +0300 Subject: [PATCH 38/39] chain_head: Move subscription to dedicated module Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/error.rs | 66 ++++++++ .../inner.rs} | 160 +----------------- .../src/chain_head/subscription/mod.rs | 125 ++++++++++++++ 3 files changed, 199 insertions(+), 152 deletions(-) create mode 100644 client/rpc-spec-v2/src/chain_head/subscription/error.rs rename client/rpc-spec-v2/src/chain_head/{subscription.rs => subscription/inner.rs} (86%) create mode 100644 client/rpc-spec-v2/src/chain_head/subscription/mod.rs diff --git a/client/rpc-spec-v2/src/chain_head/subscription/error.rs b/client/rpc-spec-v2/src/chain_head/subscription/error.rs new file mode 100644 index 0000000000000..443ee9fb87a25 --- /dev/null +++ b/client/rpc-spec-v2/src/chain_head/subscription/error.rs @@ -0,0 +1,66 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use sp_blockchain::Error; + +/// Subscription management error. +#[derive(Debug, thiserror::Error)] +pub enum SubscriptionManagementError { + /// The block cannot be pinned into memory because + /// the subscription has exceeded the maximum number + /// of blocks pinned. + #[error("Exceeded pinning limits")] + ExceededLimits, + /// Error originated from the blockchain (client or backend). + #[error("Blockchain error {0}")] + Blockchain(Error), + /// The database does not contain a block hash. + #[error("Block hash is absent")] + BlockHashAbsent, + /// The database does not contain a block header. + #[error("Block header is absent")] + BlockHeaderAbsent, + /// The specified subscription ID is not present. + #[error("Subscription is absent")] + SubscriptionAbsent, + /// Custom error. + #[error("Subscription error {0}")] + Custom(String), +} + +// Blockchain error does not implement `PartialEq` needed for testing. +impl PartialEq for SubscriptionManagementError { + fn eq(&self, other: &SubscriptionManagementError) -> bool { + match (self, other) { + (Self::ExceededLimits, Self::ExceededLimits) | + // Not needed for testing. + (Self::Blockchain(_), Self::Blockchain(_)) | + (Self::BlockHashAbsent, Self::BlockHashAbsent) | + (Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) | + (Self::SubscriptionAbsent, Self::SubscriptionAbsent) => true, + (Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs, + _ => false, + } + } +} + +impl From for SubscriptionManagementError { + fn from(err: Error) -> Self { + SubscriptionManagementError::Blockchain(err) + } +} diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs similarity index 86% rename from client/rpc-spec-v2/src/chain_head/subscription.rs rename to client/rpc-spec-v2/src/chain_head/subscription/inner.rs index e33ce2bbd302b..1b167a2af0325 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -16,12 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! Subscription management for tracking subscription IDs to pinned blocks. - use futures::channel::oneshot; -use parking_lot::RwLock; use sc_client_api::Backend; -use sp_blockchain::Error; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, @@ -29,52 +25,7 @@ use std::{ time::{Duration, Instant}, }; -/// Subscription management error. -#[derive(Debug, thiserror::Error)] -pub enum SubscriptionManagementError { - /// The block cannot be pinned into memory because - /// the subscription has exceeded the maximum number - /// of blocks pinned. - #[error("Exceeded pinning limits")] - ExceededLimits, - /// Error originated from the blockchain (client or backend). - #[error("Blockchain error {0}")] - Blockchain(Error), - /// The database does not contain a block hash. - #[error("Block hash is absent")] - BlockHashAbsent, - /// The database does not contain a block header. - #[error("Block header is absent")] - BlockHeaderAbsent, - /// The specified subscription ID is not present. - #[error("Subscription is absent")] - SubscriptionAbsent, - /// Custom error. - #[error("Subscription error {0}")] - Custom(String), -} - -// Blockchain error does not implement `PartialEq` needed for testing. -impl PartialEq for SubscriptionManagementError { - fn eq(&self, other: &SubscriptionManagementError) -> bool { - match (self, other) { - (Self::ExceededLimits, Self::ExceededLimits) | - // Not needed for testing. - (Self::Blockchain(_), Self::Blockchain(_)) | - (Self::BlockHashAbsent, Self::BlockHashAbsent) | - (Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) | - (Self::SubscriptionAbsent, Self::SubscriptionAbsent) => true, - (Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs, - _ => false, - } - } -} - -impl From for SubscriptionManagementError { - fn from(err: Error) -> Self { - SubscriptionManagementError::Blockchain(err) - } -} +use crate::chain_head::subscription::SubscriptionManagementError; /// The state machine of a block of a single subscription ID. /// @@ -321,7 +272,7 @@ impl> Drop for BlockGuard { } } -struct SubscriptionsInner> { +pub struct SubscriptionsInner> { /// Reference count the block hashes across all subscriptions. /// /// The pinned blocks cannot exceed the [`Self::global_limit`] limit. @@ -341,7 +292,7 @@ struct SubscriptionsInner> { impl> SubscriptionsInner { /// Construct a new [`SubscriptionsInner`] from the specified limits. - fn new( + pub fn new( global_max_pinned_blocks: usize, local_max_pin_duration: Duration, backend: Arc, @@ -356,7 +307,7 @@ impl> SubscriptionsInner { } /// Insert a new subscription ID. - fn insert_subscription( + pub fn insert_subscription( &mut self, sub_id: String, runtime_updates: bool, @@ -376,7 +327,7 @@ impl> SubscriptionsInner { } /// Remove the subscription ID with associated pinned blocks. - fn remove_subscription(&mut self, sub_id: &str) { + pub fn remove_subscription(&mut self, sub_id: &str) { let Some(mut sub) = self.subs.remove(sub_id) else { return }; @@ -450,7 +401,7 @@ impl> SubscriptionsInner { return is_terminated } - fn pin_block( + pub fn pin_block( &mut self, sub_id: &str, hash: Block::Hash, @@ -506,7 +457,7 @@ impl> SubscriptionsInner { } } - fn unpin_block( + pub fn unpin_block( &mut self, sub_id: &str, hash: Block::Hash, @@ -525,7 +476,7 @@ impl> SubscriptionsInner { Ok(()) } - fn lock_block( + pub fn lock_block( &mut self, sub_id: &str, hash: Block::Hash, @@ -542,101 +493,6 @@ impl> SubscriptionsInner { } } -/// Manage block pinning / unpinning for subscription IDs. -pub struct SubscriptionManagement> { - /// Manage subscription by mapping the subscription ID - /// to a set of block hashes. - inner: RwLock>, -} - -impl> SubscriptionManagement { - /// Construct a new [`SubscriptionManagement`]. - pub fn new( - global_max_pinned_blocks: usize, - local_max_pin_duration: Duration, - backend: Arc, - ) -> Self { - SubscriptionManagement { - inner: RwLock::new(SubscriptionsInner::new( - global_max_pinned_blocks, - local_max_pin_duration, - backend, - )), - } - } - - /// Insert a new subscription ID. - /// - /// If the subscription was not previously inserted, returns the receiver that is - /// triggered upon the "Stop" event. Otherwise, if the subscription ID was already - /// inserted returns none. - pub fn insert_subscription( - &self, - sub_id: String, - runtime_updates: bool, - ) -> Option> { - let mut inner = self.inner.write(); - inner.insert_subscription(sub_id, runtime_updates) - } - - /// Remove the subscription ID with associated pinned blocks. - pub fn remove_subscription(&self, sub_id: &str) { - let mut inner = self.inner.write(); - inner.remove_subscription(sub_id) - } - - /// The block is pinned in the backend only once when the block's hash is first encountered. - /// - /// Each subscription is expected to call this method twice: - /// - once from the `NewBlock` import - /// - once from the `Finalized` import - /// - /// Returns - /// - Ok(true) if the subscription did not previously contain this block - /// - Ok(false) if the subscription already contained this this - /// - Error if the backend failed to pin the block or the subscription ID is invalid - pub fn pin_block( - &self, - sub_id: &str, - hash: Block::Hash, - ) -> Result { - let mut inner = self.inner.write(); - inner.pin_block(sub_id, hash) - } - - /// Unpin the block from the subscription. - /// - /// The last subscription that unpins the block is also unpinning the block - /// from the backend. - /// - /// This method is called only once per subscription. - /// - /// Returns an error if the block is not pinned for the subscription or - /// the subscription ID is invalid. - pub fn unpin_block( - &self, - sub_id: &str, - hash: Block::Hash, - ) -> Result<(), SubscriptionManagementError> { - let mut inner = self.inner.write(); - inner.unpin_block(sub_id, hash) - } - - /// Ensure the block remains pinned until the return object is dropped. - /// - /// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner. - /// Returns an error if the block hash is not pinned for the subscription or - /// the subscription ID is invalid. - pub fn lock_block( - &self, - sub_id: &str, - hash: Block::Hash, - ) -> Result, SubscriptionManagementError> { - let mut inner = self.inner.write(); - inner.lock_block(sub_id, hash) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs new file mode 100644 index 0000000000000..86e55acc4c176 --- /dev/null +++ b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -0,0 +1,125 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use futures::channel::oneshot; +use parking_lot::RwLock; +use sc_client_api::Backend; +use sp_runtime::traits::Block as BlockT; +use std::{sync::Arc, time::Duration}; + +mod error; +mod inner; + +pub use error::SubscriptionManagementError; +pub use inner::BlockGuard; +use inner::SubscriptionsInner; + +/// Manage block pinning / unpinning for subscription IDs. +pub struct SubscriptionManagement> { + /// Manage subscription by mapping the subscription ID + /// to a set of block hashes. + inner: RwLock>, +} + +impl> SubscriptionManagement { + /// Construct a new [`SubscriptionManagement`]. + pub fn new( + global_max_pinned_blocks: usize, + local_max_pin_duration: Duration, + backend: Arc, + ) -> Self { + SubscriptionManagement { + inner: RwLock::new(SubscriptionsInner::new( + global_max_pinned_blocks, + local_max_pin_duration, + backend, + )), + } + } + + /// Insert a new subscription ID. + /// + /// If the subscription was not previously inserted, returns the receiver that is + /// triggered upon the "Stop" event. Otherwise, if the subscription ID was already + /// inserted returns none. + pub fn insert_subscription( + &self, + sub_id: String, + runtime_updates: bool, + ) -> Option> { + let mut inner = self.inner.write(); + inner.insert_subscription(sub_id, runtime_updates) + } + + /// Remove the subscription ID with associated pinned blocks. + pub fn remove_subscription(&self, sub_id: &str) { + let mut inner = self.inner.write(); + inner.remove_subscription(sub_id) + } + + /// The block is pinned in the backend only once when the block's hash is first encountered. + /// + /// Each subscription is expected to call this method twice: + /// - once from the `NewBlock` import + /// - once from the `Finalized` import + /// + /// Returns + /// - Ok(true) if the subscription did not previously contain this block + /// - Ok(false) if the subscription already contained this this + /// - Error if the backend failed to pin the block or the subscription ID is invalid + pub fn pin_block( + &self, + sub_id: &str, + hash: Block::Hash, + ) -> Result { + let mut inner = self.inner.write(); + inner.pin_block(sub_id, hash) + } + + /// Unpin the block from the subscription. + /// + /// The last subscription that unpins the block is also unpinning the block + /// from the backend. + /// + /// This method is called only once per subscription. + /// + /// Returns an error if the block is not pinned for the subscription or + /// the subscription ID is invalid. + pub fn unpin_block( + &self, + sub_id: &str, + hash: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { + let mut inner = self.inner.write(); + inner.unpin_block(sub_id, hash) + } + + /// Ensure the block remains pinned until the return object is dropped. + /// + /// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner. + /// Returns an error if the block hash is not pinned for the subscription or + /// the subscription ID is invalid. + pub fn lock_block( + &self, + sub_id: &str, + hash: Block::Hash, + ) -> Result, SubscriptionManagementError> { + let mut inner = self.inner.write(); + inner.lock_block(sub_id, hash) + } +} From 8b0eaff0e911f583dc655c7e0de130d70ceb717b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 26 Apr 2023 21:30:11 +0300 Subject: [PATCH 39/39] subscription: Rename global pin / unpin functions Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 1b167a2af0325..8865daa83cba2 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -337,7 +337,7 @@ impl> SubscriptionsInner { for (hash, state) in sub.blocks.iter() { if !state.state_machine.was_unpinned() { - self.global_unpin_block(*hash); + self.global_unregister_block(*hash); } } } @@ -424,11 +424,18 @@ impl> SubscriptionsInner { } } - self.global_pin_block(hash)?; + self.global_register_block(hash)?; Ok(true) } - fn global_pin_block(&mut self, hash: Block::Hash) -> Result<(), SubscriptionManagementError> { + /// Register the block internally. + /// + /// If the block is present the reference counter is increased. + /// If this is a new block, the block is pinned in the backend. + fn global_register_block( + &mut self, + hash: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { match self.global_blocks.entry(hash) { Entry::Occupied(mut occupied) => { *occupied.get_mut() += 1; @@ -444,7 +451,12 @@ impl> SubscriptionsInner { Ok(()) } - fn global_unpin_block(&mut self, hash: Block::Hash) { + /// Unregister the block internally. + /// + /// If the block is present the reference counter is decreased. + /// If this is the last reference of the block, the block + /// is unpinned from the backend and removed from internal tracking. + fn global_unregister_block(&mut self, hash: Block::Hash) { if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) { let counter = occupied.get_mut(); if *counter == 1 { @@ -472,7 +484,7 @@ impl> SubscriptionsInner { return Err(SubscriptionManagementError::BlockHashAbsent) } - self.global_unpin_block(hash); + self.global_unregister_block(hash); Ok(()) }