Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Transaction pool: Ensure that we prune transactions properly #8963

Merged
merged 5 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion client/transaction-pool/graph/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use sc_transaction_graph::*;
use codec::Encode;
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId};
use sp_runtime::{
generic::BlockId,
generic::BlockId, traits::Block as BlockT,
transaction_validity::{
ValidTransaction, InvalidTransaction, TransactionValidity, TransactionTag as Tag,
TransactionSource,
Expand Down Expand Up @@ -114,6 +114,13 @@ impl ChainApi for TestApi {
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(Ok(None))
}

fn block_header(
&self,
_: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
Ok(None)
}
}

fn uxt(transfer: Transfer) -> Extrinsic {
Expand Down
10 changes: 3 additions & 7 deletions client/transaction-pool/graph/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{

use linked_hash_map::LinkedHashMap;
use serde::Serialize;
use log::{debug, trace, warn};
use log::{debug, trace};
use sp_runtime::traits;

use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
Expand Down Expand Up @@ -99,12 +99,8 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
}

/// Transaction was removed as invalid.
pub fn invalid(&mut self, tx: &H, warn: bool) {
if warn {
warn!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
} else {
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
}
Comment on lines -103 to -107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning here was meant to indicate more serious pruning issue or pool inconsistency - it was triggered when we detected invalid transaction during block production.

I understand that warning is not desirable cause it is not actionable by node operator, but I'd still differentiate the log message depending on the fact when we detected the transaction to be invalid.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody ever looked at these warnings, especially as they were also triggered by re-validating inherents for example.

If we want to improve this, we should print proper messages where this function is called, but not down here where we have no idea what actually happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody ever looked at these warnings

Indeed. The problem is that these warnings are not some rare event but get printed extremely often. At the beginning people were concerned, but over time they have learned to completely ignore them.

pub fn invalid(&mut self, tx: &H) {
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
self.fire(tx, |watcher| watcher.invalid());
}

Expand Down
15 changes: 14 additions & 1 deletion client/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ pub trait ChainApi: Send + Sync {

/// Returns a block body given the block id.
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;

/// Returns a block header given the block id.
fn block_header(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
}

/// Pool configuration options.
Expand Down Expand Up @@ -237,7 +243,7 @@ impl<B: ChainApi> Pool<B> {
) -> Result<(), B::Error> {
// Get details of all extrinsics that are already in the pool
let in_pool_tags = self.validated_pool.extrinsics_tags(hashes)
.into_iter().filter_map(|x| x).flat_map(|x| x);
.into_iter().filter_map(|x| x).flatten();

// Prune all transactions that provide given tags
let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
Expand Down Expand Up @@ -579,6 +585,13 @@ mod tests {
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
futures::future::ready(Ok(None))
}

fn block_header(
&self,
_: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
Ok(None)
}
}

fn uxt(transfer: Transfer) -> Extrinsic {
Expand Down
26 changes: 11 additions & 15 deletions client/transaction-pool/graph/src/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl<B: ChainApi> ValidatedPool<B> {
Err(err)
},
ValidatedTransaction::Unknown(hash, err) => {
self.listener.write().invalid(&hash, false);
self.listener.write().invalid(&hash);
Err(err)
},
}
Expand Down Expand Up @@ -415,18 +415,20 @@ impl<B: ChainApi> ValidatedPool<B> {
Status::Future => listener.future(&hash),
Status::Ready => listener.ready(&hash, None),
Status::Dropped => listener.dropped(&hash, None),
Status::Failed => listener.invalid(&hash, initial_status.is_some()),
Status::Failed => listener.invalid(&hash),
}
}
}
}

/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
self.pool.read().by_hashes(&hashes)
self.pool.read()
.by_hashes(&hashes)
.into_iter()
.map(|existing_in_pool| existing_in_pool
.map(|transaction| transaction.provides.to_vec()))
.map(|existing_in_pool|
existing_in_pool.map(|transaction| transaction.provides.to_vec())
)
.collect()
}

Expand Down Expand Up @@ -599,7 +601,7 @@ impl<B: ChainApi> ValidatedPool<B> {

let mut listener = self.listener.write();
for tx in &invalid {
listener.invalid(&tx.hash, true);
listener.invalid(&tx.hash);
}

invalid
Expand Down Expand Up @@ -645,15 +647,9 @@ fn fire_events<H, B, Ex>(
match *imported {
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
listener.ready(hash, None);
for f in failed {
listener.invalid(f, true);
}
for r in removed {
listener.dropped(&r.hash, Some(hash));
}
for p in promoted {
listener.ready(p, None);
}
failed.into_iter().for_each(|f| listener.invalid(f));
removed.into_iter().for_each(|r| listener.dropped(&r.hash, Some(hash)));
promoted.into_iter().for_each(|p| listener.ready(p, None));
},
base::Imported::Future { ref hash } => {
listener.future(hash)
Expand Down
20 changes: 17 additions & 3 deletions client/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<Client, Block> FullChainApi<Client, Block> {
impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
{
Expand Down Expand Up @@ -150,6 +150,13 @@ where
(<traits::HashFor::<Block> as traits::Hash>::hash(x), x.len())
})
}

fn block_header(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
self.client.header(*at).map_err(Into::into)
}
}

/// Helper function to validate a transaction using a full chain API.
Expand All @@ -162,7 +169,7 @@ fn validate_transaction_blocking<Client, Block>(
) -> error::Result<TransactionValidity>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
{
Expand Down Expand Up @@ -193,7 +200,7 @@ where
impl<Client, Block> FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
{
Expand Down Expand Up @@ -333,4 +340,11 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for
Ok(Some(transactions))
}.boxed()
}

fn block_header(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
self.client.header(*at).map_err(Into::into)
}
}
25 changes: 20 additions & 5 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use parking_lot::Mutex;

