Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): enable eth_getProof #5071

Merged
merged 4 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, PayloadAttributes, PayloadError, PayloadStatus,
PayloadStatusEnum, PayloadValidationError,
};
use reth_rpc_types_compat::payload::{try_into_block, validate_block_hash};
use reth_rpc_types_compat::engine::payload::{try_into_block, validate_block_hash};
use reth_stages::{ControlFlow, Pipeline, PipelineError};
use reth_tasks::TaskSpawner;
use std::{
Expand Down
12 changes: 5 additions & 7 deletions crates/revm/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,9 @@ mod tests {
use reth_primitives::{
bytes,
constants::{BEACON_ROOTS_ADDRESS, SYSTEM_ADDRESS},
keccak256, Account, Bytecode, Bytes, ChainSpecBuilder, ForkCondition, StorageKey, MAINNET,
keccak256,
trie::AccountProof,
Account, Bytecode, Bytes, ChainSpecBuilder, ForkCondition, StorageKey, MAINNET,
};
use reth_provider::{AccountReader, BlockHashReader, StateRootProvider};
use revm::{Database, TransitionState};
Expand Down Expand Up @@ -634,12 +636,8 @@ mod tests {
Ok(self.contracts.get(&code_hash).cloned())
}

fn proof(
&self,
_address: Address,
_keys: &[B256],
) -> RethResult<(Vec<Bytes>, B256, Vec<Vec<Bytes>>)> {
todo!()
fn proof(&self, _address: Address, _keys: &[B256]) -> RethResult<AccountProof> {
unimplemented!("proof generation is not supported")
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/rpc/rpc-builder/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use reth_provider::{
};
use reth_rpc::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider,
JwtAuthValidator, JwtSecret, TracingCallPool,
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter,
EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret,
};
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::TaskSpawner;
Expand Down Expand Up @@ -66,7 +66,7 @@ where
gas_oracle,
EthConfig::default().rpc_gas_cap,
Box::new(executor.clone()),
TracingCallPool::build().expect("failed to build tracing pool"),
BlockingTaskPool::build().expect("failed to build tracing pool"),
);
let eth_filter = EthFilter::new(
provider,
Expand Down
4 changes: 2 additions & 2 deletions crates/rpc/rpc-builder/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use reth_rpc::{
gas_oracle::GasPriceOracleConfig,
RPC_DEFAULT_GAS_CAP,
},
EthApi, EthFilter, EthPubSub, TracingCallPool,
BlockingTaskPool, EthApi, EthFilter, EthPubSub,
};
use serde::{Deserialize, Serialize};

Expand All @@ -21,7 +21,7 @@ pub struct EthHandlers<Provider, Pool, Network, Events> {
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: EthPubSub<Provider, Pool, Events, Network>,
/// The configured tracing call pool
pub tracing_call_pool: TracingCallPool,
pub blocking_task_pool: BlockingTaskPool,
}

/// Additional config values for the eth namespace
Expand Down
27 changes: 14 additions & 13 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ use reth_rpc::{
cache::{cache_new_blocks_task, EthStateCache},
gas_oracle::GasPriceOracle,
},
AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider,
NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TracingCallPool, TxPoolApi,
Web3Api,
AdminApi, BlockingTaskGuard, BlockingTaskPool, DebugApi, EngineEthApi, EthApi, EthFilter,
EthPubSub, EthSubscriptionIdProvider, NetApi, OtterscanApi, RPCApi, RethApi, TraceApi,
TxPoolApi, Web3Api,
};
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
Expand Down Expand Up @@ -719,7 +719,7 @@ pub struct RethModuleRegistry<Provider, Pool, Network, Tasks, Events> {
/// Holds a clone of all the eth namespace handlers
eth: Option<EthHandlers<Provider, Pool, Network, Events>>,
/// to put trace calls behind semaphore
tracing_call_guard: TracingCallGuard,
blocking_pool_guard: BlockingTaskGuard,
/// Contains the [Methods] of a module
modules: HashMap<RethRpcModule, Methods>,
}
Expand All @@ -745,7 +745,7 @@ impl<Provider, Pool, Network, Tasks, Events>
eth: None,
executor,
modules: Default::default(),
tracing_call_guard: TracingCallGuard::new(config.eth.max_tracing_requests),
blocking_pool_guard: BlockingTaskGuard::new(config.eth.max_tracing_requests),
config,
events,
}
Expand Down Expand Up @@ -927,7 +927,7 @@ where
filter: eth_filter,
pubsub: eth_pubsub,
cache: _,
tracing_call_pool: _,
blocking_task_pool: _,
} = self.with_eth(|eth| eth.clone());

// Create a copy, so we can list out all the methods for rpc_ api
Expand All @@ -946,7 +946,7 @@ where
self.provider.clone(),
eth_api.clone(),
Box::new(self.executor.clone()),
self.tracing_call_guard.clone(),
self.blocking_pool_guard.clone(),
)
.into_rpc()
.into(),
Expand All @@ -964,7 +964,7 @@ where
RethRpcModule::Trace => TraceApi::new(
self.provider.clone(),
eth_api.clone(),
self.tracing_call_guard.clone(),
self.blocking_pool_guard.clone(),
)
.into_rpc()
.into(),
Expand Down Expand Up @@ -1026,7 +1026,8 @@ where
);

