Skip to content

Commit

Permalink
fatxpool: LocalTransactionPool implemented (paritytech#6104)
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk authored and mordamax committed Oct 29, 2024
1 parent 0ac17a6 commit b8aa32c
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 56 deletions.
10 changes: 10 additions & 0 deletions prdoc/pr_6104.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: "LocalTransactionPool implemented for fork aware transaction pool"

doc:
- audience: Node Dev
description: |
LocalTransactionPool trait is implemented for fork aware transaction pool.

crates:
- name: sc-transaction-pool
bump: minor
9 changes: 9 additions & 0 deletions substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ impl ChainApi for TestApi {
})))
}

fn validate_transaction_blocking(
&self,
_at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
) -> sc_transaction_pool_api::error::Result<TransactionValidity> {
unimplemented!();
}

fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
Expand Down
37 changes: 12 additions & 25 deletions substrate/client/transaction-pool/src/common/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ where
.boxed()
}

/// Validates a transaction by calling into the runtime.
///
/// Same as `validate_transaction` but blocks the current thread when performing validation.
fn validate_transaction_blocking(
&self,
at: Block::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
validate_transaction_blocking(&*self.client, at, source, uxt)
}

fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
Expand Down Expand Up @@ -272,28 +284,3 @@ where

result
}

impl<Client, Block> FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ BlockBackend<Block>
+ BlockIdTo<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
{
/// Validates a transaction by calling into the runtime, same as
/// `validate_transaction` but blocks the current thread when performing
/// validation. Only implemented for `FullChainApi` since we can call into
/// the runtime locally.
pub fn validate_transaction_blocking(
&self,
at: Block::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
validate_transaction_blocking(&*self.client, at, source, uxt)
}
}
9 changes: 9 additions & 0 deletions substrate/client/transaction-pool/src/common/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ impl ChainApi for TestApi {
futures::future::ready(Ok(res))
}

fn validate_transaction_blocking(
&self,
_at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
) -> error::Result<TransactionValidity> {
unimplemented!();
}

