Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc-v2: Implement archive_unstable_storageDiff #5997

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e17a555
archive/api: Add stroage diff method
lexnv Oct 9, 2024
4101ee6
storage/events: Add storage events and test serialization
lexnv Oct 9, 2024
d781d67
archive: Add initial implementation for storage diff
lexnv Oct 9, 2024
60e1a93
archive: Handle modified and delete cases
lexnv Oct 9, 2024
63da031
archive/events: Derive more impl for diff items
lexnv Oct 9, 2024
25b9e5a
archive: Deduplicate input keys
lexnv Oct 9, 2024
52cfaf7
archive: Move code to different module
lexnv Oct 9, 2024
ee6bed7
archive: Fetch value / hash or both
lexnv Oct 9, 2024
8948e74
events: Add subscription events for ArchiveStorageDiffEvent
lexnv Nov 4, 2024
fbfb0dc
archive: Make archive_storageDiff a subscription
lexnv Nov 4, 2024
f7b33ed
archive: Modify storageDiff to leverage subscription backpressure
lexnv Nov 4, 2024
f47db36
archive: Deduplicate items to a separate function
lexnv Nov 5, 2024
97546a4
archive/tests: Check deduplication
lexnv Nov 5, 2024
baa3dd0
archive: Fix deduplication shortest key
lexnv Nov 5, 2024
4683f6b
archive/tests: Check complex deduplication
lexnv Nov 5, 2024
36f59d2
archive: Simplify logic of trie iteration
lexnv Nov 5, 2024
a084f47
archive: Improve documentation
lexnv Nov 5, 2024
77cac50
archive/events: Add derive(Eq) to archive storage diff events
lexnv Nov 5, 2024
e2324a4
archive/tests: Check query for main trie under prefix
lexnv Nov 5, 2024
4b05913
archive: Add trace and debug logs for storage diff
lexnv Nov 5, 2024
83cb9b4
archive: Send StorageDiffDone for partial trie queries
lexnv Nov 5, 2024
85c810e
archive/tests: Check no changes between blocks
lexnv Nov 5, 2024
152d13b
archive/tests: Ensure added key
lexnv Nov 5, 2024
4f549bf
archive/tests: Extend test with interleaved values and hashes
lexnv Nov 5, 2024
d208068
rpc-v2: Use indexmap crate
lexnv Nov 5, 2024
369a4d2
archive: Move prevHash as last parameter to archive diff
lexnv Nov 5, 2024
a02cc84
archive: Preserve order of elements with indexMap
lexnv Nov 5, 2024
637b446
archive/tests: Check deleted keys
lexnv Nov 5, 2024
76efb18
events: Add common wrappers for construction
lexnv Nov 5, 2024
ca1fa20
archive: Propagate errors via events
lexnv Nov 5, 2024
03eefd7
archive: Check invalid parameters and events
lexnv Nov 5, 2024
9a1e792
archive: Fix clippy
lexnv Nov 5, 2024
dbbfce1
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 5, 2024
971026d
Add prdoc
lexnv Nov 5, 2024
2880058
Merge branch 'master' into lexnv/storage-diff
lexnv Nov 6, 2024
b774855
archive/storage: Optimize space used for saved keys
lexnv Nov 6, 2024
838504b
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv Nov 6, 2024
60abd9b
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions prdoc/pr_5997.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Implement archive_unstable_storageDiff method

doc:
- audience: Node Dev
description: |
This PR implements the `archive_unstable_storageDiff` rpc-v2 method.
Developers can use this method to fetch the storage differences
between two blocks. This is useful for oracles and archive nodes.
For more details see: https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/archive_unstable_storageDiff.md.

crates:
- name: sc-rpc-spec-v2
bump: minor
2 changes: 2 additions & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ log = { workspace = true, default-features = true }
futures-util = { workspace = true }
rand = { workspace = true, default-features = true }
schnellru = { workspace = true }
itertools = { workspace = true }
indexmap = { workspace = true }

[dev-dependencies]
jsonrpsee = { workspace = true, features = ["server", "ws-client"] }
Expand Down
22 changes: 21 additions & 1 deletion substrate/client/rpc-spec-v2/src/archive/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
//! API trait of the archive methods.

use crate::{
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
},
MethodResult,
};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
Expand Down Expand Up @@ -104,4 +107,21 @@ pub trait ArchiveApi<Hash> {
items: Vec<PaginatedStorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult>;

/// Returns the storage difference between two blocks.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "archive_unstable_storageDiff" => "archive_unstable_storageDiffEvent",
unsubscribe = "archive_unstable_storageDiff_stopStorageDiff",
item = ArchiveStorageDiffEvent,
)]
fn archive_unstable_storage_diff(
&self,
hash: Hash,
items: Vec<ArchiveStorageDiffItem<String>>,
previous_hash: Option<Hash>,
);
}
122 changes: 117 additions & 5 deletions substrate/client/rpc-spec-v2/src/archive/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,29 @@
//! API implementation for `archive`.

