Skip to content

Commit

Permalink
move hihgest snapshot channel inside snapshotter
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Nov 2, 2023
1 parent 113ef23 commit 73fd00f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
23 changes: 13 additions & 10 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,16 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
// fetch the head block from the database
let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?;

// configure snapshotter
let snapshotter = reth_snapshot::Snapshotter::new(
db.clone(),
self.chain.clone(),
self.chain.snapshot_block_interval,
);

// setup the blockchain provider
let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None);
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain))
.with_snapshots(highest_snapshots_rx.clone());
.with_snapshots(snapshotter.highest_snapshot_receiver());
let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?;
let blob_store = InMemoryBlobStore::default();
let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain))
Expand Down Expand Up @@ -455,7 +461,11 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let mut hooks = EngineHooks::new();

let pruner_events = if let Some(prune_config) = prune_config {
let mut pruner = self.build_pruner(&prune_config, db.clone(), highest_snapshots_rx);
let mut pruner = self.build_pruner(
&prune_config,
db.clone(),
snapshotter.highest_snapshot_receiver(),
);

let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
Expand All @@ -466,13 +476,6 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Either::Right(stream::empty())
};

let _snapshotter = reth_snapshot::Snapshotter::new(
db,
self.chain.clone(),
self.chain.snapshot_block_interval,
highest_snapshots_tx,
);

// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
client,
Expand Down
27 changes: 14 additions & 13 deletions crates/snapshot/src/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,9 @@ impl SnapshotTargets {

impl<DB: Database> Snapshotter<DB> {
/// Creates a new [Snapshotter].
pub fn new(
db: DB,
chain_spec: Arc<ChainSpec>,
block_interval: u64,
highest_snapshots_tracker: watch::Sender<Option<HighestSnapshots>>,
) -> Self {
pub fn new(db: DB, chain_spec: Arc<ChainSpec>, block_interval: u64) -> Self {
let (highest_snapshots_tracker, _) = watch::channel(None);

let snapshotter = Self {
provider_factory: ProviderFactory::new(db, chain_spec),
// TODO(alexey): fill from on-disk snapshot data
Expand Down Expand Up @@ -112,6 +109,11 @@ impl<DB: Database> Snapshotter<DB> {
});
}

/// Returns a new [`HighestSnapshotsTracker`].
pub fn highest_snapshot_receiver(&self) -> HighestSnapshotsTracker {
self.highest_snapshots_tracker.subscribe()
}

/// Run the snapshotter
pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult {
debug_assert!(targets.is_multiple_of_block_interval(self.block_interval));
Expand Down Expand Up @@ -234,17 +236,17 @@ mod tests {
};
use reth_primitives::{snapshot::HighestSnapshots, B256, MAINNET};
use reth_stages::test_utils::TestTransaction;
use tokio::sync::watch;

#[test]
fn new() {
let tx = TestTransaction::default();

let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None);
assert_eq!(*highest_snapshots_rx.borrow(), None);
let snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2);

Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, highest_snapshots_tx);
assert_eq!(*highest_snapshots_rx.borrow(), Some(HighestSnapshots::default()));
assert_eq!(
*snapshotter.highest_snapshot_receiver().borrow(),
Some(HighestSnapshots::default())
);
}

#[test]
Expand All @@ -255,8 +257,7 @@ mod tests {
let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");

let mut snapshotter =
Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, watch::channel(None).0);
let mut snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2);

// Snapshot targets has data per part up to the passed finalized block number,
// respecting the block interval
Expand Down

0 comments on commit 73fd00f

Please sign in to comment.