/// Returns a block number given the block id.
fn block_id_to_number(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ where
log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
let mempool_result = self.mempool.extend_unwatched(source, xts.clone());
let mempool_result = self.mempool.extend_unwatched(source, &xts);

if view_store.is_empty() {
return future::ready(Ok(mempool_result)).boxed()
Expand Down Expand Up @@ -838,16 +838,16 @@ where
fn submit_local(
&self,
_at: Block::Hash,
_xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error> {
//todo [#5493]
//looks like view_store / view needs non async submit_local method ?.
let e = Err(sc_transaction_pool_api::error::Error::Unactionable.into());
log::warn!(
target: LOG_TARGET,
"LocalTransactionPool::submit_local is not implemented for ForkAwareTxPool, returning error: {e:?}",
);
e
log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count());
let xt = Arc::from(xt);
let result = self
.mempool
.extend_unwatched(TransactionSource::Local, &[xt.clone()])
.remove(0)?;

self.view_store.submit_local(xt).or_else(|_| Ok(result))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,11 @@ where
pub(super) fn extend_unwatched(
&self,
source: TransactionSource,
xts: Vec<ExtrinsicFor<ChainApi>>,
xts: &[ExtrinsicFor<ChainApi>],
) -> Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>> {
let mut transactions = self.transactions.write();
let result = xts
.into_iter()
.iter()
.map(|xt| {
let hash = self.api.hash_and_length(&xt).0;
self.try_insert(
Expand Down Expand Up @@ -437,7 +437,7 @@ mod tx_mem_pool_tests {

let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();

let results = mempool.extend_unwatched(TransactionSource::External, xts);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().take(max).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
Expand All @@ -455,7 +455,7 @@ mod tx_mem_pool_tests {
let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
xts.push(xts.iter().last().unwrap().clone());

let results = mempool.extend_unwatched(TransactionSource::External, xts);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().take(max - 1).all(Result::is_ok));
assert!(matches!(
results.into_iter().last().unwrap().unwrap_err(),
Expand All @@ -471,7 +471,7 @@ mod tx_mem_pool_tests {

let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();

let results = mempool.extend_unwatched(TransactionSource::External, xts);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));

let xt = Arc::from(uxt(98));
Expand All @@ -481,7 +481,7 @@ mod tx_mem_pool_tests {
sc_transaction_pool_api::error::Error::ImmediatelyDropped
));
let xt = Arc::from(uxt(99));
let mut result = mempool.extend_unwatched(TransactionSource::External, vec![xt]);
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::ImmediatelyDropped
Expand All @@ -498,15 +498,15 @@ mod tx_mem_pool_tests {
let xt0 = xts.iter().last().unwrap().clone();
let xt1 = xts.iter().next().unwrap().clone();

let results = mempool.extend_unwatched(TransactionSource::External, xts);
let results = mempool.extend_unwatched(TransactionSource::External, &xts);
assert!(results.iter().all(Result::is_ok));

let result = mempool.push_watched(TransactionSource::External, xt0);
assert!(matches!(
result.unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
));
let mut result = mempool.extend_unwatched(TransactionSource::External, vec![xt1]);
let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt1]);
assert!(matches!(
result.pop().unwrap().unwrap_err(),
sc_transaction_pool_api::error::Error::AlreadyImported(_)
Expand All @@ -521,7 +521,7 @@ mod tx_mem_pool_tests {

let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();

let results = mempool.extend_unwatched(TransactionSource::External, xts0);
let results = mempool.extend_unwatched(TransactionSource::External, &xts0);
assert!(results.iter().all(Result::is_ok));

let xts1 = (0..5).map(|x| Arc::from(uxt(2 * x))).collect::<Vec<_>>();
Expand Down
53 changes: 48 additions & 5 deletions substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use crate::{
LOG_TARGET,
};
use parking_lot::Mutex;
use sc_transaction_pool_api::{PoolStatus, TransactionSource};
use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionSource};
use sp_blockchain::HashAndNumber;
use sp_runtime::{
traits::Block as BlockT, transaction_validity::TransactionValidityError, SaturatedConversion,
generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError,
SaturatedConversion,
};
use std::{collections::HashMap, sync::Arc, time::Instant};

Expand Down Expand Up @@ -178,6 +179,50 @@ where
self.pool.submit_and_watch(&self.at, source, xt).await
}

/// Synchronously imports single unvalidated extrinsics into the view.
pub(super) fn submit_local(
&self,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
let (hash, length) = self.pool.validated_pool().api().hash_and_length(&xt);
log::trace!(target: LOG_TARGET, "[{:?}] view::submit_local at:{}", hash, self.at.hash);

let validity = self
.pool
.validated_pool()
.api()
.validate_transaction_blocking(
self.at.hash,
TransactionSource::Local,
Arc::from(xt.clone()),
)?
.map_err(|e| {
match e {
TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
}
.into()
})?;

let block_number = self
.pool
.validated_pool()
.api()
.block_id_to_number(&BlockId::hash(self.at.hash))?
.ok_or_else(|| TxPoolError::InvalidBlockId(format!("{:?}", self.at.hash)))?;

let validated = ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash,
TransactionSource::Local,
Arc::from(xt),
length,
validity,
);

self.pool.validated_pool().submit(vec![validated]).remove(0)
}

/// Status of the pool associated with the view.
pub(super) fn status(&self) -> PoolStatus {
self.pool.validated_pool().status()
Expand Down Expand Up @@ -243,9 +288,7 @@ where
let validation_result = (api.validate_transaction(self.at.hash, tx.source, tx.data.clone()).await, tx.hash, tx);
validation_results.push(validation_result);
} else {
{
self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender());
}
self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender());
should_break = true;
}
} => {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
ReadyIteratorFor, LOG_TARGET,
};
use futures::prelude::*;
use itertools::Itertools;
use parking_lot::RwLock;
use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus, TransactionSource};
use sp_blockchain::TreeRoute;
Expand Down Expand Up @@ -110,6 +111,37 @@ where
HashMap::<_, _>::from_iter(results.into_iter())
}

/// Synchronously imports single unverified extrinsics into every active view.
pub(super) fn submit_local(
&self,
xt: ExtrinsicFor<ChainApi>,
) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
let active_views = self
.active_views
.read()
.iter()
.map(|(_, view)| view.clone())
.collect::<Vec<_>>();

let tx_hash = self.api.hash_and_length(&xt).0;

let result = active_views
.iter()
.map(|view| {
self.dropped_stream_controller
.add_initial_views(std::iter::once(tx_hash), view.at.hash);
view.submit_local(xt.clone())
})
.find_or_first(Result::is_ok);

if let Some(Err(err)) = result {
log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
return Err(err)
};

Ok(tx_hash)
}

/// Import a single extrinsic and starts to watch its progress in the pool.
///
/// The extrinsic is imported to every view, and the individual streams providing the progress
Expand Down Expand Up @@ -155,12 +187,8 @@ where
let maybe_error = futures::future::join_all(submit_and_watch_futures)
.await
.into_iter()
.reduce(|mut r, v| {
if r.is_err() && v.is_ok() {
r = v;
}
r
});
.find_or_first(Result::is_ok);

if let Some(Err(err)) = maybe_error {
log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err);
return Err((err, Some(external_watcher)));
Expand Down
13 changes: 12 additions & 1 deletion substrate/client/transaction-pool/src/graph/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,25 @@ pub trait ChainApi: Send + Sync {
+ Send
+ 'static;

/// Verify extrinsic at given block.
/// Asynchronously verify extrinsic at given block.
fn validate_transaction(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture;

/// Synchronously verify given extrinsic at given block.
///
/// Validates a transaction by calling into the runtime. Same as `validate_transaction` but
/// blocks the current thread when performing validation.
fn validate_transaction_blocking(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Result<TransactionValidity, Self::Error>;

/// Returns a block number given the block id.
fn block_id_to_number(
&self,
Expand Down
9 changes: 9 additions & 0 deletions substrate/test-utils/runtime/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,15 @@ impl ChainApi for TestApi {
ready(Ok(Ok(validity)))
}

fn validate_transaction_blocking(
&self,
_at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
) -> Result<TransactionValidity, Error> {
unimplemented!();
}

fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
Expand Down

0 comments on commit b8aa32c

Please sign in to comment.