diff --git a/examples/transfer_subscribe.rs b/examples/transfer_subscribe.rs index 11487133ac4..bc9666e876a 100644 --- a/examples/transfer_subscribe.rs +++ b/examples/transfer_subscribe.rs @@ -37,7 +37,7 @@ async fn main() -> Result<(), Box> { let client = ClientBuilder::::new().build().await?; let sub = client.subscribe_events().await?; let decoder = client.events_decoder(); - let mut sub = EventSubscription::::new(sub, decoder); + let mut sub = EventSubscription::::new(sub, decoder); sub.filter_event::>(); client.transfer(&signer, &dest, 10_000).await?; let raw = sub.next().await.unwrap().unwrap(); diff --git a/src/frame/balances.rs b/src/frame/balances.rs index 501eb8a4b8e..d054670b6b3 100644 --- a/src/frame/balances.rs +++ b/src/frame/balances.rs @@ -298,7 +298,7 @@ mod tests { let (client, _) = test_client().await; let sub = client.subscribe_events().await.unwrap(); let decoder = client.events_decoder(); - let mut sub = EventSubscription::::new(sub, &decoder); + let mut sub = EventSubscription::::new(sub, &decoder); sub.filter_event::>(); client.transfer(&alice, &bob_addr, 10_000).await.unwrap(); let raw = sub.next().await.unwrap().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index b1b3c9c0a88..0717ab41b98 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,7 +110,11 @@ pub use crate::{ SystemProperties, }, runtimes::*, - subscription::*, + subscription::{ + EventStorageSubscription, + EventSubscription, + FinalizedEventStorageSubscription, + }, substrate_subxt_proc_macro::*, }; use crate::{ @@ -133,6 +137,7 @@ pub struct ClientBuilder { page_size: Option, event_type_registry: EventTypeRegistry, skip_type_sizes_check: bool, + accept_weak_inclusion: bool, } impl ClientBuilder { @@ -144,6 +149,7 @@ impl ClientBuilder { page_size: None, event_type_registry: EventTypeRegistry::new(), skip_type_sizes_check: false, + accept_weak_inclusion: false, } } @@ -187,6 +193,12 @@ impl ClientBuilder { self } + /// Only check that transactions are InBlock on submit. + pub fn accept_weak_inclusion(mut self) -> Self { + self.accept_weak_inclusion = true; + self + } + /// Creates a new Client. pub async fn build<'a>(self) -> Result, Error> { let client = if let Some(client) = self.client { @@ -203,7 +215,10 @@ impl ClientBuilder { RpcClient::Http(Arc::new(client)) } }; - let rpc = Rpc::new(client); + let mut rpc = Rpc::new(client); + if self.accept_weak_inclusion { + rpc.accept_weak_inclusion(); + } let (metadata, genesis_hash, runtime_version, properties) = future::join4( rpc.metadata(), rpc.genesis_hash(), @@ -467,13 +482,19 @@ impl Client { } /// Subscribe to events. - pub async fn subscribe_events( - &self, - ) -> Result>, Error> { + pub async fn subscribe_events(&self) -> Result, Error> { let events = self.rpc.subscribe_events().await?; Ok(events) } + /// Subscribe to finalized events. + pub async fn subscribe_finalized_events( + &self, + ) -> Result, Error> { + let events = self.rpc.subscribe_finalized_events().await?; + Ok(events) + } + /// Subscribe to new blocks. pub async fn subscribe_blocks(&self) -> Result, Error> { let headers = self.rpc.subscribe_blocks().await?; diff --git a/src/rpc.rs b/src/rpc.rs index c1a0ef173a4..5d85486fb96 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -58,7 +58,6 @@ use sp_core::{ StorageData, StorageKey, }, - twox_128, Bytes, }; use sp_rpc::{ @@ -86,7 +85,12 @@ use crate::{ }, metadata::Metadata, runtimes::Runtime, - subscription::EventSubscription, + subscription::{ + EventStorageSubscription, + EventSubscription, + FinalizedEventStorageSubscription, + SystemEvents, + }, }; pub type ChainBlock = @@ -171,6 +175,7 @@ pub enum RpcClient { } impl RpcClient { + /// Perform a request towards the server. pub async fn request( &self, method: &str, @@ -186,6 +191,7 @@ impl RpcClient { } } + /// Send a subscription request to the server. pub async fn subscribe( &self, subscribe_method: &str, @@ -254,6 +260,7 @@ pub struct ReadProof { pub struct Rpc { client: RpcClient, marker: PhantomData, + weak_inclusion: bool, } impl Clone for Rpc { @@ -261,6 +268,7 @@ impl Clone for Rpc { Self { client: self.client.clone(), marker: PhantomData, + weak_inclusion: self.weak_inclusion, } } } @@ -270,9 +278,14 @@ impl Rpc { Self { client, marker: PhantomData, + weak_inclusion: false, } } + pub fn accept_weak_inclusion(&mut self) { + self.weak_inclusion = true; + } + /// Fetch a storage key pub async fn storage( &self, @@ -438,21 +451,27 @@ impl Rpc { } /// Subscribe to substrate System Events - pub async fn subscribe_events( - &self, - ) -> Result>, Error> { - let mut storage_key = twox_128(b"System").to_vec(); - storage_key.extend(twox_128(b"Events").to_vec()); - log::debug!("Events storage key {:?}", hex::encode(&storage_key)); - - let keys = Some(vec![StorageKey(storage_key)]); + pub async fn subscribe_events(&self) -> Result, Error> { + let keys = Some(vec![StorageKey::from(SystemEvents::new())]); let params = Params::Array(vec![to_json_value(keys)?]); let subscription = self .client .subscribe("state_subscribeStorage", params, "state_unsubscribeStorage") .await?; - Ok(subscription) + Ok(EventStorageSubscription::Imported(subscription)) + } + + /// Subscribe to finalized events. + pub async fn subscribe_finalized_events( + &self, + ) -> Result, Error> { + Ok(EventStorageSubscription::Finalized( + FinalizedEventStorageSubscription::new( + self.clone(), + self.subscribe_finalized_blocks().await?, + ), + )) } /// Subscribe to blocks. @@ -462,7 +481,7 @@ impl Rpc { .subscribe( "chain_subscribeNewHeads", Params::None, - "chain_subscribeNewHeads", + "chain_unsubscribeNewHeads", ) .await?; @@ -478,7 +497,7 @@ impl Rpc { .subscribe( "chain_subscribeFinalizedHeads", Params::None, - "chain_subscribeFinalizedHeads", + "chain_unsubscribeFinalizedHeads", ) .await?; Ok(subscription) @@ -524,56 +543,27 @@ impl Rpc { let ext_hash = T::Hashing::hash_of(&extrinsic); log::info!("Submitting Extrinsic `{:?}`", ext_hash); - let events_sub = self.subscribe_events().await?; + let events_sub = if self.weak_inclusion { + self.subscribe_events().await + } else { + self.subscribe_finalized_events().await + }?; let mut xt_sub = self.watch_extrinsic(extrinsic).await?; while let Some(status) = xt_sub.next().await { - // log::info!("received status {:?}", status); + log::info!("received status {:?}", status); match status { // ignore in progress extrinsic for now TransactionStatus::Future | TransactionStatus::Ready | TransactionStatus::Broadcast(_) => continue, TransactionStatus::InBlock(block_hash) => { - log::info!("Fetching block {:?}", block_hash); - let block = self.block(Some(block_hash)).await?; - return match block { - Some(signed_block) => { - log::info!( - "Found block {:?}, with {} extrinsics", - block_hash, - signed_block.block.extrinsics.len() - ); - let ext_index = signed_block - .block - .extrinsics - .iter() - .position(|ext| { - let hash = T::Hashing::hash_of(ext); - hash == ext_hash - }) - .ok_or_else(|| { - Error::Other(format!( - "Failed to find Extrinsic with hash {:?}", - ext_hash, - )) - })?; - let mut sub = EventSubscription::new(events_sub, &decoder); - sub.filter_extrinsic(block_hash, ext_index); - let mut events = vec![]; - while let Some(event) = sub.next().await { - events.push(event?); - } - Ok(ExtrinsicSuccess { - block: block_hash, - extrinsic: ext_hash, - events, - }) - } - None => { - Err(format!("Failed to find block {:?}", block_hash).into()) - } + if self.weak_inclusion { + return self + .process_block(events_sub, decoder, block_hash, ext_hash) + .await } + continue } TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()), TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()), @@ -581,9 +571,11 @@ impl Rpc { TransactionStatus::Retracted(_) => { return Err("Extrinsic Retracted".into()) } - // should have made it `InBlock` before either of these - TransactionStatus::Finalized(_) => { - return Err("Extrinsic Finalized".into()) + TransactionStatus::Finalized(block_hash) => { + // read finalized blocks by default + return self + .process_block(events_sub, decoder, block_hash, ext_hash) + .await } TransactionStatus::FinalityTimeout(_) => { return Err("Extrinsic FinalityTimeout".into()) @@ -593,6 +585,50 @@ impl Rpc { Err(RpcError::Custom("RPC subscription dropped".into()).into()) } + async fn process_block<'a>( + &self, + events_sub: EventStorageSubscription, + decoder: &'a EventsDecoder, + block_hash: T::Hash, + ext_hash: T::Hash, + ) -> Result, Error> { + log::info!("Fetching block {:?}", block_hash); + if let Some(signed_block) = self.block(Some(block_hash)).await? { + log::info!( + "Found block {:?}, with {} extrinsics", + block_hash, + signed_block.block.extrinsics.len() + ); + let ext_index = signed_block + .block + .extrinsics + .iter() + .position(|ext| { + let hash = T::Hashing::hash_of(ext); + hash == ext_hash + }) + .ok_or_else(|| { + Error::Other(format!( + "Failed to find Extrinsic with hash {:?}", + ext_hash, + )) + })?; + let mut sub = EventSubscription::new(events_sub, &decoder); + sub.filter_extrinsic(block_hash, ext_index); + let mut events = vec![]; + while let Some(event) = sub.next().await { + events.push(event?); + } + Ok(ExtrinsicSuccess { + block: block_hash, + extrinsic: ext_hash, + events, + }) + } else { + Err(format!("Failed to find block {:?}", block_hash).into()) + } + } + /// Insert a key into the keystore. pub async fn insert_key( &self, diff --git a/src/subscription.rs b/src/subscription.rs index 4fc48dd72a7..4835617540d 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -14,9 +14,17 @@ // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . +use crate::rpc::Rpc; use jsonrpsee_types::error::Error as RpcError; use jsonrpsee_ws_client::WsSubscription as Subscription; -use sp_core::storage::StorageChangeSet; +use sp_core::{ + storage::{ + StorageChangeSet, + StorageKey, + }, + twox_128, +}; +use sp_runtime::traits::Header; use std::collections::VecDeque; use crate::{ @@ -36,7 +44,7 @@ use crate::{ /// Event subscription simplifies filtering a storage change set stream for /// events of interest. pub struct EventSubscription<'a, T: Runtime> { - subscription: Subscription>, + subscription: EventStorageSubscription, decoder: &'a EventsDecoder, block: Option, extrinsic: Option, @@ -48,7 +56,7 @@ pub struct EventSubscription<'a, T: Runtime> { impl<'a, T: Runtime> EventSubscription<'a, T> { /// Creates a new event subscription. pub fn new( - subscription: Subscription>, + subscription: EventStorageSubscription, decoder: &'a EventsDecoder, ) -> Self { Self { @@ -132,3 +140,74 @@ impl<'a, T: Runtime> EventSubscription<'a, T> { } } } + +pub(crate) struct SystemEvents(StorageKey); + +impl SystemEvents { + pub(crate) fn new() -> Self { + let mut storage_key = twox_128(b"System").to_vec(); + storage_key.extend(twox_128(b"Events").to_vec()); + log::debug!("Events storage key {:?}", hex::encode(&storage_key)); + Self(StorageKey(storage_key)) + } +} + +impl From for StorageKey { + fn from(key: SystemEvents) -> Self { + key.0 + } +} + +/// Event subscription to only fetch finalized storage changes. +pub struct FinalizedEventStorageSubscription { + rpc: Rpc, + subscription: Subscription, + storage_changes: VecDeque>, + storage_key: StorageKey, +} + +impl FinalizedEventStorageSubscription { + /// Creates a new finalized event storage subscription. + pub fn new(rpc: Rpc, subscription: Subscription) -> Self { + Self { + rpc, + subscription, + storage_changes: Default::default(), + storage_key: SystemEvents::new().into(), + } + } + + /// Gets the next change_set. + pub async fn next(&mut self) -> Option> { + loop { + if let Some(storage_change) = self.storage_changes.pop_front() { + return Some(storage_change) + } + let header: T::Header = self.subscription.next().await?; + if let Ok(storage_changes) = self + .rpc + .query_storage_at(&[self.storage_key.clone()], Some(header.hash())) + .await + { + self.storage_changes.extend(storage_changes); + } + } + } +} + +/// Wrapper over imported and finalized event subscriptions. +pub enum EventStorageSubscription { + /// Events that are InBlock + Imported(Subscription>), + /// Events that are Finalized + Finalized(FinalizedEventStorageSubscription), +} + +impl EventStorageSubscription { + pub async fn next(&mut self) -> Option> { + match self { + Self::Imported(event_sub) => event_sub.next().await, + Self::Finalized(event_sub) => event_sub.next().await, + } + } +}