Skip to content

Commit

Permalink
implement variant of subscription that returns finalized storage changes
Browse files Browse the repository at this point in the history
Signed-off-by: Gregory Hill <gregorydhill@outlook.com>
  • Loading branch information
gregdhill committed Mar 9, 2021
1 parent 9959f0d commit d8ac8ff
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 66 deletions.
2 changes: 1 addition & 1 deletion examples/transfer_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = ClientBuilder::<DefaultNodeRuntime>::new().build().await?;
let sub = client.subscribe_events().await?;
let decoder = client.events_decoder();
let mut sub = EventSubscription::<DefaultNodeRuntime>::new(sub, decoder);
let mut sub = EventSubscription::<DefaultNodeRuntime, _>::new(sub, decoder);
sub.filter_event::<TransferEvent<_>>();
client.transfer(&signer, &dest, 10_000).await?;
let raw = sub.next().await.unwrap().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frame/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ mod tests {
let (client, _) = test_client().await;
let sub = client.subscribe_events().await.unwrap();
let decoder = client.events_decoder();
let mut sub = EventSubscription::<TestRuntime>::new(sub, &decoder);
let mut sub = EventSubscription::<TestRuntime, _>::new(sub, &decoder);
sub.filter_event::<TransferEvent<_>>();
client.transfer(&alice, &bob_addr, 10_000).await.unwrap();
let raw = sub.next().await.unwrap().unwrap();
Expand Down
31 changes: 26 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ pub use crate::{
SystemProperties,
},
runtimes::*,
subscription::*,
subscription::{
EventStorageSubscription,
EventSubscription,
FinalizedEventStorageSubscription,
},
substrate_subxt_proc_macro::*,
};
use crate::{
Expand All @@ -133,6 +137,7 @@ pub struct ClientBuilder<T: Runtime> {
page_size: Option<u32>,
event_type_registry: EventTypeRegistry<T>,
skip_type_sizes_check: bool,
accept_weak_inclusion: bool,
}

impl<T: Runtime> ClientBuilder<T> {
Expand All @@ -144,6 +149,7 @@ impl<T: Runtime> ClientBuilder<T> {
page_size: None,
event_type_registry: EventTypeRegistry::new(),
skip_type_sizes_check: false,
accept_weak_inclusion: false,
}
}

Expand Down Expand Up @@ -187,6 +193,12 @@ impl<T: Runtime> ClientBuilder<T> {
self
}

/// Only check that transactions are InBlock on submit.
pub fn accept_weak_inclusion(mut self) -> Self {
self.accept_weak_inclusion = true;
self
}

/// Creates a new Client.
pub async fn build<'a>(self) -> Result<Client<T>, Error> {
let client = if let Some(client) = self.client {
Expand All @@ -203,7 +215,10 @@ impl<T: Runtime> ClientBuilder<T> {
RpcClient::Http(Arc::new(client))
}
};
let rpc = Rpc::new(client);
let mut rpc = Rpc::new(client);
if self.accept_weak_inclusion {
rpc.accept_weak_inclusion();
}
let (metadata, genesis_hash, runtime_version, properties) = future::join4(
rpc.metadata(),
rpc.genesis_hash(),
Expand Down Expand Up @@ -467,13 +482,19 @@ impl<T: Runtime> Client<T> {
}

/// Subscribe to events.
pub async fn subscribe_events(
&self,
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> {
let events = self.rpc.subscribe_events().await?;
Ok(events)
}

/// Subscribe to finalized events.
pub async fn subscribe_finalized_events(
&self,
) -> Result<EventStorageSubscription<T>, Error> {
let events = self.rpc.subscribe_finalized_events().await?;
Ok(events)
}

/// Subscribe to new blocks.
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
let headers = self.rpc.subscribe_blocks().await?;
Expand Down
148 changes: 92 additions & 56 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use sp_core::{
StorageData,
StorageKey,
},
twox_128,
Bytes,
};
use sp_rpc::{
Expand Down Expand Up @@ -86,7 +85,12 @@ use crate::{
},
metadata::Metadata,
runtimes::Runtime,
subscription::EventSubscription,
subscription::{
EventStorageSubscription,
EventSubscription,
FinalizedEventStorageSubscription,
SystemEvents,
},
};

pub type ChainBlock<T> =
Expand Down Expand Up @@ -171,6 +175,7 @@ pub enum RpcClient {
}

impl RpcClient {
/// Perform a request towards the server.
pub async fn request<T: DeserializeOwned>(
&self,
method: &str,
Expand All @@ -186,6 +191,7 @@ impl RpcClient {
}
}

/// Send a subscription request to the server.
pub async fn subscribe<T: DeserializeOwned>(
&self,
subscribe_method: &str,
Expand Down Expand Up @@ -254,13 +260,15 @@ pub struct ReadProof<Hash> {
pub struct Rpc<T: Runtime> {
client: RpcClient,
marker: PhantomData<T>,
weak_inclusion: bool,
}

impl<T: Runtime> Clone for Rpc<T> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
marker: PhantomData,
weak_inclusion: self.weak_inclusion,
}
}
}
Expand All @@ -270,9 +278,14 @@ impl<T: Runtime> Rpc<T> {
Self {
client,
marker: PhantomData,
weak_inclusion: false,
}
}

pub fn accept_weak_inclusion(&mut self) {
self.weak_inclusion = true;
}

/// Fetch a storage key
pub async fn storage(
&self,
Expand Down Expand Up @@ -438,21 +451,27 @@ impl<T: Runtime> Rpc<T> {
}

/// Subscribe to substrate System Events
pub async fn subscribe_events(
&self,
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
log::debug!("Events storage key {:?}", hex::encode(&storage_key));

let keys = Some(vec![StorageKey(storage_key)]);
pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> {
let keys = Some(vec![StorageKey::from(SystemEvents::new())]);
let params = Params::Array(vec![to_json_value(keys)?]);

let subscription = self
.client
.subscribe("state_subscribeStorage", params, "state_unsubscribeStorage")
.await?;
Ok(subscription)
Ok(EventStorageSubscription::Imported(subscription))
}

/// Subscribe to finalized events.
pub async fn subscribe_finalized_events(
&self,
) -> Result<EventStorageSubscription<T>, Error> {
Ok(EventStorageSubscription::Finalized(
FinalizedEventStorageSubscription::new(
self.clone(),
self.subscribe_finalized_blocks().await?,
),
))
}

/// Subscribe to blocks.
Expand All @@ -462,7 +481,7 @@ impl<T: Runtime> Rpc<T> {
.subscribe(
"chain_subscribeNewHeads",
Params::None,
"chain_subscribeNewHeads",
"chain_unsubscribeNewHeads",
)
.await?;

Expand All @@ -478,7 +497,7 @@ impl<T: Runtime> Rpc<T> {
.subscribe(
"chain_subscribeFinalizedHeads",
Params::None,
"chain_subscribeFinalizedHeads",
"chain_unsubscribeFinalizedHeads",
)
.await?;
Ok(subscription)
Expand Down Expand Up @@ -524,66 +543,39 @@ impl<T: Runtime> Rpc<T> {
let ext_hash = T::Hashing::hash_of(&extrinsic);
log::info!("Submitting Extrinsic `{:?}`", ext_hash);

let events_sub = self.subscribe_events().await?;
let events_sub = if self.weak_inclusion {
self.subscribe_events().await
} else {
self.subscribe_finalized_events().await
}?;
let mut xt_sub = self.watch_extrinsic(extrinsic).await?;

while let Some(status) = xt_sub.next().await {
// log::info!("received status {:?}", status);
log::info!("received status {:?}", status);
match status {
// ignore in progress extrinsic for now
TransactionStatus::Future
| TransactionStatus::Ready
| TransactionStatus::Broadcast(_) => continue,
TransactionStatus::InBlock(block_hash) => {
log::info!("Fetching block {:?}", block_hash);
let block = self.block(Some(block_hash)).await?;
return match block {
Some(signed_block) => {
log::info!(
"Found block {:?}, with {} extrinsics",
block_hash,
signed_block.block.extrinsics.len()
);
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!(
"Failed to find Extrinsic with hash {:?}",
ext_hash,
))
})?;
let mut sub = EventSubscription::new(events_sub, &decoder);
sub.filter_extrinsic(block_hash, ext_index);
let mut events = vec![];
while let Some(event) = sub.next().await {
events.push(event?);
}
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
}
None => {
Err(format!("Failed to find block {:?}", block_hash).into())
}
if self.weak_inclusion {
return self
.process_block(events_sub, decoder, block_hash, ext_hash)
.await
}
continue
}
TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()),
TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()),
TransactionStatus::Dropped => return Err("Extrinsic Dropped".into()),
TransactionStatus::Retracted(_) => {
return Err("Extrinsic Retracted".into())
}
// should have made it `InBlock` before either of these
TransactionStatus::Finalized(_) => {
return Err("Extrinsic Finalized".into())
TransactionStatus::Finalized(block_hash) => {
// read finalized blocks by default
return self
.process_block(events_sub, decoder, block_hash, ext_hash)
.await
}
TransactionStatus::FinalityTimeout(_) => {
return Err("Extrinsic FinalityTimeout".into())
Expand All @@ -593,6 +585,50 @@ impl<T: Runtime> Rpc<T> {
Err(RpcError::Custom("RPC subscription dropped".into()).into())
}

async fn process_block<'a>(
&self,
events_sub: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
block_hash: T::Hash,
ext_hash: T::Hash,
) -> Result<ExtrinsicSuccess<T>, Error> {
log::info!("Fetching block {:?}", block_hash);
if let Some(signed_block) = self.block(Some(block_hash)).await? {
log::info!(
"Found block {:?}, with {} extrinsics",
block_hash,
signed_block.block.extrinsics.len()
);
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!(
"Failed to find Extrinsic with hash {:?}",
ext_hash,
))
})?;
let mut sub = EventSubscription::new(events_sub, &decoder);
sub.filter_extrinsic(block_hash, ext_index);
let mut events = vec![];
while let Some(event) = sub.next().await {
events.push(event?);
}
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
} else {
Err(format!("Failed to find block {:?}", block_hash).into())
}
}

/// Insert a key into the keystore.
pub async fn insert_key(
&self,
Expand Down
Loading

0 comments on commit d8ac8ff

Please sign in to comment.