Skip to content

Commit

Permalink
feat(bin): delete snapshot jars in stage drop (#6307)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Feb 2, 2024
1 parent 87476e0 commit b499acc
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 57 deletions.
4 changes: 2 additions & 2 deletions bin/reth/src/commands/db/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Command {
///
/// The discrepancies and extra elements, along with a brief summary of the diff results are
/// then written to a file in the output directory.
pub fn execute(self, tool: &DbTool<'_, DatabaseEnv>) -> eyre::Result<()> {
pub fn execute(self, tool: &DbTool<DatabaseEnv>) -> eyre::Result<()> {
// open second db
let second_db_path: PathBuf = self.secondary_datadir.join("db").into();
let second_db = open_db_read_only(
Expand All @@ -72,7 +72,7 @@ impl Command {
};

for table in tables {
let primary_tx = tool.db.tx()?;
let primary_tx = tool.provider_factory.db_ref().tx()?;
let secondary_tx = second_db.tx()?;

let output_dir = self.output.clone();
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/db/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct Command {

impl Command {
/// Execute `db get` command
pub fn execute<DB: Database>(self, tool: &DbTool<'_, DB>) -> eyre::Result<()> {
pub fn execute<DB: Database>(self, tool: &DbTool<DB>) -> eyre::Result<()> {
self.table.view(&GetValueViewer { tool, args: &self })?;

Ok(())
Expand All @@ -52,7 +52,7 @@ impl Command {
}

struct GetValueViewer<'a, DB: Database> {
tool: &'a DbTool<'a, DB>,
tool: &'a DbTool<DB>,
args: &'a Command,
}

Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/commands/db/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct Command {

impl Command {
/// Execute `db list` command
pub fn execute(self, tool: &DbTool<'_, DatabaseEnv>) -> eyre::Result<()> {
pub fn execute(self, tool: &DbTool<DatabaseEnv>) -> eyre::Result<()> {
self.table.view(&ListTableViewer { tool, args: &self })
}

Expand Down Expand Up @@ -81,15 +81,15 @@ impl Command {
}

struct ListTableViewer<'a> {
tool: &'a DbTool<'a, DatabaseEnv>,
tool: &'a DbTool<DatabaseEnv>,
args: &'a Command,
}

impl TableViewer<()> for ListTableViewer<'_> {
type Error = eyre::Report;

fn view<T: Table>(&self) -> Result<(), Self::Error> {
self.tool.db.view(|tx| {
self.tool.provider_factory.db_ref().view(|tx| {
let table_db = tx.inner.open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner.db_stat(&table_db).wrap_err(format!("Could not find table: {}", stringify!($table)))?;
let total_entries = stats.entries();
Expand Down
23 changes: 17 additions & 6 deletions bin/reth/src/commands/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use reth_db::{
Tables,
};
use reth_primitives::ChainSpec;
use reth_provider::ProviderFactory;
use std::{
io::{self, Write},
sync::Arc,
Expand Down Expand Up @@ -108,7 +109,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;
let mut stats_table = ComfyTable::new();
stats_table.load_preset(comfy_table::presets::ASCII_MARKDOWN);
stats_table.set_header([
Expand All @@ -120,7 +123,7 @@ impl Command {
"Total Size",
]);

tool.db.view(|tx| {
tool.provider_factory.db_ref().view(|tx| {
let mut tables =
Tables::ALL.iter().map(|table| table.name()).collect::<Vec<_>>();
tables.sort();
Expand Down Expand Up @@ -195,23 +198,29 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Subcommands::Diff(command) => {
let db = open_db_read_only(
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Subcommands::Get(command) => {
let db = open_db_read_only(
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Subcommands::Drop { force } => {
Expand All @@ -232,7 +241,9 @@ impl Command {

let db =
open_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?;
let mut tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let mut tool = DbTool::new(provider_factory, self.chain.clone())?;
tool.drop(db_path)?;
}
Subcommands::Clear(command) => {
Expand Down
35 changes: 30 additions & 5 deletions bin/reth/src/commands/stage/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use crate::{
};
use clap::Parser;
use reth_db::{
database::Database, mdbx::DatabaseArguments, open_db, tables, transaction::DbTxMut, DatabaseEnv,
database::Database, mdbx::DatabaseArguments, open_db, snapshot::iter_snapshots, tables,
transaction::DbTxMut, DatabaseEnv,
};
use reth_primitives::{fs, stage::StageId, ChainSpec};
use reth_primitives::{fs, snapshot::find_fixed_range, stage::StageId, ChainSpec, SnapshotSegment};
use reth_provider::ProviderFactory;
use std::sync::Arc;
use tracing::info;

Expand Down Expand Up @@ -58,11 +60,13 @@ impl Command {

let db =
open_db(db_path.as_ref(), DatabaseArguments::default().log_level(self.db.log_level))?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;

let tool = DbTool::new(&db, self.chain.clone())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;

tool.db.update(|tx| {
match &self.stage {
tool.provider_factory.db_ref().update(|tx| {
match self.stage {
StageEnum::Bodies => {
tx.clear::<tables::BlockBodyIndices>()?;
tx.clear::<tables::Transactions>()?;
Expand Down Expand Up @@ -176,6 +180,27 @@ impl Command {
Ok::<_, eyre::Error>(())
})??;

let snapshot_segment = match self.stage {
StageEnum::Headers => Some(SnapshotSegment::Headers),
StageEnum::Bodies => Some(SnapshotSegment::Transactions),
StageEnum::Execution => Some(SnapshotSegment::Receipts),
_ => None,
};

if let Some(snapshot_segment) = snapshot_segment {
let snapshot_provider = tool
.provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory");
let snapshots = iter_snapshots(snapshot_provider.directory())?;
if let Some(segment_snapshots) = snapshots.get(&snapshot_segment) {
for (block_range, _) in segment_snapshots {
snapshot_provider
.delete_jar(snapshot_segment, find_fixed_range(*block_range.start()))?;
}
}
}

Ok(())
}
}
48 changes: 36 additions & 12 deletions bin/reth/src/commands/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_execution_stage<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
to: u64,
output_db: &PathBuf,
Expand All @@ -35,30 +35,50 @@ pub(crate) async fn dump_execution_stage<DB: Database>(
/// Imports all the tables that can be copied over a range.
fn import_tables_with_range<DB: Database>(
output_db: &DatabaseEnv,
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
to: u64,
) -> eyre::Result<()> {
// We're not sharing the transaction in case the memory grows too much.

output_db.update(|tx| {
tx.import_table_with_range::<tables::CanonicalHeaders, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::CanonicalHeaders, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::HeaderTD, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::HeaderTD, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::Headers, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::Headers, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockBodyIndices, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::BlockBodyIndices, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockOmmers, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::BlockOmmers, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;

// Find range of transactions that need to be copied over
let (from_tx, to_tx) = db_tool.db.view(|read_tx| {
let (from_tx, to_tx) = db_tool.provider_factory.db_ref().view(|read_tx| {
let mut read_cursor = read_tx.cursor_read::<tables::BlockBodyIndices>()?;
let (_, from_block) =
read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
Expand All @@ -73,14 +93,18 @@ fn import_tables_with_range<DB: Database>(

output_db.update(|tx| {
tx.import_table_with_range::<tables::Transactions, _>(
&db_tool.db.tx()?,
&db_tool.provider_factory.db_ref().tx()?,
Some(from_tx),
to_tx,
)
})??;

output_db.update(|tx| {
tx.import_table_with_range::<tables::TxSenders, _>(&db_tool.db.tx()?, Some(from_tx), to_tx)
tx.import_table_with_range::<tables::TxSenders, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from_tx),
to_tx,
)
})??;

Ok(())
Expand All @@ -90,12 +114,12 @@ fn import_tables_with_range<DB: Database>(
/// PlainAccountState safely. There might be some state dependency from an address
/// which hasn't been changed in the given range.
async fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone());
let provider = factory.provider_rw()?;

let mut exec_stage =
Expand Down
12 changes: 8 additions & 4 deletions bin/reth/src/commands/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_hashing_account_stage<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: BlockNumber,
to: BlockNumber,
output_db: &PathBuf,
Expand All @@ -19,7 +19,11 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(

// Import relevant AccountChangeSets
output_db.update(|tx| {
tx.import_table_with_range::<tables::AccountChangeSet, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::AccountChangeSet, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;

unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
Expand All @@ -33,12 +37,12 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone());
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage::default();

Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/commands/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
to: u64,
output_db: &PathBuf,
Expand All @@ -28,12 +28,12 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone());
let provider = factory.provider_rw()?;

let mut exec_stage = StorageHashingStage::default();
Expand Down
Loading

0 comments on commit b499acc

Please sign in to comment.