use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero},
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero, Header as HeaderT},
};
use sp_core::traits::SpawnNamed;
use sp_transaction_pool::{
Expand Down Expand Up @@ -357,6 +357,7 @@ where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sc_client_api::blockchain::HeaderBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: sc_client_api::ExecutorProvider<Block> + Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
Expand Down Expand Up @@ -387,6 +388,7 @@ where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sc_client_api::blockchain::HeaderBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
Expand Down Expand Up @@ -523,19 +525,32 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: ChainApi<Block = Block>>(
api: &Api,
pool: &sc_transaction_graph::Pool<Api>,
) -> Vec<ExtrinsicHash<Api>> {
let hashes = api.block_body(&block_id).await
let extrinsics = api.block_body(&block_id).await
.unwrap_or_else(|e| {
log::warn!("Prune known transactions: error request {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.unwrap_or_default();

let hashes = extrinsics.iter()
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();

log::trace!(target: "txpool", "Pruning transactions: {:?}", hashes);

if let Err(e) = pool.prune_known(&block_id, &hashes) {
let header = match api.block_header(&block_id) {
Ok(Some(h)) => h,
Ok(None) => {
log::debug!(target: "txpool", "Could not find header for {:?}.", block_id);
return hashes
},
Err(e) => {
log::debug!(target: "txpool", "Error retrieving header for {:?}: {:?}", block_id, e);
return hashes
}
};

if let Err(e) = pool.prune(&block_id, &BlockId::hash(*header.parent_hash()), &extrinsics).await {
log::error!("Cannot prune known in the pool {:?}!", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the error line be updated to not say prune known?

}

Expand Down
113 changes: 78 additions & 35 deletions client/transaction-pool/src/testing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,6 @@ fn should_not_retain_invalid_hashes_from_retracted() {
assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_revalidate_transaction_multiple_times() {
let xt = uxt(Alice, 209);

let (pool, _guard, mut notifier) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

let header = pool.api.push_block(1, vec![xt.clone()], true);

block_on(pool.maintain(block_event(header)));

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

let header = pool.api.push_block(2, vec![], true);
pool.api.add_invalid(&xt);

block_on(pool.maintain(block_event(header)));
block_on(notifier.next());

assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_revalidate_across_many_blocks() {
let xt1 = uxt(Alice, 209);
Expand Down Expand Up @@ -1002,21 +977,13 @@ fn pruning_a_transaction_should_remove_it_from_best_transaction() {
let xt1 = Extrinsic::IncludeData(Vec::new());

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
let header = pool.api.push_block(1, vec![xt1.clone()], true);

// This will prune `xt1`.
block_on(pool.maintain(block_event(header)));

// Submit the tx again.
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("2. Imported");

let mut iterator = block_on(pool.ready_at(1));

assert_eq!(iterator.next().unwrap().data, xt1.clone());

// If the tx was not removed from the best txs, the tx would be
// returned a second time by the iterator.
assert!(iterator.next().is_none());
assert_eq!(pool.status().ready, 0);
}

#[test]
Expand All @@ -1038,3 +1005,79 @@ fn only_revalidate_on_best_block() {

assert_eq!(pool.status().ready, 1);
}

#[test]
fn stale_transactions_are_pruned() {
sp_tracing::try_init_simple();

// Our initial transactions
let xts = vec![
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 1,
amount: 1,
},
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 2,
amount: 1,
},
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 3,
amount: 1,
},
];

let (pool, _guard, _notifier) = maintained_pool();

xts.into_iter().for_each(|xt| {
block_on(
pool.submit_one(&BlockId::number(0), SOURCE, xt.into_signed_tx()),
).expect("1. Imported");
});
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 3);

// Almost the same as our initial transactions, but with some different `amount`s to make them
// generate a different hash
let xts = vec![
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 1,
amount: 2,
}.into_signed_tx(),
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 2,
amount: 2,
}.into_signed_tx(),
Transfer {
from: Alice.into(),
to: Bob.into(),
nonce: 3,
amount: 2,
}.into_signed_tx(),
];

// Import block
let header = pool.api.push_block(1, xts, true);
block_on(pool.maintain(block_event(header)));
// The imported transactions have a different hash and should not evict our initial
// transactions.
assert_eq!(pool.status().future, 3);

// Import enough blocks to make our transactions stale
for n in 1..66 {
let header = pool.api.push_block(n, vec![], true);
block_on(pool.maintain(block_event(header)));
}

assert_eq!(pool.status().future, 0);
assert_eq!(pool.status().ready, 0);
}
Loading