diff --git a/bin/node-template/src/service.rs b/bin/node-template/src/service.rs index ed2299e30f73e..458656d836d47 100644 --- a/bin/node-template/src/service.rs +++ b/bin/node-template/src/service.rs @@ -43,9 +43,7 @@ macro_rules! new_full_start { .with_transaction_pool(|config, client, _fetcher| { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); let pool = sc_transaction_pool::BasicPool::new(config, pool_api); - let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client); - let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer); - Ok(maintainable_pool) + Ok(pool) })? .with_import_queue(|_config, client, mut select_chain, transaction_pool| { let select_chain = select_chain.take() @@ -207,11 +205,12 @@ pub fn new_light(config: Configuration; #[allow(dead_code)] -type ConcreteTransactionPool = sp_transaction_pool::MaintainableTransactionPool< - sc_transaction_pool::BasicPool< - sc_transaction_pool::FullChainApi, - ConcreteBlock - >, - sc_transaction_pool::FullBasicPoolMaintainer< - ConcreteClient, - sc_transaction_pool::FullChainApi - > +type ConcreteTransactionPool = sc_transaction_pool::BasicPool< + sc_transaction_pool::FullChainApi, + ConcreteBlock >; /// A specialized configuration object for setting up the node.. @@ -322,10 +314,10 @@ pub fn new_light(config: NodeConfiguration) let fetcher = fetcher .ok_or_else(|| "Trying to start light transaction pool without active fetcher")?; let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone()); - let pool = sc_transaction_pool::BasicPool::new(config, pool_api); - let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher); - let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer); - Ok(maintainable_pool) + let pool = sc_transaction_pool::BasicPool::with_revalidation_type( + config, pool_api, sc_transaction_pool::RevalidationType::Light, + ); + Ok(pool) })? .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { let fetch_checker = fetcher diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 044798701c6e1..194bd09e24b52 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -49,7 +49,7 @@ use std::{ }; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; -use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer}; +use sp_transaction_pool::MaintainedTransactionPool; use sp_blockchain; use grafana_data_source::{self, record_metrics}; @@ -740,9 +740,7 @@ ServiceBuilder< TSc: Clone, TImpQu: 'static + ImportQueue, TNetP: NetworkSpecialization, - TExPool: 'static - + TransactionPool::Hash> - + TransactionPoolMaintainer::Hash>, + TExPool: MaintainedTransactionPool::Hash> + 'static, TRpc: sc_rpc::RpcExtension + Clone, { diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index c1b87e4491904..1b2e7bcd3cc71 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -61,7 +61,7 @@ pub use self::builder::{ }; pub use config::{Configuration, Roles, PruningMode}; pub use sc_chain_spec::{ChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension}; -pub use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer, InPoolTransaction, error::IntoPoolError}; +pub use sp_transaction_pool::{TransactionPool, InPoolTransaction, error::IntoPoolError}; pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions; pub use sc_client::FinalityNotifications; pub use sc_rpc::Metadata as RpcMetadata; @@ -148,8 +148,7 @@ pub trait AbstractService: 'static + Future> + /// Chain selection algorithm. type SelectChain: sp_consensus::SelectChain; /// Transaction pool. - type TransactionPool: TransactionPool - + TransactionPoolMaintainer; + type TransactionPool: TransactionPool; /// Network specialization. type NetworkSpecialization: NetworkSpecialization; @@ -213,8 +212,7 @@ where TExec: 'static + sc_client::CallExecutor + Send + Sync + Clone, TRtApi: 'static + Send + Sync, TSc: sp_consensus::SelectChain + 'static + Clone + Send + Unpin, - TExPool: 'static + TransactionPool - + TransactionPoolMaintainer, + TExPool: 'static + TransactionPool, TOc: 'static + Send + Sync, TNetSpec: NetworkSpecialization, { diff --git a/client/transaction-pool/graph/benches/basics.rs b/client/transaction-pool/graph/benches/basics.rs index 557a2ca3d1f85..75d15cc1f1916 100644 --- a/client/transaction-pool/graph/benches/basics.rs +++ b/client/transaction-pool/graph/benches/basics.rs @@ -16,7 +16,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use futures::executor::block_on; +use futures::{future::{ready, Ready}, executor::block_on}; use sc_transaction_graph::*; use sp_runtime::transaction_validity::{ValidTransaction, InvalidTransaction}; use codec::Encode; @@ -49,7 +49,8 @@ impl ChainApi for TestApi { type Block = Block; type Hash = H256; type Error = sp_transaction_pool::error::Error; - type ValidationFuture = futures::future::Ready>; + type ValidationFuture = Ready>; + type BodyFuture = Ready>>>; fn validate_transaction( &self, @@ -61,14 +62,14 @@ impl ChainApi for TestApi { match self.block_id_to_number(at) { Ok(Some(num)) if num > 5 => { - return futures::future::ready( + return ready( Ok(Err(InvalidTransaction::Stale.into())) ) }, _ => {}, } - futures::future::ready( + ready( Ok(Ok(ValidTransaction { priority: 4, requires: if nonce > 1 && self.nonce_dependant { @@ -105,6 +106,10 @@ impl ChainApi for TestApi { let encoded = uxt.encode(); (blake2_256(&encoded).into(), encoded.len()) } + + fn block_body(&self, _id: &BlockId) -> Self::BodyFuture { + ready(Ok(None)) + } } fn uxt(transfer: Transfer) -> Extrinsic { @@ -150,13 +155,13 @@ fn benchmark_main(c: &mut Criterion) { c.bench_function("sequential 50 tx", |b| { b.iter(|| { - bench_configured(Pool::new(Default::default(), TestApi::new_dependant()), 50); + bench_configured(Pool::new(Default::default(), TestApi::new_dependant().into()), 50); }); }); c.bench_function("random 100 tx", |b| { b.iter(|| { - bench_configured(Pool::new(Default::default(), TestApi::default()), 100); + bench_configured(Pool::new(Default::default(), TestApi::default().into()), 100); }); }); } diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index 629bd0a9a93a8..5be879f079a7b 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -68,6 +68,8 @@ pub trait ChainApi: Send + Sync { type Error: From + error::IntoPoolError; /// Validate transaction future. type ValidationFuture: Future> + Send + Unpin; + /// Body future (since block body might be remote) + type BodyFuture: Future::Extrinsic>>, Self::Error>> + Unpin + Send + 'static; /// Verify extrinsic at given block. fn validate_transaction( @@ -84,6 +86,9 @@ pub trait ChainApi: Send + Sync { /// Returns hash and encoding length of the extrinsic. fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize); + + /// Returns a block body given the block id. + fn block_body(&self, at: &BlockId) -> Self::BodyFuture; } /// Pool configuration options. @@ -120,7 +125,7 @@ pub struct Pool { impl Pool { /// Create a new transaction pool. - pub fn new(options: Options, api: B) -> Self { + pub fn new(options: Options, api: Arc) -> Self { Pool { validated_pool: Arc::new(ValidatedPool::new(options, api)), } @@ -488,6 +493,7 @@ mod tests { type Hash = u64; type Error = error::Error; type ValidationFuture = futures::future::Ready>; + type BodyFuture = futures::future::Ready>>>; /// Verify extrinsic at given block. fn validate_transaction( @@ -560,6 +566,10 @@ mod tests { len ) } + + fn block_body(&self, _id: &BlockId) -> Self::BodyFuture { + futures::future::ready(Ok(None)) + } } fn uxt(transfer: Transfer) -> Extrinsic { @@ -567,7 +577,7 @@ mod tests { } fn pool() -> Pool { - Pool::new(Default::default(), TestApi::default()) + Pool::new(Default::default(), TestApi::default().into()) } #[test] @@ -713,7 +723,7 @@ mod tests { ready: limit.clone(), future: limit.clone(), ..Default::default() - }, TestApi::default()); + }, TestApi::default().into()); let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), @@ -748,7 +758,7 @@ mod tests { ready: limit.clone(), future: limit.clone(), ..Default::default() - }, TestApi::default()); + }, TestApi::default().into()); // when block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { @@ -924,7 +934,7 @@ mod tests { ready: limit.clone(), future: limit.clone(), ..Default::default() - }, TestApi::default()); + }, TestApi::default().into()); let xt = uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), @@ -958,7 +968,7 @@ mod tests { let (tx, rx) = std::sync::mpsc::sync_channel(1); let mut api = TestApi::default(); api.delay = Arc::new(Mutex::new(rx.into())); - let pool = Arc::new(Pool::new(Default::default(), api)); + let pool = Arc::new(Pool::new(Default::default(), api.into())); // when let xt = uxt(Transfer { diff --git a/client/transaction-pool/graph/src/validated_pool.rs b/client/transaction-pool/graph/src/validated_pool.rs index 29f82fb894ac0..34f34d580680c 100644 --- a/client/transaction-pool/graph/src/validated_pool.rs +++ b/client/transaction-pool/graph/src/validated_pool.rs @@ -63,7 +63,7 @@ pub type ValidatedTransactionFor = ValidatedTransaction< /// Pool that deals with validated transactions. pub(crate) struct ValidatedPool { - api: B, + api: Arc, options: Options, listener: RwLock, BlockHash>>, pool: RwLock { impl ValidatedPool { /// Create a new transaction pool. - pub fn new(options: Options, api: B) -> Self { + pub fn new(options: Options, api: Arc) -> Self { let base_pool = base::BasePool::new(options.reject_future_transactions); ValidatedPool { api, diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 8495b8b65f17b..1bf6348214842 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -19,12 +19,13 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc}; use codec::{Decode, Encode}; use futures::{ - channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready}, + channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready}, }; use sc_client_api::{ blockchain::HeaderBackend, - light::{Fetcher, RemoteCallRequest} + light::{Fetcher, RemoteCallRequest, RemoteBodyRequest}, + BlockBody, }; use sp_core::Hasher; use sp_runtime::{ @@ -63,7 +64,7 @@ impl FullChainApi where impl sc_transaction_graph::ChainApi for FullChainApi where Block: BlockT, - Client: ProvideRuntimeApi + BlockIdTo + 'static + Send + Sync, + Client: ProvideRuntimeApi + BlockBody + BlockIdTo + 'static + Send + Sync, Client::Api: TaggedTransactionQueue, sp_api::ApiErrorFor: Send, { @@ -71,6 +72,11 @@ impl sc_transaction_graph::ChainApi for FullChainApi> + Send>>; + type BodyFuture = Ready::Extrinsic>>>>; + + fn block_body(&self, id: &BlockId) -> Self::BodyFuture { + ready(self.client.block_body(&id).map_err(|e| error::Error::from(e))) + } fn validate_transaction( &self, @@ -149,6 +155,7 @@ impl sc_transaction_graph::ChainApi for LightChainApi> + Send + Unpin>; + type BodyFuture = Pin::Extrinsic>>>> + Send>>; fn validate_transaction( &self, @@ -197,4 +204,33 @@ impl sc_transaction_graph::ChainApi for LightChainApi::Hashing as HashT>::hash(x), x.len()) }) } + + fn block_body(&self, id: &BlockId) -> Self::BodyFuture { + let header = self.client.header(*id) + .and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id)))); + let header = match header { + Ok(header) => header, + Err(err) => { + log::warn!(target: "txpool", "Failed to query header: {:?}", err); + return Box::pin(ready(Ok(None))); + } + }; + + let fetcher = self.fetcher.clone(); + async move { + let transactions = fetcher.remote_body({ + RemoteBodyRequest { + header, + retry_count: None, + } + }) + .await + .unwrap_or_else(|e| { + log::warn!(target: "txpool", "Failed to fetch block body: {:?}", e); + Vec::new() + }); + + Ok(Some(transactions)) + }.boxed() + } } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 4d71307c0abd0..f6f7774935b39 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -20,26 +20,26 @@ #![warn(unused_extern_crates)] mod api; -mod maintainer; - pub mod error; + #[cfg(test)] mod tests; pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; -pub use crate::maintainer::{FullBasicPoolMaintainer, LightBasicPoolMaintainer}; -use std::{collections::HashMap, sync::Arc}; -use futures::{Future, FutureExt}; +use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant}; +use futures::{Future, FutureExt, future::ready}; +use parking_lot::Mutex; use sp_runtime::{ generic::BlockId, - traits::Block as BlockT, + traits::{Block as BlockT, NumberFor, SimpleArithmetic, Extrinsic}, }; use sp_transaction_pool::{ TransactionPool, PoolStatus, ImportNotificationStream, - TxHash, TransactionFor, TransactionStatusStreamFor, + TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash, + MaintainedTransactionPool, }; /// Basic implementation of transaction pool that can be customized by providing PoolApi. @@ -49,6 +49,25 @@ pub struct BasicPool PoolApi: sc_transaction_graph::ChainApi, { pool: Arc>, + api: Arc, + revalidation_strategy: Arc>>>, +} + +/// Type of revalidation. +pub enum RevalidationType { + /// Light revalidation type. + /// + /// During maintenance, transaction pool makes periodic revalidation + /// of all transactions depending on number of blocks or time passed. + /// Also this kind of revalidation does not resubmit transactions from + /// retracted blocks, since it is too expensive. + Light, + + /// Full revalidation type. + /// + /// During maintenance, transaction pool revalidates some fixed amount of + /// transactions from the pool of valid transactions. + Full, } impl BasicPool @@ -57,16 +76,44 @@ impl BasicPool PoolApi: sc_transaction_graph::ChainApi, { /// Create new basic transaction pool with provided api. - pub fn new(options: sc_transaction_graph::Options, pool_api: PoolApi) -> Self { + pub fn new( + options: sc_transaction_graph::Options, + pool_api: PoolApi, + ) -> Self { + Self::with_revalidation_type(options, pool_api, RevalidationType::Full) + } + + /// Create new basic transaction pool with provided api and custom + /// revalidation type. + pub fn with_revalidation_type( + options: sc_transaction_graph::Options, + pool_api: PoolApi, + revalidation_type: RevalidationType, + ) -> Self { + let api = Arc::new(pool_api); + let cloned_api = api.clone(); BasicPool { - pool: Arc::new(sc_transaction_graph::Pool::new(options, pool_api)), + api: cloned_api, + pool: Arc::new(sc_transaction_graph::Pool::new(options, api)), + revalidation_strategy: Arc::new(Mutex::new( + match revalidation_type { + RevalidationType::Light => RevalidationStrategy::Light(RevalidationStatus::NotScheduled), + RevalidationType::Full => RevalidationStrategy::Always, + } + )), } + } /// Gets shared reference to the underlying pool. pub fn pool(&self) -> &Arc> { &self.pool } + + #[cfg(test)] + pub fn api(&self) -> &Arc { + &self.api + } } impl TransactionPool for BasicPool @@ -130,3 +177,169 @@ impl TransactionPool for BasicPool self.pool.on_broadcasted(propagations) } } + +#[cfg_attr(test, derive(Debug))] +enum RevalidationStatus { + /// The revalidation has never been completed. + NotScheduled, + /// The revalidation is scheduled. + Scheduled(Option, Option), + /// The revalidation is in progress. + InProgress, +} + +enum RevalidationStrategy { + Always, + Light(RevalidationStatus) +} + +struct RevalidationAction { + revalidate: bool, + resubmit: bool, + revalidate_amount: Option, +} + +impl RevalidationStrategy { + pub fn clear(&mut self) { + if let Self::Light(status) = self { + status.clear() + } + } + + pub fn next( + &mut self, + block: N, + revalidate_time_period: Option, + revalidate_block_period: Option, + ) -> RevalidationAction { + match self { + Self::Light(status) => RevalidationAction { + revalidate: status.next_required( + block, + revalidate_time_period, + revalidate_block_period + ), + resubmit: false, + revalidate_amount: None, + }, + Self::Always => RevalidationAction { + revalidate: true, + resubmit: true, + revalidate_amount: Some(16), + } + } + } +} + +impl RevalidationStatus { + /// Called when revalidation is completed. + pub fn clear(&mut self) { + *self = Self::NotScheduled; + } + + /// Returns true if revalidation is required. + pub fn next_required( + &mut self, + block: N, + revalidate_time_period: Option, + revalidate_block_period: Option, + ) -> bool { + match *self { + Self::NotScheduled => { + *self = Self::Scheduled( + revalidate_time_period.map(|period| Instant::now() + period), + revalidate_block_period.map(|period| block + period), + ); + false + }, + Self::Scheduled(revalidate_at_time, revalidate_at_block) => { + let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) + || revalidate_at_block.map(|at| block >= at).unwrap_or(false); + if is_required { + *self = Self::InProgress; + } + is_required + }, + Self::InProgress => false, + } + } +} + +impl MaintainedTransactionPool for BasicPool +where + Block: BlockT, + PoolApi: 'static + sc_transaction_graph::ChainApi, +{ + fn maintain(&self, id: &BlockId, retracted: &[BlockHash]) + -> Pin + Send>> + { + let id = id.clone(); + let pool = self.pool.clone(); + let api = self.api.clone(); + + let block_number = match api.block_id_to_number(&id) { + Ok(Some(number)) => number, + _ => { + log::trace!(target: "txqueue", "Skipping chain event - no number for that block {:?}", id); + return Box::pin(ready(())); + } + }; + + let next_action = self.revalidation_strategy.lock().next( + block_number, + Some(std::time::Duration::from_secs(60)), + Some(20.into()), + ); + let revalidation_strategy = self.revalidation_strategy.clone(); + let retracted = retracted.to_vec(); + + async move { + // We don't query block if we won't prune anything + if !pool.status().is_empty() { + let hashes = api.block_body(&id).await + .unwrap_or_else(|e| { + log::warn!("Prune known transactions: error request {:?}!", e); + None + }) + .unwrap_or_default() + .into_iter() + .map(|tx| pool.hash_of(&tx)) + .collect::>(); + + if let Err(e) = pool.prune_known(&id, &hashes) { + log::error!("Cannot prune known in the pool {:?}!", e); + } + } + + if next_action.resubmit { + let mut resubmit_transactions = Vec::new(); + + for retracted_hash in retracted { + let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await + .unwrap_or_else(|e| { + log::warn!("Failed to fetch block body {:?}!", e); + None + }) + .unwrap_or_default() + .into_iter() + .filter(|tx| tx.is_signed().unwrap_or(true)); + + resubmit_transactions.extend(block_transactions); + } + if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await { + log::debug!(target: "txpool", + "[{:?}] Error re-submitting transactions: {:?}", id, e + ) + } + } + + if next_action.revalidate { + if let Err(e) = pool.revalidate_ready(&id, next_action.revalidate_amount).await { + log::warn!("Revalidate ready failed {:?}", e); + } + } + + revalidation_strategy.lock().clear(); + }.boxed() + } +} diff --git a/client/transaction-pool/src/maintainer.rs b/client/transaction-pool/src/maintainer.rs deleted file mode 100644 index 97dc7e10a6f11..0000000000000 --- a/client/transaction-pool/src/maintainer.rs +++ /dev/null @@ -1,645 +0,0 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use std::{ - marker::{PhantomData, Unpin}, - sync::Arc, - time::Instant, -}; -use futures::{ - Future, FutureExt, - future::{Either, join, ready}, -}; -use log::{warn, debug, trace}; -use parking_lot::Mutex; - -use sc_client_api::{ - client::BlockBody, - light::{Fetcher, RemoteBodyRequest}, -}; -use sp_runtime::{ - generic::BlockId, - traits::{Block as BlockT, Extrinsic, Header, NumberFor, SimpleArithmetic}, -}; -use sp_blockchain::HeaderBackend; -use sp_transaction_pool::{TransactionPoolMaintainer, runtime_api::TaggedTransactionQueue}; -use sp_api::ProvideRuntimeApi; - -use sc_transaction_graph::{self, ChainApi}; - -/// Basic transaction pool maintainer for full clients. -pub struct FullBasicPoolMaintainer { - pool: Arc>, - client: Arc, -} - -impl FullBasicPoolMaintainer { - /// Create new basic full pool maintainer. - pub fn new( - pool: Arc>, - client: Arc, - ) -> Self { - FullBasicPoolMaintainer { pool, client } - } -} - -impl TransactionPoolMaintainer -for - FullBasicPoolMaintainer -where - Block: BlockT, - Client: ProvideRuntimeApi + HeaderBackend + BlockBody + 'static, - Client::Api: TaggedTransactionQueue, - PoolApi: ChainApi + 'static, -{ - type Block = Block; - type Hash = Block::Hash; - - fn maintain( - &self, - id: &BlockId, - retracted: &[Block::Hash], - ) -> Box + Send + Unpin> { - let now = std::time::Instant::now(); - let took = move || format!("Took {} ms", now.elapsed().as_millis()); - - let id = *id; - trace!(target: "txpool", "[{:?}] Starting pool maintainance", id); - // Put transactions from retracted blocks back into the pool. - let client_copy = self.client.clone(); - let retracted_transactions = retracted.to_vec().into_iter() - .filter_map(move |hash| client_copy.block_body(&BlockId::hash(hash)).ok().unwrap_or(None)) - .flat_map(|block| block.into_iter()) - // if signed information is not present, attempt to resubmit anyway. - .filter(|tx| tx.is_signed().unwrap_or(true)); - let resubmit_future = self.pool - .submit_at(&id, retracted_transactions, true) - .then(move |resubmit_result| ready(match resubmit_result { - Ok(_) => trace!(target: "txpool", - "[{:?}] Re-submitting retracted done. {}", id, took() - ), - Err(e) => debug!(target: "txpool", - "[{:?}] Error re-submitting transactions: {:?}", id, e - ), - })); - - // Avoid calling into runtime if there is nothing to prune from the pool anyway. - if self.pool.status().is_empty() { - return Box::new(resubmit_future) - } - - let block = (self.client.header(id), self.client.block_body(&id)); - let prune_future = match block { - (Ok(Some(header)), Ok(Some(extrinsics))) => { - let parent_id = BlockId::hash(*header.parent_hash()); - let prune_future = self.pool - .prune(&id, &parent_id, &extrinsics) - .then(move |prune_result| ready(match prune_result { - Ok(_) => trace!(target: "txpool", - "[{:?}] Pruning done. {}", id, took() - ), - Err(e) => warn!(target: "txpool", - "[{:?}] Error pruning transactions: {:?}", id, e - ), - })); - - Either::Left(resubmit_future.then(|_| prune_future)) - }, - (Ok(_), Ok(_)) => Either::Right(resubmit_future), - err => { - warn!(target: "txpool", "[{:?}] Error reading block: {:?}", id, err); - Either::Right(resubmit_future) - }, - }; - - let revalidate_future = self.pool - .revalidate_ready(&id, Some(16)) - .then(move |result| ready(match result { - Ok(_) => debug!(target: "txpool", - "[{:?}] Revalidation done: {}", id, took() - ), - Err(e) => warn!(target: "txpool", - "[{:?}] Encountered errors while revalidating transactions: {:?}", id, e - ), - })); - - Box::new(prune_future.then(|_| revalidate_future)) - } -} - -/// Basic transaction pool maintainer for light clients. -pub struct LightBasicPoolMaintainer { - pool: Arc>, - client: Arc, - fetcher: Arc, - revalidate_time_period: Option, - revalidate_block_period: Option>, - revalidation_status: Arc>>>, - _phantom: PhantomData, -} - -impl LightBasicPoolMaintainer - where - Block: BlockT, - Client: ProvideRuntimeApi + HeaderBackend + BlockBody + 'static, - Client::Api: TaggedTransactionQueue, - PoolApi: ChainApi + 'static, - F: Fetcher + 'static, -{ - /// Create light pool maintainer with default constants. - /// - /// Default constants are: revalidate every 60 seconds or every 20 blocks - /// (whatever happens first). - pub fn with_defaults( - pool: Arc>, - client: Arc, - fetcher: Arc, - ) -> Self { - Self::new( - pool, - client, - fetcher, - Some(std::time::Duration::from_secs(60)), - Some(20.into()), - ) - } - - /// Create light pool maintainer with passed constants. - pub fn new( - pool: Arc>, - client: Arc, - fetcher: Arc, - revalidate_time_period: Option, - revalidate_block_period: Option>, - ) -> Self { - Self { - pool, - client, - fetcher, - revalidate_time_period, - revalidate_block_period, - revalidation_status: Arc::new(Mutex::new(TxPoolRevalidationStatus::NotScheduled)), - _phantom: Default::default(), - } - } - - /// Returns future that prunes block transactions from the pool. - fn prune( - &self, - id: &BlockId, - header: &Block::Header, - ) -> impl std::future::Future { - // fetch transactions (possible future optimization: proofs of inclusion) that - // have been included into new block and prune these from the pool - let id = id.clone(); - let pool = self.pool.clone(); - self.fetcher.remote_body(RemoteBodyRequest { - header: header.clone(), - retry_count: None, - }) - .then(move |transactions| ready( - transactions - .map_err(|e| format!("{}", e)) - .and_then(|transactions| { - let hashes = transactions - .into_iter() - .map(|tx| pool.hash_of(&tx)) - .collect::>(); - pool.prune_known(&id, &hashes) - .map_err(|e| format!("{}", e)) - }) - )) - .then(|r| { - if let Err(e) = r { - warn!("Error pruning known transactions: {}", e) - } - ready(()) - }) - } - - /// Returns future that performs in-pool transations revalidation, if required. - fn revalidate( - &self, - id: &BlockId, - header: &Block::Header, - ) -> impl std::future::Future { - // to determine whether ready transaction is still valid, we perform periodic revalidaton - // of ready transactions - let is_revalidation_required = self.revalidation_status.lock().is_required( - *header.number(), - self.revalidate_time_period, - self.revalidate_block_period, - ); - match is_revalidation_required { - true => { - let revalidation_status = self.revalidation_status.clone(); - Either::Left(self.pool - .revalidate_ready(id, None) - .map(|r| r.map_err(|e| warn!("Error revalidating known transactions: {}", e))) - .map(move |_| revalidation_status.lock().clear())) - }, - false => Either::Right(ready(())), - } - } -} - -impl TransactionPoolMaintainer -for - LightBasicPoolMaintainer -where - Block: BlockT, - Client: ProvideRuntimeApi + HeaderBackend + BlockBody + 'static, - Client::Api: TaggedTransactionQueue, - PoolApi: ChainApi + 'static, - F: Fetcher + 'static, -{ - type Block = Block; - type Hash = Block::Hash; - - fn maintain( - &self, - id: &BlockId, - _retracted: &[Block::Hash], - ) -> Box + Send + Unpin> { - // Do nothing if transaction pool is empty. - if self.pool.status().is_empty() { - self.revalidation_status.lock().clear(); - return Box::new(ready(())); - } - let header = self.client.header(*id) - .and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id)))); - let header = match header { - Ok(header) => header, - Err(err) => { - println!("Failed to maintain light tx pool: {:?}", err); - return Box::new(ready(())); - } - }; - - // else prune block transactions from the pool - let prune_future = self.prune(id, &header); - - // and then (optionally) revalidate in-pool transactions - let revalidate_future = self.revalidate(id, &header); - - let maintain_future = join( - prune_future, - revalidate_future, - ).map(|_| ()); - - Box::new(maintain_future) - } -} - -/// The status of transactions revalidation at light tx pool. -#[cfg_attr(test, derive(Debug))] -enum TxPoolRevalidationStatus { - /// The revalidation has never been completed. - NotScheduled, - /// The revalidation is scheduled. - Scheduled(Option, Option), - /// The revalidation is in progress. - InProgress, -} - -impl TxPoolRevalidationStatus { - /// Called when revalidation is completed. - pub fn clear(&mut self) { - *self = TxPoolRevalidationStatus::NotScheduled; - } - - /// Returns true if revalidation is required. - pub fn is_required( - &mut self, - block: N, - revalidate_time_period: Option, - revalidate_block_period: Option, - ) -> bool { - match *self { - TxPoolRevalidationStatus::NotScheduled => { - *self = TxPoolRevalidationStatus::Scheduled( - revalidate_time_period.map(|period| Instant::now() + period), - revalidate_block_period.map(|period| block + period), - ); - false - }, - TxPoolRevalidationStatus::Scheduled(revalidate_at_time, revalidate_at_block) => { - let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) - || revalidate_at_block.map(|at| block >= at).unwrap_or(false); - if is_required { - *self = TxPoolRevalidationStatus::InProgress; - } - is_required - }, - TxPoolRevalidationStatus::InProgress => false, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures::executor::block_on; - use codec::Encode; - use substrate_test_runtime_client::{ - prelude::*, Client, runtime::{Block, Transfer}, sp_consensus::{BlockOrigin, SelectChain}, - LongestChain, - }; - use sp_transaction_pool::PoolStatus; - use crate::api::{FullChainApi, LightChainApi}; - - struct TestSetup { - client: Arc>, - longest_chain: LongestChain, - pool: Arc>, - } - - impl TestSetup { - fn new() -> TestSetup, Block>> { - let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); - let client = Arc::new(client); - let pool = Arc::new( - sc_transaction_graph::Pool::new(Default::default(), FullChainApi::new(client.clone())), - ); - TestSetup { - client, - longest_chain, - pool, - } - } - - fn new_light(fetcher: Arc) -> TestSetup, F, Block>> - where F: Fetcher + 'static, - { - let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); - let client = Arc::new(client); - let pool = Arc::new( - sc_transaction_graph::Pool::new( - Default::default(), - LightChainApi::new(client.clone(), fetcher) - ), - ); - TestSetup { - client, - longest_chain, - pool, - } - } - } - - fn setup() -> TestSetup, Block>> { - TestSetup::, Block>>::new() - } - - fn setup_light(fetcher: Arc) -> TestSetup, F, Block>> - where F: Fetcher + 'static, - { - TestSetup::, F, Block>>::new_light(fetcher) - } - - #[test] - fn should_remove_transactions_from_the_full_pool() { - let mut setup = setup(); - - let transaction = Transfer { - amount: 5, - nonce: 0, - from: AccountKeyring::Alice.into(), - to: Default::default(), - }.into_signed_tx(); - let best = setup.longest_chain.best_chain().unwrap(); - - // store the transaction in the pool - block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); - - // import the block - let mut builder = setup.client.new_block(Default::default()).unwrap(); - builder.push(transaction.clone()).unwrap(); - let block = builder.build().unwrap().block; - let id = BlockId::hash(block.header().hash()); - setup.client.import(BlockOrigin::Own, block).unwrap(); - - // fire notification - this should clean up the queue - assert_eq!(setup.pool.status().ready, 1); - block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[])); - - // then - assert_eq!(setup.pool.status().ready, 0); - assert_eq!(setup.pool.status().future, 0); - } - - #[test] - fn should_remove_transactions_from_the_light_pool() { - let transaction = Transfer { - amount: 5, - nonce: 0, - from: AccountKeyring::Alice.into(), - to: Default::default(), - }.into_signed_tx(); - let fetcher_transaction = transaction.clone(); - let fetcher = Arc::new(substrate_test_runtime_client::new_light_fetcher() - .with_remote_body(Some(Box::new(move |_| Ok(vec![fetcher_transaction.clone()])))) - .with_remote_call(Some(Box::new(move |_| { - let validity: sp_runtime::transaction_validity::TransactionValidity = - Ok(sp_runtime::transaction_validity::ValidTransaction { - priority: 0, - requires: Vec::new(), - provides: vec![vec![42]], - longevity: 0, - propagate: true, - }); - Ok(validity.encode()) - })))); - - let setup = setup_light(fetcher.clone()); - let best = setup.longest_chain.best_chain().unwrap(); - - // store the transaction in the pool - block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); - - // fire notification - this should clean up the queue - assert_eq!(setup.pool.status().ready, 1); - block_on(LightBasicPoolMaintainer::with_defaults(setup.pool.clone(), setup.client.clone(), fetcher).maintain( - &BlockId::Number(0), - &[], - )); - - // then - assert_eq!(setup.pool.status().ready, 0); - assert_eq!(setup.pool.status().future, 0); - } - - #[test] - fn should_schedule_transactions_revalidation_at_light_pool() { - // when revalidation is not scheduled, it became scheduled - let mut status = TxPoolRevalidationStatus::NotScheduled; - assert!(!status.is_required(10u32, None, None)); - match status { - TxPoolRevalidationStatus::Scheduled(_, _) => (), - _ => panic!("Unexpected status: {:?}", status), - } - - // revalidation required at time - let mut status = TxPoolRevalidationStatus::Scheduled(Some(Instant::now()), None); - assert!(status.is_required(10u32, None, None)); - match status { - TxPoolRevalidationStatus::InProgress => (), - _ => panic!("Unexpected status: {:?}", status), - } - - // revalidation required at block - let mut status = TxPoolRevalidationStatus::Scheduled(None, Some(10)); - assert!(status.is_required(10u32, None, None)); - match status { - TxPoolRevalidationStatus::InProgress => (), - _ => panic!("Unexpected status: {:?}", status), - } - } - - #[test] - fn should_revalidate_transactions_at_light_pool() { - use std::sync::atomic; - use sp_runtime::transaction_validity::*; - - let build_fetcher = || { - let validated = Arc::new(atomic::AtomicBool::new(false)); - Arc::new(substrate_test_runtime_client::new_light_fetcher() - .with_remote_body(Some(Box::new(move |_| Ok(vec![])))) - .with_remote_call(Some(Box::new(move |_| { - let is_inserted = validated.swap(true, atomic::Ordering::SeqCst); - let validity: TransactionValidity = if is_inserted { - Err(TransactionValidityError::Invalid( - InvalidTransaction::Custom(0) - )) - } else { - Ok(ValidTransaction { - priority: 0, - requires: Vec::new(), - provides: vec![vec![42]], - longevity: 0, - propagate: true, - }) - }; - Ok(validity.encode()) - })))) - }; - - fn with_fetcher_maintain + 'static>( - fetcher: Arc, - revalidate_time_period: Option, - revalidate_block_period: Option, - prepare_maintainer: impl Fn(&Mutex>), - ) -> PoolStatus { - let setup = setup_light(fetcher.clone()); - let best = setup.longest_chain.best_chain().unwrap(); - - // let's prepare maintainer - let maintainer = LightBasicPoolMaintainer::new( - setup.pool.clone(), - setup.client.clone(), - fetcher, - revalidate_time_period, - revalidate_block_period, - ); - prepare_maintainer(&*maintainer.revalidation_status); - - // store the transaction in the pool - block_on(setup.pool.submit_one( - &BlockId::hash(best.hash()), - Transfer { - amount: 5, - nonce: 0, - from: AccountKeyring::Alice.into(), - to: Default::default(), - }.into_signed_tx(), - )).unwrap(); - - // and run maintain procedures - block_on(maintainer.maintain(&BlockId::Number(0), &[])); - - setup.pool.status() - } - - // when revalidation is never required - nothing happens - let fetcher = build_fetcher(); - //let maintainer = DefaultLightTransactionPoolMaintainer::new(client.clone(), fetcher.clone(), None, None); - let status = with_fetcher_maintain(fetcher, None, None, |_revalidation_status| {}); - assert_eq!(status.ready, 1); - - // when revalidation is scheduled by time - it is performed - let fetcher = build_fetcher(); - let status = with_fetcher_maintain(fetcher, None, None, |revalidation_status| - *revalidation_status.lock() = TxPoolRevalidationStatus::Scheduled(Some(Instant::now()), None) - ); - assert_eq!(status.ready, 0); - - // when revalidation is scheduled by block number - it is performed - let fetcher = build_fetcher(); - let status = with_fetcher_maintain(fetcher, None, None, |revalidation_status| - *revalidation_status.lock() = TxPoolRevalidationStatus::Scheduled(None, Some(0)) - ); - assert_eq!(status.ready, 0); - } - - #[test] - fn should_add_reverted_transactions_to_the_pool() { - let mut setup = setup(); - - let transaction = Transfer { - amount: 5, - nonce: 0, - from: AccountKeyring::Alice.into(), - to: Default::default(), - }.into_signed_tx(); - let best = setup.longest_chain.best_chain().unwrap(); - - // store the transaction in the pool - block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap(); - - // import the block - let mut builder = setup.client.new_block(Default::default()).unwrap(); - builder.push(transaction.clone()).unwrap(); - let block = builder.build().unwrap().block; - let block1_hash = block.header().hash(); - let id = BlockId::hash(block1_hash.clone()); - setup.client.import(BlockOrigin::Own, block).unwrap(); - - // fire notification - this should clean up the queue - assert_eq!(setup.pool.status().ready, 1); - block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[])); - - // then - assert_eq!(setup.pool.status().ready, 0); - assert_eq!(setup.pool.status().future, 0); - - // import second block - let builder = setup.client.new_block_at( - &BlockId::hash(best.hash()), - Default::default(), - false, - ).unwrap(); - let block = builder.build().unwrap().block; - let id = BlockId::hash(block.header().hash()); - setup.client.import(BlockOrigin::Own, block).unwrap(); - - // fire notification - this should add the transaction back to the pool. - block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[block1_hash])); - - // then - assert_eq!(setup.pool.status().ready, 1); - assert_eq!(setup.pool.status().future, 0); - } -} diff --git a/client/transaction-pool/src/tests.rs b/client/transaction-pool/src/tests.rs index 1199e41cf8740..778536b7b9ae3 100644 --- a/client/transaction-pool/src/tests.rs +++ b/client/transaction-pool/src/tests.rs @@ -14,29 +14,53 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . - use super::*; +use crate::{BasicPool, MaintainedTransactionPool}; use codec::Encode; use futures::executor::block_on; -use sc_transaction_graph::{self, Pool}; -use substrate_test_runtime_client::{runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}, AccountKeyring::{self, *}}; +use parking_lot::RwLock; +use sc_transaction_graph::{self, ExHash, Pool}; use sp_runtime::{ generic::{self, BlockId}, - traits::{Hash as HashT, BlakeTwo256}, - transaction_validity::{TransactionValidity, ValidTransaction}, + traits::{BlakeTwo256, Hash as HashT}, + transaction_validity::{TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction}, +}; +use std::collections::HashSet; +use substrate_test_runtime_client::{ + runtime::{AccountId, Block, BlockNumber, Extrinsic, Hash, Header, Index, Transfer}, + AccountKeyring::{self, *}, }; struct TestApi { - pub modifier: Box, + pub modifier: RwLock>, + pub chain_block_by_number: RwLock>>, + pub chain_headers_by_number: RwLock>, + pub invalid_hashes: RwLock>>, + pub validation_requests: RwLock>, } impl TestApi { fn default() -> Self { TestApi { - modifier: Box::new(|_| {}), + modifier: RwLock::new(Box::new(|_| {})), + chain_block_by_number: RwLock::new(HashMap::new()), + invalid_hashes: RwLock::new(HashSet::new()), + chain_headers_by_number: RwLock::new(HashMap::new()), + validation_requests: RwLock::new(Default::default()), } } + + fn push_block(&self, block_number: BlockNumber, xts: Vec) { + self.chain_block_by_number.write().insert(block_number, xts); + self.chain_headers_by_number.write().insert(block_number, Header { + number: block_number, + digest: Default::default(), + extrinsics_root: Default::default(), + parent_hash: Default::default(), + state_root: Default::default(), + }); + } } impl sc_transaction_graph::ChainApi for TestApi { @@ -44,12 +68,16 @@ impl sc_transaction_graph::ChainApi for TestApi { type Hash = Hash; type Error = error::Error; type ValidationFuture = futures::future::Ready>; + type BodyFuture = futures::future::Ready>>>; fn validate_transaction( &self, at: &BlockId, uxt: sc_transaction_graph::ExtrinsicFor, ) -> Self::ValidationFuture { + + self.validation_requests.write().push(uxt.clone()); + let expected = index(at); let requires = if expected == uxt.transfer().nonce { vec![] @@ -58,6 +86,12 @@ impl sc_transaction_graph::ChainApi for TestApi { }; let provides = vec![vec![uxt.transfer().nonce as u8]]; + if self.invalid_hashes.read().contains(&self.hash_and_length(&uxt).0) { + return futures::future::ready(Ok( + Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0))) + )) + } + let mut validity = ValidTransaction { priority: 1, requires, @@ -66,29 +100,43 @@ impl sc_transaction_graph::ChainApi for TestApi { propagate: true, }; - (self.modifier)(&mut validity); + (self.modifier.read())(&mut validity); - futures::future::ready(Ok( - Ok(validity) - )) + futures::future::ready(Ok(Ok(validity))) } - fn block_id_to_number(&self, at: &BlockId) -> error::Result>> { + fn block_id_to_number( + &self, + at: &BlockId, + ) -> error::Result>> { Ok(Some(number_of(at))) } - fn block_id_to_hash(&self, at: &BlockId) -> error::Result>> { + fn block_id_to_hash( + &self, + at: &BlockId, + ) -> error::Result>> { Ok(match at { generic::BlockId::Hash(x) => Some(x.clone()), _ => Some(Default::default()), }) } - fn hash_and_length(&self, ex: &sc_transaction_graph::ExtrinsicFor) -> (Self::Hash, usize) { + fn hash_and_length( + &self, + ex: &sc_transaction_graph::ExtrinsicFor, + ) -> (Self::Hash, usize) { let encoded = ex.encode(); (BlakeTwo256::hash(&encoded), encoded.len()) } + fn block_body(&self, id: &BlockId) -> Self::BodyFuture { + futures::future::ready(Ok(if let BlockId::Number(num) = id { + self.chain_block_by_number.read().get(num).cloned() + } else { + None + })) + } } fn index(at: &BlockId) -> u64 { @@ -114,7 +162,11 @@ fn uxt(who: AccountKeyring, nonce: Index) -> Extrinsic { } fn pool() -> Pool { - Pool::new(Default::default(), TestApi::default()) + Pool::new(Default::default(), TestApi::default().into()) +} + +fn maintained_pool() -> BasicPool { + BasicPool::new(Default::default(), TestApi::default()) } #[test] @@ -192,11 +244,11 @@ fn should_ban_invalid_transactions() { #[test] fn should_correctly_prune_transactions_providing_more_than_one_tag() { - let mut api = TestApi::default(); - api.modifier = Box::new(|v: &mut ValidTransaction| { + let api = TestApi::default(); + *api.modifier.write() = Box::new(|v: &mut ValidTransaction| { v.provides.push(vec![155]); }); - let pool = Pool::new(Default::default(), api); + let pool = Pool::new(Default::default(), Arc::new(api)); let xt = uxt(Alice, 209); block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); @@ -220,3 +272,38 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 2); } + +#[test] +fn should_prune_old_during_maintenance() { + let xt = uxt(Alice, 209); + + let pool = maintained_pool(); + + block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); + assert_eq!(pool.status().ready, 1); + + pool.api.push_block(1, vec![xt.clone()]); + + block_on(pool.maintain(&BlockId::number(1), &[])); + assert_eq!(pool.status().ready, 0); +} + + +#[test] +fn should_revalidate_during_maintenance() { + let xt1 = uxt(Alice, 209); + let xt2 = uxt(Alice, 210); + + let pool = maintained_pool(); + block_on(pool.submit_one(&BlockId::number(0), xt1.clone())).expect("1. Imported"); + block_on(pool.submit_one(&BlockId::number(0), xt2.clone())).expect("2. Imported"); + assert_eq!(pool.status().ready, 2); + assert_eq!(pool.api.validation_requests.read().len(), 2); + + pool.api.push_block(1, vec![xt1.clone()]); + + block_on(pool.maintain(&BlockId::number(1), &[])); + assert_eq!(pool.status().ready, 1); + // test that pool revalidated transaction that left ready and not included in the block + assert_eq!(pool.api.validation_requests.read().len(), 3); +} diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index e67a9890755d7..ed24ad0619a09 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -20,6 +20,7 @@ use std::{ collections::HashMap, hash::Hash, sync::Arc, + pin::Pin, }; use futures::{ Future, Stream, @@ -225,6 +226,13 @@ pub trait TransactionPool: Send + Sync { fn hash_of(&self, xt: &TransactionFor) -> TxHash; } +/// Trait for transaction pool maintenance. +pub trait MaintainedTransactionPool : TransactionPool { + /// Perform maintenance + fn maintain(&self, block: &BlockId, retracted: &[BlockHash]) + -> Pin + Send>>; +} + /// An abstraction for transaction pool. /// /// This trait is used by offchain calls to be able to submit transactions. @@ -264,109 +272,4 @@ impl OffchainSubmitTransaction for TPool { e )) } -} - -/// Transaction pool maintainer interface. -pub trait TransactionPoolMaintainer: Send + Sync { - /// Block type. - type Block: BlockT; - /// Transaction Hash type. - type Hash: Hash + Eq + Member + Serialize; - - /// Returns a future that performs maintenance procedures on the pool when - /// with given hash is imported. - fn maintain( - &self, - id: &BlockId, - retracted: &[Self::Hash], - ) -> Box + Send + Unpin>; -} - -/// Maintainable pool implementation. -pub struct MaintainableTransactionPool { - pool: Pool, - maintainer: Maintainer, -} - -impl MaintainableTransactionPool { - /// Create new maintainable pool using underlying pool and maintainer. - pub fn new(pool: Pool, maintainer: Maintainer) -> Self { - MaintainableTransactionPool { pool, maintainer } - } -} - -impl TransactionPool for MaintainableTransactionPool - where - Pool: TransactionPool, - Maintainer: Send + Sync, -{ - type Block = Pool::Block; - type Hash = Pool::Hash; - type InPoolTransaction = Pool::InPoolTransaction; - type Error = Pool::Error; - - fn submit_at( - &self, - at: &BlockId, - xts: impl IntoIterator> + 'static, - ) -> Box, Self::Error>>, Self::Error>> + Send + Unpin> { - self.pool.submit_at(at, xts) - } - - fn submit_one( - &self, - at: &BlockId, - xt: TransactionFor, - ) -> Box, Self::Error>> + Send + Unpin> { - self.pool.submit_one(at, xt) - } - - fn submit_and_watch( - &self, - at: &BlockId, - xt: TransactionFor, - ) -> Box>, Self::Error>> + Send + Unpin> { - self.pool.submit_and_watch(at, xt) - } - - fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { - self.pool.remove_invalid(hashes) - } - - fn status(&self) -> PoolStatus { - self.pool.status() - } - - fn ready(&self) -> Box>> { - self.pool.ready() - } - - fn import_notification_stream(&self) -> ImportNotificationStream { - self.pool.import_notification_stream() - } - - fn hash_of(&self, xt: &TransactionFor) -> TxHash { - self.pool.hash_of(xt) - } - - fn on_broadcasted(&self, propagations: HashMap, Vec>) { - self.pool.on_broadcasted(propagations) - } -} - -impl TransactionPoolMaintainer for MaintainableTransactionPool - where - Pool: Send + Sync, - Maintainer: TransactionPoolMaintainer -{ - type Block = Maintainer::Block; - type Hash = Maintainer::Hash; - - fn maintain( - &self, - id: &BlockId, - retracted: &[Self::Hash], - ) -> Box + Send + Unpin> { - self.maintainer.maintain(id, retracted) - } -} +} \ No newline at end of file