diff --git a/examples/transfer_subscribe.rs b/examples/transfer_subscribe.rs index 11487133ac4..bc9666e876a 100644 --- a/examples/transfer_subscribe.rs +++ b/examples/transfer_subscribe.rs @@ -37,7 +37,7 @@ async fn main() -> Result<(), Box> { let client = ClientBuilder::::new().build().await?; let sub = client.subscribe_events().await?; let decoder = client.events_decoder(); - let mut sub = EventSubscription::::new(sub, decoder); + let mut sub = EventSubscription::::new(sub, decoder); sub.filter_event::>(); client.transfer(&signer, &dest, 10_000).await?; let raw = sub.next().await.unwrap().unwrap(); diff --git a/src/frame/balances.rs b/src/frame/balances.rs index 501eb8a4b8e..d054670b6b3 100644 --- a/src/frame/balances.rs +++ b/src/frame/balances.rs @@ -298,7 +298,7 @@ mod tests { let (client, _) = test_client().await; let sub = client.subscribe_events().await.unwrap(); let decoder = client.events_decoder(); - let mut sub = EventSubscription::::new(sub, &decoder); + let mut sub = EventSubscription::::new(sub, &decoder); sub.filter_event::>(); client.transfer(&alice, &bob_addr, 10_000).await.unwrap(); let raw = sub.next().await.unwrap().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 5b95d83cd25..c3aab82e99f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -454,10 +454,16 @@ impl Client { } /// Subscribe to events. - pub async fn subscribe_events( - &self, - ) -> Result>, Error> { + pub async fn subscribe_events(&self) -> Result, Error> { let events = self.rpc.subscribe_events().await?; + Ok(StorageSubscription(events)) + } + + /// Subscribe to finalized events. + pub async fn subscribe_finalized_events( + &self, + ) -> Result, Error> { + let events = self.rpc.subscribe_finalized_events().await?; Ok(events) } diff --git a/src/rpc.rs b/src/rpc.rs index 6aa0e982c77..d18cc767442 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -75,7 +75,7 @@ use crate::{ }, metadata::Metadata, runtimes::Runtime, - subscription::EventSubscription, + subscription::*, }; pub type ChainBlock = @@ -362,6 +362,16 @@ impl Rpc { Ok(subscription) } + /// Subscribe to finalized events. + pub async fn subscribe_finalized_events( + &self, + ) -> Result, Error> { + Ok(FinalizedStorageChanges::new( + self.clone(), + self.subscribe_finalized_blocks().await?, + )) + } + /// Subscribe to blocks. pub async fn subscribe_blocks(&self) -> Result, Error> { let subscription = self @@ -431,7 +441,7 @@ impl Rpc { let ext_hash = T::Hashing::hash_of(&extrinsic); log::info!("Submitting Extrinsic `{:?}`", ext_hash); - let events_sub = self.subscribe_events().await?; + let events_sub = self.subscribe_finalized_events().await?; let mut xt_sub = self.watch_extrinsic(extrinsic).await?; while let status = xt_sub.next().await { diff --git a/src/subscription.rs b/src/subscription.rs index aa74852199f..28cdbda7ec0 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -14,8 +14,20 @@ // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . +use crate::rpc::Rpc; +use core::{ + future::Future, + pin::Pin, +}; use jsonrpsee::client::Subscription; -use sp_core::storage::StorageChangeSet; +use sp_core::{ + storage::{ + StorageChangeSet, + StorageKey, + }, + twox_128, +}; +use sp_runtime::traits::Header; use std::collections::VecDeque; use crate::{ @@ -32,10 +44,38 @@ use crate::{ runtimes::Runtime, }; +/// Stream storage set changes +pub trait StorageChangeStream { + /// Type to return from next. + type Item; + + /// Fetch the next storage item. + fn next<'a>(&'a mut self) -> Pin + Send + 'a>>; +} + +/// Wrapper over storage subscription +pub struct StorageSubscription( + pub(crate) Subscription>, +); + +impl StorageChangeStream for StorageSubscription { + type Item = StorageChangeSet; + + fn next<'a>(&'a mut self) -> Pin + Send + 'a>> { + async fn run( + _self: &mut StorageSubscription, + ) -> StorageChangeSet { + _self.0.next().await + } + + Box::pin(run(self)) + } +} + /// Event subscription simplifies filtering a storage change set stream for /// events of interest. -pub struct EventSubscription<'a, T: Runtime> { - subscription: Subscription>, +pub struct EventSubscription<'a, T: Runtime, S: StorageChangeStream> { + subscription: S, decoder: &'a EventsDecoder, block: Option, extrinsic: Option, @@ -44,12 +84,11 @@ pub struct EventSubscription<'a, T: Runtime> { finished: bool, } -impl<'a, T: Runtime> EventSubscription<'a, T> { +impl<'a, T: Runtime, S: StorageChangeStream>> + EventSubscription<'a, T, S> +{ /// Creates a new event subscription. - pub fn new( - subscription: Subscription>, - decoder: &'a EventsDecoder, - ) -> Self { + pub fn new(subscription: S, decoder: &'a EventsDecoder) -> Self { Self { subscription, decoder, @@ -124,3 +163,54 @@ impl<'a, T: Runtime> EventSubscription<'a, T> { } } } + +/// Event subscription to only fetch finalized sotrage changes. +pub struct FinalizedStorageChanges { + rpc: Rpc, + subscription: Subscription, + storage_changes: VecDeque>, + storage_key: StorageKey, +} + +impl FinalizedStorageChanges { + /// Creates a new event subscription. + pub fn new(rpc: Rpc, subscription: Subscription) -> Self { + let mut storage_key = twox_128(b"System").to_vec(); + storage_key.extend(twox_128(b"Events").to_vec()); + log::debug!("Events storage key {:?}", hex::encode(&storage_key)); + + Self { + rpc, + subscription, + storage_changes: Default::default(), + storage_key: StorageKey(storage_key), + } + } +} + +impl StorageChangeStream for FinalizedStorageChanges { + type Item = StorageChangeSet; + + /// Gets the next event. + fn next<'a>(&'a mut self) -> Pin + Send + 'a>> { + async fn run( + _self: &mut FinalizedStorageChanges, + ) -> StorageChangeSet { + loop { + if let Some(storage_change) = _self.storage_changes.pop_front() { + return storage_change + } + let header: T::Header = _self.subscription.next().await; + if let Ok(storage_changes) = _self + .rpc + .query_storage_at(&[_self.storage_key.clone()], Some(header.hash())) + .await + { + _self.storage_changes.extend(storage_changes); + } + } + } + + Box::pin(run(self)) + } +}