let executor = Box::new(self.executor.clone());
let tracing_call_pool = TracingCallPool::build().expect("failed to build tracing pool");
let blocking_task_pool =
BlockingTaskPool::build().expect("failed to build tracing pool");
let api = EthApi::with_spawner(
self.provider.clone(),
self.pool.clone(),
Expand All @@ -1035,7 +1036,7 @@ where
gas_oracle,
self.config.eth.rpc_gas_cap,
executor.clone(),
tracing_call_pool.clone(),
blocking_task_pool.clone(),
);
let filter = EthFilter::new(
self.provider.clone(),
Expand All @@ -1053,7 +1054,7 @@ where
executor,
);

let eth = EthHandlers { api, cache, filter, pubsub, tracing_call_pool };
let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool };
self.eth = Some(eth);
}
f(self.eth.as_ref().expect("exists; qed"))
Expand All @@ -1071,7 +1072,7 @@ where
/// Instantiates TraceApi
pub fn trace_api(&mut self) -> TraceApi<Provider, EthApi<Provider, Pool, Network>> {
let eth = self.eth_handlers();
TraceApi::new(self.provider.clone(), eth.api, self.tracing_call_guard.clone())
TraceApi::new(self.provider.clone(), eth.api, self.blocking_pool_guard.clone())
}

/// Instantiates OtterscanApi
Expand All @@ -1087,7 +1088,7 @@ where
self.provider.clone(),
eth_api,
Box::new(self.executor.clone()),
self.tracing_call_guard.clone(),
self.blocking_pool_guard.clone(),
)
}

Expand Down
4 changes: 1 addition & 3 deletions crates/rpc/rpc-builder/tests/it/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,9 @@ where
EthApiClient::submit_hashrate(client, U256::default(), B256::default()).await.unwrap();
EthApiClient::gas_price(client).await.unwrap_err();
EthApiClient::max_priority_fee_per_gas(client).await.unwrap_err();
EthApiClient::get_proof(client, address, vec![], None).await.unwrap();

// Unimplemented
assert!(is_unimplemented(
EthApiClient::get_proof(client, address, vec![], None).await.err().unwrap()
));
assert!(is_unimplemented(EthApiClient::author(client).await.err().unwrap()));
assert!(is_unimplemented(EthApiClient::is_mining(client).await.err().unwrap()));
assert!(is_unimplemented(EthApiClient::get_work(client).await.err().unwrap()));
Expand Down
7 changes: 2 additions & 5 deletions crates/rpc/rpc-types-compat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

pub mod block;
pub use block::*;
pub mod transaction;
pub use transaction::*;
pub mod engine;
pub use engine::*;
pub mod log;
pub use log::*;
pub mod proof;
pub mod transaction;
27 changes: 27 additions & 0 deletions crates/rpc/rpc-types-compat/src/proof.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//! Compatibility functions for rpc proof related types.

use reth_primitives::{
serde_helper::JsonStorageKey,
trie::{AccountProof, StorageProof},
U64,
};
use reth_rpc_types::{EIP1186AccountProofResponse, EIP1186StorageProof};

/// Creates a new rpc storage proof from a primitive storage proof type.
pub fn from_primitive_storage_proof(proof: StorageProof) -> EIP1186StorageProof {
EIP1186StorageProof { key: JsonStorageKey(proof.key), value: proof.value, proof: proof.proof }
}

