Skip to content

Commit

Permalink
feat: add a new post log type into header digest and provide a compat…
Browse files Browse the repository at this point in the history
…ible mapping-sync worker (#1011)
  • Loading branch information
koushiro authored Mar 3, 2023
1 parent aa4db07 commit 0535842
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/mapping-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ sp-blockchain = { workspace = true }
sp-runtime = { workspace = true }
# Frontier
fc-db = { workspace = true }
fc-storage = { workspace = true }
fp-consensus = { workspace = true, features = ["default"] }
fp-rpc = { workspace = true, features = ["default"] }
92 changes: 72 additions & 20 deletions client/mapping-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,89 @@ mod worker;

pub use worker::{MappingSyncWorker, SyncStrategy};

use std::sync::Arc;

// Substrate
use sc_client_api::backend::Backend;
use sc_client_api::backend::{Backend, StorageProvider};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_blockchain::{Backend as _, HeaderBackend};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT, Zero},
};
// Frontier
use fp_consensus::FindLogError;
use fc_storage::OverrideHandle;
use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
use fp_rpc::EthereumRuntimeRPCApi;

pub fn sync_block<Block: BlockT>(
pub fn sync_block<Block: BlockT, C, BE>(
client: &C,
overrides: Arc<OverrideHandle<Block>>,
backend: &fc_db::Backend<Block>,
header: &Block::Header,
) -> Result<(), String> {
) -> Result<(), String>
where
C: StorageProvider<Block, BE> + HeaderBackend<Block>,
BE: Backend<Block>,
{
let substrate_block_hash = header.hash();
match fp_consensus::find_log(header.digest()) {
Ok(log) => {
let post_hashes = log.into_hashes();

let mapping_commitment = fc_db::MappingCommitment {
block_hash: header.hash(),
ethereum_block_hash: post_hashes.block_hash,
ethereum_transaction_hashes: post_hashes.transaction_hashes,
let gen_from_hashes = |hashes: Hashes| -> fc_db::MappingCommitment<Block> {
fc_db::MappingCommitment {
block_hash: substrate_block_hash,
ethereum_block_hash: hashes.block_hash,
ethereum_transaction_hashes: hashes.transaction_hashes,
}
};
let gen_from_block = |block| -> fc_db::MappingCommitment<Block> {
let hashes = Hashes::from_block(block);
gen_from_hashes(hashes)
};
backend.mapping().write_hashes(mapping_commitment)?;

Ok(())
}
Err(FindLogError::NotFound) => {
backend.mapping().write_none(header.hash())?;

Ok(())
match log {
Log::Pre(PreLog::Block(block)) => {
let mapping_commitment = gen_from_block(block);
backend.mapping().write_hashes(mapping_commitment)
}
Log::Post(post_log) => match post_log {
PostLog::Hashes(hashes) => {
let mapping_commitment = gen_from_hashes(hashes);
backend.mapping().write_hashes(mapping_commitment)
}
PostLog::Block(block) => {
let mapping_commitment = gen_from_block(block);
backend.mapping().write_hashes(mapping_commitment)
}
PostLog::BlockHash(expect_eth_block_hash) => {
let schema =
fc_storage::onchain_storage_schema(client, substrate_block_hash);
let ethereum_block = overrides
.schemas
.get(&schema)
.unwrap_or(&overrides.fallback)
.current_block(substrate_block_hash);
match ethereum_block {
Some(block) => {
let got_eth_block_hash = block.header.hash();
if got_eth_block_hash != expect_eth_block_hash {
Err(format!(
"Ethereum block hash mismatch: \
frontier consensus digest ({expect_eth_block_hash:?}), \
db state ({got_eth_block_hash:?})"
))
} else {
let mapping_commitment = gen_from_block(block);
backend.mapping().write_hashes(mapping_commitment)
}
}
None => backend.mapping().write_none(substrate_block_hash),
}
}
},
}
}
Err(FindLogError::NotFound) => backend.mapping().write_none(substrate_block_hash),
Err(FindLogError::MultipleLogs) => Err("Multiple logs found".to_string()),
}
}
Expand Down Expand Up @@ -109,14 +158,15 @@ where
pub fn sync_one_block<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: &fc_db::Backend<Block>,
sync_from: <Block::Header as HeaderT>::Number,
strategy: SyncStrategy,
) -> Result<bool, String>
where
C: ProvideRuntimeApi<Block>,
C::Api: EthereumRuntimeRPCApi<Block>,
C: HeaderBackend<Block>,
C: HeaderBackend<Block> + StorageProvider<Block, BE>,
BE: Backend<Block>,
{
let mut current_syncing_tips = frontier_backend.meta().current_syncing_tips()?;
Expand Down Expand Up @@ -167,7 +217,7 @@ where
{
return Ok(false);
}
sync_block(frontier_backend, &operating_header)?;
sync_block(client, overrides, frontier_backend, &operating_header)?;

current_syncing_tips.push(*operating_header.parent_hash());
frontier_backend
Expand All @@ -180,6 +230,7 @@ where
pub fn sync_blocks<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: &fc_db::Backend<Block>,
limit: usize,
sync_from: <Block::Header as HeaderT>::Number,
Expand All @@ -188,7 +239,7 @@ pub fn sync_blocks<Block: BlockT, C, BE>(
where
C: ProvideRuntimeApi<Block>,
C::Api: EthereumRuntimeRPCApi<Block>,
C: HeaderBackend<Block>,
C: HeaderBackend<Block> + StorageProvider<Block, BE>,
BE: Backend<Block>,
{
let mut synced_any = false;
Expand All @@ -198,6 +249,7 @@ where
|| sync_one_block(
client,
substrate_backend,
overrides.clone(),
frontier_backend,
sync_from,
strategy,
Expand Down
12 changes: 10 additions & 2 deletions client/mapping-sync/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ use futures::{
use futures_timer::Delay;
use log::debug;
// Substrate
use sc_client_api::{backend::Backend, client::ImportNotifications};
use sc_client_api::{
backend::{Backend, StorageProvider},
client::ImportNotifications,
};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
// Frontier
use fc_storage::OverrideHandle;
use fp_rpc::EthereumRuntimeRPCApi;

#[derive(Copy, Clone, Eq, PartialEq)]
Expand All @@ -45,6 +49,7 @@ pub struct MappingSyncWorker<Block: BlockT, C, BE> {

client: Arc<C>,
substrate_backend: Arc<BE>,
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: Arc<fc_db::Backend<Block>>,

have_next: bool,
Expand All @@ -61,6 +66,7 @@ impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {
timeout: Duration,
client: Arc<C>,
substrate_backend: Arc<BE>,
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: Arc<fc_db::Backend<Block>>,
retry_times: usize,
sync_from: <Block::Header as HeaderT>::Number,
Expand All @@ -73,6 +79,7 @@ impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {

client,
substrate_backend,
overrides,
frontier_backend,

have_next: true,
Expand All @@ -87,7 +94,7 @@ impl<Block: BlockT, C, BE> Stream for MappingSyncWorker<Block, C, BE>
where
C: ProvideRuntimeApi<Block>,
C::Api: EthereumRuntimeRPCApi<Block>,
C: HeaderBackend<Block>,
C: HeaderBackend<Block> + StorageProvider<Block, BE>,
BE: Backend<Block>,
{
type Item = ();
Expand Down Expand Up @@ -125,6 +132,7 @@ where
match crate::sync_blocks(
self.client.as_ref(),
self.substrate_backend.as_ref(),
self.overrides.clone(),
self.frontier_backend.as_ref(),
self.retry_times,
self.sync_from,
Expand Down
31 changes: 11 additions & 20 deletions primitives/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ pub enum Log {
Post(PostLog),
}

impl Log {
pub fn into_hashes(self) -> Hashes {
match self {
Log::Post(PostLog::Hashes(post_hashes)) => post_hashes,
Log::Post(PostLog::Block(block)) => Hashes::from_block(block),
Log::Pre(PreLog::Block(block)) => Hashes::from_block(block),
}
}
}

#[derive(Decode, Encode, Clone, PartialEq, Eq)]
pub enum PreLog {
#[codec(index = 3)]
Expand All @@ -52,10 +42,15 @@ pub enum PreLog {

#[derive(Decode, Encode, Clone, PartialEq, Eq)]
pub enum PostLog {
/// Ethereum block hash and txn hashes.
#[codec(index = 1)]
Hashes(Hashes),
/// Ethereum block.
#[codec(index = 2)]
Block(ethereum::BlockV2),
/// Ethereum block hash.
#[codec(index = 3)]
BlockHash(H256),
}

#[derive(Decode, Encode, Clone, PartialEq, Eq)]
Expand All @@ -68,17 +63,13 @@ pub struct Hashes {

impl Hashes {
pub fn from_block(block: ethereum::BlockV2) -> Self {
let mut transaction_hashes = Vec::new();

for t in &block.transactions {
transaction_hashes.push(t.hash());
}

let block_hash = block.header.hash();

Hashes {
transaction_hashes,
block_hash,
block_hash: block.header.hash(),
transaction_hashes: block
.transactions
.into_iter()
.map(|txn| txn.hash())
.collect(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions template/node/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub fn spawn_frontier_tasks<RuntimeApi, Executor>(
Duration::new(6, 0),
client.clone(),
backend,
overrides.clone(),
frontier_backend,
3,
0,
Expand Down

0 comments on commit 0535842

Please sign in to comment.