Skip to content

Commit

Permalink
refactor: storage override (#1425)
Browse files Browse the repository at this point in the history
# Description

This PR provides a new `StorageOverrideHandler` type like old `OverrideHandle` to support different ethereum schema and a fallback (runtime API) implementation. 
But it will automatically determine the ethereum schema version of the runtime based on `block_hash`, and then call the methods of `StorageOverride` implementation corresponding to the schema version.

Before :

```rust
let schema = fc_storage::onchain_storage_schema(client.as_ref(), block_hash);
let handler = overrides
	.schemas
	.get(&schema)
	.unwrap_or(&overrides.fallback);
let block = handler.current_block(block_hash);
let receipts = handler.current_receipts(block_hash);
```

After:

```rust
let block = storage_override.current_block(hash);
let receipts = storage_override.current_receipts(hash);
```

## Addition

This PR uses `Option::<EthereumStorageSchema>::None` instead of `EthereumStorageSchema::Undefined`.

```diff
/// The schema version for Pallet Ethereum's storage
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Encode, Decode)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EthereumStorageSchema {
-	// Undefined,
+	// deprecated
+	// #[codec(index = 0)]
+	// Undefined,
+	#[codec(index = 1)]
	V1,
+	#[codec(index = 2)]
	V2,
+	#[codec(index = 3)]
	V3,
}
```

IMO, this is more consistent with the current semantics: use the storage override corresponding to the schema version as the preference. If there is no ethereum schema in the state, use the runtime api as the fallback implementation, like:

```rust
fn current_block(&self, at: B::Hash) -> Option<BlockV2> {
	match self.querier.storage_schema(at) {
		Some(EthereumStorageSchema::V1) => {
			SchemaV1StorageOverrideRef::new(&self.querier).current_block(at)
		}
		Some(EthereumStorageSchema::V2) => {
			SchemaV2StorageOverrideRef::new(&self.querier).current_block(at)
		}
		Some(EthereumStorageSchema::V3) => {
			SchemaV3StorageOverrideRef::new(&self.querier).current_block(at)
		}
		None => self.fallback.current_block(at),
	}
}
```
  • Loading branch information
koushiro authored May 15, 2024
1 parent f060da6 commit 77d7157
Show file tree
Hide file tree
Showing 32 changed files with 978 additions and 1,105 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions client/cli/src/frontier_db_cmd/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use substrate_test_runtime_client::{
TestClientBuilder,
};
// Frontier
use fp_storage::{EthereumStorageSchema, ETHEREUM_CURRENT_TRANSACTION_STATUS, PALLET_ETHEREUM};
use fp_storage::{constants::*, EthereumStorageSchema};
use frontier_template_runtime::RuntimeApi;

use crate::frontier_db_cmd::{Column, FrontierDbCmd, Operation};
Expand Down Expand Up @@ -545,7 +545,7 @@ fn commitment_create() {
let statuses = vec![t1];

// Build a block and fill the pallet-ethereum status.
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUS);
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUSES);
let chain = client.chain_info();
let mut builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(chain.best_hash)
Expand Down Expand Up @@ -628,7 +628,7 @@ fn commitment_update() {
let statuses_a1 = vec![t1.clone()];
let statuses_a2 = vec![t1, t2];

let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUS);
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUSES);

// First we create block and insert data in the offchain db.

