Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use UnifiedStorageWriter::commit where possible #10019

Merged
merged 6 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use reth_network::{BlockDownloaderProvider, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
writer::StorageWriter, AccountExtReader, ChainSpecProvider, HashingWriter, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderFactory, StageCheckpointReader,
StateWriter, StaticFileProviderFactory, StorageReader,
writer::UnifiedStorageWriter, AccountExtReader, ChainSpecProvider, HashingWriter,
HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderFactory,
StageCheckpointReader, StateWriter, StaticFileProviderFactory, StorageReader,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::StageId;
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Command {
.try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?;
let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
let storages = provider_rw.plain_state_storages(storage_lists)?;
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use reth_network_api::NetworkInfo;
use reth_network_p2p::full_block::FullBlockClient;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
writer::StorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider,
writer::UnifiedStorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
};
use reth_revm::database::StateProviderDatabase;
Expand Down Expand Up @@ -155,7 +155,7 @@ impl Command {
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
let execution_outcome = executor.finalize();

let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;

let checkpoint = Some(StageCheckpoint::new(
Expand Down
5 changes: 2 additions & 3 deletions crates/cli/commands/src/stage/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_db_common::{
DbTool,
};
use reth_node_core::args::StageEnum;
use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
use reth_provider::{writer::UnifiedStorageWriter, StaticFileProviderFactory};
use reth_stages::StageId;
use reth_static_file_types::{find_fixed_range, StaticFileSegment};

Expand Down Expand Up @@ -174,8 +174,7 @@ impl Command {

tx.put::<tables::StageCheckpoints>(StageId::Finish.to_string(), Default::default())?;

static_file_provider.commit()?;
provider_rw.commit()?;
UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?;

Ok(())
}
Expand Down
17 changes: 7 additions & 10 deletions crates/cli/commands/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use reth_node_metrics::{
version::VersionInfo,
};
use reth_provider::{
ChainSpecProvider, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
StaticFileWriter,
writer::UnifiedStorageWriter, ChainSpecProvider, StageCheckpointReader, StageCheckpointWriter,
StaticFileProviderFactory,
};
use reth_stages::{
stages::{
Expand Down Expand Up @@ -272,12 +272,10 @@ impl Command {
}

if self.commit {
// For unwinding it makes more sense to commit the database first, since if
// this function is interrupted before the static files commit, we can just
// truncate the static files according to the
// checkpoints on the next start-up.
provider_rw.commit()?;
provider_factory.static_file_provider().commit()?;
UnifiedStorageWriter::commit_unwind(
provider_rw,
provider_factory.static_file_provider(),
)?;
provider_rw = provider_factory.provider_rw()?;
}
}
Expand All @@ -300,8 +298,7 @@ impl Command {
provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
}
if self.commit {
provider_factory.static_file_provider().commit()?;
provider_rw.commit()?;
UnifiedStorageWriter::commit(provider_rw, provider_factory.static_file_provider())?;
provider_rw = provider_factory.provider_rw()?;
}

Expand Down
10 changes: 5 additions & 5 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_primitives::{SealedBlock, B256};
use reth_provider::{writer::StorageWriter, ProviderFactory, StaticFileProviderFactory};
use reth_provider::{writer::UnifiedStorageWriter, ProviderFactory, StaticFileProviderFactory};
use reth_prune::{Pruner, PrunerOutput};
use std::sync::{
mpsc::{Receiver, SendError, Sender},
Expand Down Expand Up @@ -62,10 +62,10 @@ where
let provider_rw = self.provider.provider_rw().expect("todo: handle errors");
let sf_provider = self.provider.static_file_provider();

StorageWriter::from(&provider_rw, &sf_provider)
UnifiedStorageWriter::from(&provider_rw, &sf_provider)
.remove_blocks_above(new_tip_num)
.expect("todo: handle errors");
StorageWriter::commit_unwind(provider_rw, sf_provider)
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)
.expect("todo: handle errors");

// we ignore the error because the caller may or may not care about the result
Expand All @@ -80,10 +80,10 @@ where
let provider_rw = self.provider.provider_rw().expect("todo: handle errors");
let static_file_provider = self.provider.static_file_provider();

StorageWriter::from(&provider_rw, &static_file_provider)
UnifiedStorageWriter::from(&provider_rw, &static_file_provider)
.save_blocks(&blocks)
.expect("todo: handle errors");
StorageWriter::commit(provider_rw, static_file_provider)
UnifiedStorageWriter::commit(provider_rw, static_file_provider)
.expect("todo: handle errors");

// we ignore the error because the caller may or may not care about the result
Expand Down
5 changes: 2 additions & 3 deletions crates/optimism/cli/src/commands/import_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,13 @@ where
static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)?;

// finally, write the receipts
let mut storage_writer = StorageWriter::new(Some(&provider), Some(static_file_producer));
let mut storage_writer = StorageWriter::from(&provider, static_file_producer);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
}

provider.commit()?;
// as static files works in file ranges, internally it will be committing when creating the
// next file range already, so we only need to call explicitly at the end.
static_file_provider.commit()?;
UnifiedStorageWriter::commit(provider, static_file_provider)?;

Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
}
Expand Down
28 changes: 11 additions & 17 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures_util::Future;
use reth_db_api::database::Database;
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::StaticFileWriter, FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory,
writer::UnifiedStorageWriter, FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory,
StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_prune::PrunerBuilder;
Expand Down Expand Up @@ -342,12 +342,10 @@ where
))?;
}