/// Creates a new rpc account proof from a primitive account proof type.
pub fn from_primitive_account_proof(proof: AccountProof) -> EIP1186AccountProofResponse {
let info = proof.info.unwrap_or_default();
EIP1186AccountProofResponse {
address: proof.address,
balance: info.balance,
code_hash: info.get_bytecode_hash(),
nonce: U64::from(info.nonce),
storage_hash: proof.storage_root,
account_proof: proof.proof,
storage_proof: proof.storage_proofs.into_iter().map(from_primitive_storage_proof).collect(),
}
}
4 changes: 2 additions & 2 deletions crates/rpc/rpc-types/src/eth/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct AccountInfo {
/// Data structure with proof for one single storage-entry
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageProof {
pub struct EIP1186StorageProof {
/// Storage key.
pub key: JsonStorageKey,
/// Value that the key holds
Expand All @@ -31,7 +31,7 @@ pub struct EIP1186AccountProofResponse {
pub nonce: U64,
pub storage_hash: B256,
pub account_proof: Vec<Bytes>,
pub storage_proof: Vec<StorageProof>,
pub storage_proof: Vec<EIP1186StorageProof>,
}

/// Extended account information (used by `parity_allAccountInfo`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
/// RPC Tracing call guard semaphore.
///
/// This is used to restrict the number of concurrent RPC requests to tracing methods like
/// `debug_traceTransaction` because they can consume a lot of memory and CPU.
/// `debug_traceTransaction` as well as `eth_getProof` because they can consume a lot of
/// memory and CPU.
///
/// This types serves as an entry guard for the [TracingCallPool] and is used to rate limit parallel
/// tracing calls on the pool.
/// This types serves as an entry guard for the [BlockingTaskPool] and is used to rate limit
/// parallel blocking tasks in the pool.
#[derive(Clone, Debug)]
pub struct TracingCallGuard(Arc<Semaphore>);
pub struct BlockingTaskGuard(Arc<Semaphore>);

impl TracingCallGuard {
/// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel.
pub fn new(max_tracing_requests: u32) -> Self {
Self(Arc::new(Semaphore::new(max_tracing_requests as usize)))
impl BlockingTaskGuard {
/// Create a new `BlockingTaskGuard` with the given maximum number of blocking tasks in
/// parallel.
pub fn new(max_blocking_tasks: u32) -> Self {
Self(Arc::new(Semaphore::new(max_blocking_tasks as usize)))
}

/// See also [Semaphore::acquire_owned]
Expand All @@ -37,24 +39,24 @@ impl TracingCallGuard {
}
}

/// Used to execute tracing calls on a rayon threadpool from within a tokio runtime.
/// Used to execute blocking tasks on a rayon threadpool from within a tokio runtime.
///
/// This is a dedicated threadpool for tracing calls which are CPU bound.
/// This is a dedicated threadpool for blocking tasks which are CPU bound.
/// RPC calls that perform blocking IO (disk lookups) are not executed on this pool but on the tokio
/// runtime's blocking pool, which performs poorly with CPU bound tasks. Once the tokio blocking
/// pool is saturated it is converted into a queue, tracing calls could then interfere with the
/// pool is saturated it is converted into a queue, blocking tasks could then interfere with the
/// queue and block other RPC calls.
///
/// See also [tokio-docs] for more information.
///
/// [tokio-docs]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
#[derive(Clone, Debug)]
pub struct TracingCallPool {
pub struct BlockingTaskPool {
pool: Arc<rayon::ThreadPool>,
}

impl TracingCallPool {
/// Create a new `TracingCallPool` with the given threadpool.
impl BlockingTaskPool {
/// Create a new `BlockingTaskPool` with the given threadpool.
pub fn new(pool: rayon::ThreadPool) -> Self {
Self { pool: Arc::new(pool) }
}
Expand Down Expand Up @@ -83,7 +85,7 @@ impl TracingCallPool {
/// function's return value.
///
/// If the function panics, the future will resolve to an error.
pub fn spawn<F, R>(&self, func: F) -> TracingCallHandle<R>
pub fn spawn<F, R>(&self, func: F) -> BlockingTaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand All @@ -94,7 +96,7 @@ impl TracingCallPool {
let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
});

TracingCallHandle { rx }
BlockingTaskHandle { rx }
}

/// Asynchronous wrapper around Rayon's
Expand All @@ -104,7 +106,7 @@ impl TracingCallPool {
/// function's return value.
///
/// If the function panics, the future will resolve to an error.
pub fn spawn_fifo<F, R>(&self, func: F) -> TracingCallHandle<R>
pub fn spawn_fifo<F, R>(&self, func: F) -> BlockingTaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand All @@ -115,30 +117,30 @@ impl TracingCallPool {
let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
});

TracingCallHandle { rx }
BlockingTaskHandle { rx }
}
}

/// Async handle for a blocking tracing task running in a Rayon thread pool.
/// Async handle for a blocking task running in a Rayon thread pool.
///
/// ## Panics
///
/// If polled from outside a tokio runtime.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
pub struct TracingCallHandle<T> {
pub struct BlockingTaskHandle<T> {
#[pin]
pub(crate) rx: oneshot::Receiver<thread::Result<T>>,
}

impl<T> Future for TracingCallHandle<T> {
impl<T> Future for BlockingTaskHandle<T> {
type Output = thread::Result<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(self.project().rx.poll(cx)) {
Ok(res) => Poll::Ready(res),
Err(_) => Poll::Ready(Err(Box::<TokioTracingCallError>::default())),
Err(_) => Poll::Ready(Err(Box::<TokioBlockingTaskError>::default())),
}
}
}
Expand All @@ -149,23 +151,23 @@ impl<T> Future for TracingCallHandle<T> {
#[derive(Debug, Default, thiserror::Error)]
#[error("Tokio channel dropped while awaiting result")]
#[non_exhaustive]
pub struct TokioTracingCallError;
pub struct TokioBlockingTaskError;

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn tracing_pool() {
let pool = TracingCallPool::build().unwrap();
async fn blocking_pool() {
let pool = BlockingTaskPool::build().unwrap();
let res = pool.spawn(move || 5);
let res = res.await.unwrap();
assert_eq!(res, 5);
}

#[tokio::test]
async fn tracing_pool_panic() {
let pool = TracingCallPool::build().unwrap();
async fn blocking_pool_panic() {
let pool = BlockingTaskPool::build().unwrap();
let res = pool.spawn(move || -> i32 {
panic!();
});
Expand Down
Loading
Loading