Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage map iter #148

Merged
merged 7 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions examples/fetch_all_accounts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of substrate-subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with substrate-subxt. If not, see <http://www.gnu.org/licenses/>.

use substrate_subxt::{
system::AccountStoreExt,
ClientBuilder,
DefaultNodeRuntime,
};

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();

let client = ClientBuilder::<DefaultNodeRuntime>::new().build().await?;
let mut iter = client.account_iter(None).await?;
while let Some((key, account)) = iter.next().await? {
println!("{:?}: {}", key, account.data.free);
}
Ok(())
}
35 changes: 31 additions & 4 deletions proc-macro/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub fn store(s: Structure) -> TokenStream {
let module = utils::module_name(generics);
let store_name = utils::ident_to_name(ident, "Store").to_camel_case();
let store = format_ident!("{}", store_name.to_snake_case());
let store_iter = format_ident!("{}_iter", store_name.to_snake_case());
let store_trait = format_ident!("{}StoreExt", store_name);
let bindings = utils::bindings(&s);
let fields = utils::fields(&bindings);
Expand Down Expand Up @@ -110,6 +111,7 @@ pub fn store(s: Structure) -> TokenStream {
let keys = filtered_fields
.iter()
.map(|(field, _)| quote!(&self.#field));
let key_iter = quote!(#subxt::KeyIter<T, #ident<#(#params),*>>);

quote! {
impl#generics #subxt::Store<T> for #ident<#(#params),*> {
Expand Down Expand Up @@ -139,13 +141,19 @@ pub fn store(s: Structure) -> TokenStream {
}

/// Store extension trait.
pub trait #store_trait<T: #module> {
pub trait #store_trait<T: #subxt::Runtime + #module> {
/// Retrieve the store element.
fn #store<'a>(
&'a self,
#args
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<#ret, #subxt::Error>> + Send + 'a>>;

/// Iterate over the store element.
fn #store_iter<'a>(
&'a self,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<#key_iter, #subxt::Error>> + Send + 'a>>;
}

impl<T: #subxt::Runtime + #module> #store_trait<T> for #subxt::Client<T> {
Expand All @@ -155,7 +163,14 @@ pub fn store(s: Structure) -> TokenStream {
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<#ret, #subxt::Error>> + Send + 'a>> {
let #marker = core::marker::PhantomData::<T>;
Box::pin(self.#fetch(#build_struct, hash))
Box::pin(async move { self.#fetch(&#build_struct, hash).await })
}

fn #store_iter<'a>(
&'a self,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<#key_iter, #subxt::Error>> + Send + 'a>> {
Box::pin(self.iter(hash))
}
}
}
Expand Down Expand Up @@ -202,13 +217,18 @@ mod tests {
}

/// Store extension trait.
pub trait AccountStoreExt<T: Balances> {
pub trait AccountStoreExt<T: substrate_subxt::Runtime + Balances> {
/// Retrieve the store element.
fn account<'a>(
&'a self,
account_id: &'a <T as System>::AccountId,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<AccountData<T::Balance>, substrate_subxt::Error>> + Send + 'a>>;
/// Iterate over the store element.
fn account_iter<'a>(
&'a self,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<substrate_subxt::KeyIter<T, AccountStore<'a, T>>, substrate_subxt::Error>> + Send + 'a>>;
}

impl<T: substrate_subxt::Runtime + Balances> AccountStoreExt<T> for substrate_subxt::Client<T> {
Expand All @@ -219,7 +239,14 @@ mod tests {
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<AccountData<T::Balance>, substrate_subxt::Error>> + Send + 'a>>
{
let _ = core::marker::PhantomData::<T>;
Box::pin(self.fetch_or_default(AccountStore { account_id, }, hash))
Box::pin(async move { self.fetch_or_default(&AccountStore { account_id, }, hash).await })
}

fn account_iter<'a>(
&'a self,
hash: Option<T::Hash>,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<substrate_subxt::KeyIter<T, AccountStore<'a, T>>, substrate_subxt::Error>> + Send + 'a>> {
Box::pin(self.iter(hash))
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions proc-macro/tests/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ subxt_test!({
account: Alice,
step: {
state: {
alice: AccountStore { account_id: &alice },
bob: AccountStore { account_id: &bob },
alice: &AccountStore { account_id: &alice },
bob: &AccountStore { account_id: &bob },
},
call: TransferCall {
to: &bob,
Expand Down
135 changes: 116 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use sc_rpc_api::state::ReadProof;
use sp_core::{
storage::{
StorageChangeSet,
StorageData,
StorageKey,
},
Bytes,
Expand Down Expand Up @@ -115,6 +116,7 @@ pub struct ClientBuilder<T: Runtime> {
_marker: std::marker::PhantomData<T>,
url: Option<String>,
client: Option<jsonrpsee::Client>,
page_size: Option<u32>,
}

impl<T: Runtime> ClientBuilder<T> {
Expand All @@ -124,6 +126,7 @@ impl<T: Runtime> ClientBuilder<T> {
_marker: std::marker::PhantomData,
url: None,
client: None,
page_size: None,
}
}

Expand All @@ -139,6 +142,12 @@ impl<T: Runtime> ClientBuilder<T> {
self
}

/// Set the page size.
pub fn set_page_size(mut self, size: u32) -> Self {
self.page_size = Some(size);
self
}

/// Creates a new Client.
pub async fn build(self) -> Result<Client<T>, Error> {
let client = if let Some(client) = self.client {
Expand All @@ -164,6 +173,7 @@ impl<T: Runtime> ClientBuilder<T> {
metadata: metadata?,
runtime_version: runtime_version?,
_marker: PhantomData,
page_size: self.page_size.unwrap_or(10),
})
}
}
Expand All @@ -175,6 +185,7 @@ pub struct Client<T: Runtime> {
metadata: Metadata,
runtime_version: RuntimeVersion,
_marker: PhantomData<(fn() -> T::Signature, T::Extra)>,
page_size: u32,
}

impl<T: Runtime> Clone for Client<T> {
Expand All @@ -185,6 +196,53 @@ impl<T: Runtime> Clone for Client<T> {
metadata: self.metadata.clone(),
runtime_version: self.runtime_version.clone(),
_marker: PhantomData,
page_size: self.page_size,
}
}
}

/// Iterates over key value pairs in a map.
pub struct KeyIter<T: Runtime, F: Store<T>> {
client: Client<T>,
_marker: PhantomData<F>,
count: u32,
hash: T::Hash,
start_key: Option<StorageKey>,
buffer: Vec<(StorageKey, StorageData)>,
}

impl<T: Runtime, F: Store<T>> KeyIter<T, F> {
/// Returns the next key value pair from a map.
pub async fn next(&mut self) -> Result<Option<(StorageKey, F::Returns)>, Error> {
ascjones marked this conversation as resolved.
Show resolved Hide resolved
loop {
if let Some((k, v)) = self.buffer.pop() {
return Ok(Some((k, Decode::decode(&mut &v.0[..])?)))
} else {
let keys = self
.client
.fetch_keys::<F>(self.count, self.start_key.take(), Some(self.hash))
.await?;

if keys.is_empty() {
return Ok(None)
}

self.start_key = keys.last().cloned();

let change_sets = self
.client
.rpc
.query_storage_at(&keys, Some(self.hash))
.await?;
for change_set in change_sets {
for (k, v) in change_set.changes {
if let Some(v) = v {
self.buffer.push((k, v));
}
}
}
debug_assert_eq!(self.buffer.len(), self.count as usize);
}
}
}
}
Expand All @@ -200,45 +258,69 @@ impl<T: Runtime> Client<T> {
&self.metadata
}

/// Fetch a StorageKey with default value.
/// Fetch a StorageKey with an optional block hash.
pub async fn fetch<F: Store<T>>(
&self,
store: &F,
hash: Option<T::Hash>,
) -> Result<Option<F::Returns>, Error> {
let key = store.key(&self.metadata)?;
if let Some(data) = self.rpc.storage(&key, hash).await? {
Ok(Some(Decode::decode(&mut &data.0[..])?))
} else {
Ok(None)
}
}

/// Fetch a StorageKey that has a default value with an optional block hash.
pub async fn fetch_or_default<F: Store<T>>(
&self,
store: F,
store: &F,
hash: Option<T::Hash>,
) -> Result<F::Returns, Error> {
let key = store.key(&self.metadata)?;
if let Some(data) = self.rpc.storage(key, hash).await? {
Ok(Decode::decode(&mut &data.0[..])?)
if let Some(data) = self.fetch(store, hash).await? {
Ok(data)
} else {
Ok(store.default(&self.metadata)?)
}
}

/// Fetch a StorageKey an optional storage key.
pub async fn fetch<F: Store<T>>(
/// Returns an iterator of key value pairs.
pub async fn iter<F: Store<T>>(
&self,
store: F,
hash: Option<T::Hash>,
) -> Result<Option<F::Returns>, Error> {
let key = store.key(&self.metadata)?;
if let Some(data) = self.rpc.storage(key, hash).await? {
Ok(Some(Decode::decode(&mut &data.0[..])?))
) -> Result<KeyIter<T, F>, Error> {
let hash = if let Some(hash) = hash {
hash
} else {
Ok(None)
}
self.block_hash(None)
.await?
.expect("didn't pass a block number; qed")
};
Ok(KeyIter {
client: self.clone(),
hash,
count: self.page_size,
start_key: None,
buffer: Default::default(),
_marker: PhantomData,
})
}

/// Fetch up to `count` keys for a storage map in lexicographic order.
///
/// Supports pagination by passing a value to `start_key`.
/// Fetch up to `count` keys for a storage map in lexicographic order.
///
/// Supports pagination by passing a value to `start_key`.
pub async fn fetch_keys<F: Store<T>>(
&self,
count: u32,
start_key: Option<StorageKey>,
hash: Option<T::Hash>,
) -> Result<Vec<StorageKey>, Error> {
let prefix = <F as Store<T>>::prefix(&self.metadata)?;
let keys = self.rpc.storage_keys_paged(Some(prefix), count, start_key, hash).await?;
let keys = self
.rpc
.storage_keys_paged(Some(prefix), count, start_key, hash)
.await?;
Ok(keys)
}

Expand Down Expand Up @@ -513,6 +595,7 @@ mod tests {
SubxtClient::from_config(config, test_node::service::new_full)
.expect("Error creating subxt client"),
)
.set_page_size(2)
.build()
.await
.expect("Error creating client");
Expand Down Expand Up @@ -630,7 +713,21 @@ mod tests {
#[async_std::test]
async fn test_fetch_keys() {
let (client, _) = test_client().await;
let keys = client.fetch_keys::<system::AccountStore<_>>(4, None, None).await.unwrap();
let keys = client
.fetch_keys::<system::AccountStore<_>>(4, None, None)
.await
.unwrap();
assert_eq!(keys.len(), 4)
}

#[async_std::test]
async fn test_iter() {
let (client, _) = test_client().await;
let mut iter = client.iter::<system::AccountStore<_>>(None).await.unwrap();
let mut i = 0;
while let Some(_) = iter.next().await.unwrap() {
i += 1;
}
assert_eq!(i, 4);
}
}
19 changes: 16 additions & 3 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<T: Runtime> Rpc<T> {
/// Fetch a storage key
pub async fn storage(
&self,
key: StorageKey,
key: &StorageKey,
hash: Option<T::Hash>,
) -> Result<Option<StorageData>, Error> {
let params = Params::Array(vec![to_json_value(key)?, to_json_value(hash)?]);
Expand All @@ -132,8 +132,8 @@ impl<T: Runtime> Rpc<T> {
}

/// Returns the keys with prefix with pagination support.
/// Up to `count` keys will be returned.
/// If `start_key` is passed, return next keys in storage in lexicographic order.
/// Up to `count` keys will be returned.
/// If `start_key` is passed, return next keys in storage in lexicographic order.
pub async fn storage_keys_paged(
&self,
prefix: Option<StorageKey>,
Expand Down Expand Up @@ -170,6 +170,19 @@ impl<T: Runtime> Rpc<T> {
.map_err(Into::into)
}

/// Query historical storage entries
pub async fn query_storage_at(
&self,
keys: &[StorageKey],
at: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<<T as System>::Hash>>, Error> {
let params = Params::Array(vec![to_json_value(keys)?, to_json_value(at)?]);
self.client
.request("state_queryStorage", params)
.await
.map_err(Into::into)
}

/// Fetch the genesis hash
pub async fn genesis_hash(&self) -> Result<T::Hash, Error> {
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0)));
Expand Down