diff --git a/Cargo.lock b/Cargo.lock index 8d8d2d59a3c88..765ce24556884 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9440,6 +9440,7 @@ dependencies = [ "sc-block-builder", "sc-chain-spec", "sc-client-api", + "sc-service", "sc-transaction-pool-api", "sc-utils", "serde", diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index 27a74ddd79ed6..cd2a96c7f40f3 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -624,6 +624,7 @@ where states: RwLock>>>, blockchain: Blockchain, import_lock: RwLock<()>, + pinned_blocks: RwLock>, } impl Backend @@ -631,13 +632,28 @@ where Block::Hash: Ord, { /// Create a new instance of in-mem backend. + /// + /// # Warning + /// + /// For testing purposes only! pub fn new() -> Self { Backend { states: RwLock::new(HashMap::new()), blockchain: Blockchain::new(), import_lock: Default::default(), + pinned_blocks: Default::default(), } } + + /// Return the number of references active for a pinned block. + /// + /// # Warning + /// + /// For testing purposes only! + pub fn pin_refs(&self, hash: &::Hash) -> Option { + let blocks = self.pinned_blocks.read(); + blocks.get(hash).map(|value| *value) + } } impl backend::AuxStore for Backend @@ -787,11 +803,16 @@ where false } - fn pin_block(&self, _: ::Hash) -> blockchain::Result<()> { + fn pin_block(&self, hash: ::Hash) -> blockchain::Result<()> { + let mut blocks = self.pinned_blocks.write(); + *blocks.entry(hash).or_default() += 1; Ok(()) } - fn unpin_block(&self, _: ::Hash) {} + fn unpin_block(&self, hash: ::Hash) { + let mut blocks = self.pinned_blocks.write(); + blocks.entry(hash).and_modify(|counter| *counter -= 1).or_insert(-1); + } } impl backend::LocalBackend for Backend where Block::Hash: Ord {} diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index 23b96877f3b17..1f0cac18d324e 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -43,5 +43,6 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime" sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" } sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } +sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" } sc-utils = { version = "4.0.0-dev", path = "../utils" } assert_matches = "1.3.0" diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index c63e874c1bc15..763fc5d9acc5d 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -24,7 +24,7 @@ use crate::{ chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig}, - subscription::SubscriptionManagement, + subscription::{SubscriptionManagement, SubscriptionManagementError}, }, SubscriptionTaskExecutor, }; @@ -44,12 +44,12 @@ use sp_api::CallApiAt; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes}; use sp_runtime::traits::Block as BlockT; -use std::{marker::PhantomData, sync::Arc}; +use std::{marker::PhantomData, sync::Arc, time::Duration}; pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; /// An API for chain head RPC calls. -pub struct ChainHead { +pub struct ChainHead, Block: BlockT, Client> { /// Substrate client. client: Arc, /// Backend of the chain. @@ -57,16 +57,14 @@ pub struct ChainHead { /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, /// Keep track of the pinned blocks for each subscription. - subscriptions: Arc>, + subscriptions: Arc>, /// The hexadecimal encoded hash of the genesis block. genesis_hash: String, - /// The maximum number of pinned blocks allowed per connection. - max_pinned_blocks: usize, /// Phantom member to pin the block type. _phantom: PhantomData, } -impl ChainHead { +impl, Block: BlockT, Client> ChainHead { /// Create a new [`ChainHead`]. pub fn new>( client: Arc, @@ -74,16 +72,20 @@ impl ChainHead { executor: SubscriptionTaskExecutor, genesis_hash: GenesisHash, max_pinned_blocks: usize, + max_pinned_duration: Duration, ) -> Self { let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref())); Self { client, - backend, + backend: backend.clone(), executor, - subscriptions: Arc::new(SubscriptionManagement::new()), + subscriptions: Arc::new(SubscriptionManagement::new( + max_pinned_blocks, + max_pinned_duration, + backend, + )), genesis_hash, - max_pinned_blocks, _phantom: PhantomData, } } @@ -159,9 +161,8 @@ where return Err(err) }, }; - // Keep track of the subscription. - let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else { + let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates) else { // Inserting the subscription can only fail if the JsonRPSee // generated a duplicate subscription ID. debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id); @@ -177,7 +178,7 @@ where let mut chain_head_follow = ChainHeadFollower::new( client, backend, - sub_handle, + subscriptions.clone(), runtime_updates, sub_id.clone(), ); @@ -202,19 +203,28 @@ where let client = self.client.clone(); let subscriptions = self.subscriptions.clone(); - let fut = async move { - let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { + let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. let _ = sink.send(&ChainHeadEvent::::Disjoint); - return - }; - - // Block is not part of the subscription. - if !handle.contains_block(&hash) { + return Ok(()) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - } + return Ok(()) + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return Ok(()) + }, + }; + let fut = async move { + let _block_guard = block_guard; let event = match client.block(hash) { Ok(Some(signed_block)) => { let extrinsics = signed_block.block.extrinsics(); @@ -226,10 +236,10 @@ where debug!( target: LOG_TARGET, "[body][id={:?}] Stopping subscription because hash={:?} was pruned", - follow_subscription, + &follow_subscription, hash ); - handle.stop(); + subscriptions.remove_subscription(&follow_subscription); ChainHeadEvent::::Disjoint }, Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }), @@ -246,16 +256,19 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult> { - let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else { - // Invalid invalid subscription ID. - return Ok(None) + let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + return Ok(None) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + return Err(ChainHeadRpcError::InvalidBlock.into()) + }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - // Block is not part of the subscription. - if !handle.contains_block(&hash) { - return Err(ChainHeadRpcError::InvalidBlock.into()) - } - self.client .header(hash) .map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode())))) @@ -286,19 +299,28 @@ where let client = self.client.clone(); let subscriptions = self.subscriptions.clone(); - let fut = async move { - let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { + let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. let _ = sink.send(&ChainHeadEvent::::Disjoint); - return - }; - - // Block is not part of the subscription. - if !handle.contains_block(&hash) { + return Ok(()) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - } + return Ok(()) + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return Ok(()) + }, + }; + let fut = async move { + let _block_guard = block_guard; // The child key is provided, use the key to query the child trie. if let Some(child_key) = child_key { // The child key must not be prefixed with ":child_storage:" nor @@ -367,21 +389,29 @@ where let client = self.client.clone(); let subscriptions = self.subscriptions.clone(); - let fut = async move { - let Some(handle) = subscriptions.get_subscription(&follow_subscription) else { + let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. let _ = sink.send(&ChainHeadEvent::::Disjoint); - return - }; - - // Block is not part of the subscription. - if !handle.contains_block(&hash) { + return Ok(()) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return - } + return Ok(()) + }, + Err(error) => { + let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { + error: error.to_string(), + })); + return Ok(()) + }, + }; + let fut = async move { // Reject subscription if runtime_updates is false. - if !handle.has_runtime_updates() { + if !block_guard.has_runtime_updates() { let _ = sink.reject(ChainHeadRpcError::InvalidParam( "The runtime updates flag must be set".into(), )); @@ -417,15 +447,17 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult<()> { - let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else { - // Invalid invalid subscription ID. - return Ok(()) - }; - - if !handle.unpin_block(&hash) { - return Err(ChainHeadRpcError::InvalidBlock.into()) + match self.subscriptions.unpin_block(&follow_subscription, hash) { + Ok(()) => Ok(()), + Err(SubscriptionManagementError::SubscriptionAbsent) => { + // Invalid invalid subscription ID. + Ok(()) + }, + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + Err(ChainHeadRpcError::InvalidBlock.into()) + }, + Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()), } - - Ok(()) } } diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index a0d19654e7959..f496f07a37b18 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -24,7 +24,7 @@ use crate::chain_head::{ BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, }, - subscription::{SubscriptionHandle, SubscriptionManagementError}, + subscription::{SubscriptionManagement, SubscriptionManagementError}, }; use futures::{ channel::oneshot, @@ -44,13 +44,13 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::{collections::HashSet, sync::Arc}; /// Generates the events of the `chainHead_follow` method. -pub struct ChainHeadFollower { +pub struct ChainHeadFollower, Block: BlockT, Client> { /// Substrate client. client: Arc, /// Backend of the chain. backend: Arc, - /// Subscription handle. - sub_handle: SubscriptionHandle, + /// Subscriptions handle. + sub_handle: Arc>, /// Subscription was started with the runtime updates flag. runtime_updates: bool, /// Subscription ID. @@ -59,12 +59,12 @@ pub struct ChainHeadFollower { best_block_cache: Option, } -impl ChainHeadFollower { +impl, Block: BlockT, Client> ChainHeadFollower { /// Create a new [`ChainHeadFollower`]. pub fn new( client: Arc, backend: Arc, - sub_handle: SubscriptionHandle, + sub_handle: Arc>, runtime_updates: bool, sub_id: String, ) -> Self { @@ -221,7 +221,7 @@ where // The initialized event is the first one sent. let finalized_block_hash = startup_point.finalized_hash; - self.sub_handle.pin_block(finalized_block_hash)?; + self.sub_handle.pin_block(&self.sub_id, finalized_block_hash)?; let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None); @@ -235,7 +235,7 @@ where finalized_block_descendants.push(initialized_event); for (child, parent) in initial_blocks.into_iter() { - self.sub_handle.pin_block(child)?; + self.sub_handle.pin_block(&self.sub_id, child)?; let new_runtime = self.generate_runtime_event(child, Some(parent)); @@ -310,7 +310,7 @@ where startup_point: &StartupPoint, ) -> Result>, SubscriptionManagementError> { // The block was already pinned by the initial block events or by the finalized event. - if !self.sub_handle.pin_block(notification.hash)? { + if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? { return Ok(Default::default()) } @@ -352,7 +352,7 @@ where std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter()); for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() { // Check if the block was already reported and thus, is already pinned. - if !self.sub_handle.pin_block(*hash)? { + if !self.sub_handle.pin_block(&self.sub_id, *hash)? { continue } @@ -564,6 +564,10 @@ where stream_item = stream.next(); stop_event = next_stop_event; } + + // If we got here either the substrate streams have closed + // or the `Stop` receiver was triggered. + let _ = sink.send(&FollowEvent::::Stop); } /// Generate the block events for the `chainHead_follow` method. diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs deleted file mode 100644 index 687374bba5e00..0000000000000 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ /dev/null @@ -1,285 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Subscription management for tracking subscription IDs to pinned blocks. - -use futures::channel::oneshot; -use parking_lot::RwLock; -use sp_blockchain::Error; -use sp_runtime::traits::Block as BlockT; -use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, - sync::Arc, -}; - -/// Subscription management error. -#[derive(Debug)] -pub enum SubscriptionManagementError { - /// The block cannot be pinned into memory because - /// the subscription has exceeded the maximum number - /// of blocks pinned. - ExceededLimits, - /// Error originated from the blockchain (client or backend). - Blockchain(Error), - /// The database does not contain a block header. - BlockHeaderAbsent, - /// Custom error. - Custom(String), -} - -impl From for SubscriptionManagementError { - fn from(err: Error) -> Self { - SubscriptionManagementError::Blockchain(err) - } -} - -/// Inner subscription data structure. -struct SubscriptionInner { - /// The `runtime_updates` parameter flag of the subscription. - runtime_updates: bool, - /// Signals the "Stop" event. - tx_stop: Option>, - /// The blocks pinned. - blocks: HashSet, - /// The maximum number of pinned blocks allowed per subscription. - max_pinned_blocks: usize, -} - -/// Manage the blocks of a specific subscription ID. -#[derive(Clone)] -pub struct SubscriptionHandle { - inner: Arc>>, -} - -impl SubscriptionHandle { - /// Construct a new [`SubscriptionHandle`]. - fn new(runtime_updates: bool, tx_stop: oneshot::Sender<()>, max_pinned_blocks: usize) -> Self { - SubscriptionHandle { - inner: Arc::new(RwLock::new(SubscriptionInner { - runtime_updates, - tx_stop: Some(tx_stop), - blocks: HashSet::new(), - max_pinned_blocks, - })), - } - } - - /// Trigger the stop event for the current subscription. - /// - /// This can happen on internal failure (ie, the pruning deleted the block from memory) - /// or if the user exceeded the amount of available pinned blocks. - pub fn stop(&self) { - let mut inner = self.inner.write(); - - if let Some(tx_stop) = inner.tx_stop.take() { - let _ = tx_stop.send(()); - } - } - - /// Pin a new block for the current subscription ID. - /// - /// Returns whether the value was newly inserted if the block can be pinned. - /// Otherwise, returns an error if the maximum number of blocks has been exceeded. - pub fn pin_block(&self, hash: Block::Hash) -> Result { - let mut inner = self.inner.write(); - - if inner.blocks.len() == inner.max_pinned_blocks { - // We have reached the limit. However, the block can be already inserted. - if inner.blocks.contains(&hash) { - return Ok(false) - } else { - return Err(SubscriptionManagementError::ExceededLimits) - } - } - - Ok(inner.blocks.insert(hash)) - } - - /// Unpin a new block for the current subscription ID. - /// - /// Returns whether the value was present in the set. - pub fn unpin_block(&self, hash: &Block::Hash) -> bool { - let mut inner = self.inner.write(); - inner.blocks.remove(hash) - } - - /// Check if the block hash is present for the provided subscription ID. - /// - /// Returns `true` if the set contains the block. - pub fn contains_block(&self, hash: &Block::Hash) -> bool { - let inner = self.inner.read(); - inner.blocks.contains(hash) - } - - /// Get the `runtime_updates` flag of this subscription. - pub fn has_runtime_updates(&self) -> bool { - let inner = self.inner.read(); - inner.runtime_updates - } -} - -/// Manage block pinning / unpinning for subscription IDs. -pub struct SubscriptionManagement { - /// Manage subscription by mapping the subscription ID - /// to a set of block hashes. - inner: RwLock>>, -} - -impl SubscriptionManagement { - /// Construct a new [`SubscriptionManagement`]. - pub fn new() -> Self { - SubscriptionManagement { inner: RwLock::new(HashMap::new()) } - } - - /// Insert a new subscription ID. - /// - /// If the subscription was not previously inserted, the method returns a tuple of - /// the receiver that is triggered upon the "Stop" event and the subscription - /// handle. Otherwise, when the subscription ID was already inserted returns none. - pub fn insert_subscription( - &self, - subscription_id: String, - runtime_updates: bool, - max_pinned_blocks: usize, - ) -> Option<(oneshot::Receiver<()>, SubscriptionHandle)> { - let mut subs = self.inner.write(); - - if let Entry::Vacant(entry) = subs.entry(subscription_id) { - let (tx_stop, rx_stop) = oneshot::channel(); - let handle = - SubscriptionHandle::::new(runtime_updates, tx_stop, max_pinned_blocks); - entry.insert(handle.clone()); - Some((rx_stop, handle)) - } else { - None - } - } - - /// Remove the subscription ID with associated pinned blocks. - pub fn remove_subscription(&self, subscription_id: &String) { - let mut subs = self.inner.write(); - subs.remove(subscription_id); - } - - /// Obtain the specific subscription handle. - pub fn get_subscription(&self, subscription_id: &String) -> Option> { - let subs = self.inner.write(); - subs.get(subscription_id).and_then(|handle| Some(handle.clone())) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use sp_core::H256; - use substrate_test_runtime_client::runtime::Block; - - #[test] - fn subscription_check_id() { - let subs = SubscriptionManagement::::new(); - - let id = "abc".to_string(); - let hash = H256::random(); - - let handle = subs.get_subscription(&id); - assert!(handle.is_none()); - - let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); - assert!(!handle.contains_block(&hash)); - - subs.remove_subscription(&id); - - let handle = subs.get_subscription(&id); - assert!(handle.is_none()); - } - - #[test] - fn subscription_check_block() { - let subs = SubscriptionManagement::::new(); - - let id = "abc".to_string(); - let hash = H256::random(); - - // Check with subscription. - let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); - assert!(!handle.contains_block(&hash)); - assert!(!handle.unpin_block(&hash)); - - handle.pin_block(hash).unwrap(); - assert!(handle.contains_block(&hash)); - // Unpin an invalid block. - assert!(!handle.unpin_block(&H256::random())); - - // Unpin the valid block. - assert!(handle.unpin_block(&hash)); - assert!(!handle.contains_block(&hash)); - } - - #[test] - fn subscription_check_stop_event() { - let subs = SubscriptionManagement::::new(); - - let id = "abc".to_string(); - - // Check with subscription. - let (mut rx_stop, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); - - // Check the stop signal was not received. - let res = rx_stop.try_recv().unwrap(); - assert!(res.is_none()); - - // Inserting a second time returns None. - let res = subs.insert_subscription(id.clone(), false, 10); - assert!(res.is_none()); - - handle.stop(); - - // Check the signal was received. - let res = rx_stop.try_recv().unwrap(); - assert!(res.is_some()); - } - - #[test] - fn subscription_check_data() { - let subs = SubscriptionManagement::::new(); - - let id = "abc".to_string(); - let (_, handle) = subs.insert_subscription(id.clone(), false, 10).unwrap(); - assert!(!handle.has_runtime_updates()); - - let id2 = "abcd".to_string(); - let (_, handle) = subs.insert_subscription(id2.clone(), true, 10).unwrap(); - assert!(handle.has_runtime_updates()); - } - - #[test] - fn subscription_check_max_pinned() { - let subs = SubscriptionManagement::::new(); - - let id = "abc".to_string(); - let hash = H256::random(); - let hash_2 = H256::random(); - let (_, handle) = subs.insert_subscription(id.clone(), false, 1).unwrap(); - - handle.pin_block(hash).unwrap(); - // The same block can be pinned multiple times. - handle.pin_block(hash).unwrap(); - // Exceeded number of pinned blocks. - handle.pin_block(hash_2).unwrap_err(); - } -} diff --git a/client/rpc-spec-v2/src/chain_head/subscription/error.rs b/client/rpc-spec-v2/src/chain_head/subscription/error.rs new file mode 100644 index 0000000000000..443ee9fb87a25 --- /dev/null +++ b/client/rpc-spec-v2/src/chain_head/subscription/error.rs @@ -0,0 +1,66 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use sp_blockchain::Error; + +/// Subscription management error. +#[derive(Debug, thiserror::Error)] +pub enum SubscriptionManagementError { + /// The block cannot be pinned into memory because + /// the subscription has exceeded the maximum number + /// of blocks pinned. + #[error("Exceeded pinning limits")] + ExceededLimits, + /// Error originated from the blockchain (client or backend). + #[error("Blockchain error {0}")] + Blockchain(Error), + /// The database does not contain a block hash. + #[error("Block hash is absent")] + BlockHashAbsent, + /// The database does not contain a block header. + #[error("Block header is absent")] + BlockHeaderAbsent, + /// The specified subscription ID is not present. + #[error("Subscription is absent")] + SubscriptionAbsent, + /// Custom error. + #[error("Subscription error {0}")] + Custom(String), +} + +// Blockchain error does not implement `PartialEq` needed for testing. +impl PartialEq for SubscriptionManagementError { + fn eq(&self, other: &SubscriptionManagementError) -> bool { + match (self, other) { + (Self::ExceededLimits, Self::ExceededLimits) | + // Not needed for testing. + (Self::Blockchain(_), Self::Blockchain(_)) | + (Self::BlockHashAbsent, Self::BlockHashAbsent) | + (Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) | + (Self::SubscriptionAbsent, Self::SubscriptionAbsent) => true, + (Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs, + _ => false, + } + } +} + +impl From for SubscriptionManagementError { + fn from(err: Error) -> Self { + SubscriptionManagementError::Blockchain(err) + } +} diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs new file mode 100644 index 0000000000000..8865daa83cba2 --- /dev/null +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -0,0 +1,940 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use futures::channel::oneshot; +use sc_client_api::Backend; +use sp_runtime::traits::Block as BlockT; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::Arc, + time::{Duration, Instant}, +}; + +use crate::chain_head::subscription::SubscriptionManagementError; + +/// The state machine of a block of a single subscription ID. +/// +/// # Motivation +/// +/// Each block is registered twice: once from the `BestBlock` event +/// and once from the `Finalized` event. +/// +/// The state of a block must be tracked until both events register the +/// block and the user calls `unpin`. +/// +/// Otherwise, the following race might happen: +/// T0. BestBlock event: hash is tracked and pinned in backend. +/// T1. User calls unpin: hash is untracked and unpinned in backend. +/// T2. Finalized event: hash is tracked (no previous history) and pinned again. +/// +/// # State Machine Transition +/// +/// ```ignore +/// (register) +/// [ REGISTERED ] ---------------> [ FULLY REGISTERED ] +/// | | +/// | (unpin) | (unpin) +/// | | +/// V (register) V +/// [ UNPINNED ] -----------------> [ FULLY UNPINNED ] +/// ``` +#[derive(Debug, Clone, PartialEq)] +enum BlockStateMachine { + /// The block was registered by one event (either `Finalized` or `BestBlock` event). + /// + /// Unpin was not called. + Registered, + /// The block was registered by both events (`Finalized` and `BestBlock` events). + /// + /// Unpin was not called. + FullyRegistered, + /// The block was registered by one event (either `Finalized` or `BestBlock` event), + /// + /// Unpin __was__ called. + Unpinned, + /// The block was registered by both events (`Finalized` and `BestBlock` events). + /// + /// Unpin __was__ called. + FullyUnpinned, +} + +impl BlockStateMachine { + fn new() -> Self { + BlockStateMachine::Registered + } + + fn advance_register(&mut self) { + match self { + BlockStateMachine::Registered => *self = BlockStateMachine::FullyRegistered, + BlockStateMachine::Unpinned => *self = BlockStateMachine::FullyUnpinned, + _ => (), + } + } + + fn advance_unpin(&mut self) { + match self { + BlockStateMachine::Registered => *self = BlockStateMachine::Unpinned, + BlockStateMachine::FullyRegistered => *self = BlockStateMachine::FullyUnpinned, + _ => (), + } + } + + fn was_unpinned(&self) -> bool { + match self { + BlockStateMachine::Unpinned => true, + BlockStateMachine::FullyUnpinned => true, + _ => false, + } + } +} + +struct BlockState { + /// The state machine of this block. + state_machine: BlockStateMachine, + /// The timestamp when the block was inserted. + timestamp: Instant, +} + +/// The state of a single subscription ID. +struct SubscriptionState { + /// The `runtime_updates` parameter flag of the subscription. + runtime_updates: bool, + /// Signals the "Stop" event. + tx_stop: Option>, + /// Track the block hashes available for this subscription. + /// + /// This implementation assumes: + /// - most of the time subscriptions keep a few blocks of the chain's head pinned + /// - iteration through the blocks happens only when the hard limit is exceeded. + /// + /// Considering the assumption, iterating (in the unlike case) the hashmap O(N) is + /// more time efficient and code friendly than paying for: + /// - extra space: an extra BTreeMap to older hashes by oldest insertion + /// - extra time: O(log(N)) for insert/remove/find each `pin` block time per subscriptions + blocks: HashMap, +} + +impl SubscriptionState { + /// Trigger the stop event for the current subscription. + /// + /// This can happen on internal failure (ie, the pruning deleted the block from memory) + /// or if the subscription exceeded the available pinned blocks. + fn stop(&mut self) { + if let Some(tx_stop) = self.tx_stop.take() { + let _ = tx_stop.send(()); + } + } + + /// Keep track of the given block hash for this subscription. + /// + /// This does not handle pinning in the backend. + /// + /// Returns: + /// - true if this is the first time that the block is registered + /// - false if the block was already registered + fn register_block(&mut self, hash: Block::Hash) -> bool { + match self.blocks.entry(hash) { + Entry::Occupied(mut occupied) => { + let block_state = occupied.get_mut(); + + block_state.state_machine.advance_register(); + // Block was registered twice and unpin was called. + if block_state.state_machine == BlockStateMachine::FullyUnpinned { + occupied.remove(); + } + + // Second time we register this block. + false + }, + Entry::Vacant(vacant) => { + vacant.insert(BlockState { + state_machine: BlockStateMachine::new(), + timestamp: Instant::now(), + }); + + // First time we register this block. + true + }, + } + } + + /// A block is unregistered when the user calls `unpin`. + /// + /// Returns: + /// - true if the block can be unpinned. + /// - false if the subscription does not contain the block or it was unpinned. + fn unregister_block(&mut self, hash: Block::Hash) -> bool { + match self.blocks.entry(hash) { + Entry::Occupied(mut occupied) => { + let block_state = occupied.get_mut(); + + // Cannot unpin a block twice. + if block_state.state_machine.was_unpinned() { + return false + } + + block_state.state_machine.advance_unpin(); + // Block was registered twice and unpin was called. + if block_state.state_machine == BlockStateMachine::FullyUnpinned { + occupied.remove(); + } + + true + }, + // Block was not tracked. + Entry::Vacant(_) => false, + } + } + + /// A subscription contains a block when the block was + /// registered (`pin` was called) and the block was not `unpinned` yet. + /// + /// Returns `true` if the subscription contains the block. + fn contains_block(&self, hash: Block::Hash) -> bool { + let Some(state) = self.blocks.get(&hash) else { + // Block was not tracked. + return false + }; + + // Subscription no longer contains the block if `unpin` was called. + !state.state_machine.was_unpinned() + } + + /// Get the timestamp of the oldest inserted block. + /// + /// # Note + /// + /// This iterates over all the blocks of the subscription. + fn find_oldest_block_timestamp(&self) -> Instant { + let mut timestamp = Instant::now(); + for (_, state) in self.blocks.iter() { + timestamp = std::cmp::min(timestamp, state.timestamp); + } + timestamp + } +} + +/// Keeps a specific block pinned while the handle is alive. +/// This object ensures that the block is not unpinned while +/// executing an RPC method call. +pub struct BlockGuard> { + hash: Block::Hash, + runtime_updates: bool, + backend: Arc, +} + +// Custom implementation of Debug to avoid bounds on `backend: Debug` for `unwrap_err()` needed for +// testing. +impl> std::fmt::Debug for BlockGuard { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BlockGuard hash {:?} runtime_updates {:?}", self.hash, self.runtime_updates) + } +} + +impl> BlockGuard { + /// Construct a new [`BlockGuard`] . + fn new( + hash: Block::Hash, + runtime_updates: bool, + backend: Arc, + ) -> Result { + backend + .pin_block(hash) + .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; + + Ok(Self { hash, runtime_updates, backend }) + } + + /// The `runtime_updates` flag of the subscription. + pub fn has_runtime_updates(&self) -> bool { + self.runtime_updates + } +} + +impl> Drop for BlockGuard { + fn drop(&mut self) { + self.backend.unpin_block(self.hash); + } +} + +pub struct SubscriptionsInner> { + /// Reference count the block hashes across all subscriptions. + /// + /// The pinned blocks cannot exceed the [`Self::global_limit`] limit. + /// When the limit is exceeded subscriptions are stopped via the `Stop` event. + global_blocks: HashMap, + /// The maximum number of pinned blocks across all subscriptions. + global_max_pinned_blocks: usize, + /// The maximum duration that a block is allowed to be pinned per subscription. + local_max_pin_duration: Duration, + /// Map the subscription ID to internal details of the subscription. + subs: HashMap>, + /// Backend pinning / unpinning blocks. + /// + /// The `Arc` is handled one level-above, but substrate exposes the backend as Arc. + backend: Arc, +} + +impl> SubscriptionsInner { + /// Construct a new [`SubscriptionsInner`] from the specified limits. + pub fn new( + global_max_pinned_blocks: usize, + local_max_pin_duration: Duration, + backend: Arc, + ) -> Self { + SubscriptionsInner { + global_blocks: Default::default(), + global_max_pinned_blocks, + local_max_pin_duration, + subs: Default::default(), + backend, + } + } + + /// Insert a new subscription ID. + pub fn insert_subscription( + &mut self, + sub_id: String, + runtime_updates: bool, + ) -> Option> { + if let Entry::Vacant(entry) = self.subs.entry(sub_id) { + let (tx_stop, rx_stop) = oneshot::channel(); + let state = SubscriptionState:: { + runtime_updates, + tx_stop: Some(tx_stop), + blocks: Default::default(), + }; + entry.insert(state); + Some(rx_stop) + } else { + None + } + } + + /// Remove the subscription ID with associated pinned blocks. + pub fn remove_subscription(&mut self, sub_id: &str) { + let Some(mut sub) = self.subs.remove(sub_id) else { + return + }; + + // The `Stop` event can be generated only once. + sub.stop(); + + for (hash, state) in sub.blocks.iter() { + if !state.state_machine.was_unpinned() { + self.global_unregister_block(*hash); + } + } + } + + /// Ensure that a new block could be pinned. + /// + /// If the global number of blocks has been reached this method + /// will remove all subscriptions that have blocks older than the + /// specified pin duration. + /// + /// If after removing all subscriptions that exceed the pin duration + /// there is no space for pinning a new block, then all subscriptions + /// are terminated. + /// + /// Returns true if the given subscription is also terminated. + fn ensure_block_space(&mut self, request_sub_id: &str) -> bool { + if self.global_blocks.len() < self.global_max_pinned_blocks { + return false + } + + // Terminate all subscriptions that have blocks older than + // the specified pin duration. + let now = Instant::now(); + + let to_remove: Vec<_> = self + .subs + .iter_mut() + .filter_map(|(sub_id, sub)| { + let sub_time = sub.find_oldest_block_timestamp(); + // Subscriptions older than the specified pin duration should be removed. + let should_remove = match now.checked_duration_since(sub_time) { + Some(duration) => duration > self.local_max_pin_duration, + None => true, + }; + should_remove.then(|| sub_id.clone()) + }) + .collect(); + + let mut is_terminated = false; + for sub_id in to_remove { + if sub_id == request_sub_id { + is_terminated = true; + } + self.remove_subscription(&sub_id); + } + + // Make sure we have enough space after first pass of terminating subscriptions. + if self.global_blocks.len() < self.global_max_pinned_blocks { + return is_terminated + } + + // Sanity check: cannot uphold `chainHead` guarantees anymore. We have not + // found any subscriptions that have older pinned blocks to terminate. + let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect(); + for sub_id in to_remove { + if sub_id == request_sub_id { + is_terminated = true; + } + self.remove_subscription(&sub_id); + } + return is_terminated + } + + pub fn pin_block( + &mut self, + sub_id: &str, + hash: Block::Hash, + ) -> Result { + let Some(sub) = self.subs.get_mut(sub_id) else { + return Err(SubscriptionManagementError::SubscriptionAbsent) + }; + + // Block was already registered for this subscription and therefore + // globally tracked. + if !sub.register_block(hash) { + return Ok(false) + } + + // Ensure we have enough space only if the hash is not globally registered. + if !self.global_blocks.contains_key(&hash) { + // Subscription ID was terminated while ensuring enough space. + if self.ensure_block_space(sub_id) { + return Err(SubscriptionManagementError::ExceededLimits) + } + } + + self.global_register_block(hash)?; + Ok(true) + } + + /// Register the block internally. + /// + /// If the block is present the reference counter is increased. + /// If this is a new block, the block is pinned in the backend. + fn global_register_block( + &mut self, + hash: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { + match self.global_blocks.entry(hash) { + Entry::Occupied(mut occupied) => { + *occupied.get_mut() += 1; + }, + Entry::Vacant(vacant) => { + self.backend + .pin_block(hash) + .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; + + vacant.insert(1); + }, + }; + Ok(()) + } + + /// Unregister the block internally. + /// + /// If the block is present the reference counter is decreased. + /// If this is the last reference of the block, the block + /// is unpinned from the backend and removed from internal tracking. + fn global_unregister_block(&mut self, hash: Block::Hash) { + if let Entry::Occupied(mut occupied) = self.global_blocks.entry(hash) { + let counter = occupied.get_mut(); + if *counter == 1 { + // Unpin the block from the backend. + self.backend.unpin_block(hash); + occupied.remove(); + } else { + *counter -= 1; + } + } + } + + pub fn unpin_block( + &mut self, + sub_id: &str, + hash: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { + let Some(sub) = self.subs.get_mut(sub_id) else { + return Err(SubscriptionManagementError::SubscriptionAbsent) + }; + + // Check that unpin was not called before and the block was pinned + // for this subscription. + if !sub.unregister_block(hash) { + return Err(SubscriptionManagementError::BlockHashAbsent) + } + + self.global_unregister_block(hash); + Ok(()) + } + + pub fn lock_block( + &mut self, + sub_id: &str, + hash: Block::Hash, + ) -> Result, SubscriptionManagementError> { + let Some(sub) = self.subs.get(sub_id) else { + return Err(SubscriptionManagementError::SubscriptionAbsent) + }; + + if !sub.contains_block(hash) { + return Err(SubscriptionManagementError::BlockHashAbsent) + } + + BlockGuard::new(hash, sub.runtime_updates, self.backend.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sc_block_builder::BlockBuilderProvider; + use sc_service::client::new_in_mem; + use sp_consensus::BlockOrigin; + use sp_core::{testing::TaskExecutor, H256}; + use substrate_test_runtime_client::{ + prelude::*, + runtime::{Block, RuntimeApi}, + Client, ClientBlockImportExt, GenesisInit, + }; + + fn init_backend() -> ( + Arc>, + Arc>>, + ) { + let backend = Arc::new(sc_client_api::in_mem::Backend::new()); + let executor = substrate_test_runtime_client::new_native_or_wasm_executor(); + let client_config = sc_service::ClientConfig::default(); + let genesis_block_builder = sc_service::GenesisBlockBuilder::new( + &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(), + !client_config.no_genesis, + backend.clone(), + executor.clone(), + ) + .unwrap(); + let client = Arc::new( + new_in_mem::<_, Block, _, RuntimeApi>( + backend.clone(), + executor, + genesis_block_builder, + None, + None, + None, + Box::new(TaskExecutor::new()), + client_config, + ) + .unwrap(), + ); + (backend, client) + } + + #[test] + fn block_state_machine_register_unpin() { + let mut state = BlockStateMachine::new(); + // Starts in `Registered` state. + assert_eq!(state, BlockStateMachine::Registered); + + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyRegistered); + + // Can call register multiple times. + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyRegistered); + + assert!(!state.was_unpinned()); + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + assert!(state.was_unpinned()); + + // Can call unpin multiple times. + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + assert!(state.was_unpinned()); + + // Nothing to advance. + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + } + + #[test] + fn block_state_machine_unpin_register() { + let mut state = BlockStateMachine::new(); + // Starts in `Registered` state. + assert_eq!(state, BlockStateMachine::Registered); + + assert!(!state.was_unpinned()); + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::Unpinned); + assert!(state.was_unpinned()); + + // Can call unpin multiple times. + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::Unpinned); + assert!(state.was_unpinned()); + + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + assert!(state.was_unpinned()); + + // Nothing to advance. + state.advance_register(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + // Nothing to unpin. + state.advance_unpin(); + assert_eq!(state, BlockStateMachine::FullyUnpinned); + assert!(state.was_unpinned()); + } + + #[test] + fn sub_state_register_twice() { + let mut sub_state = SubscriptionState:: { + runtime_updates: false, + tx_stop: None, + blocks: Default::default(), + }; + + let hash = H256::random(); + assert_eq!(sub_state.register_block(hash), true); + let block_state = sub_state.blocks.get(&hash).unwrap(); + // Did not call `register_block` twice. + assert_eq!(block_state.state_machine, BlockStateMachine::Registered); + + assert_eq!(sub_state.register_block(hash), false); + let block_state = sub_state.blocks.get(&hash).unwrap(); + assert_eq!(block_state.state_machine, BlockStateMachine::FullyRegistered); + + // Block is no longer tracked when: `register_block` is called twice and + // `unregister_block` is called once. + assert_eq!(sub_state.unregister_block(hash), true); + let block_state = sub_state.blocks.get(&hash); + assert!(block_state.is_none()); + } + + #[test] + fn sub_state_register_unregister() { + let mut sub_state = SubscriptionState:: { + runtime_updates: false, + tx_stop: None, + blocks: Default::default(), + }; + + let hash = H256::random(); + // Block was not registered before. + assert_eq!(sub_state.unregister_block(hash), false); + + assert_eq!(sub_state.register_block(hash), true); + let block_state = sub_state.blocks.get(&hash).unwrap(); + // Did not call `register_block` twice. + assert_eq!(block_state.state_machine, BlockStateMachine::Registered); + + // Unregister block before the second `register_block`. + assert_eq!(sub_state.unregister_block(hash), true); + let block_state = sub_state.blocks.get(&hash).unwrap(); + assert_eq!(block_state.state_machine, BlockStateMachine::Unpinned); + + assert_eq!(sub_state.register_block(hash), false); + let block_state = sub_state.blocks.get(&hash); + assert!(block_state.is_none()); + + // Block is no longer tracked when: `register_block` is called twice and + // `unregister_block` is called once. + assert_eq!(sub_state.unregister_block(hash), false); + let block_state = sub_state.blocks.get(&hash); + assert!(block_state.is_none()); + } + + #[test] + fn subscription_lock_block() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + + let id = "abc".to_string(); + let hash = H256::random(); + + // Subscription not inserted. + let err = subs.lock_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + let _stop = subs.insert_subscription(id.clone(), true).unwrap(); + // Cannot insert the same subscription ID twice. + assert!(subs.insert_subscription(id.clone(), true).is_none()); + + // No block hash. + let err = subs.lock_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); + + subs.remove_subscription(&id); + + // No subscription. + let err = subs.lock_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + } + + #[test] + fn subscription_check_block() { + let (backend, mut client) = init_backend(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let id = "abc".to_string(); + + let _stop = subs.insert_subscription(id.clone(), true).unwrap(); + + // First time we are pinning the block. + assert_eq!(subs.pin_block(&id, hash).unwrap(), true); + + let block = subs.lock_block(&id, hash).unwrap(); + // Subscription started with runtime updates + assert_eq!(block.has_runtime_updates(), true); + + let invalid_id = "abc-invalid".to_string(); + let err = subs.unpin_block(&invalid_id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + // Unpin the block. + subs.unpin_block(&id, hash).unwrap(); + let err = subs.lock_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); + } + + #[test] + fn subscription_ref_count() { + let (backend, mut client) = init_backend(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let id = "abc".to_string(); + + let _stop = subs.insert_subscription(id.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id, hash).unwrap(), true); + // Check the global ref count. + assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1); + // Ensure the block propagated to the subscription. + subs.subs.get(&id).unwrap().blocks.get(&hash).unwrap(); + + // Insert the block for the same subscription again (simulate NewBlock + Finalized pinning) + assert_eq!(subs.pin_block(&id, hash).unwrap(), false); + // Check the global ref count should not get incremented. + assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1); + + // Ensure the hash propagates for the second subscription. + let id_second = "abcd".to_string(); + let _stop = subs.insert_subscription(id_second.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_second, hash).unwrap(), true); + // Check the global ref count. + assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 2); + // Ensure the block propagated to the subscription. + subs.subs.get(&id_second).unwrap().blocks.get(&hash).unwrap(); + + subs.unpin_block(&id, hash).unwrap(); + assert_eq!(*subs.global_blocks.get(&hash).unwrap(), 1); + // Cannot unpin a block twice for the same subscription. + let err = subs.unpin_block(&id, hash).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); + + subs.unpin_block(&id_second, hash).unwrap(); + // Block unregistered from the memory. + assert!(subs.global_blocks.get(&hash).is_none()); + } + + #[test] + fn subscription_remove_subscription() { + let (backend, mut client) = init_backend(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_1 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_2 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_3 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let id_1 = "abc".to_string(); + let id_2 = "abcd".to_string(); + + // Pin all blocks for the first subscription. + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true); + + // Pin only block 2 for the second subscription. + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true); + + // Check reference count. + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2); + assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1); + + subs.remove_subscription(&id_1); + + assert!(subs.global_blocks.get(&hash_1).is_none()); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1); + assert!(subs.global_blocks.get(&hash_3).is_none()); + + subs.remove_subscription(&id_2); + + assert!(subs.global_blocks.get(&hash_2).is_none()); + assert_eq!(subs.global_blocks.len(), 0); + } + + #[test] + fn subscription_check_limits() { + let (backend, mut client) = init_backend(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_1 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_2 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_3 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + // Maximum number of pinned blocks is 2. + let mut subs = SubscriptionsInner::new(2, Duration::from_secs(10), backend); + let id_1 = "abc".to_string(); + let id_2 = "abcd".to_string(); + + // Both subscriptions can pin the maximum limit. + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true); + + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true); + + // Check reference count. + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2); + + // Block 3 pinning will exceed the limit and both subscriptions + // are terminated because no subscription with older blocks than 10 + // seconds are present. + let err = subs.pin_block(&id_1, hash_3).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::ExceededLimits); + + // Ensure both subscriptions are removed. + let err = subs.lock_block(&id_1, hash_1).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + let err = subs.lock_block(&id_2, hash_1).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + assert!(subs.global_blocks.get(&hash_1).is_none()); + assert!(subs.global_blocks.get(&hash_2).is_none()); + assert!(subs.global_blocks.get(&hash_3).is_none()); + assert_eq!(subs.global_blocks.len(), 0); + } + + #[test] + fn subscription_check_limits_with_duration() { + let (backend, mut client) = init_backend(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_1 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_2 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash_3 = block.header.hash(); + futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + // Maximum number of pinned blocks is 2 and maximum pin duration is 5 second. + let mut subs = SubscriptionsInner::new(2, Duration::from_secs(5), backend); + let id_1 = "abc".to_string(); + let id_2 = "abcd".to_string(); + + let _stop = subs.insert_subscription(id_1.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true); + assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true); + + // Maximum pin duration is 5 second, sleep 5 seconds to ensure we clean up + // the first subscription. + std::thread::sleep(std::time::Duration::from_secs(5)); + + let _stop = subs.insert_subscription(id_2.clone(), true).unwrap(); + assert_eq!(subs.pin_block(&id_2, hash_1).unwrap(), true); + + // Check reference count. + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 2); + assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1); + + // Second subscription has only 1 block pinned. Only the first subscription is terminated. + let err = subs.pin_block(&id_1, hash_3).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::ExceededLimits); + + // Ensure both subscriptions are removed. + let err = subs.lock_block(&id_1, hash_1).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); + + let _block_guard = subs.lock_block(&id_2, hash_1).unwrap(); + + assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1); + assert!(subs.global_blocks.get(&hash_2).is_none()); + assert!(subs.global_blocks.get(&hash_3).is_none()); + assert_eq!(subs.global_blocks.len(), 1); + + // Force second subscription to get terminated. + assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true); + let err = subs.pin_block(&id_2, hash_3).unwrap_err(); + assert_eq!(err, SubscriptionManagementError::ExceededLimits); + + assert!(subs.global_blocks.get(&hash_1).is_none()); + assert!(subs.global_blocks.get(&hash_2).is_none()); + assert!(subs.global_blocks.get(&hash_3).is_none()); + assert_eq!(subs.global_blocks.len(), 0); + } + + #[test] + fn subscription_check_stop_event() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + + let id = "abc".to_string(); + + let mut rx_stop = subs.insert_subscription(id.clone(), true).unwrap(); + + // Check the stop signal was not received. + let res = rx_stop.try_recv().unwrap(); + assert!(res.is_none()); + + let sub = subs.subs.get_mut(&id).unwrap(); + sub.stop(); + + // Check the signal was received. + let res = rx_stop.try_recv().unwrap(); + assert!(res.is_some()); + } +} diff --git a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs new file mode 100644 index 0000000000000..86e55acc4c176 --- /dev/null +++ b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -0,0 +1,125 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use futures::channel::oneshot; +use parking_lot::RwLock; +use sc_client_api::Backend; +use sp_runtime::traits::Block as BlockT; +use std::{sync::Arc, time::Duration}; + +mod error; +mod inner; + +pub use error::SubscriptionManagementError; +pub use inner::BlockGuard; +use inner::SubscriptionsInner; + +/// Manage block pinning / unpinning for subscription IDs. +pub struct SubscriptionManagement> { + /// Manage subscription by mapping the subscription ID + /// to a set of block hashes. + inner: RwLock>, +} + +impl> SubscriptionManagement { + /// Construct a new [`SubscriptionManagement`]. + pub fn new( + global_max_pinned_blocks: usize, + local_max_pin_duration: Duration, + backend: Arc, + ) -> Self { + SubscriptionManagement { + inner: RwLock::new(SubscriptionsInner::new( + global_max_pinned_blocks, + local_max_pin_duration, + backend, + )), + } + } + + /// Insert a new subscription ID. + /// + /// If the subscription was not previously inserted, returns the receiver that is + /// triggered upon the "Stop" event. Otherwise, if the subscription ID was already + /// inserted returns none. + pub fn insert_subscription( + &self, + sub_id: String, + runtime_updates: bool, + ) -> Option> { + let mut inner = self.inner.write(); + inner.insert_subscription(sub_id, runtime_updates) + } + + /// Remove the subscription ID with associated pinned blocks. + pub fn remove_subscription(&self, sub_id: &str) { + let mut inner = self.inner.write(); + inner.remove_subscription(sub_id) + } + + /// The block is pinned in the backend only once when the block's hash is first encountered. + /// + /// Each subscription is expected to call this method twice: + /// - once from the `NewBlock` import + /// - once from the `Finalized` import + /// + /// Returns + /// - Ok(true) if the subscription did not previously contain this block + /// - Ok(false) if the subscription already contained this this + /// - Error if the backend failed to pin the block or the subscription ID is invalid + pub fn pin_block( + &self, + sub_id: &str, + hash: Block::Hash, + ) -> Result { + let mut inner = self.inner.write(); + inner.pin_block(sub_id, hash) + } + + /// Unpin the block from the subscription. + /// + /// The last subscription that unpins the block is also unpinning the block + /// from the backend. + /// + /// This method is called only once per subscription. + /// + /// Returns an error if the block is not pinned for the subscription or + /// the subscription ID is invalid. + pub fn unpin_block( + &self, + sub_id: &str, + hash: Block::Hash, + ) -> Result<(), SubscriptionManagementError> { + let mut inner = self.inner.write(); + inner.unpin_block(sub_id, hash) + } + + /// Ensure the block remains pinned until the return object is dropped. + /// + /// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner. + /// Returns an error if the block hash is not pinned for the subscription or + /// the subscription ID is invalid. + pub fn lock_block( + &self, + sub_id: &str, + hash: Block::Hash, + ) -> Result, SubscriptionManagementError> { + let mut inner = self.inner.write(); + inner.lock_block(sub_id, hash) + } +} diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 1d5cb8da26305..76644ccb472e8 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -11,6 +11,8 @@ use jsonrpsee::{ }; use sc_block_builder::BlockBuilderProvider; use sc_client_api::ChildInfo; +use sc_service::client::new_in_mem; +use sp_api::BlockT; use sp_blockchain::HeaderBackend; use sp_consensus::BlockOrigin; use sp_core::{ @@ -19,15 +21,17 @@ use sp_core::{ testing::TaskExecutor, }; use sp_version::RuntimeVersion; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use substrate_test_runtime::Transfer; use substrate_test_runtime_client::{ - prelude::*, runtime, Backend, BlockBuilderExt, Client, ClientBlockImportExt, + prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client, + ClientBlockImportExt, GenesisInit, }; type Header = substrate_test_runtime_client::runtime::Header; type Block = substrate_test_runtime_client::runtime::Block; const MAX_PINNED_BLOCKS: usize = 32; +const MAX_PINNED_SECS: u64 = 60; const CHAIN_GENESIS: [u8; 32] = [0; 32]; const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; @@ -72,6 +76,7 @@ async fn setup_api() -> ( Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -111,6 +116,7 @@ async fn follow_subscription_produces_blocks() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -168,6 +174,7 @@ async fn follow_with_runtime() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -273,6 +280,7 @@ async fn get_genesis() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -457,6 +465,7 @@ async fn call_runtime_without_flag() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -631,6 +640,7 @@ async fn follow_generates_initial_blocks() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -758,6 +768,7 @@ async fn follow_exceeding_pinned_blocks() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, 2, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -808,6 +819,7 @@ async fn follow_with_unpin() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, 2, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -888,6 +900,7 @@ async fn follow_prune_best_block() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -1044,6 +1057,7 @@ async fn follow_forks_pruned_block() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -1157,6 +1171,7 @@ async fn follow_report_multiple_pruned_block() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); @@ -1327,6 +1342,137 @@ async fn follow_report_multiple_pruned_block() { assert_eq!(event, expected); } +#[tokio::test] +async fn pin_block_references() { + // Manually construct an in-memory backend and client. + let backend = Arc::new(sc_client_api::in_mem::Backend::new()); + let executor = substrate_test_runtime_client::new_native_or_wasm_executor(); + let client_config = sc_service::ClientConfig::default(); + + let genesis_block_builder = sc_service::GenesisBlockBuilder::new( + &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(), + !client_config.no_genesis, + backend.clone(), + executor.clone(), + ) + .unwrap(); + + let mut client = Arc::new( + new_in_mem::<_, Block, _, RuntimeApi>( + backend.clone(), + executor, + genesis_block_builder, + None, + None, + None, + Box::new(TaskExecutor::new()), + client_config, + ) + .unwrap(), + ); + + let api = ChainHead::new( + client.clone(), + backend.clone(), + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + 3, + Duration::from_secs(MAX_PINNED_SECS), + ) + .into_rpc(); + + async fn wait_pinned_references( + backend: &Arc>, + hash: &Block::Hash, + target: i64, + ) { + // Retry for at most 2 minutes. + let mut retries = 120; + while backend.pin_refs(hash).unwrap() != target { + if retries == 0 { + panic!("Expected target={} pinned references for hash={:?}", target, hash); + } + retries -= 1; + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + let block_hash = format!("{:?}", hash); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // We need to wait a bit for: + // 1. `NewBlock` and `BestBlockChanged` notifications to propagate to the chainHead + // subscription. (pin_refs == 2) + // 2. The chainHead to call `pin_blocks` only once for the `NewBlock` + // notification (pin_refs == 3) + // 3. Both notifications to go out of scope (pin_refs == 1 (total 3 - dropped 2)). + wait_pinned_references(&backend, &hash, 1).await; + + // To not exceed the number of pinned blocks, we need to unpin before the next import. + let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &block_hash]).await.unwrap(); + + // Make sure unpin clears out the reference. + let refs = backend.pin_refs(&hash).unwrap(); + assert_eq!(refs, 0); + + // Add another 2 blocks and make sure we drop the subscription with the blocks pinned. + let mut hashes = Vec::new(); + for _ in 0..2 { + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + let hash = block.header.hash(); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + hashes.push(hash); + } + + // Make sure the pin was propagated. + for hash in &hashes { + wait_pinned_references(&backend, hash, 1).await; + } + + // Drop the subscription and expect the pinned blocks to be released. + drop(sub); + // The `chainHead` detects the subscription was terminated when it tries + // to send another block. + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + for hash in &hashes { + wait_pinned_references(&backend, &hash, 0).await; + } +} + #[tokio::test] async fn follow_finalized_before_new_block() { let builder = TestClientBuilder::new(); @@ -1341,6 +1487,7 @@ async fn follow_finalized_before_new_block() { Arc::new(TaskExecutor::default()), CHAIN_GENESIS, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), ) .into_rpc(); diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 2208c55cf4090..d338ceb78958d 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -70,7 +70,11 @@ use sp_consensus::block_validation::{ use sp_core::traits::{CodeExecutor, SpawnNamed}; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}; -use std::{str::FromStr, sync::Arc, time::SystemTime}; +use std::{ + str::FromStr, + sync::Arc, + time::{Duration, SystemTime}, +}; /// Full client type. pub type TFullClient = @@ -667,16 +671,24 @@ where ) .into_rpc(); - // Maximum pinned blocks per connection. - // This number is large enough to consider immediate blocks, - // but it will change to facilitate adequate limits for the pinning API. - const MAX_PINNED_BLOCKS: usize = 4096; + // Maximum pinned blocks across all connections. + // This number is large enough to consider immediate blocks. + // Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db. + const MAX_PINNED_BLOCKS: usize = 512; + + // Any block of any subscription should not be pinned more than + // this constant. When a subscription contains a block older than this, + // the subscription becomes subject to termination. + // Note: This should be enough for immediate blocks. + const MAX_PINNED_SECONDS: u64 = 60; + let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new( client.clone(), backend.clone(), task_executor.clone(), client.info().genesis_hash, MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECONDS), ) .into_rpc();