Skip to content

Commit

Permalink
Schema override TODOs (paritytech#345)
Browse files Browse the repository at this point in the history
* Use override in eth_getLogs

* Use override in EthFilterApi

* Wrap overrides in an Arc

* Fix checker

* Use override in EthPubSubApi

* Fix checker

* Add OverrideHandle

* Fix checker

* Update test-web3api

* Bump fc-rpc version 2.0.0-dev

* CHANGELOG unreleased notes

* Update Cargo.lock
  • Loading branch information
tgmichel authored Apr 8, 2021
1 parent 6d93a2e commit f00852a
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions client/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## [Unreleased]
### Changed
- `EthPubSubApi::new` takes an additional `overrides` parameter.
2 changes: 1 addition & 1 deletion client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fc-rpc"
version = "1.0.0"
version = "2.0.0-dev"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Ethereum RPC (web3) compatibility layer for Substrate."
Expand Down
113 changes: 64 additions & 49 deletions client/rpc/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ use crate::{frontier_backend_client, internal_err, error_on_execution_failure, E

pub use fc_rpc_core::{EthApiServer, NetApiServer, Web3ApiServer, EthFilterApiServer};
use codec::{self, Encode};
use pallet_ethereum::EthereumStorageSchema;
use crate::overrides::{StorageOverride, RuntimeApiStorageOverride};
use crate::overrides::OverrideHandle;

pub struct EthApi<B: BlockT, C, P, CT, BE, H: ExHashT> {
pool: Arc<P>,
Expand All @@ -56,8 +55,7 @@ pub struct EthApi<B: BlockT, C, P, CT, BE, H: ExHashT> {
network: Arc<NetworkService<B, H>>,
is_authority: bool,
signers: Vec<Box<dyn EthSigner>>,
overrides: BTreeMap<EthereumStorageSchema, Box<dyn StorageOverride<B> + Send + Sync>>,
fallback: Box<dyn StorageOverride<B> + Send + Sync>,
overrides: Arc<OverrideHandle<B>>,
pending_transactions: PendingTransactions,
backend: Arc<fc_db::Backend<B>>,
_marker: PhantomData<(B, BE)>,
Expand All @@ -76,7 +74,7 @@ impl<B: BlockT, C, P, CT, BE, H: ExHashT> EthApi<B, C, P, CT, BE, H> where
network: Arc<NetworkService<B, H>>,
pending_transactions: PendingTransactions,
signers: Vec<Box<dyn EthSigner>>,
overrides: BTreeMap<EthereumStorageSchema, Box<dyn StorageOverride<B> + Send + Sync>>,
overrides: Arc<OverrideHandle<B>>,
backend: Arc<fc_db::Backend<B>>,
is_authority: bool,
) -> Self {
Expand All @@ -88,7 +86,6 @@ impl<B: BlockT, C, P, CT, BE, H: ExHashT> EthApi<B, C, P, CT, BE, H> where
is_authority,
signers,
overrides,
fallback: Box::new(RuntimeApiStorageOverride::new(client)),
pending_transactions,
backend,
_marker: PhantomData,
Expand Down Expand Up @@ -322,9 +319,9 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), block);

Ok(
self.overrides
self.overrides.schemas
.get(&schema)
.unwrap_or(&self.fallback)
.unwrap_or(&self.overrides.fallback)
.current_block(&block)
.ok_or(internal_err("fetching author through override failed"))?
.header.beneficiary
Expand Down Expand Up @@ -382,9 +379,9 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
if let Ok(Some(id)) = frontier_backend_client::native_block_id::<B, C>(self.client.as_ref(), self.backend.as_ref(), number) {
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
return Ok(
self.overrides
self.overrides.schemas
.get(&schema)
.unwrap_or(&self.fallback)
.unwrap_or(&self.overrides.fallback)
.storage_at(&id, address, index)
.unwrap_or_default()
)
Expand All @@ -400,7 +397,7 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
_ => return Ok(None),
};
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.get(&schema).unwrap_or(&self.fallback);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);
Expand All @@ -426,7 +423,7 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
None => return Ok(None),
};
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.get(&schema).unwrap_or(&self.fallback);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);
Expand Down Expand Up @@ -494,7 +491,7 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
_ => return Ok(None),
};
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let block = self.overrides.get(&schema).unwrap_or(&self.fallback).current_block(&id);
let block = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback).current_block(&id);

match block {
Some(block) => Ok(Some(U256::from(block.transactions.len()))),
Expand All @@ -508,7 +505,7 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
None => return Ok(None),
};
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let block = self.overrides.get(&schema).unwrap_or(&self.fallback).current_block(&id);
let block = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback).current_block(&id);

match block {
Some(block) => Ok(Some(U256::from(block.transactions.len()))),
Expand All @@ -529,9 +526,9 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);

return Ok(
self.overrides
self.overrides.schemas
.get(&schema)
.unwrap_or(&self.fallback)
.unwrap_or(&self.overrides.fallback)
.account_code_at(&id, address)
.unwrap_or(vec![])
.into()
Expand Down Expand Up @@ -868,7 +865,7 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
_ => return Ok(None),
};
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.get(&schema).unwrap_or(&self.fallback);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);
Expand Down Expand Up @@ -899,7 +896,7 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
let index = index.value();

let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.get(&schema).unwrap_or(&self.fallback);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);
Expand Down Expand Up @@ -927,7 +924,7 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
};
let index = index.value();
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.get(&schema).unwrap_or(&self.fallback);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);
Expand Down Expand Up @@ -958,7 +955,7 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
_ => return Ok(None),
};
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.get(&schema).unwrap_or(&self.fallback);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);
Expand Down Expand Up @@ -1045,9 +1042,11 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
_ => return Ok(Vec::new()),
};

