Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Quickly skip invalid transactions during block authorship. (#9789)
Browse files Browse the repository at this point in the history
* Support skipping invalid transactions in the iterator.

* Expose concrete iterator.

* cargo +nightly fmt --all

* More consistent placement.

* Update Cargo.lock

* Pass transaction to 'report_invalid'
  • Loading branch information
tomusdrw committed Oct 1, 2021
1 parent 540b4fd commit c60ccc0
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 71 deletions.
28 changes: 21 additions & 7 deletions bin/node/bench/src/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc};
use node_primitives::Block;
use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes, Profile};
use sc_transaction_pool_api::{
ImportNotificationStream, PoolFuture, PoolStatus, TransactionFor, TransactionSource,
TransactionStatusStreamFor, TxHash,
ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor,
TransactionSource, TransactionStatusStreamFor, TxHash,
};
use sp_consensus::{Environment, Proposer};
use sp_inherents::InherentDataProvider;
Expand Down Expand Up @@ -216,6 +216,19 @@ impl sc_transaction_pool_api::InPoolTransaction for PoolTransaction {

#[derive(Clone, Debug)]
pub struct Transactions(Vec<Arc<PoolTransaction>>);
pub struct TransactionsIterator(std::vec::IntoIter<Arc<PoolTransaction>>);

impl Iterator for TransactionsIterator {
type Item = Arc<PoolTransaction>;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl ReadyTransactions for TransactionsIterator {
fn report_invalid(&mut self, _tx: &Self::Item) {}
}

impl sc_transaction_pool_api::TransactionPool for Transactions {
type Block = Block;
Expand Down Expand Up @@ -257,16 +270,17 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
_at: NumberFor<Self::Block>,
) -> Pin<
Box<
dyn Future<Output = Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>>
+ Send,
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send,
>,
> {
let iter: Box<dyn Iterator<Item = Arc<PoolTransaction>> + Send> =
Box::new(self.0.clone().into_iter());
let iter: Box<dyn ReadyTransactions<Item = Arc<PoolTransaction>> + Send> =
Box::new(TransactionsIterator(self.0.clone().into_iter()));
Box::pin(futures::future::ready(iter))
}

fn ready(&self) -> Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send> {
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
unimplemented!()
}

Expand Down
8 changes: 6 additions & 2 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ where
let mut t2 =
futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse();

let pending_iterator = select! {
let mut pending_iterator = select! {
res = t1 => res,
_ = t2 => {
log::warn!(
Expand All @@ -363,7 +363,7 @@ where
let mut transaction_pushed = false;
let mut hit_block_size_limit = false;

for pending_tx in pending_iterator {
while let Some(pending_tx) = pending_iterator.next() {
if (self.now)() > deadline {
debug!(
"Consensus deadline reached when pushing block transactions, \
Expand All @@ -378,6 +378,7 @@ where
let block_size =
block_builder.estimate_block_size(self.include_proof_in_block_size_estimation);
if block_size + pending_tx_data.encoded_size() > block_size_limit {
pending_iterator.report_invalid(&pending_tx);
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(
Expand All @@ -400,6 +401,7 @@ where
debug!("[{:?}] Pushed to the block.", pending_tx_hash);
},
Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
pending_iterator.report_invalid(&pending_tx);
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(
Expand All @@ -412,13 +414,15 @@ where
}
},
Err(e) if skipped > 0 => {
pending_iterator.report_invalid(&pending_tx);
trace!(
"[{:?}] Ignoring invalid transaction when skipping: {}",
pending_tx_hash,
e
);
},
Err(e) => {
pending_iterator.report_invalid(&pending_tx);
debug!("[{:?}] Invalid transaction: {}", pending_tx_hash, e);
unqueue_invalid.push(pending_tx_hash);
},
Expand Down
28 changes: 25 additions & 3 deletions client/transaction-pool/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,14 @@ pub trait TransactionPool: Send + Sync {
at: NumberFor<Self::Block>,
) -> Pin<
Box<
dyn Future<Output = Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>>
+ Send,
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send,
>,
>;

/// Get an iterator for ready transactions ordered by priority.
fn ready(&self) -> Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>;
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;

// *** Block production
/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
Expand All @@ -254,6 +255,27 @@ pub trait TransactionPool: Send + Sync {
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
}

/// An iterator of ready transactions.
///
/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
/// last-returned element as invalid.
///
/// The implementation is then allowed, for performance reasons, to change the elements
/// returned next, by e.g. skipping elements that are known to depend on the reported
/// transaction, which yields them invalid as well.
pub trait ReadyTransactions: Iterator {
/// Report given transaction as invalid.
///
/// This might affect subsequent elements returned by the iterator, so dependent transactions
/// are skipped for performance reasons.
fn report_invalid(&mut self, _tx: &Self::Item);
}

/// A no-op implementation for an empty iterator.
impl<T> ReadyTransactions for std::iter::Empty<T> {
fn report_invalid(&mut self, _tx: &T) {}
}

/// Events that the transaction pool listens for.
pub enum ChainEvent<B: BlockT> {
/// New best block have been added to the chain
Expand Down
39 changes: 0 additions & 39 deletions client/transaction-pool/graph/Cargo.toml

This file was deleted.

4 changes: 2 additions & 2 deletions client/transaction-pool/src/graph/base_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use sp_runtime::{

use super::{
future::{FutureTransactions, WaitingTransaction},
ready::ReadyTransactions,
ready::{BestIterator, ReadyTransactions},
};

/// Successful import result.
Expand Down Expand Up @@ -355,7 +355,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
}

/// Returns an iterator over ready transactions in the pool.
pub fn ready(&self) -> impl Iterator<Item = Arc<Transaction<Hash, Ex>>> {
pub fn ready(&self) -> BestIterator<Hash, Ex> {
self.ready.get()
}

Expand Down
114 changes: 100 additions & 14 deletions client/transaction-pool/src/graph/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::Arc,
};

use log::trace;
use log::{debug, trace};
use sc_transaction_pool_api::error;
use serde::Serialize;
use sp_runtime::{traits::Member, transaction_validity::TransactionTag as Tag};
Expand Down Expand Up @@ -156,11 +156,16 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
/// - transactions that are valid for a shorter time go first
/// 4. Lastly we sort by the time in the queue
/// - transactions that are longer in the queue go first
pub fn get(&self) -> impl Iterator<Item = Arc<Transaction<Hash, Ex>>> {
///
/// The iterator is providing a way to report transactions that the receiver considers invalid.
/// In such case the entire subgraph of transactions that depend on the reported one will be
/// skipped.
pub fn get(&self) -> BestIterator<Hash, Ex> {
BestIterator {
all: self.ready.clone(),
best: self.best.clone(),
awaiting: Default::default(),
invalid: Default::default(),
}
}

Expand Down Expand Up @@ -482,6 +487,7 @@ pub struct BestIterator<Hash, Ex> {
all: ReadOnlyTrackedMap<Hash, ReadyTx<Hash, Ex>>,
awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
invalid: HashSet<Hash>,
}

impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
Expand All @@ -498,15 +504,54 @@ impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
}
}

impl<Hash: hash::Hash + Member, Ex> sc_transaction_pool_api::ReadyTransactions
for BestIterator<Hash, Ex>
{
fn report_invalid(&mut self, tx: &Self::Item) {
BestIterator::report_invalid(self, tx)
}
}

impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
/// Report given transaction as invalid.
///
/// As a consequence, all values that depend on the invalid one will be skipped.
/// When given transaction is not in the pool it has no effect.
/// When invoked on a fully drained iterator it has no effect either.
pub fn report_invalid(&mut self, tx: &Arc<Transaction<Hash, Ex>>) {
if let Some(to_report) = self.all.read().get(&tx.hash) {
debug!(
target: "txpool",
"[{:?}] Reported as invalid. Will skip sub-chains while iterating.",
to_report.transaction.transaction.hash
);
for hash in &to_report.unlocks {
self.invalid.insert(hash.clone());
}
}
}
}

impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
type Item = Arc<Transaction<Hash, Ex>>;

fn next(&mut self) -> Option<Self::Item> {
loop {
let best = self.best.iter().next_back()?.clone();
let best = self.best.take(&best)?;
let hash = &best.transaction.hash;

// Check if the transaction was marked invalid.
if self.invalid.contains(hash) {
debug!(
target: "txpool",
"[{:?}] Skipping invalid child transaction while iterating.",
hash
);
continue
}

let next = self.all.read().get(&best.transaction.hash).cloned();
let next = self.all.read().get(hash).cloned();
let ready = match next {
Some(ready) => ready,
// The transaction is not in all, maybe it was removed in the meantime?
Expand Down Expand Up @@ -635,10 +680,13 @@ mod tests {
assert_eq!(ready.get().count(), 3);
}

#[test]
fn should_return_best_transactions_in_correct_order() {
// given
let mut ready = ReadyTransactions::default();
/// Populate the pool, with a graph that looks like so:
///
/// tx1 -> tx2 \
/// -> -> tx3
/// -> tx4 -> tx5 -> tx6
/// -> tx7
fn populate_pool(ready: &mut ReadyTransactions<u64, Vec<u8>>) {
let mut tx1 = tx(1);
tx1.requires.clear();
let mut tx2 = tx(2);
Expand All @@ -649,11 +697,17 @@ mod tests {
tx3.provides = vec![];
let mut tx4 = tx(4);
tx4.requires = vec![tx1.provides[0].clone()];
tx4.provides = vec![];
let tx5 = Transaction {
data: vec![5],
tx4.provides = vec![vec![107]];
let mut tx5 = tx(5);
tx5.requires = vec![tx4.provides[0].clone()];
tx5.provides = vec![vec![108]];
let mut tx6 = tx(6);
tx6.requires = vec![tx5.provides[0].clone()];
tx6.provides = vec![];
let tx7 = Transaction {
data: vec![7],
bytes: 1,
hash: 5,
hash: 7,
priority: 1,
valid_till: u64::MAX, // use the max here for testing.
requires: vec![tx1.provides[0].clone()],
Expand All @@ -663,20 +717,30 @@ mod tests {
};

// when
for tx in vec![tx1, tx2, tx3, tx4, tx5] {
import(&mut ready, tx).unwrap();
for tx in vec![tx1, tx2, tx3, tx7, tx4, tx5, tx6] {
import(ready, tx).unwrap();
}

// then
assert_eq!(ready.best.len(), 1);
}

#[test]
fn should_return_best_transactions_in_correct_order() {
// given
let mut ready = ReadyTransactions::default();
populate_pool(&mut ready);

// when
let mut it = ready.get().map(|tx| tx.data[0]);

// then
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(2));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(5));
assert_eq!(it.next(), Some(6));
assert_eq!(it.next(), Some(7));
assert_eq!(it.next(), None);
}

Expand Down Expand Up @@ -725,4 +789,26 @@ mod tests {
TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
);
}

#[test]
fn should_skip_invalid_transactions_while_iterating() {
// given
let mut ready = ReadyTransactions::default();
populate_pool(&mut ready);

// when
let mut it = ready.get();
let data = |tx: &Arc<Transaction<u64, Vec<u8>>>| tx.data[0];

// then
assert_eq!(it.next().as_ref().map(data), Some(1));
assert_eq!(it.next().as_ref().map(data), Some(2));
assert_eq!(it.next().as_ref().map(data), Some(3));
let tx4 = it.next();
assert_eq!(tx4.as_ref().map(data), Some(4));
// report 4 as invalid, which should skip 5 & 6.
it.report_invalid(&tx4.unwrap());
assert_eq!(it.next().as_ref().map(data), Some(7));
assert_eq!(it.next().as_ref().map(data), None);
}
}
Loading

0 comments on commit c60ccc0

Please sign in to comment.