Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bin): delete snapshot jars in stage drop #6307

Merged
merged 3 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/reth/src/commands/db/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Command {
///
/// The discrepancies and extra elements, along with a brief summary of the diff results are
/// then written to a file in the output directory.
pub fn execute(self, tool: &DbTool<'_, DatabaseEnv>) -> eyre::Result<()> {
pub fn execute(self, tool: &DbTool<DatabaseEnv>) -> eyre::Result<()> {
// open second db
let second_db_path: PathBuf = self.secondary_datadir.join("db").into();
let second_db = open_db_read_only(
Expand All @@ -72,7 +72,7 @@ impl Command {
};

for table in tables {
let primary_tx = tool.db.tx()?;
let primary_tx = tool.provider_factory.db_ref().tx()?;
let secondary_tx = second_db.tx()?;

let output_dir = self.output.clone();
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/db/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct Command {

impl Command {
/// Execute `db get` command
pub fn execute<DB: Database>(self, tool: &DbTool<'_, DB>) -> eyre::Result<()> {
pub fn execute<DB: Database>(self, tool: &DbTool<DB>) -> eyre::Result<()> {
self.table.view(&GetValueViewer { tool, args: &self })?;

Ok(())
Expand All @@ -52,7 +52,7 @@ impl Command {
}

struct GetValueViewer<'a, DB: Database> {
tool: &'a DbTool<'a, DB>,
tool: &'a DbTool<DB>,
args: &'a Command,
}

Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/commands/db/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct Command {

impl Command {
/// Execute `db list` command
pub fn execute(self, tool: &DbTool<'_, DatabaseEnv>) -> eyre::Result<()> {
pub fn execute(self, tool: &DbTool<DatabaseEnv>) -> eyre::Result<()> {
self.table.view(&ListTableViewer { tool, args: &self })
}

Expand Down Expand Up @@ -81,15 +81,15 @@ impl Command {
}

struct ListTableViewer<'a> {
tool: &'a DbTool<'a, DatabaseEnv>,
tool: &'a DbTool<DatabaseEnv>,
args: &'a Command,
}

impl TableViewer<()> for ListTableViewer<'_> {
type Error = eyre::Report;

fn view<T: Table>(&self) -> Result<(), Self::Error> {
self.tool.db.view(|tx| {
self.tool.provider_factory.db_ref().view(|tx| {
let table_db = tx.inner.open_db(Some(self.args.table.name())).wrap_err("Could not open db.")?;
let stats = tx.inner.db_stat(&table_db).wrap_err(format!("Could not find table: {}", stringify!($table)))?;
let total_entries = stats.entries();
Expand Down
23 changes: 17 additions & 6 deletions bin/reth/src/commands/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use reth_db::{
Tables,
};
use reth_primitives::ChainSpec;
use reth_provider::ProviderFactory;
use std::{
io::{self, Write},
sync::Arc,
Expand Down Expand Up @@ -108,7 +109,9 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;
let mut stats_table = ComfyTable::new();
stats_table.load_preset(comfy_table::presets::ASCII_MARKDOWN);
stats_table.set_header([
Expand All @@ -120,7 +123,7 @@ impl Command {
"Total Size",
]);

tool.db.view(|tx| {
tool.provider_factory.db_ref().view(|tx| {
let mut tables =
Tables::ALL.iter().map(|table| table.name()).collect::<Vec<_>>();
tables.sort();
Expand Down Expand Up @@ -195,23 +198,29 @@ impl Command {
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Subcommands::Diff(command) => {
let db = open_db_read_only(
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Subcommands::Get(command) => {
let db = open_db_read_only(
&db_path,
DatabaseArguments::default().log_level(self.db.log_level),
)?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;
command.execute(&tool)?;
}
Subcommands::Drop { force } => {
Expand All @@ -232,7 +241,9 @@ impl Command {

let db =
open_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?;
let mut tool = DbTool::new(&db, self.chain.clone())?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;
let mut tool = DbTool::new(provider_factory, self.chain.clone())?;
tool.drop(db_path)?;
}
Subcommands::Clear(command) => {
Expand Down
35 changes: 30 additions & 5 deletions bin/reth/src/commands/stage/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use crate::{
};
use clap::Parser;
use reth_db::{
database::Database, mdbx::DatabaseArguments, open_db, tables, transaction::DbTxMut, DatabaseEnv,
database::Database, mdbx::DatabaseArguments, open_db, snapshot::iter_snapshots, tables,
transaction::DbTxMut, DatabaseEnv,
};
use reth_primitives::{fs, stage::StageId, ChainSpec};
use reth_primitives::{fs, snapshot::find_fixed_range, stage::StageId, ChainSpec, SnapshotSegment};
use reth_provider::ProviderFactory;
use std::sync::Arc;
use tracing::info;

Expand Down Expand Up @@ -58,11 +60,13 @@ impl Command {

let db =
open_db(db_path.as_ref(), DatabaseArguments::default().log_level(self.db.log_level))?;
let provider_factory = ProviderFactory::new(db, self.chain.clone())
.with_snapshots(data_dir.snapshots_path())?;

let tool = DbTool::new(&db, self.chain.clone())?;
let tool = DbTool::new(provider_factory, self.chain.clone())?;

tool.db.update(|tx| {
match &self.stage {
tool.provider_factory.db_ref().update(|tx| {
match self.stage {
StageEnum::Bodies => {
tx.clear::<tables::BlockBodyIndices>()?;
tx.clear::<tables::Transactions>()?;
Expand Down Expand Up @@ -176,6 +180,27 @@ impl Command {
Ok::<_, eyre::Error>(())
})??;

let snapshot_segment = match self.stage {
StageEnum::Headers => Some(SnapshotSegment::Headers),
StageEnum::Bodies => Some(SnapshotSegment::Transactions),
StageEnum::Execution => Some(SnapshotSegment::Receipts),
_ => None,
};

if let Some(snapshot_segment) = snapshot_segment {
let snapshot_provider = tool
.provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory");
let snapshots = iter_snapshots(snapshot_provider.directory())?;
if let Some(segment_snapshots) = snapshots.get(&snapshot_segment) {
for (block_range, _) in segment_snapshots {
snapshot_provider
.delete_jar(snapshot_segment, find_fixed_range(*block_range.start()))?;
}
}
}

Ok(())
}
}
48 changes: 36 additions & 12 deletions bin/reth/src/commands/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_execution_stage<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
to: u64,
output_db: &PathBuf,
Expand All @@ -35,30 +35,50 @@ pub(crate) async fn dump_execution_stage<DB: Database>(
/// Imports all the tables that can be copied over a range.
fn import_tables_with_range<DB: Database>(
output_db: &DatabaseEnv,
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
to: u64,
) -> eyre::Result<()> {
// We're not sharing the transaction in case the memory grows too much.

output_db.update(|tx| {
tx.import_table_with_range::<tables::CanonicalHeaders, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::CanonicalHeaders, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::HeaderTD, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::HeaderTD, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::Headers, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::Headers, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockBodyIndices, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::BlockBodyIndices, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockOmmers, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::BlockOmmers, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;

// Find range of transactions that need to be copied over
let (from_tx, to_tx) = db_tool.db.view(|read_tx| {
let (from_tx, to_tx) = db_tool.provider_factory.db_ref().view(|read_tx| {
let mut read_cursor = read_tx.cursor_read::<tables::BlockBodyIndices>()?;
let (_, from_block) =
read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
Expand All @@ -73,14 +93,18 @@ fn import_tables_with_range<DB: Database>(

output_db.update(|tx| {
tx.import_table_with_range::<tables::Transactions, _>(
&db_tool.db.tx()?,
&db_tool.provider_factory.db_ref().tx()?,
Some(from_tx),
to_tx,
)
})??;

output_db.update(|tx| {
tx.import_table_with_range::<tables::TxSenders, _>(&db_tool.db.tx()?, Some(from_tx), to_tx)
tx.import_table_with_range::<tables::TxSenders, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from_tx),
to_tx,
)
})??;

Ok(())
Expand All @@ -90,12 +114,12 @@ fn import_tables_with_range<DB: Database>(
/// PlainAccountState safely. There might be some state dependency from an address
/// which hasn't been changed in the given range.
async fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone());
let provider = factory.provider_rw()?;

let mut exec_stage =
Expand Down
12 changes: 8 additions & 4 deletions bin/reth/src/commands/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_hashing_account_stage<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: BlockNumber,
to: BlockNumber,
output_db: &PathBuf,
Expand All @@ -19,7 +19,11 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(

// Import relevant AccountChangeSets
output_db.update(|tx| {
tx.import_table_with_range::<tables::AccountChangeSet, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::AccountChangeSet, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
)
})??;

unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
Expand All @@ -33,12 +37,12 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone());
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage::default();

Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/commands/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
to: u64,
output_db: &PathBuf,
Expand All @@ -28,12 +28,12 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
db_tool: &DbTool<DB>,
from: u64,
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone());
let provider = factory.provider_rw()?;

let mut exec_stage = StorageHashingStage::default();
Expand Down
Loading