use crate::{
archive::{error::Error as ArchiveError, ArchiveApiServer},
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
hex_string, MethodResult,
archive::{
archive_storage::{deduplicate_storage_diff_items, ArchiveStorage, ArchiveStorageDiff},
error::Error as ArchiveError,
ArchiveApiServer,
},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
},
hex_string, MethodResult, SubscriptionTaskExecutor,
};

use codec::Encode;
use jsonrpsee::core::{async_trait, RpcResult};
use futures::FutureExt;
use jsonrpsee::{
core::{async_trait, RpcResult},
PendingSubscriptionSink,
};
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::Subscription;
use sp_api::{CallApiAt, CallContext};
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
Expand All @@ -41,7 +53,9 @@ use sp_runtime::{
};
use std::{collections::HashSet, marker::PhantomData, sync::Arc};

use super::archive_storage::ArchiveStorage;
use tokio::sync::mpsc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";

/// The configuration of [`Archive`].
pub struct ArchiveConfig {
Expand All @@ -64,6 +78,12 @@ const MAX_DESCENDANT_RESPONSES: usize = 5;
/// `MAX_DESCENDANT_RESPONSES`.
const MAX_QUERIED_ITEMS: usize = 8;

/// The buffer capacity for each storage query.
///
/// This is small because the underlying JSON-RPC server has
/// its down buffer capacity per connection as well.
const STORAGE_QUERY_BUF: usize = 16;

impl Default for ArchiveConfig {
fn default() -> Self {
Self {
Expand All @@ -79,6 +99,8 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
/// Backend of the chain.
backend: Arc<BE>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items the `archive_storage` can return for a descendant query before
Expand All @@ -96,12 +118,14 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
client: Arc<Client>,
backend: Arc<BE>,
genesis_hash: GenesisHash,
executor: SubscriptionTaskExecutor,
config: ArchiveConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend,
executor,
genesis_hash,
storage_max_descendant_responses: config.max_descendant_responses,
storage_max_queried_items: config.max_queried_items,
Expand Down Expand Up @@ -278,4 +302,92 @@ where

Ok(storage_client.handle_query(hash, items, child_trie))
}

fn archive_unstable_storage_diff(
&self,
pending: PendingSubscriptionSink,
hash: Block::Hash,
items: Vec<ArchiveStorageDiffItem<String>>,
previous_hash: Option<Block::Hash>,
) {
let storage_client = ArchiveStorageDiff::new(self.client.clone());
let client = self.client.clone();

log::trace!(target: LOG_TARGET, "Storage diff subscription started");

let fut = async move {
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };

// Deduplicate the items.
let mut trie_items = match deduplicate_storage_diff_items(items) {
Ok(items) => items,
Err(error) => {
let _ = sink.send(&ArchiveStorageDiffEvent::err(error.to_string())).await;
return
},
};
// Default to using the main storage trie if no items are provided.
if trie_items.is_empty() {
trie_items.push(Vec::new());
}
log::trace!(target: LOG_TARGET, "Storage diff deduplicated items: {:?}", trie_items);

let previous_hash = if let Some(previous_hash) = previous_hash {
previous_hash
} else {
let Ok(Some(current_header)) = client.header(hash) else {
let message = format!("Block header is not present: {hash}");
let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
return
};
*current_header.parent_hash()
};

let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
for trie_queries in trie_items {
Copy link
Member

@niklasad1 niklasad1 Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why not pass trie_items directly to storage_client.handle_trie_queries?
Would be neat not spawn a task for each trie query...

I reckon that it would be quite neat to move the sender and rely on that it will close down the stream instead checking whether the event.is_done()....

Then also send ArchiveStorageDiffEvent::StorageDiffDone in process_event...

let storage_fut = storage_client.handle_trie_queries(
hash,
previous_hash,
trie_queries,
tx.clone(),
);
let result =
futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
if !result.1 {
log::debug!(target: LOG_TARGET, "Error processing trie queries");
return;
}
}

let _ = sink.send(&ArchiveStorageDiffEvent::StorageDiffDone).await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
}

/// Returns true if the events where processed successfully, false otherwise.
async fn process_events(
rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>,
sink: &mut Subscription,
) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above, would be neat to remove the bool flag, process all trie queries in handle_trie_queries and just deal with logs, sending out all message and so in this function.

while let Some(event) = rx.recv().await {
if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
break
}

let is_error_event = event.is_err();
if let Err(_) = sink.send(&event).await {
return false
}

if is_error_event {
log::debug!(target: LOG_TARGET, "Error event encountered while processing partial trie query");
// Stop further processing if an error event is received.
return false
}
}

true
}
Loading
Loading