Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove static file commit() from inside the stage #6717

Merged
merged 4 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber, B256,
};
use reth_provider::{ProviderFactory, StageCheckpointReader, StageCheckpointWriter};
use reth_provider::{
providers::SnapshotWriter, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
};
use reth_tokio_util::EventListeners;
use std::pin::Pin;
use tokio::sync::watch;
Expand Down Expand Up @@ -279,7 +281,9 @@ where
self.listeners
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });

self.provider_factory.snapshot_provider().commit()?;
provider_rw.commit()?;

provider_rw = self.provider_factory.provider_rw()?;
}
Err(err) => {
Expand Down Expand Up @@ -371,6 +375,7 @@ where
result: out.clone(),
});

self.provider_factory.snapshot_provider().commit()?;
provider_rw.commit()?;

if done {
Expand Down Expand Up @@ -428,6 +433,7 @@ fn on_stage_error<DB: Database>(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
factory.snapshot_provider().commit()?;
provider_rw.commit()?;

// We unwind because of a validation error. If the unwind itself
Expand Down
8 changes: 5 additions & 3 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
highest_block = block_number;
}

// Committing static file can be done, since we unwind it if the db tx is not committed.
snapshotter.commit()?;

// The stage is "done" if:
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
Expand Down Expand Up @@ -385,6 +382,7 @@ mod tests {
// Check that we only synced around `batch_size` blocks even though the number of blocks
// synced by the previous stage is higher
let output = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint {
Expand Down Expand Up @@ -421,6 +419,7 @@ mod tests {
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
output,
Ok(ExecOutput {
Expand Down Expand Up @@ -458,6 +457,7 @@ mod tests {

// Check that we synced at least 10 blocks
let first_run = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
first_run,
Ok(ExecOutput { checkpoint: StageCheckpoint {
Expand All @@ -478,6 +478,7 @@ mod tests {

// Check that we synced more blocks
let output = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint {
Expand Down Expand Up @@ -518,6 +519,7 @@ mod tests {
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint {
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/stages/finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod tests {
generators::{random_header, random_header_range},
};
use reth_primitives::SealedHeader;
use reth_provider::providers::SnapshotWriter;

stage_test_suite_ext!(FinishTestRunner, finish);

Expand Down
4 changes: 3 additions & 1 deletion crates/stages/src/stages/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,12 @@ fn stage_checkpoint_progress<DB: Database>(
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner,
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
UnwindStageTestRunner,
};
use assert_matches::assert_matches;
use reth_primitives::{stage::StageUnitCheckpoint, Account, U256};
use reth_provider::providers::SnapshotWriter;
use test_utils::*;

stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/stages/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ mod tests {
generators::{random_block_range, random_contract_account_range},
};
use reth_primitives::{stage::StageUnitCheckpoint, Address, SealedBlock, B256, U256};
use reth_provider::providers::SnapshotWriter;

stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);

Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ where
// Append to Headers segment
writer.append_header(header, td, header_hash)?;
}
writer.commit()?;

info!(target: "sync::stages::headers", total = total_headers, "Writing header hash index");

Expand Down Expand Up @@ -566,6 +565,7 @@ mod tests {
runner.send_tip(tip.hash());

let result = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();
assert_matches!( result, Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint {
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/stages/index_account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ mod tests {
generators::{random_block_range, random_changeset_range, random_contract_account_range},
};
use reth_primitives::{address, Address, BlockNumber, B256};
use reth_provider::providers::SnapshotWriter;
use std::collections::BTreeMap;

const ADDRESS: Address = address!("0000000000000000000000000000000000000001");
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/stages/index_storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ mod tests {
generators::{random_block_range, random_changeset_range, random_contract_account_range},
};
use reth_primitives::{address, b256, Address, BlockNumber, StorageEntry, B256, U256};
use reth_provider::providers::SnapshotWriter;
use std::collections::BTreeMap;

const ADDRESS: Address = address!("0000000000000000000000000000000000000001");
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ mod tests {
stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock,
TransactionSigned, B256,
};
use reth_provider::{PruneCheckpointWriter, TransactionsProvider};
use reth_provider::{providers::SnapshotWriter, PruneCheckpointWriter, TransactionsProvider};

use super::*;
use crate::test_utils::{
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ mod tests {
generators::{random_block, random_block_range},
};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, B256};
use reth_provider::TransactionsProvider;
use reth_provider::{providers::SnapshotWriter, TransactionsProvider};
use std::ops::Sub;

// Implement stage test suite.
Expand Down
10 changes: 10 additions & 0 deletions crates/stages/src/test_utils/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ macro_rules! stage_test_suite {

// Run stage execution
let result = runner.execute(input).await;
runner.db().factory.snapshot_provider().commit().unwrap();

// Check that the result is returned and the stage does not panic.
// The return result with empty db is stage-specific.
assert_matches::assert_matches!(result, Ok(_));
Expand Down Expand Up @@ -44,6 +46,8 @@ macro_rules! stage_test_suite {

// Assert the successful result
let result = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();

assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
Expand Down Expand Up @@ -72,6 +76,8 @@ macro_rules! stage_test_suite {

// Run stage unwind
let rx = runner.unwind(input).await;
runner.db().factory.snapshot_provider().commit().unwrap();

assert_matches::assert_matches!(
rx,
Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == input.unwind_to
Expand Down Expand Up @@ -104,6 +110,8 @@ macro_rules! stage_test_suite {

// Assert the successful execution result
let result = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();

assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
Expand Down Expand Up @@ -171,6 +179,8 @@ macro_rules! stage_test_suite_ext {

// Assert the successful result
let result = rx.await.unwrap();
runner.db().factory.snapshot_provider().commit().unwrap();

assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
Expand Down
1 change: 0 additions & 1 deletion crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ impl<DB: Database> DerefMut for DatabaseProviderRW<DB> {
impl<DB: Database> DatabaseProviderRW<DB> {
/// Commit database transaction and snapshot if it exists.
pub fn commit(self) -> ProviderResult<bool> {
self.0.snapshot_provider.commit()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, this is a lot cleaner now imo

self.0.commit()
}

Expand Down
Loading