diff --git a/Cargo.toml b/Cargo.toml index 1e1c0241fd766..2d2e05f6fd8fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,8 @@ log = "0.4" futures = "0.1.28" jsonrpc-core-client = { version = "12.1.0", features = ["ws"] } num-traits = { version = "0.2", default-features = false } -parity-codec = { version = "4.1", default-features = false, features = ["derive", "full"] } +parity-scale-codec = { version = "1.0", default-features = false, features = ["derive", "full"] } +runtime_metadata = { git = "https://github.com/paritytech/substrate/", package = "srml-metadata" } runtime_support = { git = "https://github.com/paritytech/substrate/", package = "srml-support" } runtime_primitives = { git = "https://github.com/paritytech/substrate/", package = "sr-primitives" } serde = { version = "1.0", features = ["derive"] } @@ -30,7 +31,7 @@ url = "1.7" [dev-dependencies] env_logger = "0.6" -node_runtime = { git = "https://github.com/paritytech/substrate/", package = "node-runtime" } -srml_balances = { git = "https://github.com/paritytech/substrate/", package = "srml-balances" } +node-runtime = { git = "https://github.com/paritytech/substrate/", package = "node-runtime" } +srml-balances = { git = "https://github.com/paritytech/substrate/", package = "srml-balances" } substrate-keyring = { git = "https://github.com/paritytech/substrate/", package = "substrate-keyring" } tokio = "0.1" diff --git a/README.md b/README.md index 9d18ccc0dccab..290ae889571fe 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# subxt +# subxt A library to **sub**mit e**xt**rinsics to a [substrate](https://github.com/paritytech/substrate) node via RPC. @@ -7,4 +7,3 @@ A library to **sub**mit e**xt**rinsics to a [substrate](https://github.com/parit ## License The entire code within this repository is licensed under the [GPLv3](LICENSE). Please [contact us](https://www.parity.io/contact/) if you have questions about the licensing of our products. - of our products. diff --git a/src/error.rs b/src/error.rs index 95b9adfeaee3a..3f6165c8578ba 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,12 +1,12 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of substrate-subxt. // -// ink! is free software: you can redistribute it and/or modify +// subxt is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // -// ink! is distributed in the hope that it will be useful, +// subxt is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. @@ -15,11 +15,13 @@ // along with substrate-subxt. If not, see . use jsonrpc_core_client::RpcError; +use parity_scale_codec::Error as CodecError; use std::io::Error as IoError; use substrate_primitives::crypto::SecretStringError; #[derive(Debug, derive_more::From)] pub enum Error { + Codec(CodecError), Io(IoError), Rpc(RpcError), SecretString(SecretStringError), diff --git a/src/lib.rs b/src/lib.rs index 913821cc371c3..7ba55479554b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,12 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of substrate-subxt. // -// ink! is free software: you can redistribute it and/or modify +// subxt is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // -// ink! is distributed in the hope that it will be useful, +// subxt is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. @@ -16,16 +16,22 @@ use futures::future::Future; use jsonrpc_core_client::transports::ws; -use parity_codec::{ +use metadata::Metadata; +use parity_scale_codec::{ Codec, Decode, }; - use runtime_primitives::traits::SignedExtension; -use substrate_primitives::Pair; +use substrate_primitives::{ + storage::StorageKey, + Pair, +}; use url::Url; +pub use error::Error; + mod error; +mod metadata; mod rpc; /// Captures data for when an extrinsic is successfully included in a block @@ -36,87 +42,227 @@ pub struct ExtrinsicSuccess { pub events: Vec, } -/// Creates, signs and submits an Extrinsic with the given `Call` to a substrate node. -pub fn submit( +fn connect( url: &Url, - signer: P, - call: C, - extra: E, -) -> impl Future, Error = error::Error> -where - T: srml_system::Trait, - P: Pair, - P::Signature: Codec, - P::Public: Into, - C: Codec + Send + 'static, - E: Fn(T::Index) -> SE + Send + 'static, - SE: SignedExtension + 'static, -{ +) -> impl Future, Error = error::Error> { ws::connect(url.as_str()) .expect("Url is a valid url; qed") .map_err(Into::into) - .and_then(|rpc: rpc::Rpc| { - rpc.create_and_submit_extrinsic(signer, call, extra) +} + +pub struct ClientBuilder { + _marker: std::marker::PhantomData<(T, SE)>, + url: Option, +} + +impl ClientBuilder { + pub fn new() -> Self { + Self { + _marker: std::marker::PhantomData, + url: None, + } + } + + pub fn set_url(mut self, url: Url) -> Self { + self.url = Some(url); + self + } + + pub fn build(self) -> impl Future, Error = error::Error> { + let url = self.url.unwrap_or_else(|| { + Url::parse("ws://127.0.0.1:9944").expect("Is valid url; qed") + }); + connect::(&url).and_then(|rpc| { + rpc.metadata() + .join(rpc.genesis_hash()) + .map(|(metadata, genesis_hash)| { + Client { + _marker: std::marker::PhantomData, + url, + genesis_hash, + metadata, + } + }) }) + } } -/// Fetches a storage key from a substrate node. -pub fn fetch( - url: &Url, - key: Vec, -) -> impl Future, Error = error::Error> { - ws::connect(url.as_str()) - .expect("Url is a valid url; qed") - .map_err(Into::into) - .and_then(|rpc: rpc::Rpc| rpc.fetch::(key)) - .map_err(Into::into) +#[derive(Clone)] +pub struct Client { + _marker: std::marker::PhantomData, + url: Url, + genesis_hash: T::Hash, + metadata: Metadata, } -/// Fetches a storage key from a substrate node -pub fn fetch_or( - url: &Url, - key: Vec, - default: V, -) -> impl Future { - fetch::(url, key).map(|value| value.unwrap_or(default)) +impl Client { + fn connect(&self) -> impl Future, Error = error::Error> { + connect(&self.url) + } + + pub fn metadata(&self) -> &metadata::Metadata { + &self.metadata + } + + pub fn fetch( + &self, + key: StorageKey, + ) -> impl Future, Error = error::Error> { + self.connect().and_then(|rpc| rpc.storage::(key)) + } + + pub fn fetch_or( + &self, + key: StorageKey, + default: V, + ) -> impl Future { + self.fetch(key).map(|value| value.unwrap_or(default)) + } + + pub fn fetch_or_default( + &self, + key: StorageKey, + ) -> impl Future { + self.fetch(key).map(|value| value.unwrap_or_default()) + } + + pub fn xt( + &self, + signer: P, + extra: E, + ) -> impl Future, Error = error::Error> + where + P: Pair, + P::Public: Into, + P::Signature: Codec, + E: Fn(T::Index) -> SE, + { + let account_id: T::AccountId = signer.public().into(); + let account_nonce_key = self + .metadata + .module("System") + .expect("srml_system is present") + .storage("AccountNonce") + .expect("srml_system has account nonce") + .map() + .expect("account nonce is a map") + .key(&account_id); + let client = (*self).clone(); + self.fetch_or_default(account_nonce_key).map(|nonce| { + XtBuilder { + client, + nonce, + signer, + extra, + } + }) + } } -/// Fetches a storage key from a substrate node. -pub fn fetch_or_default( - url: &Url, - key: Vec, -) -> impl Future { - fetch::(url, key).map(|value| value.unwrap_or_default()) +pub struct XtBuilder { + client: Client, + nonce: T::Index, + signer: P, + extra: E, +} + +impl XtBuilder +where + P: Pair, + P::Public: Into, + P::Signature: Codec, + E: Fn(T::Index) -> SE, +{ + pub fn submit( + &self, + call: C, + ) -> impl Future, Error = error::Error> { + let signer = self.signer.clone(); + let nonce = self.nonce.clone(); + let extra = (self.extra)(nonce.clone()); + let genesis_hash = self.client.genesis_hash.clone(); + self.client.connect().and_then(move |rpc| { + rpc.create_and_submit_extrinsic(signer, call, extra, nonce, genesis_hash) + }) + } } #[cfg(test)] -pub mod tests { - use node_runtime::Runtime; +mod tests { + use super::*; + use parity_scale_codec::Encode; use runtime_primitives::generic::Era; use runtime_support::StorageMap; - use substrate_primitives::crypto::Pair as _; + use substrate_primitives::{ + blake2_256, + storage::StorageKey, + Pair, + }; - fn run(f: F) -> Result - where - F: futures::Future + Send + 'static, - F::Item: Send + 'static, - F::Error: Send + 'static, - { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(f) + #[derive(Clone, PartialEq, Eq)] + struct Runtime; + + impl srml_system::Trait for Runtime { + type Call = ::Call; + type Origin = ::Origin; + type Index = ::Index; + type BlockNumber = ::BlockNumber; + type Hash = ::Hash; + type Hashing = ::Hashing; + type AccountId = ::AccountId; + type Lookup = ::Lookup; + type WeightMultiplierUpdate = + ::WeightMultiplierUpdate; + type Header = ::Header; + type Event = ::Event; + type BlockHashCount = + ::BlockHashCount; + type MaximumBlockWeight = + ::MaximumBlockWeight; + type MaximumBlockLength = + ::MaximumBlockLength; + type AvailableBlockRatio = + ::AvailableBlockRatio; + } + + impl srml_balances::Trait for Runtime { + type Balance = ::Balance; + type OnFreeBalanceZero = (); + type OnNewAccount = (); + type TransactionPayment = (); + type TransferPayment = + ::TransferPayment; + type DustRemoval = ::DustRemoval; + type Event = ::Event; + type ExistentialDeposit = + ::ExistentialDeposit; + type TransferFee = ::TransferFee; + type CreationFee = ::CreationFee; + type TransactionBaseFee = + ::TransactionBaseFee; + type TransactionByteFee = + ::TransactionByteFee; + type WeightToFee = ::WeightToFee; } + type SignedExtra = ( + srml_system::CheckGenesis, + srml_system::CheckEra, + srml_system::CheckNonce, + srml_system::CheckWeight, + srml_balances::TakeFees, + ); + #[test] #[ignore] // requires locally running substrate node fn node_runtime_balance_transfer() { env_logger::try_init().ok(); - let url = url::Url::parse("ws://127.0.0.1:9944").unwrap(); - let signer = substrate_keyring::AccountKeyring::Alice.pair(); - - let dest = substrate_keyring::AccountKeyring::Bob.pair().public(); - let transfer = srml_balances::Call::transfer(dest.into(), 10_000); - let call = node_runtime::Call::Balances(transfer); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let client = rt + .block_on(ClientBuilder::::new().build()) + .unwrap(); + let signer = substrate_keyring::AccountKeyring::Alice.pair(); let extra = |nonce| { ( srml_system::CheckGenesis::::new(), @@ -126,19 +272,81 @@ pub mod tests { srml_balances::TakeFees::::from(0), ) }; - let future = super::submit::(&url, signer, call, extra); - run(future).unwrap(); + let xt = rt.block_on(client.xt(signer, extra)).unwrap(); + + let dest = substrate_keyring::AccountKeyring::Bob.pair().public(); + let transfer = srml_balances::Call::transfer::(dest.into(), 10_000); + let call = client.metadata().module("Balances").unwrap().call(transfer); + rt.block_on(xt.submit(call)).unwrap(); } #[test] #[ignore] // requires locally running substrate node fn node_runtime_fetch_account_balance() { env_logger::try_init().ok(); - let url = url::Url::parse("ws://127.0.0.1:9944").unwrap(); - let account = substrate_keyring::AccountKeyring::Alice.pair().public(); - let key = >::key_for(&account); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let client = rt + .block_on(ClientBuilder::::new().build()) + .unwrap(); + + let account: ::AccountId = + substrate_keyring::AccountKeyring::Alice + .pair() + .public() + .into(); + let key = client + .metadata() + .module("Balances") + .unwrap() + .storage("FreeBalance") + .unwrap() + .map() + .unwrap() + .key(&account); type Balance = ::Balance; - let future = super::fetch::(&url, key); - run(future).unwrap(); + rt.block_on(client.fetch::(key)).unwrap(); + } + + #[test] + #[ignore] // requires locally running substrate node + fn node_runtime_fetch_metadata() { + env_logger::try_init().ok(); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let client = rt + .block_on(ClientBuilder::::new().build()) + .unwrap(); + + let balances = client.metadata().module("Balances").unwrap(); + + let dest = substrate_keyring::AccountKeyring::Bob.pair().public(); + let transfer = srml_balances::Call::transfer(dest.clone().into(), 10_000); + let call = node_runtime::Call::Balances(transfer.clone()) + .encode() + .to_vec(); + let call2 = balances.call(transfer); + assert_eq!(call, call2); + + let free_balance = >::key_for(&dest); + let free_balance_key = StorageKey(blake2_256(&free_balance).to_vec()); + let free_balance_key2 = balances + .storage("FreeBalance") + .unwrap() + .map() + .unwrap() + .key(&dest); + assert_eq!(free_balance_key, free_balance_key2); + + let account_nonce = >::key_for(&dest); + let account_nonce_key = StorageKey(blake2_256(&account_nonce).to_vec()); + let account_nonce_key2 = client + .metadata() + .module("System") + .unwrap() + .storage("AccountNonce") + .unwrap() + .map() + .unwrap() + .key(&dest); + assert_eq!(account_nonce_key, account_nonce_key2); } } diff --git a/src/metadata.rs b/src/metadata.rs new file mode 100644 index 0000000000000..6ae6b20b98b28 --- /dev/null +++ b/src/metadata.rs @@ -0,0 +1,162 @@ +use parity_scale_codec::Encode; +use runtime_metadata::{ + DecodeDifferent, + RuntimeMetadata, + RuntimeMetadataPrefixed, + StorageEntryModifier, + StorageEntryType, + StorageHasher, + META_RESERVED, +}; +use std::{ + collections::HashMap, + convert::TryFrom, +}; +use substrate_primitives::storage::StorageKey; + +#[derive(Clone, Debug)] +pub struct Metadata { + modules: HashMap, +} + +impl Metadata { + pub fn module(&self, name: &str) -> Option<&ModuleMetadata> { + self.modules.get(name) + } +} + +#[derive(Clone, Debug)] +pub struct ModuleMetadata { + index: u8, + storage: HashMap, + // calls, event, constants +} + +impl ModuleMetadata { + pub fn call(&self, call: T) -> Vec { + let mut bytes = vec![self.index]; + bytes.extend(call.encode()); + bytes + } + + pub fn storage(&self, key: &str) -> Option<&StorageMetadata> { + self.storage.get(key) + } +} + +#[derive(Clone, Debug)] +pub struct StorageMetadata { + prefix: String, + modifier: StorageEntryModifier, + ty: StorageEntryType, + default: Vec, +} + +impl StorageMetadata { + pub fn map(&self) -> Option { + match &self.ty { + StorageEntryType::Map { hasher, .. } => { + let prefix = self.prefix.as_bytes().to_vec(); + let hasher = hasher.to_owned(); + Some(StorageMap { prefix, hasher }) + } + _ => None, + } + } +} + +#[derive(Clone, Debug)] +pub struct StorageMap { + prefix: Vec, + hasher: StorageHasher, +} + +impl StorageMap { + pub fn key(&self, key: K) -> StorageKey { + let mut bytes = self.prefix.clone(); + bytes.extend(key.encode()); + let hash = match self.hasher { + StorageHasher::Blake2_128 => { + substrate_primitives::blake2_128(&bytes).to_vec() + } + StorageHasher::Blake2_256 => { + substrate_primitives::blake2_256(&bytes).to_vec() + } + StorageHasher::Twox128 => substrate_primitives::twox_128(&bytes).to_vec(), + StorageHasher::Twox256 => substrate_primitives::twox_256(&bytes).to_vec(), + StorageHasher::Twox64Concat => substrate_primitives::twox_64(&bytes).to_vec(), + }; + StorageKey(hash) + } +} + +#[derive(Debug)] +pub enum Error { + InvalidPrefix, + InvalidVersion, + ExpectedDecoded, +} + +impl TryFrom for Metadata { + type Error = Error; + + fn try_from(metadata: RuntimeMetadataPrefixed) -> Result { + if metadata.0 != META_RESERVED { + Err(Error::InvalidPrefix)?; + } + let meta = match metadata.1 { + RuntimeMetadata::V7(meta) => meta, + _ => Err(Error::InvalidVersion)?, + }; + let mut modules = HashMap::new(); + for (i, module) in convert(meta.modules)?.into_iter().enumerate() { + modules.insert( + convert(module.name.clone())?, + convert_module(i as u8, module)?, + ); + } + Ok(Metadata { modules }) + } +} + +fn convert(dd: DecodeDifferent) -> Result { + match dd { + DecodeDifferent::Decoded(value) => Ok(value), + _ => Err(Error::ExpectedDecoded), + } +} + +fn convert_module( + index: u8, + module: runtime_metadata::ModuleMetadata, +) -> Result { + let mut entries = HashMap::new(); + if let Some(storage) = module.storage { + let storage = convert(storage)?; + let prefix = convert(storage.prefix)?; + for entry in convert(storage.entries)?.into_iter() { + let entry_name = convert(entry.name.clone())?; + let entry_prefix = format!("{} {}", prefix, entry_name); + let entry = convert_entry(entry_prefix, entry)?; + entries.insert(entry_name, entry); + } + } + + Ok(ModuleMetadata { + index, + storage: entries, + }) +} + +fn convert_entry( + prefix: String, + entry: runtime_metadata::StorageEntryMetadata, +) -> Result { + let default = convert(entry.default)?; + Ok(StorageMetadata { + prefix, + modifier: entry.modifier, + ty: entry.ty, + default, + }) +} diff --git a/src/rpc.rs b/src/rpc.rs index 0c9920a1e65de..b1a6843a27d83 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,12 +1,12 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of substrate-subxt. // -// ink! is free software: you can redistribute it and/or modify +// subxt is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // -// ink! is distributed in the hope that it will be useful, +// subxt is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. @@ -16,6 +16,7 @@ use crate::{ error::Error, + metadata::Metadata, ExtrinsicSuccess, }; use futures::{ @@ -33,12 +34,13 @@ use jsonrpc_core_client::{ }; use log; use num_traits::bounds::Bounded; -use parity_codec::{ +use parity_scale_codec::{ Codec, Decode, Encode, }; +use runtime_metadata::RuntimeMetadataPrefixed; use runtime_primitives::{ generic::UncheckedExtrinsic, traits::{ @@ -46,13 +48,13 @@ use runtime_primitives::{ SignedExtension, }, }; -use runtime_support::StorageMap; use serde::{ self, de::Error as DeError, Deserialize, }; -use std::marker::PhantomData; +use srml_system::EventRecord; +use std::convert::TryInto; use substrate_primitives::{ blake2_256, storage::{ @@ -93,7 +95,7 @@ impl<'a> serde::Deserialize<'a> for OpaqueExtrinsic { { let r = substrate_primitives::bytes::deserialize(de)?; Decode::decode(&mut &r[..]) - .ok_or(DeError::custom("Invalid value passed into decode")) + .map_err(|e| DeError::custom(format!("Decode error: {}", e))) } } @@ -113,75 +115,63 @@ pub struct SignedBlock { } /// Client for substrate rpc interfaces -pub struct Rpc { +pub struct Rpc { + _marker: std::marker::PhantomData, state: StateClient, chain: ChainClient, author: AuthorClient, - _phantom: PhantomData<(C, P, E, SE)>, } /// Allows connecting to all inner interfaces on the same RpcChannel -impl From for Rpc -where - T: srml_system::Trait, -{ - fn from(channel: RpcChannel) -> Rpc { - Rpc { +impl From for Rpc { + fn from(channel: RpcChannel) -> Self { + Self { + _marker: std::marker::PhantomData, state: channel.clone().into(), chain: channel.clone().into(), author: channel.into(), - _phantom: PhantomData, } } } -impl Rpc -where - T: srml_system::Trait, -{ +impl Rpc { /// Fetch a storage key - pub fn fetch( + pub fn storage( &self, - key: Vec, - ) -> impl Future, Error = RpcError> { - let storage_key = StorageKey(blake2_256(&key).to_vec()); + key: StorageKey, + ) -> impl Future, Error = Error> { self.state - .storage(storage_key, None) + .storage(key, None) .map(|data| { data.map(|d| Decode::decode(&mut &d.0[..]).expect("Valid storage key")) }) .map_err(Into::into) } -} - -impl Rpc -where - T: srml_system::Trait, - C: Codec + Send, - P: Pair, - P::Signature: Codec, - P::Public: Into, - E: Fn(T::Index) -> SE + Send, - SE: SignedExtension + Encode, -{ - /// Fetch the latest nonce for the given `AccountId` - fn fetch_nonce( - &self, - account: &T::AccountId, - ) -> impl Future::Index, Error = RpcError> { - let account_nonce_key = >::key_for(account); - self.fetch::<::Index>(account_nonce_key) - .map(|value| value.unwrap_or_default()) - } /// Fetch the genesis hash - fn fetch_genesis_hash( - &self, - ) -> impl Future, Error = RpcError> { + pub fn genesis_hash(&self) -> impl Future { let block_zero = T::BlockNumber::min_value(); - self.chain.block_hash(Some(NumberOrHex::Number(block_zero))) + self.chain + .block_hash(Some(NumberOrHex::Number(block_zero))) + .map_err(Into::into) + .and_then(|genesis_hash| { + future::result(genesis_hash.ok_or("Genesis hash not found".into())) + }) + } + + /// Fetch the metadata + pub fn metadata(&self) -> impl Future { + self.state + .metadata(None) + .map(|bytes| Decode::decode(&mut &bytes[..]).unwrap()) + .map_err(Into::into) + .and_then(|meta: RuntimeMetadataPrefixed| { + future::result(meta.try_into().map_err(|err| format!("{:?}", err).into())) + }) } +} +impl Rpc { /// Subscribe to substrate System Events fn subscribe_events( &self, @@ -197,10 +187,16 @@ where /// Submit an extrinsic, waiting for it to be finalized. /// If successful, returns the block hash. - fn submit_and_watch( + fn submit_and_watch( self, extrinsic: UncheckedExtrinsic, - ) -> impl Future { + ) -> impl Future + where + C: Codec + Send, + P: Pair, + P::Public: Into, + P::Signature: Codec, + { self.author .watch_extrinsic(extrinsic.encode().into()) .map_err(Into::into) @@ -227,77 +223,79 @@ where } /// Create and submit an extrinsic and return corresponding Event if successful - pub fn create_and_submit_extrinsic( + pub fn create_and_submit_extrinsic( self, signer: P, call: C, - extra: E, - ) -> impl Future, Error = Error> { - let account_nonce = self - .fetch_nonce(&signer.public().into()) - .map_err(Into::into); - let genesis_hash = - self.fetch_genesis_hash() - .map_err(Into::into) - .and_then(|genesis_hash| { - future::result(genesis_hash.ok_or("Genesis hash not found".into())) - }); + extra: SE, + account_nonce: T::Index, + genesis_hash: T::Hash, + ) -> impl Future, Error = Error> + where + C: Codec + Send, + P: Pair, + P::Public: Into, + P::Signature: Codec, + { let events = self.subscribe_events().map_err(Into::into); + events.and_then(move |events| { + let extrinsic = Self::create_and_sign_extrinsic( + account_nonce, + call, + genesis_hash, + &signer, + extra, + ); + let ext_hash = T::Hashing::hash_of(&extrinsic); - account_nonce.join3(genesis_hash, events).and_then( - move |(index, genesis_hash, events)| { - let extrinsic = Self::create_and_sign_extrinsic( - index, - call, - genesis_hash, - &signer, - extra, - ); - let ext_hash = T::Hashing::hash_of(&extrinsic); + log::info!("Submitting Extrinsic `{:?}`", ext_hash); - log::info!("Submitting Extrinsic `{:?}`", ext_hash); - - let chain = self.chain.clone(); - self.submit_and_watch(extrinsic) - .and_then(move |bh| { - log::info!("Fetching block {:?}", bh); - chain - .block(Some(bh)) - .map(move |b| (bh, b)) - .map_err(Into::into) - }) - .and_then(|(h, b)| { - b.ok_or(format!("Failed to find block {:?}", h).into()) - .map(|b| (h, b)) - .into_future() - }) - .and_then(move |(bh, sb)| { - log::info!( - "Found block {:?}, with {} extrinsics", - bh, - sb.block.extrinsics.len() - ); - wait_for_block_events::(ext_hash, &sb, bh, events) - }) - }, - ) + let chain = self.chain.clone(); + self.submit_and_watch::(extrinsic) + .and_then(move |bh| { + log::info!("Fetching block {:?}", bh); + chain + .block(Some(bh)) + .map(move |b| (bh, b)) + .map_err(Into::into) + }) + .and_then(|(h, b)| { + b.ok_or(format!("Failed to find block {:?}", h).into()) + .map(|b| (h, b)) + .into_future() + }) + .and_then(move |(bh, sb)| { + log::info!( + "Found block {:?}, with {} extrinsics", + bh, + sb.block.extrinsics.len() + ); + wait_for_block_events::(ext_hash, &sb, bh, events) + }) + }) } /// Creates and signs an Extrinsic for the supplied `Call` - fn create_and_sign_extrinsic( + fn create_and_sign_extrinsic( index: T::Index, function: C, genesis_hash: T::Hash, signer: &P, - extra: E, - ) -> UncheckedExtrinsic { + extra: SE, + ) -> UncheckedExtrinsic + where + C: Encode + Send, + P: Pair, + P::Public: Into, + P::Signature: Codec, + { log::info!( "Creating Extrinsic with genesis hash {:?} and account nonce {:?}", genesis_hash, index ); - let raw_payload = (function, extra(index), genesis_hash); + let raw_payload = (function, extra.clone(), genesis_hash); let signature = raw_payload.using_encoded(|payload| { if payload.len() > 256 { signer.sign(&blake2_256(payload)[..]) @@ -310,21 +308,18 @@ where raw_payload.0, signer.public().into(), signature.into(), - extra(index), + extra, ) } } /// Waits for events for the block triggered by the extrinsic -fn wait_for_block_events( +fn wait_for_block_events( ext_hash: T::Hash, signed_block: &SignedBlock, block_hash: T::Hash, - events: TypedSubscriptionStream>, -) -> impl Future, Error = Error> -where - T: srml_system::Trait, -{ + events_stream: TypedSubscriptionStream>, +) -> impl Future, Error = Error> { let ext_index = signed_block .block .extrinsics @@ -337,33 +332,33 @@ where .into_future(); let block_hash = block_hash.clone(); - let block_events = events - .map(|event| { - let records = event - .changes - .iter() - .filter_map(|(_key, data)| { - data.as_ref() - .and_then(|data| Decode::decode(&mut &data.0[..])) - }) - .flat_map(|events: Vec>| { - events - }) - .collect::>(); - log::debug!("Block {:?}, Events {:?}", event.block, records.len()); - (event.block, records) - }) - .filter(move |(event_block, _)| *event_block == block_hash) + let block_events = events_stream + .filter(move |event| event.block == block_hash) .into_future() .map_err(|(e, _)| e.into()) - .map(|(events, _)| events); + .and_then(|(change_set, _)| { + match change_set { + None => future::ok(Vec::new()), + Some(change_set) => { + let events = change_set + .changes + .iter() + .filter_map(|(_key, data)| { + data.as_ref().map(|data| Decode::decode(&mut &data.0[..])) + }) + .collect::>>, _>>() + .map(|events| events.into_iter().flat_map(|es| es).collect()) + .map_err(Into::into); + future::result(events) + } + } + }); block_events .join(ext_index) .map(move |(events, ext_index)| { let events: Vec = events .iter() - .flat_map(|(_, events)| events) .filter_map(|e| { if let srml_system::Phase::ApplyExtrinsic(i) = e.phase { if i as usize == ext_index {