Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot, prune): headers #6230

Merged
merged 20 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ impl Command {
fs::create_dir_all(&db_path)?;
let db =
Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?);
let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone());
let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;

debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis(db.clone(), self.chain.clone())?;
init_genesis(provider_factory.clone(), self.chain.clone())?;

let consensus: Arc<dyn Consensus> = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain)));

Expand Down
5 changes: 3 additions & 2 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ impl ImportCommand {
let db =
Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?);
info!(target: "reth::cli", "Database opened");
let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone());
let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;

debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");

init_genesis(db.clone(), self.chain.clone())?;
init_genesis(provider_factory.clone(), self.chain.clone())?;

let consensus = Arc::new(BeaconConsensus::new(self.chain.clone()));
info!(target: "reth::cli", "Consensus engine initialized");
Expand Down
6 changes: 5 additions & 1 deletion bin/reth/src/commands/init_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
use clap::Parser;
use reth_db::{init_db, mdbx::DatabaseArguments};
use reth_primitives::ChainSpec;
use reth_provider::ProviderFactory;
use std::sync::Arc;
use tracing::info;

Expand Down Expand Up @@ -56,8 +57,11 @@ impl InitCommand {
Arc::new(init_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?);
info!(target: "reth::cli", "Database opened");

let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;

info!(target: "reth::cli", "Writing genesis block");
let hash = init_genesis(db, self.chain)?;
let hash = init_genesis(provider_factory, self.chain)?;

info!(target: "reth::cli", hash = ?hash, "Genesis block written");
Ok(())
Expand Down
6 changes: 4 additions & 2 deletions bin/reth/src/commands/recover/storage_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ impl Command {
fs::create_dir_all(&db_path)?;
let db = Arc::new(init_db(db_path, Default::default())?);

let factory = ProviderFactory::new(&db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;

debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis(db.clone(), self.chain.clone())?;
init_genesis(factory.clone(), self.chain.clone())?;

let factory = ProviderFactory::new(&db, self.chain);
let mut provider = factory.provider_rw()?;
let best_block = provider.best_block_number()?;
let best_header = provider
Expand Down
77 changes: 38 additions & 39 deletions crates/node-core/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
//! Reth genesis initialization utility functions.

use reth_db::{
cursor::DbCursorRO,
database::Database,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_db::{database::Database, tables, transaction::DbTxMut};
use reth_interfaces::{db::DatabaseError, provider::ProviderResult};
use reth_primitives::{
stage::StageId, Account, Bytecode, ChainSpec, Receipts, StorageEntry, B256, U256,
};
use reth_provider::{
bundle_state::{BundleStateInit, RevertsInit},
BundleStateWithReceipts, DatabaseProviderRW, HashingWriter, HistoryWriter, OriginalValuesKnown,
ProviderError, ProviderFactory,
BlockHashReader, BundleStateWithReceipts, DatabaseProviderRW, HashingWriter, HistoryWriter,
OriginalValuesKnown, ProviderError, ProviderFactory,
};
use std::{
collections::{BTreeMap, HashMap},
Expand Down Expand Up @@ -47,15 +42,15 @@ impl From<DatabaseError> for InitDatabaseError {

/// Write the genesis block if it has not already been written
pub fn init_genesis<DB: Database>(
db: Arc<DB>,
provider_factory: ProviderFactory<DB>,
chain: Arc<ChainSpec>,
) -> Result<B256, InitDatabaseError> {
let genesis = chain.genesis();

let hash = chain.genesis_hash();

let tx = db.tx()?;
if let Some((_, db_hash)) = tx.cursor_read::<tables::CanonicalHeaders>()?.first()? {
let provider = provider_factory.provider()?;
if let Some(db_hash) = provider.block_hash(0)? {
if db_hash == hash {
debug!("Genesis already written, skipping.");
return Ok(hash)
Expand All @@ -67,28 +62,29 @@ pub fn init_genesis<DB: Database>(
})
}

drop(tx);
drop(provider);
debug!("Writing genesis block.");

// use transaction to insert genesis header
let factory = ProviderFactory::new(&db, chain.clone());
let provider_rw = factory.provider_rw()?;
let mut provider_rw = provider_factory.provider_rw()?;
insert_genesis_hashes(&provider_rw, genesis)?;
insert_genesis_history(&provider_rw, genesis)?;
provider_rw.commit()?;

// Insert header
let tx = db.tx_mut()?;
insert_genesis_header::<DB>(&tx, chain.clone())?;
{
let tx = provider_rw.tx_mut();
insert_genesis_header::<DB>(tx, chain.clone())?;

insert_genesis_state::<DB>(&tx, genesis)?;
insert_genesis_state::<DB>(tx, genesis)?;

// insert sync stage
for stage in StageId::ALL.iter() {
tx.put::<tables::SyncStage>(stage.to_string(), Default::default())?;
// insert sync stage
for stage in StageId::ALL.iter() {
tx.put::<tables::SyncStage>(stage.to_string(), Default::default())?;
}
}

tx.commit()?;
provider_rw.commit()?;

Ok(hash)
}

Expand Down Expand Up @@ -160,7 +156,7 @@ pub fn insert_genesis_state<DB: Database>(

/// Inserts hashes for the genesis state.
pub fn insert_genesis_hashes<DB: Database>(
provider: &DatabaseProviderRW<&DB>,
provider: &DatabaseProviderRW<DB>,
genesis: &reth_primitives::Genesis,
) -> ProviderResult<()> {
// insert and hash accounts to hashing table
Expand All @@ -187,7 +183,7 @@ pub fn insert_genesis_hashes<DB: Database>(

/// Inserts history indices for genesis accounts and storage.
pub fn insert_genesis_history<DB: Database>(
provider: &DatabaseProviderRW<&DB>,
provider: &DatabaseProviderRW<DB>,
genesis: &reth_primitives::Genesis,
) -> ProviderResult<()> {
let account_transitions =
Expand Down Expand Up @@ -226,15 +222,17 @@ mod tests {
use super::*;

use reth_db::{
cursor::DbCursorRO,
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
table::{Table, TableRow},
test_utils::create_test_rw_db,
transaction::DbTx,
DatabaseEnv,
};
use reth_primitives::{
Address, Chain, ForkTimestamps, Genesis, GenesisAccount, IntegerList, GOERLI,
GOERLI_GENESIS_HASH, MAINNET, MAINNET_GENESIS_HASH, SEPOLIA, SEPOLIA_GENESIS_HASH,
};
use reth_provider::test_utils::create_test_provider_factory_with_chain_spec;
use std::collections::HashMap;

fn collect_table_entries<DB, T>(
Expand All @@ -249,38 +247,38 @@ mod tests {

#[test]
fn success_init_genesis_mainnet() {
let db = create_test_rw_db();
let genesis_hash = init_genesis(db, MAINNET.clone()).unwrap();
let factory = create_test_provider_factory_with_chain_spec(MAINNET.clone());
let genesis_hash = init_genesis(factory, MAINNET.clone()).unwrap();

// actual, expected
assert_eq!(genesis_hash, MAINNET_GENESIS_HASH);
}

#[test]
fn success_init_genesis_goerli() {
let db = create_test_rw_db();
let genesis_hash = init_genesis(db, GOERLI.clone()).unwrap();
let factory = create_test_provider_factory_with_chain_spec(GOERLI.clone());
let genesis_hash = init_genesis(factory, GOERLI.clone()).unwrap();

// actual, expected
assert_eq!(genesis_hash, GOERLI_GENESIS_HASH);
}

#[test]
fn success_init_genesis_sepolia() {
let db = create_test_rw_db();
let genesis_hash = init_genesis(db, SEPOLIA.clone()).unwrap();
let factory = create_test_provider_factory_with_chain_spec(SEPOLIA.clone());
let genesis_hash = init_genesis(factory, SEPOLIA.clone()).unwrap();

// actual, expected
assert_eq!(genesis_hash, SEPOLIA_GENESIS_HASH);
}

#[test]
fn fail_init_inconsistent_db() {
let db = create_test_rw_db();
init_genesis(db.clone(), SEPOLIA.clone()).unwrap();
let factory = create_test_provider_factory_with_chain_spec(SEPOLIA.clone());
init_genesis(factory.clone(), SEPOLIA.clone()).unwrap();

// Try to init db with a different genesis block
let genesis_hash = init_genesis(db, MAINNET.clone());
let genesis_hash = init_genesis(factory, MAINNET.clone());

assert_eq!(
genesis_hash.unwrap_err(),
Expand Down Expand Up @@ -322,13 +320,14 @@ mod tests {
..Default::default()
});

let db = create_test_rw_db();
init_genesis(db.clone(), chain_spec).unwrap();
let factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(factory.clone(), chain_spec).unwrap();

let tx = db.tx().expect("failed to init tx");
let provider = factory.provider().expect("failed to init provider");
let tx = provider.tx_ref();

assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountHistory>(&tx)
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountHistory>(tx)
.expect("failed to collect"),
vec![
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
Expand All @@ -337,7 +336,7 @@ mod tests {
);

assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::StorageHistory>(&tx)
collect_table_entries::<Arc<DatabaseEnv>, tables::StorageHistory>(tx)
.expect("failed to collect"),
vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
Expand Down
2 changes: 1 addition & 1 deletion crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit

debug!(target: "reth::cli", chain=%self.config.chain.chain, genesis=?self.config.chain.genesis_hash(), "Initializing genesis");

let genesis_hash = init_genesis(Arc::clone(&self.db), self.config.chain.clone())?;
let genesis_hash = init_genesis(provider_factory.clone(), self.config.chain.clone())?;

info!(target: "reth::cli", "{}", DisplayHardforks::new(self.config.chain.hardforks()));

Expand Down
8 changes: 2 additions & 6 deletions crates/prune/src/segments/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,9 @@ impl<DB: Database> Segment<DB> for Headers {
}

let results = [
self.prune_table::<DB, tables::CanonicalHeaders>(
provider,
block_range.clone(),
delete_limit,
)?,
self.prune_table::<DB, tables::Headers>(provider, block_range.clone(), delete_limit)?,
self.prune_table::<DB, tables::HeaderTD>(provider, block_range, delete_limit)?,
self.prune_table::<DB, tables::HeaderTD>(provider, block_range.clone(), delete_limit)?,
self.prune_table::<DB, tables::CanonicalHeaders>(provider, block_range, delete_limit)?,
];

if !results.iter().map(|(_, _, last_pruned_block)| last_pruned_block).all_equal() {
Expand Down
44 changes: 39 additions & 5 deletions crates/snapshot/src/segments/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use reth_db::{
};
use reth_interfaces::provider::ProviderResult;
use reth_primitives::{snapshot::SegmentConfig, BlockNumber, SnapshotSegment};
use reth_provider::{providers::SnapshotProvider, DatabaseProviderRO};
use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
DatabaseProviderRO,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};

/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
Expand All @@ -19,11 +22,42 @@ impl<DB: Database> Segment<DB> for Headers {

fn snapshot(
&self,
_provider: DatabaseProviderRO<DB>,
_snapshot_provider: Arc<SnapshotProvider>,
_block_range: RangeInclusive<BlockNumber>,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
unimplemented!()
let mut snapshot_writer =
snapshot_provider.get_writer(*block_range.start(), SnapshotSegment::Headers)?;

let mut headers_cursor = provider.tx_ref().cursor_read::<tables::Headers>()?;
let headers_walker = headers_cursor.walk_range(block_range.clone())?;

let mut header_td_cursor = provider.tx_ref().cursor_read::<tables::HeaderTD>()?;
let header_td_walker = header_td_cursor.walk_range(block_range.clone())?;

let mut canonical_headers_cursor =
provider.tx_ref().cursor_read::<tables::CanonicalHeaders>()?;
let canonical_headers_walker = canonical_headers_cursor.walk_range(block_range)?;

let mut headers =
headers_walker.zip(header_td_walker).zip(canonical_headers_walker).peekable();
while let Some(((header_entry, header_td_entry), canonical_header_entry)) = headers.next() {
let (header_block, header) = header_entry?;
let (header_td_block, header_td) = header_td_entry?;
let (canonical_header_block, canonical_header) = canonical_header_entry?;

debug_assert_eq!(header_block, header_td_block);
debug_assert_eq!(header_td_block, canonical_header_block);

snapshot_writer.append_header(header, header_td.0, canonical_header)?;

if headers.peek().is_some() {
let _snapshot_block = snapshot_writer.increment_block(SnapshotSegment::Headers)?;
debug_assert_eq!(_snapshot_block, header_block);
}
}

Ok(())
}

fn create_snapshot_file(
Expand Down
Loading
Loading