Skip to content

Commit

Permalink
Mapping sync worker (paritytech#309)
Browse files Browse the repository at this point in the history
* Simple sync_block function for mapping_sync

* MetaDb operating functions

* Implement sync_one_block

* Implement sync_blocks

* Implement mapping sync worker

* Implement mapping sync worker

* Remove consensus-layer import code

* Fix genesis block syncing logic

* Fix importing logic
  • Loading branch information
sorpaas authored Mar 8, 2021
1 parent 42de772 commit 4cfe019
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 55 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"client/rpc-core",
"client/rpc",
"client/db",
"client/mapping-sync",
"primitives/consensus",
"primitives/evm",
"primitives/rpc",
Expand Down
54 changes: 8 additions & 46 deletions client/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use sc_client_api::{BlockOf, backend::AuxStore};
use sp_blockchain::{HeaderBackend, ProvideCache, well_known_cache_keys::Id as CacheKeyId};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_runtime::generic::OpaqueDigestItemId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, One, Zero};
use sp_api::{ProvideRuntimeApi, BlockId};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_api::ProvideRuntimeApi;
use sp_consensus::{
BlockImportParams, Error as ConsensusError, BlockImport,
BlockCheckParams, ImportResult,
Expand Down Expand Up @@ -120,59 +120,21 @@ impl<B, I, C> BlockImport<B> for FrontierBlockImport<B, I, C> where

fn import_block(
&mut self,
mut block: BlockImportParams<B, Self::Transaction>,
block: BlockImportParams<B, Self::Transaction>,
new_cache: HashMap<CacheKeyId, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
macro_rules! insert_closure {
() => (
|insert| block.auxiliary.extend(
insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
)
)
}

let client = self.client.clone();

if self.enabled {
let log = find_frontier_log::<B>(&block.header)?;
let hash = block.post_hash();
let post_hashes = match log {
ConsensusLog::PostHashes(post_hashes) => post_hashes,
ConsensusLog::PreBlock(block) => fp_consensus::PostHashes::from_block(block),
ConsensusLog::PostBlock(block) => fp_consensus::PostHashes::from_block(block),
};

let mapping_commitment = fc_db::MappingCommitment {
block_hash: hash,
ethereum_block_hash: post_hashes.block_hash,
ethereum_transaction_hashes: post_hashes.transaction_hashes,
};
let res = self.backend.mapping_db().write_hashes(mapping_commitment);
if res.is_err() { trace!(target: "frontier-consensus", "{:?}", res); }

// On importing block 1 we also map the genesis block in the auxiliary.
if block.header.number().clone() == One::one() {
let id = BlockId::Number(Zero::zero());
if let Ok(Some(header)) = client.header(id) {
let block = self.client.runtime_api().current_block(&id)
.map_err(|_| Error::RuntimeApiCallFailed)?;
let block_hash = block.unwrap().header.hash(); // TODO: shouldn't use unwrap
let mapping_commitment = fc_db::MappingCommitment::<B> {
block_hash: header.hash(),
ethereum_block_hash: block_hash,
ethereum_transaction_hashes: Vec::new(),
};
let res = self.backend.mapping_db().write_hashes(mapping_commitment);
if res.is_err() { trace!(target: "frontier-consensus", "{:?}", res); }
}
}
// We validate that there are only one frontier log. No other
// actions are needed and mapping syncing is delegated to a separate
// worker.
let _ = find_frontier_log::<B>(&block.header)?;
}

self.inner.import_block(block, new_cache).map_err(Into::into)
}
}