// For unwinding it makes more sense to commit the database first, since if
// this function is interrupted before the static files commit, we can just
// truncate the static files according to the
// checkpoints on the next start-up.
provider_rw.commit()?;
self.provider_factory.static_file_provider().commit()?;
UnifiedStorageWriter::commit_unwind(
provider_rw,
self.provider_factory.static_file_provider(),
)?;

stage.post_unwind_commit()?;

Expand Down Expand Up @@ -455,14 +453,10 @@ where
result: out.clone(),
});

// For execution it makes more sense to commit the static files first, since if
// this function is interrupted before the database commit, we can just truncate
// the static files according to the checkpoints on the next
// start-up.
self.provider_factory.static_file_provider().commit()?;
provider_rw.commit()?;

stage.post_execute_commit()?;
UnifiedStorageWriter::commit(
provider_rw,
self.provider_factory.static_file_provider(),
)?;

if done {
let block_number = checkpoint.block_number;
Expand Down Expand Up @@ -520,8 +514,8 @@ fn on_stage_error<DB: Database>(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
factory.static_file_provider().commit()?;
provider_rw.commit()?;

UnifiedStorageWriter::commit(provider_rw, factory.static_file_provider())?;

// We unwind because of a validation error. If the unwind itself
// fails, we bail entirely,
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_primitives::{BlockNumber, Header, StaticFileSegment};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::StorageWriter,
writer::UnifiedStorageWriter,
BlockReader, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, StateWriter, StatsReader, TransactionVariant,
};
Expand Down Expand Up @@ -361,7 +361,7 @@ where
let time = Instant::now();

// write output
let mut writer = StorageWriter::new(Some(provider), static_file_producer);
let mut writer = UnifiedStorageWriter::new(provider, static_file_producer);
writer.write_to_storage(state, OriginalValuesKnown::Yes)?;

let db_write_duration = time.elapsed();
Expand Down
9 changes: 5 additions & 4 deletions crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use reth_primitives::{
use reth_provider::{
errors::provider::ProviderResult,
providers::{StaticFileProvider, StaticFileWriter},
writer::StorageWriter,
writer::UnifiedStorageWriter,
BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DatabaseProviderRW,
ExecutionOutcome, HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError,
ProviderFactory, RevertsInit, StageCheckpointWriter, StateWriter, StaticFileProviderFactory,
Expand Down Expand Up @@ -131,8 +131,9 @@ pub fn init_genesis<DB: Database>(factory: ProviderFactory<DB>) -> Result<B256,
let segment = StaticFileSegment::Transactions;
static_file_provider.latest_writer(segment)?.increment_block(0)?;

provider_rw.commit()?;
static_file_provider.commit()?;
// `commit_unwind`` will first commit the DB and then the static file provider, which is
// necessary on `init_genesis`.
UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?;

Ok(hash)
}
Expand Down Expand Up @@ -210,7 +211,7 @@ pub fn insert_state<'a, 'b, DB: Database>(
Vec::new(),
);

let mut storage_writer = StorageWriter::new(Some(provider), None);
let mut storage_writer = UnifiedStorageWriter::from_database(provider);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;

trace!(target: "reth::cli", "Inserted state");
Expand Down
5 changes: 1 addition & 4 deletions crates/storage/errors/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::db::DatabaseError;
use reth_primitives::StaticFileSegment;

/// `StorageWriter` related errors
/// `UnifiedStorageWriter` related errors
#[derive(Clone, Debug, thiserror_no_std::Error, PartialEq, Eq)]
pub enum StorageWriterError {
/// Database writer is missing
#[error("Database writer is missing")]
MissingDatabaseWriter,
/// Static file writer is missing
#[error("Static file writer is missing")]
MissingStaticFileWriter,
Expand Down
6 changes: 3 additions & 3 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
writer::StorageWriter,
writer::UnifiedStorageWriter,
AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader,
BlockReader, BlockWriter, BundleStateInit, EvmEnvProvider, FinalizedBlockReader,
FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
Expand Down Expand Up @@ -3570,7 +3570,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
Ok(block_indices)
}

/// TODO(joshie): this fn should be moved to `StorageWriter` eventually
/// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
Expand Down Expand Up @@ -3600,7 +3600,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
// Must be written after blocks because of the receipt lookup.
// TODO: should _these_ be moved to storagewriter? seems like storagewriter should be
// _above_ db provider
let mut storage_writer = StorageWriter::new(Some(self), None);
let mut storage_writer = UnifiedStorageWriter::from_database(self);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
durations_recorder.record_relative(metrics::Action::InsertState);

Expand Down
Loading
Loading