From cc291562af0e44190ab379dc1d4cda31a0c4a60a Mon Sep 17 00:00:00 2001 From: Gregory Hill Date: Wed, 10 Mar 2021 10:05:14 +0000 Subject: [PATCH] implement variant of subscription that returns finalized storage changes Signed-off-by: Gregory Hill --- src/lib.rs | 34 ++++++++-- src/rpc.rs | 153 +++++++++++++++++++++++++++----------------- src/subscription.rs | 86 ++++++++++++++++++++++++- 3 files changed, 208 insertions(+), 65 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ad1f08db87..76508a7e56 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,7 +110,11 @@ pub use crate::{ SystemProperties, }, runtimes::*, - subscription::*, + subscription::{ + EventStorageSubscription, + EventSubscription, + FinalizedEventStorageSubscription, + }, substrate_subxt_proc_macro::*, }; use crate::{ @@ -133,6 +137,7 @@ pub struct ClientBuilder { page_size: Option, event_type_registry: EventTypeRegistry, skip_type_sizes_check: bool, + accept_weak_inclusion: bool, } impl ClientBuilder { @@ -144,6 +149,7 @@ impl ClientBuilder { page_size: None, event_type_registry: EventTypeRegistry::new(), skip_type_sizes_check: false, + accept_weak_inclusion: false, } } @@ -187,6 +193,12 @@ impl ClientBuilder { self } + /// Only check that transactions are InBlock on submit. + pub fn accept_weak_inclusion(mut self) -> Self { + self.accept_weak_inclusion = true; + self + } + /// Creates a new Client. pub async fn build<'a>(self) -> Result, Error> { let client = if let Some(client) = self.client { @@ -202,7 +214,10 @@ impl ClientBuilder { RpcClient::Http(Arc::new(client)) } }; - let rpc = Rpc::new(client); + let mut rpc = Rpc::new(client); + if self.accept_weak_inclusion { + rpc.accept_weak_inclusion(); + } let (metadata, genesis_hash, runtime_version, properties) = future::join4( rpc.metadata(), rpc.genesis_hash(), @@ -466,13 +481,22 @@ impl Client { } /// Subscribe to events. - pub async fn subscribe_events( - &self, - ) -> Result>, Error> { + /// + /// *WARNING* these may not be included in the finalized chain, use + /// `subscribe_finalized_events` to ensure events are finalized. + pub async fn subscribe_events(&self) -> Result, Error> { let events = self.rpc.subscribe_events().await?; Ok(events) } + /// Subscribe to finalized events. + pub async fn subscribe_finalized_events( + &self, + ) -> Result, Error> { + let events = self.rpc.subscribe_finalized_events().await?; + Ok(events) + } + /// Subscribe to new blocks. pub async fn subscribe_blocks(&self) -> Result, Error> { let headers = self.rpc.subscribe_blocks().await?; diff --git a/src/rpc.rs b/src/rpc.rs index 0482c700de..3a2f5af9a3 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -58,7 +58,6 @@ use sp_core::{ StorageData, StorageKey, }, - twox_128, Bytes, }; use sp_rpc::{ @@ -86,7 +85,12 @@ use crate::{ }, metadata::Metadata, runtimes::Runtime, - subscription::EventSubscription, + subscription::{ + EventStorageSubscription, + EventSubscription, + FinalizedEventStorageSubscription, + SystemEvents, + }, }; pub type ChainBlock = @@ -256,6 +260,7 @@ pub struct ReadProof { pub struct Rpc { client: RpcClient, marker: PhantomData, + accept_weak_inclusion: bool, } impl Clone for Rpc { @@ -263,6 +268,7 @@ impl Clone for Rpc { Self { client: self.client.clone(), marker: PhantomData, + accept_weak_inclusion: self.accept_weak_inclusion, } } } @@ -272,9 +278,16 @@ impl Rpc { Self { client, marker: PhantomData, + accept_weak_inclusion: false, } } + /// Configure the Rpc to accept non-finalized blocks + /// in `submit_and_watch_extrinsic` + pub fn accept_weak_inclusion(&mut self) { + self.accept_weak_inclusion = true; + } + /// Fetch a storage key pub async fn storage( &self, @@ -439,22 +452,31 @@ impl Rpc { Ok(version) } - /// Subscribe to substrate System Events - pub async fn subscribe_events( - &self, - ) -> Result>, Error> { - let mut storage_key = twox_128(b"System").to_vec(); - storage_key.extend(twox_128(b"Events").to_vec()); - log::debug!("Events storage key {:?}", hex::encode(&storage_key)); - - let keys = Some(vec![StorageKey(storage_key)]); + /// Subscribe to System Events that are imported into blocks. + /// + /// *WARNING* these may not be included in the finalized chain, use + /// `subscribe_finalized_events` to ensure events are finalized. + pub async fn subscribe_events(&self) -> Result, Error> { + let keys = Some(vec![StorageKey::from(SystemEvents::new())]); let params = Params::Array(vec![to_json_value(keys)?]); let subscription = self .client .subscribe("state_subscribeStorage", params, "state_unsubscribeStorage") .await?; - Ok(subscription) + Ok(EventStorageSubscription::Imported(subscription)) + } + + /// Subscribe to finalized events. + pub async fn subscribe_finalized_events( + &self, + ) -> Result, Error> { + Ok(EventStorageSubscription::Finalized( + FinalizedEventStorageSubscription::new( + self.clone(), + self.subscribe_finalized_blocks().await?, + ), + )) } /// Subscribe to blocks. @@ -464,7 +486,7 @@ impl Rpc { .subscribe( "chain_subscribeNewHeads", Params::None, - "chain_subscribeNewHeads", + "chain_unsubscribeNewHeads", ) .await?; @@ -480,7 +502,7 @@ impl Rpc { .subscribe( "chain_subscribeFinalizedHeads", Params::None, - "chain_subscribeFinalizedHeads", + "chain_unsubscribeFinalizedHeads", ) .await?; Ok(subscription) @@ -526,56 +548,27 @@ impl Rpc { let ext_hash = T::Hashing::hash_of(&extrinsic); log::info!("Submitting Extrinsic `{:?}`", ext_hash); - let events_sub = self.subscribe_events().await?; + let events_sub = if self.accept_weak_inclusion { + self.subscribe_events().await + } else { + self.subscribe_finalized_events().await + }?; let mut xt_sub = self.watch_extrinsic(extrinsic).await?; while let Some(status) = xt_sub.next().await { - // log::info!("received status {:?}", status); + log::info!("received status {:?}", status); match status { // ignore in progress extrinsic for now TransactionStatus::Future | TransactionStatus::Ready | TransactionStatus::Broadcast(_) => continue, TransactionStatus::InBlock(block_hash) => { - log::info!("Fetching block {:?}", block_hash); - let block = self.block(Some(block_hash)).await?; - return match block { - Some(signed_block) => { - log::info!( - "Found block {:?}, with {} extrinsics", - block_hash, - signed_block.block.extrinsics.len() - ); - let ext_index = signed_block - .block - .extrinsics - .iter() - .position(|ext| { - let hash = T::Hashing::hash_of(ext); - hash == ext_hash - }) - .ok_or_else(|| { - Error::Other(format!( - "Failed to find Extrinsic with hash {:?}", - ext_hash, - )) - })?; - let mut sub = EventSubscription::new(events_sub, &decoder); - sub.filter_extrinsic(block_hash, ext_index); - let mut events = vec![]; - while let Some(event) = sub.next().await { - events.push(event?); - } - Ok(ExtrinsicSuccess { - block: block_hash, - extrinsic: ext_hash, - events, - }) - } - None => { - Err(format!("Failed to find block {:?}", block_hash).into()) - } + if self.accept_weak_inclusion { + return self + .process_block(events_sub, decoder, block_hash, ext_hash) + .await } + continue } TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()), TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()), @@ -583,9 +576,11 @@ impl Rpc { TransactionStatus::Retracted(_) => { return Err("Extrinsic Retracted".into()) } - // should have made it `InBlock` before either of these - TransactionStatus::Finalized(_) => { - return Err("Extrinsic Finalized".into()) + TransactionStatus::Finalized(block_hash) => { + // read finalized blocks by default + return self + .process_block(events_sub, decoder, block_hash, ext_hash) + .await } TransactionStatus::FinalityTimeout(_) => { return Err("Extrinsic FinalityTimeout".into()) @@ -595,6 +590,50 @@ impl Rpc { Err(RpcError::Custom("RPC subscription dropped".into()).into()) } + async fn process_block<'a>( + &self, + events_sub: EventStorageSubscription, + decoder: &'a EventsDecoder, + block_hash: T::Hash, + ext_hash: T::Hash, + ) -> Result, Error> { + log::info!("Fetching block {:?}", block_hash); + if let Some(signed_block) = self.block(Some(block_hash)).await? { + log::info!( + "Found block {:?}, with {} extrinsics", + block_hash, + signed_block.block.extrinsics.len() + ); + let ext_index = signed_block + .block + .extrinsics + .iter() + .position(|ext| { + let hash = T::Hashing::hash_of(ext); + hash == ext_hash + }) + .ok_or_else(|| { + Error::Other(format!( + "Failed to find Extrinsic with hash {:?}", + ext_hash, + )) + })?; + let mut sub = EventSubscription::new(events_sub, &decoder); + sub.filter_extrinsic(block_hash, ext_index); + let mut events = vec![]; + while let Some(event) = sub.next().await { + events.push(event?); + } + Ok(ExtrinsicSuccess { + block: block_hash, + extrinsic: ext_hash, + events, + }) + } else { + Err(format!("Failed to find block {:?}", block_hash).into()) + } + } + /// Insert a key into the keystore. pub async fn insert_key( &self, diff --git a/src/subscription.rs b/src/subscription.rs index 4fc48dd72a..485c614340 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -16,7 +16,14 @@ use jsonrpsee_types::error::Error as RpcError; use jsonrpsee_ws_client::WsSubscription as Subscription; -use sp_core::storage::StorageChangeSet; +use sp_core::{ + storage::{ + StorageChangeSet, + StorageKey, + }, + twox_128, +}; +use sp_runtime::traits::Header; use std::collections::VecDeque; use crate::{ @@ -30,13 +37,14 @@ use crate::{ system::Phase, Event, }, + rpc::Rpc, runtimes::Runtime, }; /// Event subscription simplifies filtering a storage change set stream for /// events of interest. pub struct EventSubscription<'a, T: Runtime> { - subscription: Subscription>, + subscription: EventStorageSubscription, decoder: &'a EventsDecoder, block: Option, extrinsic: Option, @@ -48,7 +56,7 @@ pub struct EventSubscription<'a, T: Runtime> { impl<'a, T: Runtime> EventSubscription<'a, T> { /// Creates a new event subscription. pub fn new( - subscription: Subscription>, + subscription: EventStorageSubscription, decoder: &'a EventsDecoder, ) -> Self { Self { @@ -132,3 +140,75 @@ impl<'a, T: Runtime> EventSubscription<'a, T> { } } } + +pub(crate) struct SystemEvents(StorageKey); + +impl SystemEvents { + pub(crate) fn new() -> Self { + let mut storage_key = twox_128(b"System").to_vec(); + storage_key.extend(twox_128(b"Events").to_vec()); + log::debug!("Events storage key {:?}", hex::encode(&storage_key)); + Self(StorageKey(storage_key)) + } +} + +impl From for StorageKey { + fn from(key: SystemEvents) -> Self { + key.0 + } +} + +/// Event subscription to only fetch finalized storage changes. +pub struct FinalizedEventStorageSubscription { + rpc: Rpc, + subscription: Subscription, + storage_changes: VecDeque>, + storage_key: StorageKey, +} + +impl FinalizedEventStorageSubscription { + /// Creates a new finalized event storage subscription. + pub fn new(rpc: Rpc, subscription: Subscription) -> Self { + Self { + rpc, + subscription, + storage_changes: Default::default(), + storage_key: SystemEvents::new().into(), + } + } + + /// Gets the next change_set. + pub async fn next(&mut self) -> Option> { + loop { + if let Some(storage_change) = self.storage_changes.pop_front() { + return Some(storage_change) + } + let header: T::Header = self.subscription.next().await?; + if let Ok(storage_changes) = self + .rpc + .query_storage_at(&[self.storage_key.clone()], Some(header.hash())) + .await + { + self.storage_changes.extend(storage_changes); + } + } + } +} + +/// Wrapper over imported and finalized event subscriptions. +pub enum EventStorageSubscription { + /// Events that are InBlock + Imported(Subscription>), + /// Events that are Finalized + Finalized(FinalizedEventStorageSubscription), +} + +impl EventStorageSubscription { + /// Gets the next change_set from the subscription. + pub async fn next(&mut self) -> Option> { + match self { + Self::Imported(event_sub) => event_sub.next().await, + Self::Finalized(event_sub) => event_sub.next().await, + } + } +}