Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Refactor tx-pool maintenance and other high-level api (#4629)
Browse files Browse the repository at this point in the history
* Reduction.

* Reformation.

* add locked timer stuff

* fix issues and introduce full pool

* arrange together

* fix benches

* fix new_light

* Add revalidation test case

* review fixes

* review fixes

* use just ready future

* address review
  • Loading branch information
NikVolf authored Jan 24, 2020
1 parent 219fad6 commit e5fed33
Show file tree
Hide file tree
Showing 12 changed files with 423 additions and 827 deletions.
13 changes: 6 additions & 7 deletions bin/node-template/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|config, client, _fetcher| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
Ok(pool)
})?
.with_import_queue(|_config, client, mut select_chain, transaction_pool| {
let select_chain = select_chain.take()
Expand Down Expand Up @@ -207,11 +205,12 @@ pub fn new_light<C: Send + Default + 'static>(config: Configuration<C, GenesisCo
.with_transaction_pool(|config, client, fetcher| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;

let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, pool_api, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
let fetch_checker = fetcher
Expand Down
24 changes: 8 additions & 16 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|config, client, _fetcher| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
Ok(pool)
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
Expand Down Expand Up @@ -272,15 +270,9 @@ type ConcreteClient =
#[allow(dead_code)]
type ConcreteBackend = Backend<ConcreteBlock>;
#[allow(dead_code)]
type ConcreteTransactionPool = sp_transaction_pool::MaintainableTransactionPool<
sc_transaction_pool::BasicPool<
sc_transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>,
ConcreteBlock
>,
sc_transaction_pool::FullBasicPoolMaintainer<
ConcreteClient,
sc_transaction_pool::FullChainApi<ConcreteClient, Block>
>
type ConcreteTransactionPool = sc_transaction_pool::BasicPool<
sc_transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>,
ConcreteBlock
>;

/// A specialized configuration object for setting up the node..
Expand Down Expand Up @@ -322,10 +314,10 @@ pub fn new_light<C: Send + Default + 'static>(config: NodeConfiguration<C>)
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, pool_api, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
let fetch_checker = fetcher
Expand Down
6 changes: 2 additions & 4 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use std::{
};
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer};
use sp_transaction_pool::MaintainedTransactionPool;
use sp_blockchain;
use grafana_data_source::{self, record_metrics};

Expand Down Expand Up @@ -740,9 +740,7 @@ ServiceBuilder<
TSc: Clone,
TImpQu: 'static + ImportQueue<TBl>,
TNetP: NetworkSpecialization<TBl>,
TExPool: 'static
+ TransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>
+ TransactionPoolMaintainer<Block=TBl, Hash = <TBl as BlockT>::Hash>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata> + Clone,
{

Expand Down
8 changes: 3 additions & 5 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use self::builder::{
};
pub use config::{Configuration, Roles, PruningMode};
pub use sc_chain_spec::{ChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension};
pub use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer, InPoolTransaction, error::IntoPoolError};
pub use sp_transaction_pool::{TransactionPool, InPoolTransaction, error::IntoPoolError};
pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions;
pub use sc_client::FinalityNotifications;
pub use sc_rpc::Metadata as RpcMetadata;
Expand Down Expand Up @@ -148,8 +148,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
/// Chain selection algorithm.
type SelectChain: sp_consensus::SelectChain<Self::Block>;
/// Transaction pool.
type TransactionPool: TransactionPool<Block = Self::Block>
+ TransactionPoolMaintainer<Block = Self::Block>;
type TransactionPool: TransactionPool<Block = Self::Block>;
/// Network specialization.
type NetworkSpecialization: NetworkSpecialization<Self::Block>;

Expand Down Expand Up @@ -213,8 +212,7 @@ where
TExec: 'static + sc_client::CallExecutor<TBl> + Send + Sync + Clone,
TRtApi: 'static + Send + Sync,
TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send + Unpin,
TExPool: 'static + TransactionPool<Block = TBl>
+ TransactionPoolMaintainer<Block = TBl>,
TExPool: 'static + TransactionPool<Block = TBl>,
TOc: 'static + Send + Sync,
TNetSpec: NetworkSpecialization<TBl>,
{
Expand Down
17 changes: 11 additions & 6 deletions client/transaction-pool/graph/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use criterion::{criterion_group, criterion_main, Criterion};

use futures::executor::block_on;
use futures::{future::{ready, Ready}, executor::block_on};
use sc_transaction_graph::*;
use sp_runtime::transaction_validity::{ValidTransaction, InvalidTransaction};
use codec::Encode;
Expand Down Expand Up @@ -49,7 +49,8 @@ impl ChainApi for TestApi {
type Block = Block;
type Hash = H256;
type Error = sp_transaction_pool::error::Error;
type ValidationFuture = futures::future::Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
type ValidationFuture = Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
type BodyFuture = Ready<sp_transaction_pool::error::Result<Option<Vec<Extrinsic>>>>;

fn validate_transaction(
&self,
Expand All @@ -61,14 +62,14 @@ impl ChainApi for TestApi {

match self.block_id_to_number(at) {
Ok(Some(num)) if num > 5 => {
return futures::future::ready(
return ready(
Ok(Err(InvalidTransaction::Stale.into()))
)
},
_ => {},
}

futures::future::ready(
ready(
Ok(Ok(ValidTransaction {
priority: 4,
requires: if nonce > 1 && self.nonce_dependant {
Expand Down Expand Up @@ -105,6 +106,10 @@ impl ChainApi for TestApi {
let encoded = uxt.encode();
(blake2_256(&encoded).into(), encoded.len())
}

fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(Ok(None))
}
}

fn uxt(transfer: Transfer) -> Extrinsic {
Expand Down Expand Up @@ -150,13 +155,13 @@ fn benchmark_main(c: &mut Criterion) {

c.bench_function("sequential 50 tx", |b| {
b.iter(|| {
bench_configured(Pool::new(Default::default(), TestApi::new_dependant()), 50);
bench_configured(Pool::new(Default::default(), TestApi::new_dependant().into()), 50);
});
});

c.bench_function("random 100 tx", |b| {
b.iter(|| {
bench_configured(Pool::new(Default::default(), TestApi::default()), 100);
bench_configured(Pool::new(Default::default(), TestApi::default().into()), 100);
});
});
}
Expand Down
22 changes: 16 additions & 6 deletions client/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub trait ChainApi: Send + Sync {
type Error: From<error::Error> + error::IntoPoolError;
/// Validate transaction future.
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
/// Body future (since block body might be remote)
type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>> + Unpin + Send + 'static;

/// Verify extrinsic at given block.
fn validate_transaction(
Expand All @@ -84,6 +86,9 @@ pub trait ChainApi: Send + Sync {

/// Returns hash and encoding length of the extrinsic.
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize);

/// Returns a block body given the block id.
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
}

/// Pool configuration options.
Expand Down Expand Up @@ -120,7 +125,7 @@ pub struct Pool<B: ChainApi> {

impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: B) -> Self {
pub fn new(options: Options, api: Arc<B>) -> Self {
Pool {
validated_pool: Arc::new(ValidatedPool::new(options, api)),
}
Expand Down Expand Up @@ -488,6 +493,7 @@ mod tests {
type Hash = u64;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;

/// Verify extrinsic at given block.
fn validate_transaction(
Expand Down Expand Up @@ -560,14 +566,18 @@ mod tests {
len
)
}

fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
futures::future::ready(Ok(None))
}
}

fn uxt(transfer: Transfer) -> Extrinsic {
Extrinsic::Transfer(transfer, Default::default())
}

fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default())
Pool::new(Default::default(), TestApi::default().into())
}

#[test]
Expand Down Expand Up @@ -713,7 +723,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());

let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
Expand Down Expand Up @@ -748,7 +758,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());

// when
block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
Expand Down Expand Up @@ -924,7 +934,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());

let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
Expand Down Expand Up @@ -958,7 +968,7 @@ mod tests {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let pool = Arc::new(Pool::new(Default::default(), api));
let pool = Arc::new(Pool::new(Default::default(), api.into()));

// when
let xt = uxt(Transfer {
Expand Down
4 changes: 2 additions & 2 deletions client/transaction-pool/graph/src/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub type ValidatedTransactionFor<B> = ValidatedTransaction<

/// Pool that deals with validated transactions.
pub(crate) struct ValidatedPool<B: ChainApi> {
api: B,
api: Arc<B>,
options: Options,
listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
pool: RwLock<base::BasePool<
Expand All @@ -76,7 +76,7 @@ pub(crate) struct ValidatedPool<B: ChainApi> {

impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: B) -> Self {
pub fn new(options: Options, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
ValidatedPool {
api,
Expand Down
42 changes: 39 additions & 3 deletions client/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
use std::{marker::PhantomData, pin::Pin, sync::Arc};
use codec::{Decode, Encode};
use futures::{
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready},
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready},
};

use sc_client_api::{
blockchain::HeaderBackend,
light::{Fetcher, RemoteCallRequest}
light::{Fetcher, RemoteCallRequest, RemoteBodyRequest},
BlockBody,
};
use sp_core::Hasher;
use sp_runtime::{
Expand Down Expand Up @@ -63,14 +64,19 @@ impl<Client, Block> FullChainApi<Client, Block> where

impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block> where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockIdTo<Block> + 'static + Send + Sync,
Client: ProvideRuntimeApi<Block> + BlockBody<Block> + BlockIdTo<Block> + 'static + Send + Sync,
Client::Api: TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send,
{
type Block = Block;
type Hash = Block::Hash;
type Error = error::Error;
type ValidationFuture = Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;

fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(self.client.block_body(&id).map_err(|e| error::Error::from(e)))
}

fn validate_transaction(
&self,
Expand Down Expand Up @@ -149,6 +155,7 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
type Hash = Block::Hash;
type Error = error::Error;
type ValidationFuture = Box<dyn Future<Output = error::Result<TransactionValidity>> + Send + Unpin>;
type BodyFuture = Pin<Box<dyn Future<Output = error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>> + Send>>;

fn validate_transaction(
&self,
Expand Down Expand Up @@ -197,4 +204,33 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
(<<Block::Header as HeaderT>::Hashing as HashT>::hash(x), x.len())
})
}

fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
let header = self.client.header(*id)
.and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id))));
let header = match header {
Ok(header) => header,
Err(err) => {
log::warn!(target: "txpool", "Failed to query header: {:?}", err);
return Box::pin(ready(Ok(None)));
}
};

let fetcher = self.fetcher.clone();
async move {
let transactions = fetcher.remote_body({
RemoteBodyRequest {
header,
retry_count: None,
}
})
.await
.unwrap_or_else(|e| {
log::warn!(target: "txpool", "Failed to fetch block body: {:?}", e);
Vec::new()
});

Ok(Some(transactions))
}.boxed()
}
}
Loading

0 comments on commit e5fed33

Please sign in to comment.