diff --git a/prdoc/pr_6104.prdoc b/prdoc/pr_6104.prdoc new file mode 100644 index 000000000000..2b62a68c9f0e --- /dev/null +++ b/prdoc/pr_6104.prdoc @@ -0,0 +1,10 @@ +title: "LocalTransactionPool implemented for fork aware transaction pool" + +doc: + - audience: Node Dev + description: | + LocalTransactionPool trait is implemented for fork aware transaction pool. + +crates: + - name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/benches/basics.rs b/substrate/client/transaction-pool/benches/basics.rs index 2db34bc3f32f..0d8c1cbba9b4 100644 --- a/substrate/client/transaction-pool/benches/basics.rs +++ b/substrate/client/transaction-pool/benches/basics.rs @@ -91,6 +91,15 @@ impl ChainApi for TestApi { }))) } + fn validate_transaction_blocking( + &self, + _at: ::Hash, + _source: TransactionSource, + _uxt: Arc<::Extrinsic>, + ) -> sc_transaction_pool_api::error::Result { + unimplemented!(); + } + fn block_id_to_number( &self, at: &BlockId, diff --git a/substrate/client/transaction-pool/src/common/api.rs b/substrate/client/transaction-pool/src/common/api.rs index a5185ba606ef..e16c0f2efa51 100644 --- a/substrate/client/transaction-pool/src/common/api.rs +++ b/substrate/client/transaction-pool/src/common/api.rs @@ -162,6 +162,18 @@ where .boxed() } + /// Validates a transaction by calling into the runtime. + /// + /// Same as `validate_transaction` but blocks the current thread when performing validation. + fn validate_transaction_blocking( + &self, + at: Block::Hash, + source: TransactionSource, + uxt: graph::ExtrinsicFor, + ) -> error::Result { + validate_transaction_blocking(&*self.client, at, source, uxt) + } + fn block_id_to_number( &self, at: &BlockId, @@ -272,28 +284,3 @@ where result } - -impl FullChainApi -where - Block: BlockT, - Client: ProvideRuntimeApi - + BlockBackend - + BlockIdTo - + HeaderBackend - + HeaderMetadata, - Client: Send + Sync + 'static, - Client::Api: TaggedTransactionQueue, -{ - /// Validates a transaction by calling into the runtime, same as - /// `validate_transaction` but blocks the current thread when performing - /// validation. Only implemented for `FullChainApi` since we can call into - /// the runtime locally. - pub fn validate_transaction_blocking( - &self, - at: Block::Hash, - source: TransactionSource, - uxt: graph::ExtrinsicFor, - ) -> error::Result { - validate_transaction_blocking(&*self.client, at, source, uxt) - } -} diff --git a/substrate/client/transaction-pool/src/common/tests.rs b/substrate/client/transaction-pool/src/common/tests.rs index 1cbabf8b5fde..b00cf5fbfede 100644 --- a/substrate/client/transaction-pool/src/common/tests.rs +++ b/substrate/client/transaction-pool/src/common/tests.rs @@ -156,6 +156,15 @@ impl ChainApi for TestApi { futures::future::ready(Ok(res)) } + fn validate_transaction_blocking( + &self, + _at: ::Hash, + _source: TransactionSource, + _uxt: Arc<::Extrinsic>, + ) -> error::Result { + unimplemented!(); + } + /// Returns a block number given the block id. fn block_id_to_number( &self, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 11e30bef7ea2..7e72b44adf38 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -599,7 +599,7 @@ where log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count()); log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at"); let xts = xts.into_iter().map(Arc::from).collect::>(); - let mempool_result = self.mempool.extend_unwatched(source, xts.clone()); + let mempool_result = self.mempool.extend_unwatched(source, &xts); if view_store.is_empty() { return future::ready(Ok(mempool_result)).boxed() @@ -838,16 +838,16 @@ where fn submit_local( &self, _at: Block::Hash, - _xt: sc_transaction_pool_api::LocalTransactionFor, + xt: sc_transaction_pool_api::LocalTransactionFor, ) -> Result { - //todo [#5493] - //looks like view_store / view needs non async submit_local method ?. - let e = Err(sc_transaction_pool_api::error::Error::Unactionable.into()); - log::warn!( - target: LOG_TARGET, - "LocalTransactionPool::submit_local is not implemented for ForkAwareTxPool, returning error: {e:?}", - ); - e + log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count()); + let xt = Arc::from(xt); + let result = self + .mempool + .extend_unwatched(TransactionSource::Local, &[xt.clone()]) + .remove(0)?; + + self.view_store.submit_local(xt).or_else(|_| Ok(result)) } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 86ea27dcf451..989c7e8ef356 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -237,11 +237,11 @@ where pub(super) fn extend_unwatched( &self, source: TransactionSource, - xts: Vec>, + xts: &[ExtrinsicFor], ) -> Vec, ChainApi::Error>> { let mut transactions = self.transactions.write(); let result = xts - .into_iter() + .iter() .map(|xt| { let hash = self.api.hash_and_length(&xt).0; self.try_insert( @@ -437,7 +437,7 @@ mod tx_mem_pool_tests { let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::>(); - let results = mempool.extend_unwatched(TransactionSource::External, xts); + let results = mempool.extend_unwatched(TransactionSource::External, &xts); assert!(results.iter().take(max).all(Result::is_ok)); assert!(matches!( results.into_iter().last().unwrap().unwrap_err(), @@ -455,7 +455,7 @@ mod tx_mem_pool_tests { let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::>(); xts.push(xts.iter().last().unwrap().clone()); - let results = mempool.extend_unwatched(TransactionSource::External, xts); + let results = mempool.extend_unwatched(TransactionSource::External, &xts); assert!(results.iter().take(max - 1).all(Result::is_ok)); assert!(matches!( results.into_iter().last().unwrap().unwrap_err(), @@ -471,7 +471,7 @@ mod tx_mem_pool_tests { let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::>(); - let results = mempool.extend_unwatched(TransactionSource::External, xts); + let results = mempool.extend_unwatched(TransactionSource::External, &xts); assert!(results.iter().all(Result::is_ok)); let xt = Arc::from(uxt(98)); @@ -481,7 +481,7 @@ mod tx_mem_pool_tests { sc_transaction_pool_api::error::Error::ImmediatelyDropped )); let xt = Arc::from(uxt(99)); - let mut result = mempool.extend_unwatched(TransactionSource::External, vec![xt]); + let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]); assert!(matches!( result.pop().unwrap().unwrap_err(), sc_transaction_pool_api::error::Error::ImmediatelyDropped @@ -498,7 +498,7 @@ mod tx_mem_pool_tests { let xt0 = xts.iter().last().unwrap().clone(); let xt1 = xts.iter().next().unwrap().clone(); - let results = mempool.extend_unwatched(TransactionSource::External, xts); + let results = mempool.extend_unwatched(TransactionSource::External, &xts); assert!(results.iter().all(Result::is_ok)); let result = mempool.push_watched(TransactionSource::External, xt0); @@ -506,7 +506,7 @@ mod tx_mem_pool_tests { result.unwrap_err(), sc_transaction_pool_api::error::Error::AlreadyImported(_) )); - let mut result = mempool.extend_unwatched(TransactionSource::External, vec![xt1]); + let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt1]); assert!(matches!( result.pop().unwrap().unwrap_err(), sc_transaction_pool_api::error::Error::AlreadyImported(_) @@ -521,7 +521,7 @@ mod tx_mem_pool_tests { let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::>(); - let results = mempool.extend_unwatched(TransactionSource::External, xts0); + let results = mempool.extend_unwatched(TransactionSource::External, &xts0); assert!(results.iter().all(Result::is_ok)); let xts1 = (0..5).map(|x| Arc::from(uxt(2 * x))).collect::>(); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs index fd5bfa8312c0..99095d88cb0a 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs @@ -33,10 +33,11 @@ use crate::{ LOG_TARGET, }; use parking_lot::Mutex; -use sc_transaction_pool_api::{PoolStatus, TransactionSource}; +use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionSource}; use sp_blockchain::HashAndNumber; use sp_runtime::{ - traits::Block as BlockT, transaction_validity::TransactionValidityError, SaturatedConversion, + generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError, + SaturatedConversion, }; use std::{collections::HashMap, sync::Arc, time::Instant}; @@ -178,6 +179,50 @@ where self.pool.submit_and_watch(&self.at, source, xt).await } + /// Synchronously imports single unvalidated extrinsics into the view. + pub(super) fn submit_local( + &self, + xt: ExtrinsicFor, + ) -> Result, ChainApi::Error> { + let (hash, length) = self.pool.validated_pool().api().hash_and_length(&xt); + log::trace!(target: LOG_TARGET, "[{:?}] view::submit_local at:{}", hash, self.at.hash); + + let validity = self + .pool + .validated_pool() + .api() + .validate_transaction_blocking( + self.at.hash, + TransactionSource::Local, + Arc::from(xt.clone()), + )? + .map_err(|e| { + match e { + TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i), + TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u), + } + .into() + })?; + + let block_number = self + .pool + .validated_pool() + .api() + .block_id_to_number(&BlockId::hash(self.at.hash))? + .ok_or_else(|| TxPoolError::InvalidBlockId(format!("{:?}", self.at.hash)))?; + + let validated = ValidatedTransaction::valid_at( + block_number.saturated_into::(), + hash, + TransactionSource::Local, + Arc::from(xt), + length, + validity, + ); + + self.pool.validated_pool().submit(vec![validated]).remove(0) + } + /// Status of the pool associated with the view. pub(super) fn status(&self) -> PoolStatus { self.pool.validated_pool().status() @@ -243,9 +288,7 @@ where let validation_result = (api.validate_transaction(self.at.hash, tx.source, tx.data.clone()).await, tx.hash, tx); validation_results.push(validation_result); } else { - { - self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender()); - } + self.revalidation_worker_channels.lock().as_mut().map(|ch| ch.remove_sender()); should_break = true; } } => {} diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index 953d6d860338..413fca223242 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -29,6 +29,7 @@ use crate::{ ReadyIteratorFor, LOG_TARGET, }; use futures::prelude::*; +use itertools::Itertools; use parking_lot::RwLock; use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus, TransactionSource}; use sp_blockchain::TreeRoute; @@ -110,6 +111,37 @@ where HashMap::<_, _>::from_iter(results.into_iter()) } + /// Synchronously imports single unverified extrinsics into every active view. + pub(super) fn submit_local( + &self, + xt: ExtrinsicFor, + ) -> Result, ChainApi::Error> { + let active_views = self + .active_views + .read() + .iter() + .map(|(_, view)| view.clone()) + .collect::>(); + + let tx_hash = self.api.hash_and_length(&xt).0; + + let result = active_views + .iter() + .map(|view| { + self.dropped_stream_controller + .add_initial_views(std::iter::once(tx_hash), view.at.hash); + view.submit_local(xt.clone()) + }) + .find_or_first(Result::is_ok); + + if let Some(Err(err)) = result { + log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err); + return Err(err) + }; + + Ok(tx_hash) + } + /// Import a single extrinsic and starts to watch its progress in the pool. /// /// The extrinsic is imported to every view, and the individual streams providing the progress @@ -155,12 +187,8 @@ where let maybe_error = futures::future::join_all(submit_and_watch_futures) .await .into_iter() - .reduce(|mut r, v| { - if r.is_err() && v.is_ok() { - r = v; - } - r - }); + .find_or_first(Result::is_ok); + if let Some(Err(err)) = maybe_error { log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err); return Err((err, Some(external_watcher))); diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs index 6d08a0f0b93c..2dd8de352c6b 100644 --- a/substrate/client/transaction-pool/src/graph/pool.rs +++ b/substrate/client/transaction-pool/src/graph/pool.rs @@ -73,7 +73,7 @@ pub trait ChainApi: Send + Sync { + Send + 'static; - /// Verify extrinsic at given block. + /// Asynchronously verify extrinsic at given block. fn validate_transaction( &self, at: ::Hash, @@ -81,6 +81,17 @@ pub trait ChainApi: Send + Sync { uxt: ExtrinsicFor, ) -> Self::ValidationFuture; + /// Synchronously verify given extrinsic at given block. + /// + /// Validates a transaction by calling into the runtime. Same as `validate_transaction` but + /// blocks the current thread when performing validation. + fn validate_transaction_blocking( + &self, + at: ::Hash, + source: TransactionSource, + uxt: ExtrinsicFor, + ) -> Result; + /// Returns a block number given the block id. fn block_id_to_number( &self, diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index 2d19dbfb6d49..6a4f38f63e82 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -450,6 +450,15 @@ impl ChainApi for TestApi { ready(Ok(Ok(validity))) } + fn validate_transaction_blocking( + &self, + _at: ::Hash, + _source: TransactionSource, + _uxt: Arc<::Extrinsic>, + ) -> Result { + unimplemented!(); + } + fn block_id_to_number( &self, at: &BlockId,