Skip to content

Commit

Permalink
archive: Refactor archive_storage method into subscription (parityt…
Browse files Browse the repository at this point in the history
…ech#6483)

This PR adapts the `archive_storage` implementation from a method to a
subscription.

This keeps the archive APIs uniform and consistent.

Builds on: paritytech#5997

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: James Wilson <james@jsdw.me>
  • Loading branch information
2 people authored and Krayt78 committed Dec 18, 2024
1 parent c3d5de4 commit 0e673c3
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 579 deletions.
13 changes: 8 additions & 5 deletions substrate/client/rpc-spec-v2/src/archive/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
use crate::{
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
},
MethodResult,
};
Expand Down Expand Up @@ -100,13 +99,17 @@ pub trait ArchiveApi<Hash> {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "archive_unstable_storage", blocking)]
#[subscription(
name = "archive_unstable_storage" => "archive_unstable_storageEvent",
unsubscribe = "archive_unstable_stopStorage",
item = ArchiveStorageEvent,
)]
fn archive_unstable_storage(
&self,
hash: Hash,
items: Vec<PaginatedStorageQuery<String>>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult>;
);

/// Returns the storage difference between two blocks.
///
Expand Down
202 changes: 107 additions & 95 deletions substrate/client/rpc-spec-v2/src/archive/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
use crate::{
archive::{
archive_storage::{ArchiveStorage, ArchiveStorageDiff},
error::Error as ArchiveError,
ArchiveApiServer,
archive_storage::ArchiveStorageDiff, error::Error as ArchiveError, ArchiveApiServer,
},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
common::{
events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
},
storage::{QueryResult, StorageSubscriptionClient},
},
hex_string, MethodResult, SubscriptionTaskExecutor,
};
Expand Down Expand Up @@ -57,42 +57,12 @@ use tokio::sync::mpsc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";

/// The configuration of [`Archive`].
pub struct ArchiveConfig {
/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
pub max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time.
pub max_queried_items: usize,
}

/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
///
/// Note: this is identical to the `chainHead` value.
const MAX_DESCENDANT_RESPONSES: usize = 5;

/// The maximum number of queried items allowed for the `archive_storage` at a time.
///
/// Note: A queried item can also be a descendant query which can return up to
/// `MAX_DESCENDANT_RESPONSES`.
const MAX_QUERIED_ITEMS: usize = 8;

/// The buffer capacity for each storage query.
///
/// This is small because the underlying JSON-RPC server has
/// its down buffer capacity per connection as well.
const STORAGE_QUERY_BUF: usize = 16;

impl Default for ArchiveConfig {
fn default() -> Self {
Self {
max_descendant_responses: MAX_DESCENDANT_RESPONSES,
max_queried_items: MAX_QUERIED_ITEMS,
}
}
}

/// An API for archive RPC calls.
pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
Expand All @@ -103,11 +73,6 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
executor: SubscriptionTaskExecutor,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
storage_max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time.
storage_max_queried_items: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -119,18 +84,9 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
backend: Arc<BE>,
genesis_hash: GenesisHash,
executor: SubscriptionTaskExecutor,
config: ArchiveConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend,
executor,
genesis_hash,
storage_max_descendant_responses: config.max_descendant_responses,
storage_max_queried_items: config.max_queried_items,
_phantom: PhantomData,
}
Self { client, backend, executor, genesis_hash, _phantom: PhantomData }
}
}

Expand Down Expand Up @@ -260,47 +216,53 @@ where

fn archive_unstable_storage(
&self,
pending: PendingSubscriptionSink,
hash: Block::Hash,
items: Vec<PaginatedStorageQuery<String>>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult> {
let items = items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
let pagination_start_key = query
.pagination_start_key
.map(|key| parse_hex_param(key).map(|key| StorageKey(key)))
.transpose()?;

// Paginated start key is only supported
if pagination_start_key.is_some() && !query.query_type.is_descendant_query() {
return Err(ArchiveError::InvalidParam(
"Pagination start key is only supported for descendants queries"
.to_string(),
))
}
) {
let mut storage_client =
StorageSubscriptionClient::<Client, Block, BE>::new(self.client.clone());

let fut = async move {
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };

Ok(PaginatedStorageQuery {
key,
query_type: query.query_type,
pagination_start_key,
let items = match items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
Ok(StorageQuery { key, query_type: query.query_type })
})
})
.collect::<Result<Vec<_>, ArchiveError>>()?;
.collect::<Result<Vec<_>, ArchiveError>>()
{
Ok(items) => items,
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
return
},
};

let child_trie = child_trie
.map(|child_trie| parse_hex_param(child_trie))
.transpose()?
.map(ChildInfo::new_default_from_vec);
let child_trie = child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose();
let child_trie = match child_trie {
Ok(child_trie) => child_trie.map(ChildInfo::new_default_from_vec),
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
return
},
};

let storage_client = ArchiveStorage::new(
self.client.clone(),
self.storage_max_descendant_responses,
self.storage_max_queried_items,
);
let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
let storage_fut = storage_client.generate_events(hash, items, child_trie, tx);

Ok(storage_client.handle_query(hash, items, child_trie))
// We don't care about the return value of this join:
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_storage_events(&mut rx, &mut sink))
.await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}

fn archive_unstable_storage_diff(
Expand Down Expand Up @@ -337,24 +299,74 @@ where
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
let _ =
futures::future::join(storage_fut, process_storage_diff_events(&mut rx, &mut sink))
.await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
}

/// Sends all the events to the sink.
async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) {
while let Some(event) = rx.recv().await {
if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
/// Sends all the events of the storage_diff method to the sink.
async fn process_storage_diff_events(
rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>,
sink: &mut Subscription,
) {
loop {
tokio::select! {
_ = sink.closed() => {
return
},

maybe_event = rx.recv() => {
let Some(event) = maybe_event else {
break;
};

if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
}

if sink.send(&event).await.is_err() {
return
}
}
}
}
}

/// Sends all the events of the storage method to the sink.
async fn process_storage_events(rx: &mut mpsc::Receiver<QueryResult>, sink: &mut Subscription) {
loop {
tokio::select! {
_ = sink.closed() => {
break
}

maybe_storage = rx.recv() => {
let Some(event) = maybe_storage else {
break;
};

match event {
Ok(None) => continue,

Ok(Some(event)) =>
if sink.send(&ArchiveStorageEvent::result(event)).await.is_err() {
return
},

if sink.send(&event).await.is_err() {
return
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error)).await;
return
}
}
}
}
}

let _ = sink.send(&ArchiveStorageEvent::StorageDone).await;
}
Loading

0 comments on commit 0e673c3

Please sign in to comment.