Skip to content

Commit

Permalink
implement variant of subscription that returns finalized storage changes
Browse files Browse the repository at this point in the history
Signed-off-by: Gregory Hill <gregorydhill@outlook.com>
  • Loading branch information
gregdhill committed Mar 5, 2021
1 parent c4405c4 commit 5ab0ed3
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 15 deletions.
2 changes: 1 addition & 1 deletion examples/transfer_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = ClientBuilder::<DefaultNodeRuntime>::new().build().await?;
let sub = client.subscribe_events().await?;
let decoder = client.events_decoder();
let mut sub = EventSubscription::<DefaultNodeRuntime>::new(sub, decoder);
let mut sub = EventSubscription::<DefaultNodeRuntime, _>::new(sub, decoder);
sub.filter_event::<TransferEvent<_>>();
client.transfer(&signer, &dest, 10_000).await?;
let raw = sub.next().await.unwrap().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frame/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ mod tests {
let (client, _) = test_client().await;
let sub = client.subscribe_events().await.unwrap();
let decoder = client.events_decoder();
let mut sub = EventSubscription::<TestRuntime>::new(sub, &decoder);
let mut sub = EventSubscription::<TestRuntime, _>::new(sub, &decoder);
sub.filter_event::<TransferEvent<_>>();
client.transfer(&alice, &bob_addr, 10_000).await.unwrap();
let raw = sub.next().await.unwrap().unwrap();
Expand Down
12 changes: 9 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,16 @@ impl<T: Runtime> Client<T> {
}

/// Subscribe to events.
pub async fn subscribe_events(
&self,
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
pub async fn subscribe_events(&self) -> Result<StorageSubscription<T>, Error> {
let events = self.rpc.subscribe_events().await?;
Ok(StorageSubscription(events))
}

/// Subscribe to finalized events.
pub async fn subscribe_finalized_events(
&self,
) -> Result<FinalizedStorageChanges<T>, Error> {
let events = self.rpc.subscribe_finalized_events().await?;
Ok(events)
}

Expand Down
14 changes: 12 additions & 2 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use crate::{
},
metadata::Metadata,
runtimes::Runtime,
subscription::EventSubscription,
subscription::*,
};

pub type ChainBlock<T> =
Expand Down Expand Up @@ -362,6 +362,16 @@ impl<T: Runtime> Rpc<T> {
Ok(subscription)
}

/// Subscribe to finalized events.
pub async fn subscribe_finalized_events(
&self,
) -> Result<FinalizedStorageChanges<T>, Error> {
Ok(FinalizedStorageChanges::new(
self.clone(),
self.subscribe_finalized_blocks().await?,
))
}

/// Subscribe to blocks.
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
let subscription = self
Expand Down Expand Up @@ -431,7 +441,7 @@ impl<T: Runtime> Rpc<T> {
let ext_hash = T::Hashing::hash_of(&extrinsic);
log::info!("Submitting Extrinsic `{:?}`", ext_hash);

let events_sub = self.subscribe_events().await?;
let events_sub = self.subscribe_finalized_events().await?;
let mut xt_sub = self.watch_extrinsic(extrinsic).await?;

while let status = xt_sub.next().await {
Expand Down
106 changes: 98 additions & 8 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,20 @@
// You should have received a copy of the GNU General Public License
// along with substrate-subxt. If not, see <http://www.gnu.org/licenses/>.

use crate::rpc::Rpc;
use core::{
future::Future,
pin::Pin,
};
use jsonrpsee::client::Subscription;
use sp_core::storage::StorageChangeSet;
use sp_core::{
storage::{
StorageChangeSet,
StorageKey,
},
twox_128,
};
use sp_runtime::traits::Header;
use std::collections::VecDeque;

use crate::{
Expand All @@ -32,10 +44,38 @@ use crate::{
runtimes::Runtime,
};

/// Stream storage set changes
pub trait StorageChangeStream {
/// Type to return from next.
type Item;

/// Fetch the next storage item.
fn next<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Self::Item> + Send + 'a>>;
}

/// Wrapper over storage subscription
pub struct StorageSubscription<T: Runtime>(
pub(crate) Subscription<StorageChangeSet<T::Hash>>,
);

impl<T: Runtime> StorageChangeStream for StorageSubscription<T> {
type Item = StorageChangeSet<T::Hash>;

fn next<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Self::Item> + Send + 'a>> {
async fn run<T: Runtime>(
_self: &mut StorageSubscription<T>,
) -> StorageChangeSet<T::Hash> {
_self.0.next().await
}

Box::pin(run(self))
}
}

/// Event subscription simplifies filtering a storage change set stream for
/// events of interest.
pub struct EventSubscription<'a, T: Runtime> {
subscription: Subscription<StorageChangeSet<T::Hash>>,
pub struct EventSubscription<'a, T: Runtime, S: StorageChangeStream> {
subscription: S,
decoder: &'a EventsDecoder<T>,
block: Option<T::Hash>,
extrinsic: Option<usize>,
Expand All @@ -44,12 +84,11 @@ pub struct EventSubscription<'a, T: Runtime> {
finished: bool,
}

impl<'a, T: Runtime> EventSubscription<'a, T> {
impl<'a, T: Runtime, S: StorageChangeStream<Item = StorageChangeSet<T::Hash>>>
EventSubscription<'a, T, S>
{
/// Creates a new event subscription.
pub fn new(
subscription: Subscription<StorageChangeSet<T::Hash>>,
decoder: &'a EventsDecoder<T>,
) -> Self {
pub fn new(subscription: S, decoder: &'a EventsDecoder<T>) -> Self {
Self {
subscription,
decoder,
Expand Down Expand Up @@ -124,3 +163,54 @@ impl<'a, T: Runtime> EventSubscription<'a, T> {
}
}
}

/// Event subscription to only fetch finalized sotrage changes.
pub struct FinalizedStorageChanges<T: Runtime> {
rpc: Rpc<T>,
subscription: Subscription<T::Header>,
storage_changes: VecDeque<StorageChangeSet<T::Hash>>,
storage_key: StorageKey,
}

impl<T: Runtime> FinalizedStorageChanges<T> {
/// Creates a new event subscription.
pub fn new(rpc: Rpc<T>, subscription: Subscription<T::Header>) -> Self {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
log::debug!("Events storage key {:?}", hex::encode(&storage_key));

Self {
rpc,
subscription,
storage_changes: Default::default(),
storage_key: StorageKey(storage_key),
}
}
}

impl<T: Runtime> StorageChangeStream for FinalizedStorageChanges<T> {
type Item = StorageChangeSet<T::Hash>;

/// Gets the next event.
fn next<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Self::Item> + Send + 'a>> {
async fn run<T: Runtime>(
_self: &mut FinalizedStorageChanges<T>,
) -> StorageChangeSet<T::Hash> {
loop {
if let Some(storage_change) = _self.storage_changes.pop_front() {
return storage_change
}
let header: T::Header = _self.subscription.next().await;
if let Ok(storage_changes) = _self
.rpc
.query_storage_at(&[_self.storage_key.clone()], Some(header.hash()))
.await
{
_self.storage_changes.extend(storage_changes);
}
}
}

Box::pin(run(self))
}
}

0 comments on commit 5ab0ed3

Please sign in to comment.