diff --git a/subxt/src/backend/unstable/follow_stream.rs b/subxt/src/backend/unstable/follow_stream.rs index 9474f7a302..71af824831 100644 --- a/subxt/src/backend/unstable/follow_stream.rs +++ b/subxt/src/backend/unstable/follow_stream.rs @@ -243,7 +243,7 @@ pub(super) mod test_utils { /// An initialized event pub fn ev_initialized(n: u64) -> FollowEvent { FollowEvent::Initialized(Initialized { - finalized_block_hash: H256::from_low_u64_le(n), + finalized_block_hashes: vec![H256::from_low_u64_le(n)], finalized_block_runtime: None, }) } diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index d32c98a104..e85336c314 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -267,9 +267,9 @@ impl Shared { shared.seen_runtime_events.clear(); - if let Some(finalized) = finalized_ev.finalized_block_hashes.last() { - init_message.finalized_block_hash = finalized.clone(); - } + init_message.finalized_block_hashes = + finalized_ev.finalized_block_hashes.clone(); + if let Some(runtime_ev) = newest_runtime { init_message.finalized_block_runtime = Some(runtime_ev); } diff --git a/subxt/src/backend/unstable/follow_stream_unpin.rs b/subxt/src/backend/unstable/follow_stream_unpin.rs index ca00e37690..c128a86cf3 100644 --- a/subxt/src/backend/unstable/follow_stream_unpin.rs +++ b/subxt/src/backend/unstable/follow_stream_unpin.rs @@ -10,6 +10,7 @@ use crate::backend::unstable::rpc_methods::{ use crate::config::{BlockHash, Config}; use crate::error::Error; use futures::stream::{FuturesUnordered, Stream, StreamExt}; + use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; @@ -34,9 +35,11 @@ pub struct FollowStreamUnpin { // Futures for sending unpin events that we'll poll to completion as // part of polling the stream as a whole. unpin_futs: FuturesUnordered, - // Each new finalized block increments this. Allows us to track - // the age of blocks so that we can unpin old ones. - rel_block_num: usize, + // Each time a new finalized block is seen, we give it an age of `next_rel_block_age`, + // and then increment this ready for the next finalized block. So, the first finalized + // block will have an age of 0, the next 1, 2, 3 and so on. We can then use `max_block_life` + // to say "unpin all blocks with an age < (next_rel_block_age-1) - max_block_life". + next_rel_block_age: usize, // The latest ID of the FollowStream subscription, which we can use // to unpin blocks. subscription_id: Option>, @@ -113,15 +116,23 @@ impl Stream for FollowStreamUnpin { FollowStreamMsg::Ready(subscription_id) } FollowStreamMsg::Event(FollowEvent::Initialized(details)) => { - // The first finalized block gets the starting block_num. - let rel_block_num = this.rel_block_num; - // Pin this block, but note that it can be unpinned any time since it won't show up again (except - // as a parent block, which we are ignoring at the moment). - let block_ref = - this.pin_unpinnable_block_at(rel_block_num, details.finalized_block_hash); + let mut finalized_block_hashes = + Vec::with_capacity(details.finalized_block_hashes.len()); + + // Pin each of the finalized blocks. None of them will show up again (except as a + // parent block), and so they can all be unpinned immediately at any time. Increment + // the block age for each one, so that older finalized blocks are pruned first. + for finalized_block in &details.finalized_block_hashes { + let rel_block_age = this.next_rel_block_age; + let block_ref = + this.pin_unpinnable_block_at(rel_block_age, *finalized_block); + + finalized_block_hashes.push(block_ref); + this.next_rel_block_age += 1; + } FollowStreamMsg::Event(FollowEvent::Initialized(Initialized { - finalized_block_hash: block_ref, + finalized_block_hashes, finalized_block_runtime: details.finalized_block_runtime, })) } @@ -129,15 +140,15 @@ impl Stream for FollowStreamUnpin { // One bigger than our parent, and if no parent seen (maybe it was // unpinned already), then one bigger than the last finalized block num // as a best guess. - let parent_rel_block_num = this + let parent_rel_block_age = this .pinned .get(&details.parent_block_hash) - .map(|p| p.rel_block_num) - .unwrap_or(this.rel_block_num); + .map(|p| p.rel_block_age) + .unwrap_or(this.next_rel_block_age.saturating_sub(1)); - let block_ref = this.pin_block_at(parent_rel_block_num + 1, details.block_hash); + let block_ref = this.pin_block_at(parent_rel_block_age + 1, details.block_hash); let parent_block_ref = - this.pin_block_at(parent_rel_block_num, details.parent_block_hash); + this.pin_block_at(parent_rel_block_age, details.parent_block_hash); FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock { block_hash: block_ref, @@ -148,8 +159,8 @@ impl Stream for FollowStreamUnpin { FollowStreamMsg::Event(FollowEvent::BestBlockChanged(details)) => { // We expect this block to already exist, so it'll keep its existing block_num, // but worst case it'll just get the current finalized block_num + 1. - let rel_block_num = this.rel_block_num + 1; - let block_ref = this.pin_block_at(rel_block_num, details.best_block_hash); + let rel_block_age = this.next_rel_block_age; + let block_ref = this.pin_block_at(rel_block_age, details.best_block_hash); FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_ref, @@ -167,14 +178,14 @@ impl Stream for FollowStreamUnpin { // // `pin_unpinnable_block_at` indicates that the block will not show up in future events // (They will show up as a parent block, but we don't care about that right now). - let rel_block_num = this.rel_block_num + idx + 1; - this.pin_unpinnable_block_at(rel_block_num, hash) + let rel_block_age = this.next_rel_block_age + idx; + this.pin_unpinnable_block_at(rel_block_age, hash) }) .collect(); // Our relative block height is increased by however many finalized // blocks we've seen. - this.rel_block_num += finalized_block_refs.len(); + this.next_rel_block_age += finalized_block_refs.len(); let pruned_block_refs: Vec<_> = details .pruned_block_hashes @@ -183,8 +194,8 @@ impl Stream for FollowStreamUnpin { // We should know about these, too, and if not we set their age to last_finalized + 1. // // `pin_unpinnable_block_at` indicates that the block will not show up in future events. - let rel_block_num = this.rel_block_num + 1; - this.pin_unpinnable_block_at(rel_block_num, hash) + let rel_block_age = this.next_rel_block_age; + this.pin_unpinnable_block_at(rel_block_age, hash) }) .collect(); @@ -208,7 +219,7 @@ impl Stream for FollowStreamUnpin { this.pinned.clear(); this.unpin_futs.clear(); this.unpin_flags.lock().unwrap().clear(); - this.rel_block_num = 0; + this.next_rel_block_age = 0; FollowStreamMsg::Event(FollowEvent::Stop) } @@ -255,7 +266,7 @@ impl FollowStreamUnpin { max_block_life, pinned: Default::default(), subscription_id: None, - rel_block_num: 0, + next_rel_block_age: 0, unpin_flags: Default::default(), unpin_futs: Default::default(), } @@ -287,21 +298,21 @@ impl FollowStreamUnpin { /// Pin a block, or return the reference to an already-pinned block. If the block has been registered to /// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already /// been sent though, then the block will be unpinned. - fn pin_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef { - self.pin_block_at_setting_unpinnable_flag(rel_block_num, hash, false) + fn pin_block_at(&mut self, rel_block_age: usize, hash: Hash) -> BlockRef { + self.pin_block_at_setting_unpinnable_flag(rel_block_age, hash, false) } /// Pin a block, or return the reference to an already-pinned block. /// /// This is the same as [`Self::pin_block_at`], except that it also marks the block as being unpinnable now, /// which should be done for any block that will no longer be seen in future events. - fn pin_unpinnable_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef { - self.pin_block_at_setting_unpinnable_flag(rel_block_num, hash, true) + fn pin_unpinnable_block_at(&mut self, rel_block_age: usize, hash: Hash) -> BlockRef { + self.pin_block_at_setting_unpinnable_flag(rel_block_age, hash, true) } fn pin_block_at_setting_unpinnable_flag( &mut self, - rel_block_num: usize, + rel_block_age: usize, hash: Hash, can_be_unpinned: bool, ) -> BlockRef { @@ -317,7 +328,7 @@ impl FollowStreamUnpin { }) // If there's not an entry already, make one and return it. .or_insert_with(|| PinnedDetails { - rel_block_num, + rel_block_age, block_ref: BlockRef { inner: Arc::new(BlockRefInner { hash, @@ -333,7 +344,9 @@ impl FollowStreamUnpin { /// Unpin any blocks that are either too old, or have the unpin flag set and are old enough. fn unpin_blocks(&mut self, waker: &Waker) { let mut unpin_flags = self.unpin_flags.lock().unwrap(); - let rel_block_num = self.rel_block_num; + + // This gets the age of the last finalized block. + let rel_block_age = self.next_rel_block_age.saturating_sub(1); // If we asked to unpin and there was no subscription_id, then there's nothing we can do, // and nothing will need unpinning now anyway. @@ -343,7 +356,7 @@ impl FollowStreamUnpin { let mut blocks_to_unpin = vec![]; for (hash, details) in &self.pinned { - if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life + if rel_block_age.saturating_sub(details.rel_block_age) >= self.max_block_life || (unpin_flags.contains(hash) && details.can_be_unpinned) { // The block is too old, or it's been flagged to be unpinned and won't be in a future @@ -381,8 +394,10 @@ type UnpinFlags = Arc>>; #[derive(Debug)] struct PinnedDetails { - /// How old is the block? - rel_block_num: usize, + /// Realtively speaking, how old is the block? When we start following + /// blocks, the first finalized block gets an age of 0, the second an age + /// of 1 and so on. + rel_block_age: usize, /// A block ref we can hand out to keep blocks pinned. /// Because we store one here until it's unpinned, the live count /// will only drop to 1 when no external refs are left. @@ -502,7 +517,7 @@ pub(super) mod test_utils { /// An initialized event containing a BlockRef (useful for comparisons) pub fn ev_initialized_ref(n: u64) -> FollowEvent> { FollowEvent::Initialized(Initialized { - finalized_block_hash: BlockRef::new(H256::from_low_u64_le(n)), + finalized_block_hashes: vec![BlockRef::new(H256::from_low_u64_le(n))], finalized_block_runtime: None, }) } diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 21a9cca504..7095b90af1 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -321,7 +321,9 @@ impl Backend for UnstableBackend { .events() .filter_map(|ev| { let out = match ev { - FollowEvent::Initialized(init) => Some(init.finalized_block_hash.into()), + FollowEvent::Initialized(init) => { + init.finalized_block_hashes.last().map(|b| b.clone().into()) + } _ => None, }; std::future::ready(out) @@ -353,7 +355,10 @@ impl Backend for UnstableBackend { .filter_map(move |ev| { let output = match ev { FollowEvent::Initialized(ev) => { - runtimes.remove(&ev.finalized_block_hash.hash()); + for finalized_block in ev.finalized_block_hashes { + runtimes.remove(&finalized_block.hash()); + } + ev.finalized_block_runtime } FollowEvent::NewBlock(ev) => { @@ -422,9 +427,11 @@ impl Backend for UnstableBackend { &self, ) -> Result)>, Error> { self.stream_headers(|ev| match ev { - FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash), - FollowEvent::NewBlock(ev) => Some(ev.block_hash), - _ => None, + FollowEvent::Initialized(init) => init.finalized_block_hashes, + FollowEvent::NewBlock(ev) => { + vec![ev.block_hash] + } + _ => vec![], }) .await } @@ -433,9 +440,9 @@ impl Backend for UnstableBackend { &self, ) -> Result)>, Error> { self.stream_headers(|ev| match ev { - FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash), - FollowEvent::BestBlockChanged(ev) => Some(ev.best_block_hash), - _ => None, + FollowEvent::Initialized(init) => init.finalized_block_hashes, + FollowEvent::BestBlockChanged(ev) => vec![ev.best_block_hash], + _ => vec![], }) .await } @@ -444,9 +451,7 @@ impl Backend for UnstableBackend { &self, ) -> Result)>, Error> { self.stream_headers(|ev| match ev { - FollowEvent::Initialized(ev) => { - vec![ev.finalized_block_hash] - } + FollowEvent::Initialized(init) => init.finalized_block_hashes, FollowEvent::Finalized(ev) => ev.finalized_block_hashes, _ => vec![], }) diff --git a/subxt/src/backend/unstable/rpc_methods.rs b/subxt/src/backend/unstable/rpc_methods.rs index c51e5020cb..a1b04cad4a 100644 --- a/subxt/src/backend/unstable/rpc_methods.rs +++ b/subxt/src/backend/unstable/rpc_methods.rs @@ -359,8 +359,8 @@ pub enum FollowEvent { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Initialized { - /// The hash of the latest finalized block. - pub finalized_block_hash: Hash, + /// The hashes of the last finalized blocks. + pub finalized_block_hashes: Vec, /// The runtime version of the finalized block. /// /// # Note diff --git a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs index 084c1223a8..66fcffdc9d 100644 --- a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs +++ b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs @@ -32,7 +32,7 @@ async fn chainhead_unstable_follow() { assert_eq!( event, FollowEvent::Initialized(Initialized { - finalized_block_hash, + finalized_block_hashes: vec![finalized_block_hash], finalized_block_runtime: None, }) ); @@ -47,7 +47,7 @@ async fn chainhead_unstable_follow() { assert_matches!( event, FollowEvent::Initialized(init) => { - assert_eq!(init.finalized_block_hash, finalized_block_hash); + assert_eq!(init.finalized_block_hashes, vec![finalized_block_hash]); if let Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec })) = init.finalized_block_runtime { assert_eq!(spec.spec_version, runtime_version.spec_version); assert_eq!(spec.transaction_version, runtime_version.transaction_version); @@ -66,7 +66,7 @@ async fn chainhead_unstable_body() { let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(), _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); @@ -95,7 +95,7 @@ async fn chainhead_unstable_header() { let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(), _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); @@ -123,7 +123,7 @@ async fn chainhead_unstable_storage() { let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(), _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); @@ -168,7 +168,7 @@ async fn chainhead_unstable_call() { let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(), _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap(); @@ -205,7 +205,7 @@ async fn chainhead_unstable_unpin() { let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap(); let event = blocks.next().await.unwrap().unwrap(); let hash = match event { - FollowEvent::Initialized(init) => init.finalized_block_hash, + FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(), _ => panic!("Unexpected event"), }; let sub_id = blocks.subscription_id().unwrap();