Skip to content

Commit

Permalink
chore: remove static file commit() from inside the stage (#6717)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Feb 22, 2024
1 parent 5a871d5 commit b0eda75
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 9 deletions.
8 changes: 7 additions & 1 deletion crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber, B256,
};
use reth_provider::{ProviderFactory, StageCheckpointReader, StageCheckpointWriter};
use reth_provider::{
providers::SnapshotWriter, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
};
use reth_tokio_util::EventListeners;
use std::pin::Pin;
use tokio::sync::watch;
Expand Down Expand Up @@ -279,7 +281,9 @@ where
self.listeners
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });

self.provider_factory.snapshot_provider().commit()?;
provider_rw.commit()?;

provider_rw = self.provider_factory.provider_rw()?;
}
Err(err) => {
Expand Down Expand Up @@ -371,6 +375,7 @@ where
result: out.clone(),
});

self.provider_factory.snapshot_provider().commit()?;
provider_rw.commit()?;

if done {
Expand Down Expand Up @@ -428,6 +433,7 @@ fn on_stage_error<DB: Database>(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
factory.snapshot_provider().commit()?;
provider_rw.commit()?;

// We unwind because of a validation error. If the unwind itself
Expand Down
8 changes: 5 additions & 3 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
highest_block = block_number;
}

// Committing static file can be done, since we unwind it if the db tx is not committed.
snapshotter.commit()?;

// The stage is "done" if:
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
Expand Down Expand Up @@ -385,6 +382,7 @@ mod tests {
// Check that we only synced around `batch_size` blocks even though the number of blocks
// synced by the previous stage is higher
let output = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint {
Expand Down Expand Up @@ -421,6 +419,7 @@ mod tests {
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
output,
Ok(ExecOutput {
Expand Down Expand Up @@ -458,6 +457,7 @@ mod tests {

// Check that we synced at least 10 blocks
let first_run = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
first_run,
Ok(ExecOutput { checkpoint: StageCheckpoint {
Expand All @@ -478,6 +478,7 @@ mod tests {

// Check that we synced more blocks
let output = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint {
Expand Down Expand Up @@ -518,6 +519,7 @@ mod tests {
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint {
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/stages/finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod tests {
generators::{random_header, random_header_range},
};
use reth_primitives::SealedHeader;
use reth_provider::providers::SnapshotWriter;

stage_test_suite_ext!(FinishTestRunner, finish);

Expand Down
4 changes: 3 additions & 1 deletion crates/stages/src/stages/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,12 @@ fn stage_checkpoint_progress<DB: Database>(
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner,
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
UnwindStageTestRunner,
};
use assert_matches::assert_matches;
use reth_primitives::{stage::StageUnitCheckpoint, Account, U256};
use reth_provider::providers::SnapshotWriter;
use test_utils::*;

stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/stages/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ mod tests {
generators::{random_block_range, random_contract_account_range},
};
use reth_primitives::{stage::StageUnitCheckpoint, Address, SealedBlock, B256, U256};
use reth_provider::providers::SnapshotWriter;

stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);

Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ where
// Append to Headers segment
writer.append_header(header, td, header_hash)?;
}
writer.commit()?;

info!(target: "sync::stages::headers", total = total_headers, "Writing header hash index");

Expand Down Expand Up @@ -566,6 +565,7 @@ mod tests {
runner.send_tip(tip.hash());

let result = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!( result, Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint {
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/stages/index_account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ mod tests {
generators::{random_block_range, random_changeset_range, random_contract_account_range},
};
use reth_primitives::{address, Address, BlockNumber, B256};
use reth_provider::providers::SnapshotWriter;
use std::collections::BTreeMap;

const ADDRESS: Address = address!("0000000000000000000000000000000000000001");
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/stages/index_storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ mod tests {
generators::{random_block_range, random_changeset_range, random_contract_account_range},
};
use reth_primitives::{address, b256, Address, BlockNumber, StorageEntry, B256, U256};
use reth_provider::providers::SnapshotWriter;
use std::collections::BTreeMap;

const ADDRESS: Address = address!("0000000000000000000000000000000000000001");
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ mod tests {
stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock,
TransactionSigned, B256,
};
use reth_provider::{PruneCheckpointWriter, TransactionsProvider};
use reth_provider::{providers::SnapshotWriter, PruneCheckpointWriter, TransactionsProvider};

use super::*;
use crate::test_utils::{
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ mod tests {
generators::{random_block, random_block_range},
};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, B256};
use reth_provider::TransactionsProvider;
use reth_provider::{providers::SnapshotWriter, TransactionsProvider};
use std::ops::Sub;

// Implement stage test suite.
Expand Down
10 changes: 10 additions & 0 deletions crates/stages/src/test_utils/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ macro_rules! stage_test_suite {

// Run stage execution
let result = runner.execute(input).await;
runner.db().factory.snapshot_provider().commit().unwrap();

// Check that the result is returned and the stage does not panic.
// The return result with empty db is stage-specific.
assert_matches::assert_matches!(result, Ok(_));
Expand Down Expand Up @@ -44,6 +46,8 @@ macro_rules! stage_test_suite {

// Assert the successful result
let result = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();

assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
Expand Down Expand Up @@ -72,6 +76,8 @@ macro_rules! stage_test_suite {

// Run stage unwind
let rx = runner.unwind(input).await;
runner.db().factory.snapshot_provider().commit().unwrap();

assert_matches::assert_matches!(
rx,
Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == input.unwind_to
Expand Down Expand Up @@ -104,6 +110,8 @@ macro_rules! stage_test_suite {

// Assert the successful execution result
let result = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();

assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
Expand Down Expand Up @@ -171,6 +179,8 @@ macro_rules! stage_test_suite_ext {

// Assert the successful result
let result = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();

assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
Expand Down
1 change: 0 additions & 1 deletion crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ impl<DB: Database> DerefMut for DatabaseProviderRW<DB> {
impl<DB: Database> DatabaseProviderRW<DB> {
/// Commit database transaction and snapshot if it exists.
pub fn commit(self) -> ProviderResult<bool> {
self.0.snapshot_provider.commit()?;
self.0.commit()
}

Expand Down

0 comments on commit b0eda75

Please sign in to comment.