Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Account for transaction priority when enforcing limits
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Nov 30, 2021
1 parent e2a89b2 commit 6011a11
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 16 deletions.
38 changes: 23 additions & 15 deletions client/transaction-pool/src/graph/base_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//!
//! For a more full-featured pool, have a look at the `pool` module.

use std::{collections::HashSet, fmt, hash, sync::Arc};
use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc};

use log::{debug, trace, warn};
use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
Expand All @@ -36,7 +36,7 @@ use sp_runtime::{

use super::{
future::{FutureTransactions, WaitingTransaction},
ready::{BestIterator, ReadyTransactions},
ready::{BestIterator, ReadyTransactions, TransactionRef},
};

/// Successful import result.
Expand Down Expand Up @@ -385,7 +385,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
/// Removes and returns worst transactions from the queues and all transactions that depend on
/// them. Technically the worst transaction should be evaluated by computing the entire pending
/// set. We use a simplified approach to remove the transaction that occupies the pool for the
/// longest time.
/// longest time or has the lowest priority.
pub fn enforce_limits(
&mut self,
ready: &Limit,
Expand All @@ -395,33 +395,41 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,

while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
// find the worst transaction
let minimal = self.ready.fold(|minimal, current| {
let worst = self.ready.fold::<TransactionRef<Hash, Ex>, _>(|worst, current| {
let transaction = &current.transaction;
match minimal {
None => Some(transaction.clone()),
Some(ref tx) if tx.insertion_id > transaction.insertion_id =>
Some(transaction.clone()),
other => other,
}
worst
.map(|worst| {
match worst.transaction.priority.cmp(&transaction.transaction.priority) {
Ordering::Less => worst,
Ordering::Equal =>
if worst.insertion_id > transaction.insertion_id {
transaction.clone()
} else {
worst
},
Ordering::Greater => transaction.clone(),
}
})
.or_else(|| Some(transaction.clone()))
});

if let Some(minimal) = minimal {
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
if let Some(worst) = worst {
removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
} else {
break
}
}

while future.is_exceeded(self.future.len(), self.future.bytes()) {
// find the worst transaction
let minimal = self.future.fold(|minimal, current| match minimal {
let worst = self.future.fold(|worst, current| match worst {
None => Some(current.clone()),
Some(ref tx) if tx.imported_at > current.imported_at => Some(current.clone()),
other => other,
});

if let Some(minimal) = minimal {
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
if let Some(worst) = worst {
removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
} else {
break
}
Expand Down
70 changes: 69 additions & 1 deletion client/transaction-pool/src/graph/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,13 @@ mod tests {
longevity: 9001,
propagate: false,
}),
Extrinsic::Store(_) => Ok(ValidTransaction {
priority: 9001,
requires: vec![],
provides: vec![vec![43]],
longevity: 9001,
propagate: false,
}),
_ => unimplemented!(),
};

Expand Down Expand Up @@ -1044,7 +1051,7 @@ mod tests {
}

#[test]
fn should_trigger_dropped() {
fn should_trigger_dropped_older() {
// given
let limit = Limit { count: 1, total_bytes: 1000 };
let options =
Expand Down Expand Up @@ -1077,6 +1084,67 @@ mod tests {
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}

#[test]
fn should_trigger_dropped_lower_priority() {
{
// given
let limit = Limit { count: 1, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let pool = Pool::new(options, true.into(), TestApi::default().into());

let xt = Extrinsic::IncludeData(Vec::new());
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);

// then
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(2)),
to: AccountId::from_h256(H256::from_low_u64_be(1)),
amount: 4,
nonce: 1,
});
let result = block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt));
assert!(matches!(
result,
Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
));
}
{
// given
let limit = Limit { count: 2, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let pool = Pool::new(options, true.into(), TestApi::default().into());

let xt = Extrinsic::IncludeData(Vec::new());
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);

let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);

// when
let xt = Extrinsic::Store(Vec::new());
block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);

// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}
}

#[test]
fn should_handle_pruning_in_the_middle_of_import() {
// given
Expand Down

0 comments on commit 6011a11

Please sign in to comment.