Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Refactor tx-pool maintenance and other high-level api #4629

Merged
merged 12 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions bin/node-template/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|config, client, _fetcher| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
Ok(pool)
})?
.with_import_queue(|_config, client, mut select_chain, transaction_pool| {
let select_chain = select_chain.take()
Expand Down Expand Up @@ -207,11 +205,12 @@ pub fn new_light<C: Send + Default + 'static>(config: Configuration<C, GenesisCo
.with_transaction_pool(|config, client, fetcher| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;

let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, pool_api, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
let fetch_checker = fetcher
Expand Down
24 changes: 8 additions & 16 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ macro_rules! new_full_start {
.with_transaction_pool(|config, client, _fetcher| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
Ok(pool)
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
Expand Down Expand Up @@ -272,15 +270,9 @@ type ConcreteClient =
#[allow(dead_code)]
type ConcreteBackend = Backend<ConcreteBlock>;
#[allow(dead_code)]
type ConcreteTransactionPool = sp_transaction_pool::MaintainableTransactionPool<
sc_transaction_pool::BasicPool<
sc_transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>,
ConcreteBlock
>,
sc_transaction_pool::FullBasicPoolMaintainer<
ConcreteClient,
sc_transaction_pool::FullChainApi<ConcreteClient, Block>
>
type ConcreteTransactionPool = sc_transaction_pool::BasicPool<
sc_transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>,
ConcreteBlock
>;

/// A specialized configuration object for setting up the node..
Expand Down Expand Up @@ -322,10 +314,10 @@ pub fn new_light<C: Send + Default + 'static>(config: NodeConfiguration<C>)
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher);
let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
Ok(maintainable_pool)
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, pool_api, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
let fetch_checker = fetcher
Expand Down
6 changes: 2 additions & 4 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use std::{
};
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer};
use sp_transaction_pool::MaintainedTransactionPool;
use sp_blockchain;
use grafana_data_source::{self, record_metrics};

Expand Down Expand Up @@ -740,9 +740,7 @@ ServiceBuilder<
TSc: Clone,
TImpQu: 'static + ImportQueue<TBl>,
TNetP: NetworkSpecialization<TBl>,
TExPool: 'static
+ TransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>
+ TransactionPoolMaintainer<Block=TBl, Hash = <TBl as BlockT>::Hash>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata> + Clone,
{

Expand Down
8 changes: 3 additions & 5 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use self::builder::{
};
pub use config::{Configuration, Roles, PruningMode};
pub use sc_chain_spec::{ChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension};
pub use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer, InPoolTransaction, error::IntoPoolError};
pub use sp_transaction_pool::{TransactionPool, InPoolTransaction, error::IntoPoolError};
pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions;
pub use sc_client::FinalityNotifications;
pub use sc_rpc::Metadata as RpcMetadata;
Expand Down Expand Up @@ -148,8 +148,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
/// Chain selection algorithm.
type SelectChain: sp_consensus::SelectChain<Self::Block>;
/// Transaction pool.
type TransactionPool: TransactionPool<Block = Self::Block>
+ TransactionPoolMaintainer<Block = Self::Block>;
type TransactionPool: TransactionPool<Block = Self::Block>;
/// Network specialization.
type NetworkSpecialization: NetworkSpecialization<Self::Block>;

Expand Down Expand Up @@ -213,8 +212,7 @@ where
TExec: 'static + sc_client::CallExecutor<TBl> + Send + Sync + Clone,
TRtApi: 'static + Send + Sync,
TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send + Unpin,
TExPool: 'static + TransactionPool<Block = TBl>
+ TransactionPoolMaintainer<Block = TBl>,
TExPool: 'static + TransactionPool<Block = TBl>,
TOc: 'static + Send + Sync,
TNetSpec: NetworkSpecialization<TBl>,
{
Expand Down
17 changes: 11 additions & 6 deletions client/transaction-pool/graph/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use criterion::{criterion_group, criterion_main, Criterion};

use futures::executor::block_on;
use futures::{future::{ready, Ready}, executor::block_on};
use sc_transaction_graph::*;
use sp_runtime::transaction_validity::{ValidTransaction, InvalidTransaction};
use codec::Encode;
Expand Down Expand Up @@ -49,7 +49,8 @@ impl ChainApi for TestApi {
type Block = Block;
type Hash = H256;
type Error = sp_transaction_pool::error::Error;
type ValidationFuture = futures::future::Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
type ValidationFuture = Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
type BodyFuture = Ready<sp_transaction_pool::error::Result<Option<Vec<Extrinsic>>>>;

fn validate_transaction(
&self,
Expand All @@ -61,14 +62,14 @@ impl ChainApi for TestApi {

match self.block_id_to_number(at) {
Ok(Some(num)) if num > 5 => {
return futures::future::ready(
return ready(
Ok(Err(InvalidTransaction::Stale.into()))
)
},
_ => {},
}

futures::future::ready(
ready(
Ok(Ok(ValidTransaction {
priority: 4,
requires: if nonce > 1 && self.nonce_dependant {
Expand Down Expand Up @@ -105,6 +106,10 @@ impl ChainApi for TestApi {
let encoded = uxt.encode();
(blake2_256(&encoded).into(), encoded.len())
}

fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(Ok(None))
}
}

fn uxt(transfer: Transfer) -> Extrinsic {
Expand Down Expand Up @@ -150,13 +155,13 @@ fn benchmark_main(c: &mut Criterion) {

c.bench_function("sequential 50 tx", |b| {
b.iter(|| {
bench_configured(Pool::new(Default::default(), TestApi::new_dependant()), 50);
bench_configured(Pool::new(Default::default(), TestApi::new_dependant().into()), 50);
});
});

c.bench_function("random 100 tx", |b| {
b.iter(|| {
bench_configured(Pool::new(Default::default(), TestApi::default()), 100);
bench_configured(Pool::new(Default::default(), TestApi::default().into()), 100);
});
});
}
Expand Down
22 changes: 16 additions & 6 deletions client/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub trait ChainApi: Send + Sync {
type Error: From<error::Error> + error::IntoPoolError;
/// Validate transaction future.
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
/// Body future (since block body might be remote)
type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>> + Unpin + Send + 'static;
svyatonik marked this conversation as resolved.
Show resolved Hide resolved

/// Verify extrinsic at given block.
fn validate_transaction(
Expand All @@ -84,6 +86,9 @@ pub trait ChainApi: Send + Sync {

/// Returns hash and encoding length of the extrinsic.
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize);

/// Returns a block body given the block id.
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
}

/// Pool configuration options.
Expand Down Expand Up @@ -120,7 +125,7 @@ pub struct Pool<B: ChainApi> {

impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: B) -> Self {
pub fn new(options: Options, api: Arc<B>) -> Self {
Pool {
validated_pool: Arc::new(ValidatedPool::new(options, api)),
}
Expand Down Expand Up @@ -488,6 +493,7 @@ mod tests {
type Hash = u64;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;

/// Verify extrinsic at given block.
fn validate_transaction(
Expand Down Expand Up @@ -560,14 +566,18 @@ mod tests {
len
)
}

fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
futures::future::ready(Ok(None))
}
}

fn uxt(transfer: Transfer) -> Extrinsic {
Extrinsic::Transfer(transfer, Default::default())
}

fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default())
Pool::new(Default::default(), TestApi::default().into())
}

#[test]
Expand Down Expand Up @@ -713,7 +723,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());

let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
Expand Down Expand Up @@ -748,7 +758,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());

// when
block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
Expand Down Expand Up @@ -924,7 +934,7 @@ mod tests {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
}, TestApi::default().into());

let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
Expand Down Expand Up @@ -958,7 +968,7 @@ mod tests {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let pool = Arc::new(Pool::new(Default::default(), api));
let pool = Arc::new(Pool::new(Default::default(), api.into()));

// when
let xt = uxt(Transfer {
Expand Down
4 changes: 2 additions & 2 deletions client/transaction-pool/graph/src/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub type ValidatedTransactionFor<B> = ValidatedTransaction<

/// Pool that deals with validated transactions.
pub(crate) struct ValidatedPool<B: ChainApi> {
api: B,
api: Arc<B>,
options: Options,
listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
pool: RwLock<base::BasePool<
Expand All @@ -76,7 +76,7 @@ pub(crate) struct ValidatedPool<B: ChainApi> {

impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: B) -> Self {
pub fn new(options: Options, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
ValidatedPool {
api,
Expand Down
42 changes: 39 additions & 3 deletions client/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
use std::{marker::PhantomData, pin::Pin, sync::Arc};
use codec::{Decode, Encode};
use futures::{
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready},
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready},
};

use sc_client_api::{
blockchain::HeaderBackend,
light::{Fetcher, RemoteCallRequest}
light::{Fetcher, RemoteCallRequest, RemoteBodyRequest},
BlockBody,
};
use sp_core::Hasher;
use sp_runtime::{
Expand Down Expand Up @@ -63,14 +64,19 @@ impl<Client, Block> FullChainApi<Client, Block> where

impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block> where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockIdTo<Block> + 'static + Send + Sync,
Client: ProvideRuntimeApi<Block> + BlockBody<Block> + BlockIdTo<Block> + 'static + Send + Sync,
Client::Api: TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send,
{
type Block = Block;
type Hash = Block::Hash;
type Error = error::Error;
type ValidationFuture = Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;

fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(self.client.block_body(&id).map_err(|e| error::Error::from(e)))
}

fn validate_transaction(
&self,
Expand Down Expand Up @@ -149,6 +155,7 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
type Hash = Block::Hash;
type Error = error::Error;
type ValidationFuture = Box<dyn Future<Output = error::Result<TransactionValidity>> + Send + Unpin>;
type BodyFuture = Pin<Box<dyn Future<Output = error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>> + Send>>;

fn validate_transaction(
&self,
Expand Down Expand Up @@ -197,4 +204,33 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
(<<Block::Header as HeaderT>::Hashing as HashT>::hash(x), x.len())
})
}

fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
let header = self.client.header(*id)
.and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id))));
let header = match header {
Ok(header) => header,
Err(err) => {
log::warn!(target: "txpool", "Failed to query header: {:?}", err);
return Box::pin(ready(Ok(None)));
}
};

let fetcher = self.fetcher.clone();
async move {
let transactions = fetcher.remote_body({
RemoteBodyRequest {
header,
retry_count: None,
}
})
.await
.unwrap_or_else(|e| {
log::warn!(target: "txpool", "Failed to fetch block body: {:?}", e);
Vec::new()
});

Ok(Some(transactions))
}.boxed()
}
}
Loading