fn find_frontier_log<B: BlockT>(
pub fn find_frontier_log<B: BlockT>(
header: &B::Header,
) -> Result<ConsensusLog, Error> {
let mut frontier_log: Option<_> = None;
Expand Down
51 changes: 46 additions & 5 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,66 @@ pub(crate) mod columns {
pub const TRANSACTION_MAPPING: u32 = 2;
}

pub(crate) mod static_keys {
pub const CURRENT_SYNCING_TIPS: &[u8] = b"CURRENT_SYNCING_TIPS";
}

pub struct Backend<Block: BlockT> {
mapping_db: Arc<MappingDb<Block>>,
meta: Arc<MetaDb<Block>>,
mapping: Arc<MappingDb<Block>>,
}

impl<Block: BlockT> Backend<Block> {
pub fn new(config: &DatabaseSettings) -> Result<Self, String> {
let db = utils::open_database(config)?;

Ok(Self {
mapping_db: Arc::new(MappingDb {
mapping: Arc::new(MappingDb {
db: db.clone(),
write_lock: Arc::new(Mutex::new(())),
_marker: PhantomData,
})
}),
meta: Arc::new(MetaDb {
db: db.clone(),
_marker: PhantomData,
}),
})
}

pub fn mapping_db(&self) -> &Arc<MappingDb<Block>> {
&self.mapping_db
pub fn mapping(&self) -> &Arc<MappingDb<Block>> {
&self.mapping
}

pub fn meta(&self) -> &Arc<MetaDb<Block>> {
&self.meta
}
}

pub struct MetaDb<Block: BlockT> {
db: Arc<dyn Database<DbHash>>,
_marker: PhantomData<Block>,
}

impl<Block: BlockT> MetaDb<Block> {
pub fn current_syncing_tips(&self) -> Result<Vec<Block::Hash>, String> {
match self.db.get(crate::columns::META, &crate::static_keys::CURRENT_SYNCING_TIPS) {
Some(raw) => Ok(Vec::<Block::Hash>::decode(&mut &raw[..]).map_err(|e| format!("{:?}", e))?),
None => Ok(Vec::new()),
}
}

pub fn write_current_syncing_tips(&self, tips: Vec<Block::Hash>) -> Result<(), String> {
let mut transaction = sp_database::Transaction::new();

transaction.set(
crate::columns::META,
crate::static_keys::CURRENT_SYNCING_TIPS,
&tips.encode(),
);

self.db.commit(transaction).map_err(|e| format!("{:?}", e))?;

Ok(())
}
}

Expand Down
20 changes: 20 additions & 0 deletions client/mapping-sync/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "fc-mapping-sync"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Mapping sync logic for Frontier."
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"

[dependencies]
sp-runtime = { version = "2.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sp-blockchain = { version = "2.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sc-client-api = { version = "2.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sp-api = { version = "2.0.0", git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
fp-consensus = { path = "../../primitives/consensus" }
fc-consensus = { path = "../consensus" }
fc-db = { path = "../db" }
fp-rpc = { path = "../../primitives/rpc" }
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "3.0.1"
log = "0.4.8"
147 changes: 147 additions & 0 deletions client/mapping-sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This file is part of Frontier.
//
// Copyright (c) 2020 Parity Technologies (UK) Ltd.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

mod worker;

pub use worker::MappingSyncWorker;

use sp_runtime::{generic::BlockId, traits::{Block as BlockT, Header as HeaderT, Zero}};
use sp_api::ProvideRuntimeApi;
use sc_client_api::BlockOf;
use sp_blockchain::HeaderBackend;
use fp_consensus::ConsensusLog;
use fp_rpc::EthereumRuntimeRPCApi;

pub fn sync_block<Block: BlockT>(
backend: &fc_db::Backend<Block>,
header: &Block::Header,
) -> Result<(), String> {
let log = fc_consensus::find_frontier_log::<Block>(&header)?;
let post_hashes = match log {
ConsensusLog::PostHashes(post_hashes) => post_hashes,
ConsensusLog::PreBlock(block) => fp_consensus::PostHashes::from_block(block),
ConsensusLog::PostBlock(block) => fp_consensus::PostHashes::from_block(block),
};

let mapping_commitment = fc_db::MappingCommitment {
block_hash: header.hash(),
ethereum_block_hash: post_hashes.block_hash,
ethereum_transaction_hashes: post_hashes.transaction_hashes,
};
backend.mapping().write_hashes(mapping_commitment)?;

Ok(())
}

pub fn sync_genesis_block<Block: BlockT, C>(
client: &C,
backend: &fc_db::Backend<Block>,
header: &Block::Header,
) -> Result<(), String> where
C: ProvideRuntimeApi<Block> + Send + Sync + HeaderBackend<Block> + BlockOf,
C::Api: EthereumRuntimeRPCApi<Block>,
{
let id = BlockId::Hash(header.hash());

let block = client.runtime_api().current_block(&id)
.map_err(|e| format!("{:?}", e))?;
let block_hash = block.ok_or("Ethereum genesis block not found".to_string())?.header.hash();
let mapping_commitment = fc_db::MappingCommitment::<Block> {
block_hash: header.hash(),
ethereum_block_hash: block_hash,
ethereum_transaction_hashes: Vec::new(),
};
backend.mapping().write_hashes(mapping_commitment)?;

Ok(())
}

pub fn sync_one_level<Block: BlockT, C, B>(
client: &C,
substrate_backend: &B,
frontier_backend: &fc_db::Backend<Block>,
) -> Result<bool, String> where
C: ProvideRuntimeApi<Block> + Send + Sync + HeaderBackend<Block> + BlockOf,
C::Api: EthereumRuntimeRPCApi<Block>,
B: sp_blockchain::HeaderBackend<Block> + sp_blockchain::Backend<Block>,
{
let mut current_syncing_tips = frontier_backend.meta().current_syncing_tips()?;

if current_syncing_tips.len() == 0 {
// Sync genesis block.

let header = substrate_backend.header(BlockId::Number(Zero::zero()))
.map_err(|e| format!("{:?}", e))?
.ok_or("Genesis header not found".to_string())?;
sync_genesis_block(client, frontier_backend, &header)?;

current_syncing_tips.push(header.hash());
frontier_backend.meta().write_current_syncing_tips(current_syncing_tips)?;

Ok(true)
} else {
let mut syncing_tip_and_children = None;

for tip in &current_syncing_tips {
let children = substrate_backend.children(*tip)
.map_err(|e| format!("{:?}", e))?;

if children.len() > 0 {
syncing_tip_and_children = Some((*tip, children));
break
}
}

if let Some((syncing_tip, children)) = syncing_tip_and_children {
current_syncing_tips.retain(|tip| tip != &syncing_tip);

for child in children {
let header = substrate_backend.header(BlockId::Hash(child))
.map_err(|e| format!("{:?}", e))?
.ok_or("Header not found".to_string())?;

sync_block(frontier_backend, &header)?;
current_syncing_tips.push(child);
}
frontier_backend.meta().write_current_syncing_tips(current_syncing_tips)?;

Ok(true)
} else {
Ok(false)
}
}
}

pub fn sync_blocks<Block: BlockT, C, B>(
client: &C,
substrate_backend: &B,
frontier_backend: &fc_db::Backend<Block>,
limit: usize,
) -> Result<bool, String> where
C: ProvideRuntimeApi<Block> + Send + Sync + HeaderBackend<Block> + BlockOf,
C::Api: EthereumRuntimeRPCApi<Block>,
B: sp_blockchain::HeaderBackend<Block> + sp_blockchain::Backend<Block>,
{
let mut synced_any = false;

for _ in 0..limit {
synced_any = synced_any || sync_one_level(client, substrate_backend, frontier_backend)?;
}

Ok(synced_any)
}
Loading

0 comments on commit 4cfe019

Please sign in to comment.