Skip to content

Commit

Permalink
feat(storage): make snapshots non-optional in provider factory (#6344)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Feb 2, 2024
1 parent 333bc2c commit af32e85
Show file tree
Hide file tree
Showing 42 changed files with 513 additions and 425 deletions.
4 changes: 1 addition & 3 deletions bin/reth/src/commands/db/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ impl Command {
table.view(&ClearViewer { db: provider_factory.db_ref() })?
}
Subcommands::Snapshot { segment } => {
let snapshot_provider = provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory");
let snapshot_provider = provider_factory.snapshot_provider();
let snapshots = iter_snapshots(snapshot_provider.directory())?;

if let Some(segment_snapshots) = snapshots.get(&segment) {
Expand Down
32 changes: 19 additions & 13 deletions bin/reth/src/commands/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(data_dir, &tool)?;
}
Expand All @@ -114,8 +115,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Expand All @@ -124,8 +126,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Expand All @@ -134,8 +137,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Expand All @@ -157,20 +161,22 @@ impl Command {

let db =
open_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let mut tool = DbTool::new(provider_factory, self.chain.clone())?;
tool.drop(db_path)?;
}
Subcommands::Clear(command) => {
let db =
open_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

command.execute(provider_factory)?;
}
Subcommands::Snapshot(command) => {
command.execute(&db_path, self.db.log_level, self.chain.clone())?;
command.execute(data_dir, self.db.log_level, self.chain.clone())?;
}
Subcommands::Version => {
let local_db_version = match get_db_version(&db_path) {
Expand Down
9 changes: 3 additions & 6 deletions bin/reth/src/commands/db/snapshots/bench.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use reth_db::DatabaseEnv;
use reth_primitives::{
snapshot::{Compression, Filters},
ChainSpec, SnapshotSegment,
SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, ProviderFactory};
use std::{fmt::Debug, sync::Arc, time::Instant};
Expand All @@ -16,7 +16,7 @@ pub(crate) enum BenchKind {

pub(crate) fn bench<F1, F2, R>(
bench_kind: BenchKind,
db: (DatabaseEnv, Arc<ChainSpec>),
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
Expand All @@ -28,8 +28,6 @@ where
F2: Fn(DatabaseProviderRO<DatabaseEnv>) -> eyre::Result<R>,
R: Debug + PartialEq,
{
let (db, chain) = db;

println!();
println!("############");
println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]");
Expand All @@ -42,8 +40,7 @@ where
};

let db_result = {
let factory = ProviderFactory::new(db, chain);
let provider = factory.provider()?;
let provider = provider_factory.provider()?;
let start = Instant::now();
let result = database_method(provider)?;
let end = start.elapsed().as_micros();
Expand Down
34 changes: 12 additions & 22 deletions bin/reth/src/commands/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,25 @@ use super::{
Command,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{mdbx::DatabaseArguments, open_db_read_only, snapshot::HeaderMask};
use reth_interfaces::db::LogLevel;
use reth_db::{snapshot::HeaderMask, DatabaseEnv};
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
BlockHash, ChainSpec, Header, SnapshotSegment,
BlockHash, Header, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use std::{path::PathBuf, sync::Arc};

impl Command {
pub(crate) fn bench_headers_snapshot(
&self,
db_path: &Path,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: Option<PerfectHashingFunction>,
) -> eyre::Result<()> {
let db_args = DatabaseArguments::default().log_level(log_level);

let factory = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone());
let provider = factory.provider()?;
let provider = provider_factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();
Expand Down Expand Up @@ -58,7 +49,7 @@ impl Command {
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Headers,
filters,
compression,
Expand Down Expand Up @@ -89,7 +80,7 @@ impl Command {
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Headers,
filters,
compression,
Expand All @@ -109,15 +100,14 @@ impl Command {
// BENCHMARK QUERYING A RANDOM HEADER BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let header_hash =
ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone())
.header_by_number(num)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?
.hash_slow();
let header_hash = provider_factory
.header_by_number(num)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?
.hash_slow();

bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Headers,
filters,
compression,
Expand Down
37 changes: 17 additions & 20 deletions bin/reth/src/commands/db/snapshots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use reth_db::{
};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_primitives::{
snapshot::{
Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, SegmentHeader,
Expand Down Expand Up @@ -81,7 +82,7 @@ impl Command {
/// Execute `db snapshot` command
pub fn execute(
self,
db_path: &Path,
data_dir: ChainPath<DataDirPath>,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
) -> eyre::Result<()> {
Expand All @@ -95,14 +96,16 @@ impl Command {
self.phf.iter().copied().map(Some).collect::<Vec<_>>()
});

{
let db = open_db_read_only(
db_path,
DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
)?;
let factory = Arc::new(ProviderFactory::new(db, chain.clone()));
let db = open_db_read_only(
data_dir.db_path().as_path(),
DatabaseArguments::default()
.log_level(log_level)
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
)?;
let provider_factory =
Arc::new(ProviderFactory::new(db, chain.clone(), data_dir.snapshots_path())?);

{
if !self.only_bench {
for ((mode, compression), phf) in all_combinations.clone() {
let filters = if let Some(phf) = self.with_filters.then_some(phf).flatten() {
Expand All @@ -113,17 +116,17 @@ impl Command {

match mode {
SnapshotSegment::Headers => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
provider_factory.clone(),
snap_segments::Headers,
SegmentConfig { filters, compression },
)?,
SnapshotSegment::Transactions => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
provider_factory.clone(),
snap_segments::Transactions,
SegmentConfig { filters, compression },
)?,
SnapshotSegment::Receipts => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
provider_factory.clone(),
snap_segments::Receipts,
SegmentConfig { filters, compression },
)?,
Expand All @@ -136,25 +139,19 @@ impl Command {
for ((mode, compression), phf) in all_combinations {
match mode {
SnapshotSegment::Headers => self.bench_headers_snapshot(
db_path,
log_level,
chain.clone(),
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
)?,
SnapshotSegment::Transactions => self.bench_transactions_snapshot(
db_path,
log_level,
chain.clone(),
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
)?,
SnapshotSegment::Receipts => self.bench_receipts_snapshot(
db_path,
log_level,
chain.clone(),
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
Expand Down
33 changes: 11 additions & 22 deletions bin/reth/src/commands/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,27 @@ use super::{
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{open_db_read_only, snapshot::ReceiptMask};
use reth_interfaces::db::LogLevel;
use reth_db::{snapshot::ReceiptMask, DatabaseEnv};
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, Receipt, SnapshotSegment,
Receipt, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, BlockNumReader, ProviderError, ProviderFactory, ReceiptProvider,
TransactionsProvider, TransactionsProviderExt,
};

use reth_db::mdbx::DatabaseArguments;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use std::{path::PathBuf, sync::Arc};

impl Command {
pub(crate) fn bench_receipts_snapshot(
&self,
db_path: &Path,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: Option<PerfectHashingFunction>,
) -> eyre::Result<()> {
let db_args = DatabaseArguments::default().log_level(log_level);

let factory = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone());
let provider = factory.provider()?;
let provider = provider_factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();
Expand All @@ -46,9 +36,8 @@ impl Command {

let mut rng = rand::thread_rng();

let tx_range = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range.clone())?;
let tx_range =
provider_factory.provider()?.transaction_range_by_block_range(block_range.clone())?;

let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

Expand All @@ -67,7 +56,7 @@ impl Command {
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Receipts,
filters,
compression,
Expand Down Expand Up @@ -98,7 +87,7 @@ impl Command {
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Receipts,
filters,
compression,
Expand All @@ -118,14 +107,14 @@ impl Command {
// BENCHMARK QUERYING A RANDOM RECEIPT BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let tx_hash = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone())
let tx_hash = provider_factory
.transaction_by_id(num)?
.ok_or(ProviderError::ReceiptNotFound(num.into()))?
.hash();

bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory,
SnapshotSegment::Receipts,
filters,
compression,
Expand Down
Loading

0 comments on commit af32e85

Please sign in to comment.