let (block, _, statuses) = self.client.runtime_api()
.current_all(&id)
.map_err(|err| internal_err(format!("fetch runtime account basic failed: {:?}", err)))?;
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);

if let (Some(block), Some(statuses)) = (block, statuses) {
blocks_and_statuses.push((block, statuses));
Expand All @@ -1073,9 +1072,11 @@ impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H> where
while current_number >= from_number {
let id = BlockId::Number(current_number);

let (block, _, statuses) = self.client.runtime_api()
.current_all(&id)
.map_err(|err| internal_err(format!("fetch runtime account basic failed: {:?}", err)))?;
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);

if let (Some(block), Some(statuses)) = (block, statuses) {
blocks_and_statuses.push((block, statuses));
Expand Down Expand Up @@ -1197,34 +1198,46 @@ impl<B, C> Web3ApiT for Web3Api<B, C> where
}
}

pub struct EthFilterApi<B, C> {
pub struct EthFilterApi<B: BlockT, C, BE> {
client: Arc<C>,
filter_pool: FilterPool,
max_stored_filters: usize,
_marker: PhantomData<B>,
overrides: Arc<OverrideHandle<B>>,
_marker: PhantomData<(B, BE)>,
}

impl<B, C> EthFilterApi<B, C> {
impl<B: BlockT, C, BE> EthFilterApi<B, C, BE> where
C: ProvideRuntimeApi<B> + StorageProvider<B, BE>,
C::Api: EthereumRuntimeRPCApi<B>,
B: BlockT<Hash=H256> + Send + Sync + 'static,
BE: Backend<B> + 'static,
BE::State: StateBackend<BlakeTwo256>,
C: Send + Sync + 'static,
{
pub fn new(
client: Arc<C>,
filter_pool: FilterPool,
max_stored_filters: usize,
overrides: Arc<OverrideHandle<B>>,
) -> Self {
Self {
client,
client: client.clone(),
filter_pool,
max_stored_filters,
overrides,
_marker: PhantomData,
}
}
}

impl<B, C> EthFilterApi<B, C> where
C: ProvideRuntimeApi<B> + AuxStore,
impl<B, C, BE> EthFilterApi<B, C, BE> where
C: ProvideRuntimeApi<B> + StorageProvider<B, BE>,
C::Api: EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B> + HeaderMetadata<B, Error=BlockChainError> + 'static,
C: Send + Sync + 'static,
B: BlockT<Hash=H256> + Send + Sync + 'static,
BE: Backend<B> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
fn create_filter(&self, filter_type: FilterType) -> Result<U256> {
let block_number = UniqueSaturatedInto::<u64>::unique_saturated_into(
Expand Down Expand Up @@ -1259,12 +1272,14 @@ impl<B, C> EthFilterApi<B, C> where
}
}

impl<B, C> EthFilterApiT for EthFilterApi<B, C> where
C: ProvideRuntimeApi<B> + AuxStore,
impl<B, C, BE> EthFilterApiT for EthFilterApi<B, C, BE> where
C: ProvideRuntimeApi<B> + StorageProvider<B, BE>,
C::Api: EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B> + HeaderMetadata<B, Error=BlockChainError> + 'static,
C: Send + Sync + 'static,
B: BlockT<Hash=H256> + Send + Sync + 'static,
BE: Backend<B> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
fn new_filter(&self, filter: Filter) -> Result<U256> {
self.create_filter(FilterType::Log(filter))
Expand Down Expand Up @@ -1296,11 +1311,11 @@ impl<B, C> EthFilterApiT for EthFilterApi<B, C> where
let mut ethereum_hashes: Vec<H256> = Vec::new();
for n in last..next {
let id = BlockId::Number(n.unique_saturated_into());
let block = self.client.runtime_api()
.current_block(&id)
.map_err(|err| internal_err(
format!("fetch runtime block failed: {:?}", err)
))?;

let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
if let Some(block) = block {
ethereum_hashes.push(block.header.hash())
}
Expand Down Expand Up @@ -1349,11 +1364,11 @@ impl<B, C> EthFilterApiT for EthFilterApi<B, C> where
while current_number >= from_number {
let id = BlockId::Number(current_number);

let (block, _, statuses) = self.client.runtime_api()
.current_all(&id)
.map_err(|err| internal_err(
format!("fetch runtime account basic failed: {:?}", err)
))?;
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);

if let (Some(block), Some(statuses)) = (block, statuses) {
blocks_and_statuses.push((block, statuses));
Expand Down Expand Up @@ -1429,11 +1444,11 @@ impl<B, C> EthFilterApiT for EthFilterApi<B, C> where
while current_number >= from_number {
let id = BlockId::Number(current_number);

let (block, _, statuses) = self.client.runtime_api()
.current_all(&id)
.map_err(|err| internal_err(
format!("fetch runtime account basic failed: {:?}", err)
))?;
let schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(self.client.as_ref(), id);
let handler = self.overrides.schemas.get(&schema).unwrap_or(&self.overrides.fallback);

let block = handler.current_block(&id);
let statuses = handler.current_transaction_statuses(&id);

if let (Some(block), Some(statuses)) = (block, statuses) {
blocks_and_statuses.push((block, statuses));
Expand Down
Loading

0 comments on commit f00852a

Please sign in to comment.