Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Revalidate transactions only on latest best block #6824

Merged
merged 5 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ mod tests {
setup_handles = Some((block_import.clone(), babe_link.clone()));
}
)?;

let node = sc_service_test::TestNetComponents::new(
keep_alive, client, network, transaction_pool
);
Expand All @@ -521,11 +521,9 @@ mod tests {

futures::executor::block_on(
service.transaction_pool().maintain(
ChainEvent::NewBlock {
is_new_best: true,
ChainEvent::NewBestBlock {
hash: parent_header.hash(),
tree_route: None,
header: parent_header.clone(),
},
)
);
Expand Down
20 changes: 12 additions & 8 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! A set of APIs supported by the client along with their primitives.

use std::{fmt, collections::HashSet, sync::Arc};
use std::{fmt, collections::HashSet, sync::Arc, convert::TryFrom};
use sp_core::storage::StorageKey;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Expand Down Expand Up @@ -252,13 +252,17 @@ pub struct FinalityNotification<Block: BlockT> {
pub header: Block::Header,
}

impl<B: BlockT> From<BlockImportNotification<B>> for sp_transaction_pool::ChainEvent<B> {
fn from(n: BlockImportNotification<B>) -> Self {
Self::NewBlock {
is_new_best: n.is_new_best,
hash: n.hash,
header: n.header,
tree_route: n.tree_route,
impl<B: BlockT> TryFrom<BlockImportNotification<B>> for sp_transaction_pool::ChainEvent<B> {
type Error = ();

fn try_from(n: BlockImportNotification<B>) -> Result<Self, ()> {
if n.is_new_best {
Ok(Self::NewBestBlock {
hash: n.hash,
tree_route: n.tree_route,
})
} else {
Err(())
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,9 @@ mod tests {
fn chain_event<B: BlockT>(header: B::Header) -> ChainEvent<B>
where NumberFor<B>: From<u64>
{
ChainEvent::NewBlock {
ChainEvent::NewBestBlock {
hash: header.hash(),
tree_route: None,
is_new_best: true,
header,
}
}

Expand Down
14 changes: 7 additions & 7 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ mod tests {
use sp_consensus::ImportedAux;
use sp_inherents::InherentDataProviders;
use sc_basic_authorship::ProposerFactory;
use sc_client_api::BlockBackend;

fn api() -> Arc<TestApi> {
Arc::new(TestApi::empty())
Expand Down Expand Up @@ -415,15 +416,13 @@ mod tests {
}
}
);
// assert that there's a new block in the db.
assert!(client.header(&BlockId::Number(0)).unwrap().is_some());
let block = client.block(&BlockId::Number(1)).unwrap().unwrap().block;
pool_api.add_block(block, true);
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok());

let header = client.header(&BlockId::Number(1)).expect("db error").expect("imported above");
pool.maintain(sp_transaction_pool::ChainEvent::NewBlock {
pool.maintain(sp_transaction_pool::ChainEvent::NewBestBlock {
hash: header.hash(),
header,
is_new_best: true,
tree_route: None,
}).await;

Expand All @@ -438,10 +437,11 @@ mod tests {
rx1.await.expect("should be no error receiving"),
Ok(_)
);
assert!(client.header(&BlockId::Number(1)).unwrap().is_some());
let block = client.block(&BlockId::Number(2)).unwrap().unwrap().block;
pool_api.add_block(block, true);
pool_api.increment_nonce(Alice.into());

assert!(pool.submit_one(&BlockId::Number(2), SOURCE, uxt(Alice, 2)).await.is_ok());
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 2)).await.is_ok());
let (tx2, rx2) = futures::channel::oneshot::channel();
assert!(sink.send(EngineCommand::SealNewBlock {
parent_hash: Some(created_block.hash),
Expand Down
17 changes: 8 additions & 9 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod testing;
pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};

use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin};
use std::{collections::{HashMap, HashSet}, sync::Arc, pin::Pin, convert::TryInto};
use futures::{prelude::*, future::{self, ready}, channel::oneshot};
use parking_lot::Mutex;

Expand Down Expand Up @@ -549,7 +549,7 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
{
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>> {
match event {
ChainEvent::NewBlock { hash, tree_route, is_new_best, .. } => {
ChainEvent::NewBestBlock { hash, tree_route } => {
let pool = self.pool.clone();
let api = self.api.clone();

Expand Down Expand Up @@ -608,10 +608,7 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
})
}

// If this is a new best block, we need to prune its transactions from the pool.
if is_new_best {
pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await);
}
pruned_log.extend(prune_known_txs_for_block(id.clone(), &*api, &*pool).await);

metrics.report(
|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64)
Expand Down Expand Up @@ -690,9 +687,9 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
.map(|tx| tx.hash.clone())
.collect();
revalidation_queue.revalidate_later(block_number, hashes).await;
}

revalidation_strategy.lock().clear();
revalidation_strategy.lock().clear();
}
}.boxed()
}
ChainEvent::Finalized { hash } => {
Expand Down Expand Up @@ -721,7 +718,9 @@ pub async fn notification_future<Client, Pool, Block>(
Client: sc_client_api::BlockchainEvents<Block>,
Pool: MaintainedTransactionPool<Block=Block>,
{
let import_stream = client.import_notification_stream().map(Into::into).fuse();
let import_stream = client.import_notification_stream()
.filter_map(|n| ready(n.try_into().ok()))
.fuse();
let finality_stream = client.finality_notification_stream()
.map(Into::into)
.fuse();
Expand Down
21 changes: 12 additions & 9 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
mut self,
from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
interval: R,
) where R: Send, R::Guard: Send
{
) where R: Send, R::Guard: Send {
let interval = interval.into_stream().fuse();
let from_queue = from_queue.fuse();
futures::pin_mut!(interval, from_queue);
Expand Down Expand Up @@ -253,7 +252,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
if this.members.len() > 0 {
log::debug!(
target: "txpool",
"Updated revalidation queue at {}. Transactions: {:?}",
"Updated revalidation queue at {:?}. Transactions: {:?}",
this.best_block,
this.members,
);
Expand Down Expand Up @@ -298,9 +297,7 @@ where
api: Arc<Api>,
pool: Arc<Pool<Api>>,
interval: R,
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>)
where R: Send + 'static, R::Guard: Send
{
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>) where R: Send + 'static, R::Guard: Send {
let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue");

let worker = RevalidationWorker::new(api.clone(), pool.clone());
Expand Down Expand Up @@ -338,16 +335,22 @@ where
/// If queue configured with background worker, this will return immediately.
/// If queue configured without background worker, this will resolve after
/// revalidation is actually done.
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExtrinsicHash<Api>>) {
pub async fn revalidate_later(
&self,
at: NumberFor<Api>,
transactions: Vec<ExtrinsicHash<Api>>,
) {
if transactions.len() > 0 {
log::debug!(target: "txpool", "Sent {} transactions to revalidation queue", transactions.len());
log::debug!(
target: "txpool", "Sent {} transactions to revalidation queue",
transactions.len(),
);
}

if let Some(ref to_worker) = self.background {
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
}
return;
} else {
let pool = self.pool.clone();
let api = self.api.clone();
Expand Down
Loading