Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pruner): respect batch size per run #4246

Merged
merged 25 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cf670f2
feat(pruner): respect batch size per run
shekhirin Aug 17, 2023
689d267
new batch logic
shekhirin Aug 18, 2023
d9115be
fix engine prune event
shekhirin Aug 18, 2023
bbf5c0c
more trace logs
shekhirin Aug 18, 2023
adffcf6
even more logs
shekhirin Aug 18, 2023
8fa4757
fix short-circuit
shekhirin Aug 18, 2023
80a53ef
improve log fields
shekhirin Aug 18, 2023
1d149e0
increase batch sizes to 1000
shekhirin Aug 18, 2023
54a67f6
add comments
shekhirin Aug 18, 2023
4939ab9
Merge remote-tracking branch 'origin/main' into alexey/pruner-batch-size
shekhirin Aug 21, 2023
8451bed
fix stage checkpoint tests
shekhirin Aug 21, 2023
c10eeb5
fixes of receipts by logs pruner checkpoint
shekhirin Aug 21, 2023
df14e1e
return early if receipts by logs block range is empty
shekhirin Aug 22, 2023
0498b18
Revert "return early if receipts by logs block range is empty"
shekhirin Aug 22, 2023
a092d72
check empty tx range
shekhirin Aug 22, 2023
bedb5a4
check tip block number
shekhirin Aug 22, 2023
1ac798b
Merge remote-tracking branch 'origin/main' into alexey/pruner-batch-size
shekhirin Aug 22, 2023
08f71e2
add another log for contracts by logs pruning
shekhirin Aug 22, 2023
b941ed4
prune receipts by logs with range instead of iterator
shekhirin Aug 22, 2023
9b4925a
Merge remote-tracking branch 'origin/main' into alexey/pruner-batch-size
shekhirin Aug 22, 2023
de2d739
Merge remote-tracking branch 'origin/main' into alexey/pruner-batch-size
shekhirin Aug 22, 2023
d4e760a
fix checkpoints for contract log filtering on live sync
joshieDo Aug 22, 2023
a042bf6
fix lint
shekhirin Aug 23, 2023
7d43486
fixes after review
shekhirin Aug 23, 2023
407bdcc
Merge remote-tracking branch 'origin/main' into alexey/pruner-batch-size
shekhirin Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/primitives/src/prune/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ impl ContractLogsPruneConfig {
// the BTreeMap (block = 0), otherwise it will be excluded.
// Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all
// other receipts.
//
// Reminder, that we increment because the [`BlockNumber`] key of the new map should be
// viewed as `PruneMode::Before(block)`
let block = (pruned_block + 1).max(
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
.map(|(block, _)| block)
.unwrap_or_default(),
.unwrap_or_default() +
1,
);

map.entry(block).or_insert_with(Vec::new).push(address)
Expand Down
235 changes: 168 additions & 67 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,25 @@
.map(|(bn, _)| bn)
.unwrap_or_default();

// Figure out what receipts have already been pruned, so we can have an accurate
// `address_filter`
let pruned_block = provider
// Get status checkpoint from latest run
let mut last_pruned_block = provider
.get_prune_checkpoint(PrunePart::ContractLogs)?
.and_then(|checkpoint| checkpoint.block_number);

let initial_last_pruned_block = last_pruned_block;

let mut from_tx_number = match initial_last_pruned_block {
Some(block) => provider
.block_body_indices(block)?
.map(|block| block.last_tx_num() + 1)
.unwrap_or(0),
None => 0,
};

// Figure out what receipts have already been pruned, so we can have an accurate
// `address_filter`
let address_filter =
self.modes.contract_logs_filter.group_by_block(tip_block_number, pruned_block)?;
self.modes.contract_logs_filter.group_by_block(tip_block_number, last_pruned_block)?;

// Splits all transactions in different block ranges. Each block range will have its own
// filter address list and will check it while going through the table
Expand Down Expand Up @@ -455,9 +466,13 @@
while let Some((start_block, addresses)) = blocks_iter.next() {
filtered_addresses.extend_from_slice(addresses);

// This will clear all receipts before the first appearance of a contract log
// This will clear all receipts before the first appearance of a contract log or since
// the block after the last pruned one.
if block_ranges.is_empty() {
block_ranges.push((0, *start_block - 1, 0));
let init = last_pruned_block.and_then(|b| Some(b + 1)).unwrap_or_default();

Check failure on line 472 in crates/prune/src/pruner.rs

View workflow job for this annotation

GitHub Actions / clippy

using `Option.and_then(|x| Some(y))`, which is more succinctly expressed as `map(|x| y)`

error: using `Option.and_then(|x| Some(y))`, which is more succinctly expressed as `map(|x| y)` --> crates/prune/src/pruner.rs:472:28 | 472 | let init = last_pruned_block.and_then(|b| Some(b + 1)).unwrap_or_default(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try: `last_pruned_block.map(|b| b + 1)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#bind_instead_of_map = note: `-D clippy::bind-instead-of-map` implied by `-D warnings`
if init <= *start_block - 1 {

Check failure on line 473 in crates/prune/src/pruner.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary `>= y + 1` or `x - 1 >=`

error: unnecessary `>= y + 1` or `x - 1 >=` --> crates/prune/src/pruner.rs:473:20 | 473 | if init <= *start_block - 1 { | ^^^^^^^^^^^^^^^^^^^^^^^^ help: change it to: `init < *start_block` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#int_plus_one = note: `-D clippy::int-plus-one` implied by `-D warnings`
block_ranges.push((init, *start_block - 1, 0));
}
}

let end_block =
Expand All @@ -477,16 +492,13 @@

let mut limit = self.batch_sizes.receipts;
let mut done = true;
let mut last_pruned_block = None;
let mut last_pruned_transaction = None;
for (start_block, end_block, num_addresses) in block_ranges {
let block_range = start_block..=end_block;
let tx_range = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::ContractLogs,
end_block,
)? {
Some(range) => range,

// Calculate the transaction range from this block range
let tx_range_end = match provider.block_body_indices(end_block)? {
Some(body) => body.last_tx_num(),
None => {
trace!(
target: "pruner",
Expand All @@ -496,50 +508,52 @@
continue
}
};
let tx_range_end = *tx_range.end();
let tx_range = from_tx_number..=tx_range_end;

last_pruned_transaction = Some(tx_range_end);
// Delete receipts, except the ones in the inclusion list
let mut last_skipped_transaction = 0;
let deleted;
(deleted, done) = provider.prune_table_with_range::<tables::Receipts>(
tx_range,
limit,
|receipt| {
num_addresses > 0 &&
|(tx_num, receipt)| {
let skip = num_addresses > 0 &&
receipt.logs.iter().any(|log| {
filtered_addresses[..num_addresses].contains(&&log.address)
})
});

if skip {
last_skipped_transaction = *tx_num;
}
skip
},
|row| last_pruned_transaction = Some(row.0),
)?;
trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts");

limit = limit.saturating_sub(deleted);

last_pruned_block = provider
.transaction_block(last_pruned_transaction.unwrap())?
.ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
// If there's more receipts to prune, set the checkpoint block number to previous,
// so we could finish pruning its receipts on the next run.
.checked_sub(if done { 0 } else { 1 });

// If this is the last block range, avoid writing an unused checkpoint
if last_pruned_block != Some(to_block) {
// This allows us to query for the transactions in the next block range with
// [`get_next_tx_num_range_from_checkpoint`]. It's just a temporary intermediate
// checkpoint, which should be adjusted in the end.
provider.save_prune_checkpoint(
PrunePart::ContractLogs,
PruneCheckpoint {
block_number: last_pruned_block,
tx_number: last_pruned_transaction,
prune_mode: PruneMode::Before(end_block + 1),
},
)?;
}
// For accurate checkpoints we need to know that we have checked every transaction.
// Example: we reached the end of the range, and the last receipt is supposed to skip
// its deletion.
last_pruned_transaction =
Some(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
last_pruned_block = Some(
provider
.transaction_block(last_pruned_transaction.expect("qed"))?
.ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
// If there's more receipts to prune, set the checkpoint block number to
// previous, so we could finish pruning its receipts on the
// next run.
.saturating_sub(if done { 0 } else { 1 }),
);

if limit == 0 {
done &= end_block == to_block;
break
}

from_tx_number = last_pruned_transaction.expect("qed") + 1;
}

// If there are contracts using `PruneMode::Distance(_)` there will be receipts before
Expand All @@ -548,41 +562,23 @@
// This ensures that in future pruner runs we can prune all these receipts between the
// previous `lowest_block_with_distance` and the new one using
// `get_next_tx_num_range_from_checkpoint`.
//
// Only applies if we were able to prune everything intended for this run, otherwise the
// checkpoing is the `last_pruned_block`.
let prune_mode_block = self
.modes
.contract_logs_filter
.lowest_block_with_distance(tip_block_number, pruned_block)?
.lowest_block_with_distance(tip_block_number, initial_last_pruned_block)?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct? shouldn't it be last_pruned_block?

Copy link
Collaborator

@joshieDo joshieDo Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be correct, this argument works as a lower bound.

if we set it to last_pruned_block something of the kind would happen:

example:

  • (distance(200), addrA)
  • contract log pruner goes until distance(128).
  • last_pruned_block would be the block from distance(128)

Which would mean that we'd never go through the blocks between distance(200) -> distance(128) for the tip of this particular run. However, for future runs with higher tips, they should get checked and cleaned-up

.unwrap_or(to_block);

let (checkpoint_block, checkpoint_transaction) = if done {
let Some(prune_mode_block) = prune_mode_block.checked_sub(1) else { return Ok(done) };
(
Some(prune_mode_block),
provider
.block_body_indices(prune_mode_block)?
.ok_or(PrunerError::InconsistentData(
"Block body indices for prune mode block not found",
))?
.last_tx_num(),
)
} else {
(
last_pruned_block,
last_pruned_transaction.ok_or(PrunerError::InconsistentData(
"Last pruned transaction is not present",
))?,
)
};

provider.save_prune_checkpoint(
PrunePart::ContractLogs,
PruneCheckpoint {
block_number: checkpoint_block,
tx_number: Some(checkpoint_transaction),
block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))),
tx_number: last_pruned_transaction,
prune_mode: PruneMode::Before(prune_mode_block),
},
)?;

Ok(done)
}

Expand Down Expand Up @@ -916,17 +912,22 @@
FoldWhile::{Continue, Done},
Itertools,
};
use reth_db::{tables, test_utils::create_test_rw_db, BlockNumberList};
use reth_db::{
cursor::DbCursorRO, tables, test_utils::create_test_rw_db, transaction::DbTx,
BlockNumberList,
};
use reth_interfaces::test_utils::{
generators,
generators::{
random_block_range, random_changeset_range, random_eoa_account_range, random_receipt,
random_block_range, random_changeset_range, random_eoa_account,
random_eoa_account_range, random_log, random_receipt,
},
};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, H256, MAINNET,
BlockNumber, ContractLogsPruneConfig, PruneCheckpoint, PruneMode, PruneModes, PrunePart,
TxNumber, H256, MAINNET,
};
use reth_provider::PruneCheckpointReader;
use reth_provider::{PruneCheckpointReader, TransactionsProvider};
use reth_stages::test_utils::TestTransaction;
use std::{collections::BTreeMap, ops::AddAssign};

Expand Down Expand Up @@ -1502,4 +1503,104 @@
test_prune(2300, 2, true);
test_prune(3000, 3, true);
}

#[test]
fn prune_receipts_by_logs() {
let tx = TestTransaction::default();
let mut rng = generators::rng();

let tip = 300;
let blocks = random_block_range(&mut rng, 0..=tip, H256::zero(), 1..5);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");

let mut receipts = Vec::new();

let (deposit_contract_addr, _) = random_eoa_account(&mut rng);
for block in &blocks {
assert!(!block.body.is_empty());
for (txi, transaction) in block.body.iter().enumerate() {
let mut receipt = random_receipt(&mut rng, transaction, Some(1));
receipt.logs.push(random_log(
&mut rng,
if txi == (block.body.len() - 1) { Some(deposit_contract_addr) } else { None },
Some(1),
));
receipts.push((receipts.len() as u64, receipt));
}
}
tx.insert_receipts(receipts).expect("insert receipts");

assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::Receipts>().unwrap().len()
);

let run_prune = || {
let provider = tx.inner_rw();

let prune_before_block: usize = 20;
let prune_mode = PruneMode::Before(prune_before_block as u64);
let contract_logs_filter =
ContractLogsPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
let pruner = Pruner::new(
tx.inner_raw(),
MAINNET.clone(),
5,
PruneModes {
contract_logs_filter: contract_logs_filter.clone(),
..Default::default()
},
BatchSizes {
// Less than total amount of blocks to prune to test the batching logic
receipts: 10,
..Default::default()
},
);

let result = pruner.prune_receipts_by_logs(&provider, tip);
assert_matches!(result, Ok(_));
let done = result.unwrap();
provider.commit().expect("commit");

let (pruned_block, pruned_tx) = tx
.inner()
.get_prune_checkpoint(PrunePart::ContractLogs)
.unwrap()
.and_then(|checkpoint| {
Some((checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap()))
})
.unwrap_or_default();

// All receipts are in the end of the block
let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1);

assert_eq!(
tx.table::<tables::Receipts>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>() -
((pruned_tx + 1) - unprunable) as usize
);

return done
};

while !run_prune() {}

let provider = tx.inner();
let mut cursor = provider.tx_ref().cursor_read::<tables::Receipts>().unwrap();
let walker = cursor.walk(None).unwrap();
for receipt in walker {
let (tx_num, receipt) = receipt.unwrap();

// Either we only find our contract, or the receipt is part of the unprunable receipts
// set by tip - 128
assert!(
receipt.logs.iter().any(|l| l.address == deposit_contract_addr) ||
provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128,
);
}
}
}
4 changes: 2 additions & 2 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,15 +659,15 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&self,
keys: impl RangeBounds<T::Key> + Clone + Debug,
limit: usize,
skip_filter: impl Fn(&T::Value) -> bool,
mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
mut delete_callback: impl FnMut(TableRow<T>),
) -> std::result::Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut walker = cursor.walk_range(keys.clone())?;
let mut deleted = 0;

while let Some(row) = walker.next().transpose()? {
if !skip_filter(&row.1) {
if !skip_filter(&row) {
walker.delete_current()?;
deleted += 1;
delete_callback(row);
Expand Down
Loading