Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): make snapshots non-optional in provider factory #6344

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions bin/reth/src/commands/db/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ impl Command {
table.view(&ClearViewer { db: provider_factory.db_ref() })?
}
Subcommands::Snapshot { segment } => {
let snapshot_provider = provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory");
let snapshot_provider = provider_factory.snapshot_provider();
let snapshots = iter_snapshots(snapshot_provider.directory())?;

if let Some(segment_snapshots) = snapshots.get(&segment) {
Expand Down
32 changes: 19 additions & 13 deletions bin/reth/src/commands/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(data_dir, &tool)?;
}
Expand All @@ -114,8 +115,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Expand All @@ -124,8 +126,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Expand All @@ -134,8 +137,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Expand All @@ -157,20 +161,22 @@ impl Command {

let db =
open_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

let mut tool = DbTool::new(provider_factory, self.chain.clone())?;
tool.drop(db_path)?;
}
Subcommands::Clear(command) => {
let db =
open_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?;

command.execute(provider_factory)?;
}
Subcommands::Snapshot(command) => {
command.execute(&db_path, self.db.log_level, self.chain.clone())?;
command.execute(data_dir, self.db.log_level, self.chain.clone())?;
}
Subcommands::Version => {
let local_db_version = match get_db_version(&db_path) {
Expand Down
9 changes: 3 additions & 6 deletions bin/reth/src/commands/db/snapshots/bench.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use reth_db::DatabaseEnv;
use reth_primitives::{
snapshot::{Compression, Filters},
ChainSpec, SnapshotSegment,
SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, ProviderFactory};
use std::{fmt::Debug, sync::Arc, time::Instant};
Expand All @@ -16,7 +16,7 @@ pub(crate) enum BenchKind {

pub(crate) fn bench<F1, F2, R>(
bench_kind: BenchKind,
db: (DatabaseEnv, Arc<ChainSpec>),
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
Expand All @@ -28,8 +28,6 @@ where
F2: Fn(DatabaseProviderRO<DatabaseEnv>) -> eyre::Result<R>,
R: Debug + PartialEq,
{
let (db, chain) = db;

println!();
println!("############");
println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]");
Expand All @@ -42,8 +40,7 @@ where
};

let db_result = {
let factory = ProviderFactory::new(db, chain);
let provider = factory.provider()?;
let provider = provider_factory.provider()?;
let start = Instant::now();
let result = database_method(provider)?;
let end = start.elapsed().as_micros();
Expand Down
34 changes: 12 additions & 22 deletions bin/reth/src/commands/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,25 @@ use super::{
Command,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{mdbx::DatabaseArguments, open_db_read_only, snapshot::HeaderMask};
use reth_interfaces::db::LogLevel;
use reth_db::{snapshot::HeaderMask, DatabaseEnv};
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
BlockHash, ChainSpec, Header, SnapshotSegment,
BlockHash, Header, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use std::{path::PathBuf, sync::Arc};

impl Command {
pub(crate) fn bench_headers_snapshot(
&self,
db_path: &Path,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: Option<PerfectHashingFunction>,
) -> eyre::Result<()> {
let db_args = DatabaseArguments::default().log_level(log_level);

let factory = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone());
let provider = factory.provider()?;
let provider = provider_factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();
Expand Down Expand Up @@ -58,7 +49,7 @@ impl Command {
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Headers,
filters,
compression,
Expand Down Expand Up @@ -89,7 +80,7 @@ impl Command {
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Headers,
filters,
compression,
Expand All @@ -109,15 +100,14 @@ impl Command {
// BENCHMARK QUERYING A RANDOM HEADER BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let header_hash =
ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone())
.header_by_number(num)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?
.hash_slow();
let header_hash = provider_factory
.header_by_number(num)?
.ok_or(ProviderError::HeaderNotFound(num.into()))?
.hash_slow();

bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Headers,
filters,
compression,
Expand Down
37 changes: 17 additions & 20 deletions bin/reth/src/commands/db/snapshots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use reth_db::{
};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_primitives::{
snapshot::{
Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, SegmentHeader,
Expand Down Expand Up @@ -81,7 +82,7 @@ impl Command {
/// Execute `db snapshot` command
pub fn execute(
self,
db_path: &Path,
data_dir: ChainPath<DataDirPath>,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
) -> eyre::Result<()> {
Expand All @@ -95,14 +96,16 @@ impl Command {
self.phf.iter().copied().map(Some).collect::<Vec<_>>()
});

{
let db = open_db_read_only(
db_path,
DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
)?;
let factory = Arc::new(ProviderFactory::new(db, chain.clone()));
let db = open_db_read_only(
data_dir.db_path().as_path(),
DatabaseArguments::default()
.log_level(log_level)
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
)?;
let provider_factory =
Arc::new(ProviderFactory::new(db, chain.clone(), data_dir.snapshots_path())?);

{
if !self.only_bench {
for ((mode, compression), phf) in all_combinations.clone() {
let filters = if let Some(phf) = self.with_filters.then_some(phf).flatten() {
Expand All @@ -113,17 +116,17 @@ impl Command {

match mode {
SnapshotSegment::Headers => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
provider_factory.clone(),
snap_segments::Headers,
SegmentConfig { filters, compression },
)?,
SnapshotSegment::Transactions => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
provider_factory.clone(),
snap_segments::Transactions,
SegmentConfig { filters, compression },
)?,
SnapshotSegment::Receipts => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
provider_factory.clone(),
snap_segments::Receipts,
SegmentConfig { filters, compression },
)?,
Expand All @@ -136,25 +139,19 @@ impl Command {
for ((mode, compression), phf) in all_combinations {
match mode {
SnapshotSegment::Headers => self.bench_headers_snapshot(
db_path,
log_level,
chain.clone(),
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
)?,
SnapshotSegment::Transactions => self.bench_transactions_snapshot(
db_path,
log_level,
chain.clone(),
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
)?,
SnapshotSegment::Receipts => self.bench_receipts_snapshot(
db_path,
log_level,
chain.clone(),
provider_factory.clone(),
compression,
InclusionFilter::Cuckoo,
phf,
Expand Down
33 changes: 11 additions & 22 deletions bin/reth/src/commands/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,27 @@ use super::{
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{open_db_read_only, snapshot::ReceiptMask};
use reth_interfaces::db::LogLevel;
use reth_db::{snapshot::ReceiptMask, DatabaseEnv};
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, Receipt, SnapshotSegment,
Receipt, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, BlockNumReader, ProviderError, ProviderFactory, ReceiptProvider,
TransactionsProvider, TransactionsProviderExt,
};

use reth_db::mdbx::DatabaseArguments;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use std::{path::PathBuf, sync::Arc};

impl Command {
pub(crate) fn bench_receipts_snapshot(
&self,
db_path: &Path,
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
provider_factory: Arc<ProviderFactory<DatabaseEnv>>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: Option<PerfectHashingFunction>,
) -> eyre::Result<()> {
let db_args = DatabaseArguments::default().log_level(log_level);

let factory = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone());
let provider = factory.provider()?;
let provider = provider_factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();
Expand All @@ -46,9 +36,8 @@ impl Command {

let mut rng = rand::thread_rng();

let tx_range = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range.clone())?;
let tx_range =
provider_factory.provider()?.transaction_range_by_block_range(block_range.clone())?;

let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

Expand All @@ -67,7 +56,7 @@ impl Command {
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(
bench_kind,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Receipts,
filters,
compression,
Expand Down Expand Up @@ -98,7 +87,7 @@ impl Command {
let num = row_indexes[rng.gen_range(0..row_indexes.len())];
bench(
BenchKind::RandomOne,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory.clone(),
SnapshotSegment::Receipts,
filters,
compression,
Expand All @@ -118,14 +107,14 @@ impl Command {
// BENCHMARK QUERYING A RANDOM RECEIPT BY HASH
{
let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64;
let tx_hash = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone())
let tx_hash = provider_factory
.transaction_by_id(num)?
.ok_or(ProviderError::ReceiptNotFound(num.into()))?
.hash();

bench(
BenchKind::RandomHash,
(open_db_read_only(db_path, db_args)?, chain.clone()),
provider_factory,
SnapshotSegment::Receipts,
filters,
compression,
Expand Down
Loading