Expand Down Expand Up @@ -756,7 +756,7 @@ fn mapping_read_works() {
let statuses = vec![t1];

// Build a block and fill the pallet-ethereum status.
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUS);
let key = storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_TRANSACTION_STATUSES);
let chain = client.chain_info();
let mut builder = BlockBuilderBuilder::new(&*client)
.on_parent_block(chain.best_hash)
Expand Down
2 changes: 0 additions & 2 deletions client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ sp-blockchain = { workspace = true }
sp-core = { workspace = true }
sp-database = { workspace = true }
sp-runtime = { workspace = true }
sp-storage = { workspace = true, optional = true }
# Frontier
fc-api = { workspace = true }
fc-storage = { workspace = true, optional = true }
Expand Down Expand Up @@ -61,7 +60,6 @@ sql = [
"tokio",
"sc-client-api",
"sp-api",
"sp-storage",
"fc-storage",
"fp-consensus",
"fp-rpc",
Expand Down
94 changes: 27 additions & 67 deletions client/db/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ use sp_runtime::{
};
// Frontier
use fc_api::{FilteredLog, TransactionMetadata};
use fc_storage::OverrideHandle;
use fc_storage::{StorageOverride, StorageQuerier};
use fp_consensus::{FindLogError, Hashes, Log as ConsensusLog, PostLog, PreLog};
use fp_rpc::EthereumRuntimeRPCApi;
use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
use fp_storage::EthereumStorageSchema;

/// Maximum number to topics allowed to be filtered upon
const MAX_TOPIC_COUNT: u16 = 4;
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct Backend<Block: BlockT> {
pool: SqlitePool,

/// The additional overrides for the logs handler.
overrides: Arc<OverrideHandle<Block>>,
storage_override: Arc<dyn StorageOverride<Block>>,

/// The number of allowed operations for the Sqlite filter call.
/// A value of `0` disables the timeout.
Expand All @@ -114,7 +114,7 @@ where
config: BackendConfig<'_>,
pool_size: u32,
num_ops_timeout: Option<NonZeroU32>,
overrides: Arc<OverrideHandle<Block>>,
storage_override: Arc<dyn StorageOverride<Block>>,
) -> Result<Self, Error> {
let any_pool = SqlitePoolOptions::new()
.max_connections(pool_size)
Expand All @@ -123,7 +123,7 @@ where
let _ = Self::create_indexes_if_not_exist(&any_pool).await?;
Ok(Self {
pool: any_pool,
overrides,
storage_override,
num_ops_timeout: num_ops_timeout
.map(|n| n.get())
.unwrap_or(0)
Expand Down Expand Up @@ -232,8 +232,10 @@ where
.expect("runtime api reachable")
.expect("ethereum genesis block");

let schema =
Self::onchain_storage_schema(client.as_ref(), substrate_genesis_hash).encode();
let schema = StorageQuerier::new(client)
.storage_schema(substrate_genesis_hash)
.unwrap_or(EthereumStorageSchema::V3)
.encode();
let ethereum_block_hash = ethereum_block.header.hash().as_bytes().to_owned();
let substrate_block_hash = substrate_genesis_hash.as_bytes();
let block_number = 0i32;
Expand Down Expand Up @@ -266,7 +268,7 @@ where
fn insert_block_metadata_inner<Client, BE>(
client: Arc<Client>,
hash: H256,
overrides: Arc<OverrideHandle<Block>>,
storage_override: &dyn StorageOverride<Block>,
) -> Result<BlockMetadata, Error>
where
Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
Expand All @@ -277,16 +279,14 @@ where
if let Ok(Some(header)) = client.header(hash) {
match fp_consensus::find_log(header.digest()) {
Ok(log) => {
let schema = Self::onchain_storage_schema(client.as_ref(), hash);
let schema = StorageQuerier::new(client.clone())
.storage_schema(hash)
.unwrap_or(EthereumStorageSchema::V3);
let log_hashes = match log {
ConsensusLog::Post(PostLog::Hashes(post_hashes)) => post_hashes,
ConsensusLog::Post(PostLog::Block(block)) => Hashes::from_block(block),
ConsensusLog::Post(PostLog::BlockHash(expect_eth_block_hash)) => {
let ethereum_block = overrides
.schemas
.get(&schema)
.unwrap_or(&overrides.fallback)
.current_block(hash);
let ethereum_block = storage_override.current_block(hash);
match ethereum_block {
Some(block) => {
let got_eth_block_hash = block.header.hash();
Expand Down Expand Up @@ -366,9 +366,9 @@ where
BE::State: StateBackend<BlakeTwo256>,
{
// Spawn a blocking task to get block metadata from substrate backend.
let overrides = self.overrides.clone();
let storage_override = self.storage_override.clone();
let metadata = tokio::task::spawn_blocking(move || {
Self::insert_block_metadata_inner(client.clone(), hash, overrides)
Self::insert_block_metadata_inner(client.clone(), hash, &*storage_override)
})
.await
.map_err(|_| Error::Protocol("tokio blocking metadata task failed".to_string()))??;
Expand Down Expand Up @@ -435,14 +435,9 @@ where
}

/// Index the logs for the newly indexed blocks upto a `max_pending_blocks` value.
pub async fn index_block_logs<Client, BE>(&self, client: Arc<Client>, block_hash: Block::Hash)
where
Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
BE: BackendT<Block> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
pub async fn index_block_logs(&self, block_hash: Block::Hash) {
let pool = self.pool().clone();
let overrides = self.overrides.clone();
let storage_override = self.storage_override.clone();
let _ = async {
// The overarching db transaction for the task.
// Due to the async nature of this task, the same work is likely to happen
Expand All @@ -469,7 +464,7 @@ where
Ok(_) => {
// Spawn a blocking task to get log data from substrate backend.
let logs = tokio::task::spawn_blocking(move || {
Self::get_logs(client.clone(), overrides, block_hash)
Self::get_logs(storage_override, block_hash)
})
.await
.map_err(|_| Error::Protocol("tokio blocking task failed".to_string()))?;
Expand Down Expand Up @@ -512,26 +507,14 @@ where
log::debug!(target: "frontier-sql", "Batch committed");
}

fn get_logs<Client, BE>(
client: Arc<Client>,
overrides: Arc<OverrideHandle<Block>>,
fn get_logs(
storage_override: Arc<dyn StorageOverride<Block>>,
substrate_block_hash: H256,
) -> Vec<Log>
where
Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
BE: BackendT<Block> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
) -> Vec<Log> {
let mut logs: Vec<Log> = vec![];
let mut transaction_count: usize = 0;
let mut log_count: usize = 0;
let schema = Self::onchain_storage_schema(client.as_ref(), substrate_block_hash);
let handler = overrides
.schemas
.get(&schema)
.unwrap_or(&overrides.fallback);

let receipts = handler
let receipts = storage_override
.current_receipts(substrate_block_hash)
.unwrap_or_default();

Expand Down Expand Up @@ -565,20 +548,6 @@ where
logs
}

fn onchain_storage_schema<Client, BE>(client: &Client, at: Block::Hash) -> EthereumStorageSchema
where
Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
BE: BackendT<Block> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
match client.storage(at, &sp_storage::StorageKey(PALLET_ETHEREUM_SCHEMA.to_vec())) {
Ok(Some(bytes)) => Decode::decode(&mut &bytes.0[..])
.ok()
.unwrap_or(EthereumStorageSchema::Undefined),
_ => EthereumStorageSchema::Undefined,
}
}

/// Retrieves the status if a block has been already indexed.
pub async fn is_block_indexed(&self, block_hash: Block::Hash) -> bool {
sqlx::query("SELECT substrate_block_hash FROM sync_status WHERE substrate_block_hash = ?")
Expand Down Expand Up @@ -1034,7 +1003,7 @@ LIMIT 10001",
mod test {
use super::*;

use std::{collections::BTreeMap, path::Path};
use std::path::Path;

use maplit::hashset;
use scale_codec::Encode;
Expand All @@ -1051,7 +1020,7 @@ mod test {
};
// Frontier
use fc_api::Backend as BackendT;
use fc_storage::{OverrideHandle, SchemaV3Override, StorageOverride};
use fc_storage::SchemaV3StorageOverride;
use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};

type OpaqueBlock =
Expand Down Expand Up @@ -1129,16 +1098,7 @@ mod test {
);
let client = Arc::new(client);
// Overrides
let mut overrides_map = BTreeMap::new();
overrides_map.insert(
EthereumStorageSchema::V3,
Box::new(SchemaV3Override::new(client.clone())) as Box<dyn StorageOverride<_>>,
);
let overrides = Arc::new(OverrideHandle {
schemas: overrides_map,
fallback: Box::new(SchemaV3Override::new(client.clone())),
});

let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
// Indexer backend
let indexer_backend = Backend::new(
BackendConfig::Sqlite(SqliteBackendConfig {
Expand All @@ -1153,7 +1113,7 @@ mod test {
}),
1,
None,
overrides.clone(),
storage_override.clone(),
)
.await
.expect("indexer pool to be created");
Expand Down
29 changes: 9 additions & 20 deletions client/mapping-sync/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,17 @@ use sp_blockchain::{Backend as _, HeaderBackend};
use sp_consensus::SyncOracle;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero};
// Frontier
use fc_storage::OverrideHandle;
use fc_storage::StorageOverride;
use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
use fp_rpc::EthereumRuntimeRPCApi;

use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};

pub fn sync_block<Block: BlockT, C, BE>(
client: &C,
overrides: Arc<OverrideHandle<Block>>,
pub fn sync_block<Block: BlockT>(
storage_override: Arc<dyn StorageOverride<Block>>,
backend: &fc_db::kv::Backend<Block>,
header: &Block::Header,
) -> Result<(), String>
where
C: HeaderBackend<Block> + StorageProvider<Block, BE>,
BE: Backend<Block>,
{
) -> Result<(), String> {
let substrate_block_hash = header.hash();
match fp_consensus::find_log(header.digest()) {
Ok(log) => {
Expand Down Expand Up @@ -77,13 +72,7 @@ where
backend.mapping().write_hashes(mapping_commitment)
}
PostLog::BlockHash(expect_eth_block_hash) => {
let schema =
fc_storage::onchain_storage_schema(client, substrate_block_hash);
let ethereum_block = overrides
.schemas
.get(&schema)
.unwrap_or(&overrides.fallback)
.current_block(substrate_block_hash);
let ethereum_block = storage_override.current_block(substrate_block_hash);
match ethereum_block {
Some(block) => {
let got_eth_block_hash = block.header.hash();
Expand Down Expand Up @@ -158,7 +147,7 @@ where
pub fn sync_one_block<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
storage_override: Arc<dyn StorageOverride<Block>>,
frontier_backend: &fc_db::kv::Backend<Block>,
sync_from: <Block::Header as HeaderT>::Number,
strategy: SyncStrategy,
Expand Down Expand Up @@ -220,7 +209,7 @@ where
{
return Ok(false);
}
sync_block(client, overrides, frontier_backend, &operating_header)?;
sync_block(storage_override, frontier_backend, &operating_header)?;

current_syncing_tips.push(*operating_header.parent_hash());
frontier_backend
Expand All @@ -247,7 +236,7 @@ where
pub fn sync_blocks<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
storage_override: Arc<dyn StorageOverride<Block>>,
frontier_backend: &fc_db::kv::Backend<Block>,
limit: usize,
sync_from: <Block::Header as HeaderT>::Number,
Expand All @@ -270,7 +259,7 @@ where
|| sync_one_block(
client,
substrate_backend,
overrides.clone(),
storage_override.clone(),
frontier_backend,
sync_from,
strategy,
Expand Down
Loading

0 comments on commit 77d7157

Please sign in to comment.