From 886d3ab4727a55c946189db3ee79c9a88223b0be Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 13:18:54 +0200 Subject: [PATCH 01/18] tx/tests: Move tests to dedicated module Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests/mod.rs | 20 +++++++++++++++++++ .../transaction_broadcast_tests.rs} | 17 +++++++++------- 2 files changed, 30 insertions(+), 7 deletions(-) create mode 100644 substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs rename substrate/client/rpc-spec-v2/src/transaction/{tests.rs => tests/transaction_broadcast_tests.rs} (91%) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs new file mode 100644 index 000000000000..bcb815fb9b22 --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs @@ -0,0 +1,20 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#[cfg(test)] +mod transaction_broadcast_tests; diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs similarity index 91% rename from substrate/client/rpc-spec-v2/src/transaction/tests.rs rename to substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 45477494768a..0abad67e56f4 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -16,10 +16,13 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use super::*; use crate::{ - chain_head::test_utils::ChainHeadMockClient, hex_string, - transaction::TransactionBroadcast as RpcTransactionBroadcast, + chain_head::test_utils::ChainHeadMockClient, + hex_string, + transaction::{ + api::TransactionBroadcastApiServer, error::json_rpc_spec, + TransactionBroadcast as RpcTransactionBroadcast, + }, }; use assert_matches::assert_matches; use codec::Encode; @@ -119,7 +122,7 @@ fn setup_api() -> ( Arc>, Arc>>, RpcModule< - TransactionBroadcast, ChainHeadMockClient>>, + RpcTransactionBroadcast, ChainHeadMockClient>>, >, TaskExecutorRecv, ) { @@ -194,7 +197,7 @@ async fn tx_broadcast_invalid_tx() { .await .unwrap_err(); assert_matches!(err, - Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params" + Error::Call(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params" ); assert_eq!(0, pool.status().ready); @@ -219,7 +222,7 @@ async fn tx_broadcast_invalid_tx() { .await .unwrap_err(); assert_matches!(err, - Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + Error::Call(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" ); } @@ -233,6 +236,6 @@ async fn tx_invalid_stop() { .await .unwrap_err(); assert_matches!(err, - Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + Error::Call(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" ); } From 52c3afb83b8071280292db749702be5bcbf9d58a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 13:36:26 +0200 Subject: [PATCH 02/18] tx/tests: Add mock transaction pool to gain access to tx status Signed-off-by: Alexandru Vasile --- .../src/transaction/tests/middleware_pool.rs | 186 ++++++++++++++++++ .../rpc-spec-v2/src/transaction/tests/mod.rs | 3 +- 2 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs new file mode 100644 index 000000000000..bfb42c67bfdf --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs @@ -0,0 +1,186 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use codec::Encode; +use futures::Future; +use sc_transaction_pool::BasicPool; +use sc_transaction_pool_api::{ + ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, + TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, +}; + +use crate::hex_string; +use futures::{FutureExt, StreamExt}; + +use sp_runtime::traits::{Block as BlockT, NumberFor}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; +use substrate_test_runtime_transaction_pool::TestApi; +use tokio::sync::mpsc; + +pub type Block = substrate_test_runtime_client::runtime::Block; + +pub type TxTestPool = MiddlewarePool; +pub type TxStatusType = sc_transaction_pool_api::TransactionStatus< + sc_transaction_pool_api::TxHash, + sc_transaction_pool_api::BlockHash, +>; +pub type TxStatusTypeTest = TxStatusType; + +/// The type of the event that the middleware captures. +#[derive(Debug, PartialEq)] +pub enum MiddlewarePoolEvent { + TransactionStatus { + transaction: String, + status: sc_transaction_pool_api::TransactionStatus< + ::Hash, + ::Hash, + >, + }, + PoolError { + transaction: String, + err: String, + }, +} + +/// Add a middleware to the transaction pool. +/// +/// This wraps the `submit_and_watch` to gain access to the events. +pub struct MiddlewarePool { + pub inner_pool: Arc>, + /// Send the middleware events to the test. + sender: mpsc::UnboundedSender, +} + +impl MiddlewarePool { + /// Construct a new [`MiddlewarePool`]. + pub fn new( + pool: Arc>, + ) -> (Self, mpsc::UnboundedReceiver) { + let (sender, recv) = mpsc::unbounded_channel(); + (MiddlewarePool { inner_pool: pool, sender }, recv) + } +} + +impl TransactionPool for MiddlewarePool { + type Block = as TransactionPool>::Block; + type Hash = as TransactionPool>::Hash; + type InPoolTransaction = as TransactionPool>::InPoolTransaction; + type Error = as TransactionPool>::Error; + + fn submit_at( + &self, + at: ::Hash, + source: TransactionSource, + xts: Vec>, + ) -> PoolFuture, Self::Error>>, Self::Error> { + self.inner_pool.submit_at(at, source, xts) + } + + fn submit_one( + &self, + at: ::Hash, + source: TransactionSource, + xt: TransactionFor, + ) -> PoolFuture, Self::Error> { + self.inner_pool.submit_one(at, source, xt) + } + + fn submit_and_watch( + &self, + at: ::Hash, + source: TransactionSource, + xt: TransactionFor, + ) -> PoolFuture>>, Self::Error> { + let pool = self.inner_pool.clone(); + let sender = self.sender.clone(); + let transaction = hex_string(&xt.encode()); + + async move { + let watcher = match pool.submit_and_watch(at, source, xt).await { + Ok(watcher) => watcher, + Err(err) => { + let _ = sender.send(MiddlewarePoolEvent::PoolError { + transaction: transaction.clone(), + err: err.to_string(), + }); + return Err(err); + }, + }; + + let watcher = watcher.map(move |status| { + let sender = sender.clone(); + let transaction = transaction.clone(); + + let _ = sender.send(MiddlewarePoolEvent::TransactionStatus { + transaction, + status: status.clone(), + }); + + status + }); + + Ok(watcher.boxed()) + } + .boxed() + } + + fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { + self.inner_pool.remove_invalid(hashes) + } + + fn status(&self) -> PoolStatus { + self.inner_pool.status() + } + + fn import_notification_stream(&self) -> ImportNotificationStream> { + self.inner_pool.import_notification_stream() + } + + fn hash_of(&self, xt: &TransactionFor) -> TxHash { + self.inner_pool.hash_of(xt) + } + + fn on_broadcasted(&self, propagations: HashMap, Vec>) { + self.inner_pool.on_broadcasted(propagations) + } + + fn ready_transaction(&self, hash: &TxHash) -> Option> { + self.inner_pool.ready_transaction(hash) + } + + fn ready_at( + &self, + at: NumberFor, + ) -> Pin< + Box< + dyn Future< + Output = Box> + Send>, + > + Send, + >, + > { + self.inner_pool.ready_at(at) + } + + fn ready(&self) -> Box> + Send> { + self.inner_pool.ready() + } + + fn futures(&self) -> Vec { + self.inner_pool.futures() + } +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs index bcb815fb9b22..11e87f65ee9d 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs @@ -16,5 +16,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -#[cfg(test)] +mod middleware_pool; + mod transaction_broadcast_tests; From 58b3f5e6d3a3b80420892d79682ebd8c973d9d8c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 13:41:53 +0200 Subject: [PATCH 03/18] tx/tests: Add task executor mock Signed-off-by: Alexandru Vasile --- .../src/transaction/tests/executor.rs | 72 +++++++++++++++++++ .../rpc-spec-v2/src/transaction/tests/mod.rs | 1 + 2 files changed, 73 insertions(+) create mode 100644 substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs new file mode 100644 index 000000000000..a0677b73f9c9 --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs @@ -0,0 +1,72 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use sp_core::{testing::TaskExecutor, traits::SpawnNamed}; +use tokio::sync::mpsc; + +/// Wrap the `TaskExecutor` to know when the broadcast future is dropped. +#[derive(Clone)] +pub struct TaskExecutorBroadcast { + executor: TaskExecutor, + sender: mpsc::UnboundedSender<()>, +} + +/// The channel that receives events when the broadcast futures are dropped. +pub type TaskExecutorRecv = mpsc::UnboundedReceiver<()>; + +impl TaskExecutorBroadcast { + /// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures + /// are dropped. + pub fn new() -> (Self, TaskExecutorRecv) { + let (sender, recv) = mpsc::unbounded_channel(); + + (Self { executor: TaskExecutor::new(), sender }, recv) + } +} + +impl SpawnNamed for TaskExecutorBroadcast { + fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + let sender = self.sender.clone(); + let future = Box::pin(async move { + future.await; + let _ = sender.send(()); + }); + + self.executor.spawn(name, group, future) + } + + fn spawn_blocking( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + let sender = self.sender.clone(); + let future = Box::pin(async move { + future.await; + let _ = sender.send(()); + }); + + self.executor.spawn_blocking(name, group, future) + } +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs index 11e87f65ee9d..ba2cb3f2a9ce 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs @@ -16,6 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +mod executor; mod middleware_pool; mod transaction_broadcast_tests; From 8fc233916f077df34cf397f67846ced4e1861996 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 13:50:32 +0200 Subject: [PATCH 04/18] tx/tests: Add setup file to use middlewares Signed-off-by: Alexandru Vasile --- .../src/transaction/tests/middleware_pool.rs | 7 +- .../rpc-spec-v2/src/transaction/tests/mod.rs | 1 + .../src/transaction/tests/setup.rs | 88 ++++++++++++ .../tests/transaction_broadcast_tests.rs | 135 ++---------------- 4 files changed, 103 insertions(+), 128 deletions(-) create mode 100644 substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs index bfb42c67bfdf..aa8ac572dec9 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs @@ -57,6 +57,9 @@ pub enum MiddlewarePoolEvent { }, } +/// The channel that receives events when the broadcast futures are dropped. +pub type MiddlewarePoolRecv = mpsc::UnboundedReceiver; + /// Add a middleware to the transaction pool. /// /// This wraps the `submit_and_watch` to gain access to the events. @@ -68,9 +71,7 @@ pub struct MiddlewarePool { impl MiddlewarePool { /// Construct a new [`MiddlewarePool`]. - pub fn new( - pool: Arc>, - ) -> (Self, mpsc::UnboundedReceiver) { + pub fn new(pool: Arc>) -> (Self, MiddlewarePoolRecv) { let (sender, recv) = mpsc::unbounded_channel(); (MiddlewarePool { inner_pool: pool, sender }, recv) } diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs index ba2cb3f2a9ce..535d25b128ad 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs @@ -18,5 +18,6 @@ mod executor; mod middleware_pool; +mod setup; mod transaction_broadcast_tests; diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs new file mode 100644 index 000000000000..d0acdf7380ce --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -0,0 +1,88 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{ + chain_head::test_utils::ChainHeadMockClient, + transaction::{ + api::TransactionBroadcastApiServer, + tests::executor::{TaskExecutorBroadcast, TaskExecutorRecv}, + TransactionBroadcast as RpcTransactionBroadcast, + }, +}; +use futures::Future; +use jsonrpsee::RpcModule; +use sc_transaction_pool::*; +use std::{pin::Pin, sync::Arc}; +use substrate_test_runtime_client::{prelude::*, Client}; +use substrate_test_runtime_transaction_pool::TestApi; + +use crate::transaction::tests::middleware_pool::{MiddlewarePool, MiddlewarePoolRecv}; + +pub type Block = substrate_test_runtime_client::runtime::Block; + +/// Initial Alice account nonce. +pub const ALICE_NONCE: u64 = 209; + +fn create_basic_pool_with_genesis( + test_api: Arc, +) -> (BasicPool, Pin + Send>>) { + let genesis_hash = { + test_api + .chain() + .read() + .block_by_number + .get(&0) + .map(|blocks| blocks[0].0.header.hash()) + .expect("there is block 0. qed") + }; + BasicPool::new_test(test_api, genesis_hash, genesis_hash) +} + +fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { + let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE)); + let (pool, background_task) = create_basic_pool_with_genesis(api.clone()); + + let thread_pool = futures::executor::ThreadPool::new().unwrap(); + thread_pool.spawn_ok(background_task); + (pool, api, thread_pool) +} + +pub fn setup_api() -> ( + Arc, + Arc, + Arc>>, + RpcModule>>>, + TaskExecutorRecv, + MiddlewarePoolRecv, +) { + let (pool, api, _) = maintained_pool(); + let (pool, pool_recv) = MiddlewarePool::new(Arc::new(pool).clone()); + let pool = Arc::new(pool); + + let builder = TestClientBuilder::new(); + let client = Arc::new(builder.build()); + let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); + + let (task_executor, executor_recv) = TaskExecutorBroadcast::new(); + + let tx_api = + RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)) + .into_rpc(); + + (api, pool, client_mock, tx_api, executor_recv, pool_recv) +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 0abad67e56f4..680a0455aba3 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -16,135 +16,20 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{ - chain_head::test_utils::ChainHeadMockClient, - hex_string, - transaction::{ - api::TransactionBroadcastApiServer, error::json_rpc_spec, - TransactionBroadcast as RpcTransactionBroadcast, - }, -}; +use crate::{hex_string, transaction::error::json_rpc_spec}; use assert_matches::assert_matches; use codec::Encode; -use futures::Future; -use jsonrpsee::{core::error::Error, rpc_params, RpcModule}; -use sc_transaction_pool::*; +use jsonrpsee::{core::error::Error, rpc_params}; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; -use sp_core::{testing::TaskExecutor, traits::SpawnNamed}; -use std::{pin::Pin, sync::Arc, time::Duration}; -use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client}; -use substrate_test_runtime_transaction_pool::{uxt, TestApi}; -use tokio::sync::mpsc; - -type Block = substrate_test_runtime_client::runtime::Block; - -/// Wrap the `TaskExecutor` to know when the broadcast future is dropped. -#[derive(Clone)] -struct TaskExecutorBroadcast { - executor: TaskExecutor, - sender: mpsc::UnboundedSender<()>, -} - -/// The channel that receives events when the broadcast futures are dropped. -type TaskExecutorRecv = mpsc::UnboundedReceiver<()>; - -impl TaskExecutorBroadcast { - /// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures - /// are dropped. - fn new() -> (Self, TaskExecutorRecv) { - let (sender, recv) = mpsc::unbounded_channel(); +use std::time::Duration; +use substrate_test_runtime_client::AccountKeyring::*; +use substrate_test_runtime_transaction_pool::uxt; - (Self { executor: TaskExecutor::new(), sender }, recv) - } -} - -impl SpawnNamed for TaskExecutorBroadcast { - fn spawn( - &self, - name: &'static str, - group: Option<&'static str>, - future: futures::future::BoxFuture<'static, ()>, - ) { - let sender = self.sender.clone(); - let future = Box::pin(async move { - future.await; - let _ = sender.send(()); - }); - - self.executor.spawn(name, group, future) - } - - fn spawn_blocking( - &self, - name: &'static str, - group: Option<&'static str>, - future: futures::future::BoxFuture<'static, ()>, - ) { - let sender = self.sender.clone(); - let future = Box::pin(async move { - future.await; - let _ = sender.send(()); - }); - - self.executor.spawn_blocking(name, group, future) - } -} - -/// Initial Alice account nonce. -const ALICE_NONCE: u64 = 209; - -fn create_basic_pool_with_genesis( - test_api: Arc, -) -> (BasicPool, Pin + Send>>) { - let genesis_hash = { - test_api - .chain() - .read() - .block_by_number - .get(&0) - .map(|blocks| blocks[0].0.header.hash()) - .expect("there is block 0. qed") - }; - BasicPool::new_test(test_api, genesis_hash, genesis_hash) -} - -fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { - let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE)); - let (pool, background_task) = create_basic_pool_with_genesis(api.clone()); - - let thread_pool = futures::executor::ThreadPool::new().unwrap(); - thread_pool.spawn_ok(background_task); - (pool, api, thread_pool) -} - -fn setup_api() -> ( - Arc, - Arc>, - Arc>>, - RpcModule< - RpcTransactionBroadcast, ChainHeadMockClient>>, - >, - TaskExecutorRecv, -) { - let (pool, api, _) = maintained_pool(); - let pool = Arc::new(pool); - - let builder = TestClientBuilder::new(); - let client = Arc::new(builder.build()); - let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); - - let (task_executor, executor_recv) = TaskExecutorBroadcast::new(); - - let tx_api = - RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)) - .into_rpc(); - - (api, pool, client_mock, tx_api, executor_recv) -} +use crate::transaction::tests::setup::{setup_api, ALICE_NONCE}; #[tokio::test] async fn tx_broadcast_enters_pool() { - let (api, pool, client_mock, tx_api, _) = setup_api(); + let (api, pool, client_mock, tx_api, _, _) = setup_api(); // Start at block 1. let block_1_header = api.push_block(1, vec![], true); @@ -176,7 +61,7 @@ async fn tx_broadcast_enters_pool() { // Announce block 2 to the pool. let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None }; - pool.maintain(event).await; + pool.inner_pool.maintain(event).await; assert_eq!(0, pool.status().ready); @@ -189,7 +74,7 @@ async fn tx_broadcast_enters_pool() { #[tokio::test] async fn tx_broadcast_invalid_tx() { - let (_, pool, _, tx_api, mut exec_recv) = setup_api(); + let (_, pool, _, tx_api, mut exec_recv, _) = setup_api(); // Invalid parameters. let err = tx_api @@ -228,7 +113,7 @@ async fn tx_broadcast_invalid_tx() { #[tokio::test] async fn tx_invalid_stop() { - let (_, _, _, tx_api, _) = setup_api(); + let (_, _, _, tx_api, _, _) = setup_api(); // Make an invalid stop call. let err = tx_api From 9a543865ffad47c4f9cbf4fc35271795e96164ca Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 13:59:29 +0200 Subject: [PATCH 05/18] tx/tests: Use the testing infrastructure Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests/mod.rs | 1 + .../src/transaction/tests/setup.rs | 10 ++++ .../tests/transaction_broadcast_tests.rs | 50 ++++++++++++------- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs index 535d25b128ad..ab0caaf906fd 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/mod.rs @@ -18,6 +18,7 @@ mod executor; mod middleware_pool; +#[macro_use] mod setup; mod transaction_broadcast_tests; diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index d0acdf7380ce..a38fac315e2e 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -86,3 +86,13 @@ pub fn setup_api() -> ( (api, pool, client_mock, tx_api, executor_recv, pool_recv) } + +/// Get the next event from the provided middleware in at most 60 seconds. +macro_rules! get_next_event { + ($middleware:expr) => { + tokio::time::timeout(std::time::Duration::from_secs(60), $middleware.recv()) + .await + .unwrap() + .unwrap() + }; +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 680a0455aba3..8f5d689527ce 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -21,15 +21,18 @@ use assert_matches::assert_matches; use codec::Encode; use jsonrpsee::{core::error::Error, rpc_params}; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; -use std::time::Duration; use substrate_test_runtime_client::AccountKeyring::*; use substrate_test_runtime_transaction_pool::uxt; -use crate::transaction::tests::setup::{setup_api, ALICE_NONCE}; +// Test helpers. +use crate::transaction::tests::{ + middleware_pool::{MiddlewarePoolEvent, TxStatusTypeTest}, + setup::{setup_api, ALICE_NONCE}, +}; #[tokio::test] async fn tx_broadcast_enters_pool() { - let (api, pool, client_mock, tx_api, _, _) = setup_api(); + let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = setup_api(); // Start at block 1. let block_1_header = api.push_block(1, vec![], true); @@ -44,16 +47,17 @@ async fn tx_broadcast_enters_pool() { client_mock.trigger_import_stream(block_1_header).await; // Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool. + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::Ready + } + ); - // TODO: Improve testability by extending the `transaction_unstable_broadcast` with - // a middleware trait that intercepts the transaction status for testing. - let mut num_retries = 12; - while num_retries > 0 && pool.status().ready != 1 { - tokio::time::sleep(Duration::from_secs(5)).await; - num_retries -= 1; - } - assert_eq!(1, pool.status().ready); - assert_eq!(uxt.encode().len(), pool.status().ready_bytes); + assert_eq!(1, pool.inner_pool.status().ready); + assert_eq!(uxt.encode().len(), pool.inner_pool.status().ready_bytes); // Import block 2 with the transaction included. let block_2_header = api.push_block(2, vec![uxt.clone()], true); @@ -62,19 +66,31 @@ async fn tx_broadcast_enters_pool() { // Announce block 2 to the pool. let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None }; pool.inner_pool.maintain(event).await; + assert_eq!(0, pool.inner_pool.status().ready); + + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::InBlock((block_2, 0)) + } + ); - assert_eq!(0, pool.status().ready); - - // Stop call can still be made. + // The future broadcast awaits for the finalized status to be reached. + // Force the future to exit by calling stop. let _: () = tx_api .call("transaction_unstable_stop", rpc_params![&operation_id]) .await .unwrap(); + + // Ensure the broadcast future finishes. + let _ = get_next_event!(&mut exec_middleware); } #[tokio::test] async fn tx_broadcast_invalid_tx() { - let (_, pool, _, tx_api, mut exec_recv, _) = setup_api(); + let (_, pool, _, tx_api, mut exec_middleware, _) = setup_api(); // Invalid parameters. let err = tx_api @@ -97,7 +113,7 @@ async fn tx_broadcast_invalid_tx() { // Await the broadcast future to exit. // Without this we'd be subject to races, where we try to call the stop before the tx is // dropped. - exec_recv.recv().await.unwrap(); + let _ = get_next_event!(&mut exec_middleware); // The broadcast future was dropped, and the operation is no longer active. // When the operation is not active, either from the tx being finalized or a From d98f713e03f30682a6dc9fcf4beebcc61a25bb37 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 14:06:38 +0200 Subject: [PATCH 06/18] tx/tests: Ensure a future tx is propagated Signed-off-by: Alexandru Vasile --- .../src/transaction/tests/setup.rs | 17 ++++ .../tests/transaction_broadcast_tests.rs | 85 +++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index a38fac315e2e..857c68fa4765 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -96,3 +96,20 @@ macro_rules! get_next_event { .unwrap() }; } + +/// Collect the next number of transaction events from the provided middleware. +macro_rules! get_next_tx_events { + ($middleware:expr, $num:expr) => {{ + let mut events = std::collections::HashMap::new(); + for _ in 0..$num { + let event = get_next_event!($middleware); + match event { + crate::transaction::tests::middleware_pool::MiddlewarePoolEvent::TransactionStatus { transaction, status } => { + events.insert(transaction, status); + }, + _ => panic!("Expected TransactionStatus"), + }; + } + events + }}; +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 8f5d689527ce..30bbc64eb441 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -21,6 +21,7 @@ use assert_matches::assert_matches; use codec::Encode; use jsonrpsee::{core::error::Error, rpc_params}; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; +use std::sync::Arc; use substrate_test_runtime_client::AccountKeyring::*; use substrate_test_runtime_transaction_pool::uxt; @@ -140,3 +141,87 @@ async fn tx_invalid_stop() { Error::Call(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" ); } + +#[tokio::test] +async fn tx_broadcast_resubmits_future_nonce_tx() { + let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = setup_api(); + + // Start at block 1. + let block_1_header = api.push_block(1, vec![], true); + let block_1 = block_1_header.hash(); + + let current_uxt = uxt(Alice, ALICE_NONCE); + let current_xt = hex_string(¤t_uxt.encode()); + // This lives in the future. + let future_uxt = uxt(Alice, ALICE_NONCE + 1); + let future_xt = hex_string(&future_uxt.encode()); + + let future_operation_id: String = tx_api + .call("transaction_unstable_broadcast", rpc_params![&future_xt]) + .await + .unwrap(); + + // Announce block 1 to `transaction_unstable_broadcast`. + client_mock.trigger_import_stream(block_1_header).await; + + // Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool. + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: future_xt.clone(), + status: TxStatusTypeTest::Future + } + ); + + let event = ChainEvent::NewBestBlock { hash: block_1, tree_route: None }; + pool.inner_pool.maintain(event).await; + assert_eq!(0, pool.inner_pool.status().ready); + // Ensure the tx is in the future. + assert_eq!(1, pool.inner_pool.status().future); + + let block_2_header = api.push_block(2, vec![], true); + let block_2 = block_2_header.hash(); + + let operation_id: String = tx_api + .call("transaction_unstable_broadcast", rpc_params![¤t_xt]) + .await + .unwrap(); + assert_ne!(future_operation_id, operation_id); + + // Announce block 2 to `transaction_unstable_broadcast`. + client_mock.trigger_import_stream(block_2_header).await; + + // Collect the events of both transactions. + let events = get_next_tx_events!(&mut pool_middleware, 2); + // Transactions entered the ready queue. + assert_eq!(events.get(¤t_xt).unwrap(), &TxStatusTypeTest::Ready); + assert_eq!(events.get(&future_xt).unwrap(), &TxStatusTypeTest::Ready); + + let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None }; + pool.inner_pool.maintain(event).await; + assert_eq!(2, pool.inner_pool.status().ready); + assert_eq!(0, pool.inner_pool.status().future); + + // Finalize transactions. + let block_3_header = api.push_block(3, vec![current_uxt, future_uxt], true); + let block_3 = block_3_header.hash(); + client_mock.trigger_import_stream(block_3_header).await; + + let event = ChainEvent::Finalized { hash: block_3, tree_route: Arc::from(vec![]) }; + pool.inner_pool.maintain(event).await; + assert_eq!(0, pool.inner_pool.status().ready); + assert_eq!(0, pool.inner_pool.status().future); + + let events = get_next_tx_events!(&mut pool_middleware, 2); + assert_eq!(events.get(¤t_xt).unwrap(), &TxStatusTypeTest::InBlock((block_3, 0))); + assert_eq!(events.get(&future_xt).unwrap(), &TxStatusTypeTest::InBlock((block_3, 1))); + + let events = get_next_tx_events!(&mut pool_middleware, 2); + assert_eq!(events.get(¤t_xt).unwrap(), &TxStatusTypeTest::Finalized((block_3, 0))); + assert_eq!(events.get(&future_xt).unwrap(), &TxStatusTypeTest::Finalized((block_3, 1))); + + // Both broadcast futures must exit. + let _ = get_next_event!(&mut exec_middleware); + let _ = get_next_event!(&mut exec_middleware); +} From c93a386e3367d07fb8ccccd3a4d3bba231919b47 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 14:10:15 +0200 Subject: [PATCH 07/18] tx/tests: Check stop cannot be called after broadcast finishes Signed-off-by: Alexandru Vasile --- .../tests/transaction_broadcast_tests.rs | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 30bbc64eb441..d7146b2897c6 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -225,3 +225,78 @@ async fn tx_broadcast_resubmits_future_nonce_tx() { let _ = get_next_event!(&mut exec_middleware); let _ = get_next_event!(&mut exec_middleware); } + +/// This test is similar to `tx_broadcast_enters_pool` +/// However the last block is announced as finalized to force the +/// broadcast future to exit before the `stop` is called. +#[tokio::test] +async fn tx_broadcast_stop_after_broadcast_finishes() { + let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = setup_api(); + + // Start at block 1. + let block_1_header = api.push_block(1, vec![], true); + + let uxt = uxt(Alice, ALICE_NONCE); + let xt = hex_string(&uxt.encode()); + + let operation_id: String = + tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + + // Announce block 1 to `transaction_unstable_broadcast`. + client_mock.trigger_import_stream(block_1_header).await; + + // Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction + // pool.inner_pool. + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::Ready + } + ); + + assert_eq!(1, pool.inner_pool.status().ready); + assert_eq!(uxt.encode().len(), pool.inner_pool.status().ready_bytes); + + // Import block 2 with the transaction included. + let block_2_header = api.push_block(2, vec![uxt.clone()], true); + let block_2 = block_2_header.hash(); + + // Announce block 2 to the pool.inner_pool. + let event = ChainEvent::Finalized { hash: block_2, tree_route: Arc::from(vec![]) }; + pool.inner_pool.maintain(event).await; + + assert_eq!(0, pool.inner_pool.status().ready); + + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::InBlock((block_2, 0)) + } + ); + + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::Finalized((block_2, 0)) + } + ); + + // Ensure the broadcast future terminated properly. + let _ = get_next_event!(&mut exec_middleware); + + // The operation ID is no longer valid, check that the broadcast future + // cleared out the inner state of the operation. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + ); +} From 274b47a1c2e7a6961f480ca8f4a6379f66b37c64 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 14:12:05 +0200 Subject: [PATCH 08/18] tx/tests: Fix race between InBlock and Finalized events Signed-off-by: Alexandru Vasile --- .../src/transaction/tests/setup.rs | 2 +- .../tests/transaction_broadcast_tests.rs | 20 ++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index 857c68fa4765..666555104534 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -105,7 +105,7 @@ macro_rules! get_next_tx_events { let event = get_next_event!($middleware); match event { crate::transaction::tests::middleware_pool::MiddlewarePoolEvent::TransactionStatus { transaction, status } => { - events.insert(transaction, status); + events.entry(transaction).or_insert_with(|| vec![]).push(status); }, _ => panic!("Expected TransactionStatus"), }; diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index d7146b2897c6..95a277b97b1c 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -195,8 +195,8 @@ async fn tx_broadcast_resubmits_future_nonce_tx() { // Collect the events of both transactions. let events = get_next_tx_events!(&mut pool_middleware, 2); // Transactions entered the ready queue. - assert_eq!(events.get(¤t_xt).unwrap(), &TxStatusTypeTest::Ready); - assert_eq!(events.get(&future_xt).unwrap(), &TxStatusTypeTest::Ready); + assert_eq!(events.get(¤t_xt).unwrap(), &vec![TxStatusTypeTest::Ready]); + assert_eq!(events.get(&future_xt).unwrap(), &vec![TxStatusTypeTest::Ready]); let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None }; pool.inner_pool.maintain(event).await; @@ -213,13 +213,15 @@ async fn tx_broadcast_resubmits_future_nonce_tx() { assert_eq!(0, pool.inner_pool.status().ready); assert_eq!(0, pool.inner_pool.status().future); - let events = get_next_tx_events!(&mut pool_middleware, 2); - assert_eq!(events.get(¤t_xt).unwrap(), &TxStatusTypeTest::InBlock((block_3, 0))); - assert_eq!(events.get(&future_xt).unwrap(), &TxStatusTypeTest::InBlock((block_3, 1))); - - let events = get_next_tx_events!(&mut pool_middleware, 2); - assert_eq!(events.get(¤t_xt).unwrap(), &TxStatusTypeTest::Finalized((block_3, 0))); - assert_eq!(events.get(&future_xt).unwrap(), &TxStatusTypeTest::Finalized((block_3, 1))); + let events = get_next_tx_events!(&mut pool_middleware, 4); + assert_eq!( + events.get(¤t_xt).unwrap(), + &vec![TxStatusTypeTest::InBlock((block_3, 0)), TxStatusTypeTest::Finalized((block_3, 0))] + ); + assert_eq!( + events.get(&future_xt).unwrap(), + &vec![TxStatusTypeTest::InBlock((block_3, 1)), TxStatusTypeTest::Finalized((block_3, 1))] + ); // Both broadcast futures must exit. let _ = get_next_event!(&mut exec_middleware); From 94bae37ac9875f545a47b44bb8fbdf3a6ce1f12a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 14:57:32 +0200 Subject: [PATCH 09/18] tx-pool: Extend basic pool with options Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs | 2 +- substrate/client/transaction-pool/src/lib.rs | 3 ++- substrate/client/transaction-pool/tests/pool.rs | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index 666555104534..8e25696fd6a6 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -50,7 +50,7 @@ fn create_basic_pool_with_genesis( .map(|blocks| blocks[0].0.header.hash()) .expect("there is block 0. qed") }; - BasicPool::new_test(test_api, genesis_hash, genesis_hash) + BasicPool::new_test(test_api, genesis_hash, genesis_hash, Default::default()) } fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index faa3f455a580..64b301e6bf36 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -164,8 +164,9 @@ where pool_api: Arc, best_block_hash: Block::Hash, finalized_hash: Block::Hash, + options: graph::Options, ) -> (Self, Pin + Send>>) { - let pool = Arc::new(graph::Pool::new(Default::default(), true.into(), pool_api.clone())); + let pool = Arc::new(graph::Pool::new(options, true.into(), pool_api.clone())); let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background( pool_api.clone(), pool.clone(), diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs index 6b1a197440c1..461b9860d414 100644 --- a/substrate/client/transaction-pool/tests/pool.rs +++ b/substrate/client/transaction-pool/tests/pool.rs @@ -73,7 +73,7 @@ fn create_basic_pool_with_genesis( .map(|blocks| blocks[0].0.header.hash()) .expect("there is block 0. qed") }; - BasicPool::new_test(test_api, genesis_hash, genesis_hash) + BasicPool::new_test(test_api, genesis_hash, genesis_hash, Default::default()) } fn create_basic_pool(test_api: TestApi) -> BasicPool { @@ -994,6 +994,7 @@ fn import_notification_to_pool_maintain_works() { )), best_hash, finalized_hash, + Default::default(), ) .0, ); From 233f789331d8a8c1f9ecb079e50fa2267e86bdbd Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 15:01:16 +0200 Subject: [PATCH 10/18] tx/tests: Use options as default Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/transaction/tests/setup.rs | 15 ++++++++++----- .../tests/transaction_broadcast_tests.rs | 13 ++++++++----- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index 8e25696fd6a6..e193c84aceba 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -40,6 +40,7 @@ pub const ALICE_NONCE: u64 = 209; fn create_basic_pool_with_genesis( test_api: Arc, + options: Options, ) -> (BasicPool, Pin + Send>>) { let genesis_hash = { test_api @@ -50,19 +51,23 @@ fn create_basic_pool_with_genesis( .map(|blocks| blocks[0].0.header.hash()) .expect("there is block 0. qed") }; - BasicPool::new_test(test_api, genesis_hash, genesis_hash, Default::default()) + BasicPool::new_test(test_api, genesis_hash, genesis_hash, options) } -fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { +fn maintained_pool( + options: Options, +) -> (BasicPool, Arc, futures::executor::ThreadPool) { let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE)); - let (pool, background_task) = create_basic_pool_with_genesis(api.clone()); + let (pool, background_task) = create_basic_pool_with_genesis(api.clone(), options); let thread_pool = futures::executor::ThreadPool::new().unwrap(); thread_pool.spawn_ok(background_task); (pool, api, thread_pool) } -pub fn setup_api() -> ( +pub fn setup_api( + options: Options, +) -> ( Arc, Arc, Arc>>, @@ -70,7 +75,7 @@ pub fn setup_api() -> ( TaskExecutorRecv, MiddlewarePoolRecv, ) { - let (pool, api, _) = maintained_pool(); + let (pool, api, _) = maintained_pool(options); let (pool, pool_recv) = MiddlewarePool::new(Arc::new(pool).clone()); let pool = Arc::new(pool); diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 95a277b97b1c..13ad6686bb7d 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -33,7 +33,8 @@ use crate::transaction::tests::{ #[tokio::test] async fn tx_broadcast_enters_pool() { - let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = setup_api(); + let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = + setup_api(Default::default()); // Start at block 1. let block_1_header = api.push_block(1, vec![], true); @@ -91,7 +92,7 @@ async fn tx_broadcast_enters_pool() { #[tokio::test] async fn tx_broadcast_invalid_tx() { - let (_, pool, _, tx_api, mut exec_middleware, _) = setup_api(); + let (_, pool, _, tx_api, mut exec_middleware, _) = setup_api(Default::default()); // Invalid parameters. let err = tx_api @@ -130,7 +131,7 @@ async fn tx_broadcast_invalid_tx() { #[tokio::test] async fn tx_invalid_stop() { - let (_, _, _, tx_api, _, _) = setup_api(); + let (_, _, _, tx_api, _, _) = setup_api(Default::default()); // Make an invalid stop call. let err = tx_api @@ -144,7 +145,8 @@ async fn tx_invalid_stop() { #[tokio::test] async fn tx_broadcast_resubmits_future_nonce_tx() { - let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = setup_api(); + let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = + setup_api(Default::default()); // Start at block 1. let block_1_header = api.push_block(1, vec![], true); @@ -233,7 +235,8 @@ async fn tx_broadcast_resubmits_future_nonce_tx() { /// broadcast future to exit before the `stop` is called. #[tokio::test] async fn tx_broadcast_stop_after_broadcast_finishes() { - let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = setup_api(); + let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = + setup_api(Default::default()); // Start at block 1. let block_1_header = api.push_block(1, vec![], true); From f8177ab28519616c327ac97ed8b649d038fdd72e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 15:06:44 +0200 Subject: [PATCH 11/18] tx/tests: Check that an invalid tx is resubmited in the future Signed-off-by: Alexandru Vasile --- .../tests/transaction_broadcast_tests.rs | 117 ++++++++++++++++++ .../runtime/transaction-pool/src/lib.rs | 8 ++ 2 files changed, 125 insertions(+) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 13ad6686bb7d..6c5c686e1a3b 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -20,6 +20,7 @@ use crate::{hex_string, transaction::error::json_rpc_spec}; use assert_matches::assert_matches; use codec::Encode; use jsonrpsee::{core::error::Error, rpc_params}; +use sc_transaction_pool::{Options, PoolLimit}; use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; use std::sync::Arc; use substrate_test_runtime_client::AccountKeyring::*; @@ -305,3 +306,119 @@ async fn tx_broadcast_stop_after_broadcast_finishes() { Error::Call(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" ); } + +#[tokio::test] +async fn tx_broadcast_resubmits_invalid_tx() { + let limits = PoolLimit { count: 8192, total_bytes: 20 * 1024 * 1024 }; + let options = Options { + ready: limits.clone(), + future: limits, + reject_future_transactions: false, + // This ensures that a transaction is not banned. + ban_time: std::time::Duration::ZERO, + }; + + let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) = + setup_api(options); + + let uxt = uxt(Alice, ALICE_NONCE); + let xt = hex_string(&uxt.encode()); + let _operation_id: String = + tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + + let block_1_header = api.push_block(1, vec![], true); + let block_1 = block_1_header.hash(); + // Announce block 1 to `transaction_unstable_broadcast`. + client_mock.trigger_import_stream(block_1_header).await; + + // Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool. + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::Ready, + } + ); + assert_eq!(1, pool.inner_pool.status().ready); + assert_eq!(uxt.encode().len(), pool.inner_pool.status().ready_bytes); + + // Mark the transaction as invalid from the API, causing a temporary ban. + api.add_invalid(&uxt); + + // Push an event to the pool to ensure the transaction is excluded. + let event = ChainEvent::NewBestBlock { hash: block_1, tree_route: None }; + pool.inner_pool.maintain(event).await; + assert_eq!(1, pool.inner_pool.status().ready); + + // Ensure the `transaction_unstable_broadcast` is aware of the invalid transaction. + let event = get_next_event!(&mut pool_middleware); + // Because we have received an `Invalid` status, we try to broadcast the transaction with the + // next announced block. + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::Invalid + } + ); + + // Import block 2. + let block_2_header = api.push_block(2, vec![], true); + client_mock.trigger_import_stream(block_2_header).await; + + // Ensure we propagate the temporary ban error to `submit_and_watch`. + // This ensures we'll loop again with the next annmounced block and try to resubmit the + // transaction. The transaction remains temporarily banned until the pool is maintained. + let event = get_next_event!(&mut pool_middleware); + assert_matches!(event, MiddlewarePoolEvent::PoolError { transaction, err } if transaction == xt && err.contains("Transaction temporarily Banned")); + + // Import block 3. + let block_3_header = api.push_block(3, vec![], true); + let block_3 = block_3_header.hash(); + // Remove the invalid transaction from the pool to allow it to pass through. + api.remove_invalid(&uxt); + let event = ChainEvent::NewBestBlock { hash: block_3, tree_route: None }; + // We have to maintain the pool to ensure the transaction is no longer invalid. + // This clears out the banned transactions. + pool.inner_pool.maintain(event).await; + assert_eq!(0, pool.inner_pool.status().ready); + + // Announce block to `transaction_unstable_broadcast`. + client_mock.trigger_import_stream(block_3_header).await; + + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::Ready, + } + ); + assert_eq!(1, pool.inner_pool.status().ready); + + let block_4_header = api.push_block(4, vec![uxt], true); + let block_4 = block_4_header.hash(); + let event = ChainEvent::Finalized { hash: block_4, tree_route: Arc::from(vec![]) }; + pool.inner_pool.maintain(event).await; + + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::InBlock((block_4, 0)), + } + ); + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: xt.clone(), + status: TxStatusTypeTest::Finalized((block_4, 0)), + } + ); + + // Ensure the broadcast future terminated properly. + let _ = get_next_event!(&mut exec_middleware); +} diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index 8c8345b06bd3..3bc3882a8800 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -214,6 +214,14 @@ impl TestApi { self.chain.write().invalid_hashes.insert(Self::hash_and_length_inner(xts).0); } + /// Remove a transaction that was prior declared as invalid via `[Self::add_invalid]`. + /// + /// Next time transaction pool will try to validate this + /// extrinsic, api will succeed. + pub fn remove_invalid(&self, xts: &Extrinsic) { + self.chain.write().invalid_hashes.remove(&Self::hash_and_length_inner(xts).0); + } + /// Query validation requests received. pub fn validation_requests(&self) -> Vec { self.validation_requests.read().clone() From 7004451ecc8fd82a7346e66163844a7fa38fec06 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 17:53:52 +0200 Subject: [PATCH 12/18] chainHead/tests: Do not panic on dropped receivers from mocks Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs b/substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs index d63a98a5cb0d..e81bd4bfa0b0 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs @@ -63,7 +63,7 @@ impl ChainHeadMockClient { BlockImportNotification::new(header.hash(), BlockOrigin::Own, header, true, None, sink); for sink in self.import_sinks.lock().iter_mut() { - sink.unbounded_send(notification.clone()).unwrap(); + let _ = sink.unbounded_send(notification.clone()); } } @@ -83,7 +83,7 @@ impl ChainHeadMockClient { let notification = FinalityNotification::from_summary(summary, sink); for sink in self.finality_sinks.lock().iter_mut() { - sink.unbounded_send(notification.clone()).unwrap(); + let _ = sink.unbounded_send(notification.clone()); } } } From 5c76f2933b44ad27584462e4c07192848701b3bc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 17:54:58 +0200 Subject: [PATCH 13/18] test-utils: Inject tx priority to the test pool api Signed-off-by: Alexandru Vasile --- .../runtime/transaction-pool/src/lib.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index 3bc3882a8800..c866bdef65b2 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -81,6 +81,7 @@ pub struct ChainState { pub block_by_hash: HashMap, pub nonces: HashMap, pub invalid_hashes: HashSet, + pub priorities: HashMap, } /// Test Api for transaction pool. @@ -222,6 +223,14 @@ impl TestApi { self.chain.write().invalid_hashes.remove(&Self::hash_and_length_inner(xts).0); } + /// Set a transaction priority. + pub fn set_priority(&self, xts: &Extrinsic, priority: u64) { + self.chain + .write() + .priorities + .insert(Self::hash_and_length_inner(xts).0, priority); + } + /// Query validation requests received. pub fn validation_requests(&self) -> Vec { self.validation_requests.read().clone() @@ -308,8 +317,14 @@ impl ChainApi for TestApi { return ready(Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0))))) } - let mut validity = - ValidTransaction { priority: 1, requires, provides, longevity: 64, propagate: true }; + let priority = self.chain.read().priorities.get(&self.hash_and_length(&uxt).0).cloned(); + let mut validity = ValidTransaction { + priority: priority.unwrap_or(1), + requires, + provides, + longevity: 64, + propagate: true, + }; (self.valid_modifier.read())(&mut validity); From 2d764ef5c7a27b2ef46e8a16e7a2d732e636df06 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 13 Feb 2024 17:59:46 +0200 Subject: [PATCH 14/18] tx/tests: Check immediately dropped tx are resubmitted Signed-off-by: Alexandru Vasile --- .../src/transaction/tests/setup.rs | 2 +- .../tests/transaction_broadcast_tests.rs | 94 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index e193c84aceba..bd155b86f1ee 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -112,7 +112,7 @@ macro_rules! get_next_tx_events { crate::transaction::tests::middleware_pool::MiddlewarePoolEvent::TransactionStatus { transaction, status } => { events.entry(transaction).or_insert_with(|| vec![]).push(status); }, - _ => panic!("Expected TransactionStatus"), + other => panic!("Expected TransactionStatus, received {:?}", other), }; } events diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 6c5c686e1a3b..0c396aadb3cb 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -422,3 +422,97 @@ async fn tx_broadcast_resubmits_invalid_tx() { // Ensure the broadcast future terminated properly. let _ = get_next_event!(&mut exec_middleware); } + +/// This is similar to `tx_broadcast_resubmits_invalid_tx`. +/// However, it forces the tx to be resubmited because of the pool +/// limits. Which is a different code path than the invalid tx. +#[tokio::test] +async fn tx_broadcast_resubmits_dropped_tx() { + let limits = PoolLimit { count: 1, total_bytes: 1000 }; + let options = Options { + ready: limits.clone(), + future: limits, + reject_future_transactions: false, + // This ensures that a transaction is not banned. + ban_time: std::time::Duration::ZERO, + }; + + let (api, pool, client_mock, tx_api, _, mut pool_middleware) = setup_api(options); + + let current_uxt = uxt(Alice, ALICE_NONCE); + let current_xt = hex_string(¤t_uxt.encode()); + // This lives in the future. + let future_uxt = uxt(Alice, ALICE_NONCE + 1); + let future_xt = hex_string(&future_uxt.encode()); + + // By default the `validate_transaction` mock uses priority 1 for + // transactions. Bump the priority to ensure other transactions + // are immediately dropped. + api.set_priority(¤t_uxt, 10); + + let current_operation_id: String = tx_api + .call("transaction_unstable_broadcast", rpc_params![¤t_xt]) + .await + .unwrap(); + + // Announce block 1 to `transaction_unstable_broadcast`. + let block_1_header = api.push_block(1, vec![], true); + let event = + ChainEvent::Finalized { hash: block_1_header.hash(), tree_route: Arc::from(vec![]) }; + pool.inner_pool.maintain(event).await; + client_mock.trigger_import_stream(block_1_header).await; + + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::TransactionStatus { + transaction: current_xt.clone(), + status: TxStatusTypeTest::Ready, + } + ); + assert_eq!(1, pool.inner_pool.status().ready); + + // The future tx has priority 2, smaller than the current 10. + api.set_priority(&future_uxt, 2); + let future_operation_id: String = tx_api + .call("transaction_unstable_broadcast", rpc_params![&future_xt]) + .await + .unwrap(); + assert_ne!(current_operation_id, future_operation_id); + + let block_2_header = api.push_block(2, vec![], true); + let event = + ChainEvent::Finalized { hash: block_2_header.hash(), tree_route: Arc::from(vec![]) }; + pool.inner_pool.maintain(event).await; + client_mock.trigger_import_stream(block_2_header).await; + + // We must have at most 1 transaction in the pool, as per limits above. + assert_eq!(1, pool.inner_pool.status().ready); + + let event = get_next_event!(&mut pool_middleware); + assert_eq!( + event, + MiddlewarePoolEvent::PoolError { + transaction: future_xt.clone(), + err: "Transaction couldn't enter the pool because of the limit".into() + } + ); + + let block_3_header = api.push_block(3, vec![current_uxt], true); + let event = + ChainEvent::Finalized { hash: block_3_header.hash(), tree_route: Arc::from(vec![]) }; + pool.inner_pool.maintain(event).await; + client_mock.trigger_import_stream(block_3_header.clone()).await; + + // The first tx is in a finalzied block; the future tx must enter the pool. + let events = get_next_tx_events!(&mut pool_middleware, 3); + assert_eq!( + events.get(¤t_xt).unwrap(), + &vec![ + TxStatusTypeTest::InBlock((block_3_header.hash(), 0)), + TxStatusTypeTest::Finalized((block_3_header.hash(), 0)) + ] + ); + // The dropped transaction was resubmitted. + assert_eq!(events.get(&future_xt).unwrap(), &vec![TxStatusTypeTest::Ready]); +} From 373ae27f9132d77f1ec07e643c38e213f6baeb8f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 15 Feb 2024 14:18:50 +0200 Subject: [PATCH 15/18] tx/tests: Decrease timeout from 60 secs to 5 secs Signed-off-by: Alexandru Vasile --- substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index bd155b86f1ee..8d97398900f6 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -92,10 +92,10 @@ pub fn setup_api( (api, pool, client_mock, tx_api, executor_recv, pool_recv) } -/// Get the next event from the provided middleware in at most 60 seconds. +/// Get the next event from the provided middleware in at most 5 seconds. macro_rules! get_next_event { ($middleware:expr) => { - tokio::time::timeout(std::time::Duration::from_secs(60), $middleware.recv()) + tokio::time::timeout(std::time::Duration::from_secs(5), $middleware.recv()) .await .unwrap() .unwrap() From 72dd3aeefb578b9188e1658cea4006aa92d4f1c4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 27 Feb 2024 13:48:07 +0200 Subject: [PATCH 16/18] tx/tests: Check number of active spawned tasks Signed-off-by: Alexandru Vasile --- .../src/transaction/tests/executor.rs | 32 +++++++++++++++++-- .../src/transaction/tests/setup.rs | 8 ++--- .../tests/transaction_broadcast_tests.rs | 17 ++++++---- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs index a0677b73f9c9..ff9aca79887c 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/executor.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use sp_core::{testing::TaskExecutor, traits::SpawnNamed}; +use std::sync::{atomic::AtomicUsize, Arc}; use tokio::sync::mpsc; /// Wrap the `TaskExecutor` to know when the broadcast future is dropped. @@ -24,18 +25,35 @@ use tokio::sync::mpsc; pub struct TaskExecutorBroadcast { executor: TaskExecutor, sender: mpsc::UnboundedSender<()>, + num_tasks: Arc, } /// The channel that receives events when the broadcast futures are dropped. pub type TaskExecutorRecv = mpsc::UnboundedReceiver<()>; +/// The state of the `TaskExecutorBroadcast`. +pub struct TaskExecutorState { + pub recv: TaskExecutorRecv, + pub num_tasks: Arc, +} + +impl TaskExecutorState { + pub fn num_tasks(&self) -> usize { + self.num_tasks.load(std::sync::atomic::Ordering::Acquire) + } +} + impl TaskExecutorBroadcast { /// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures /// are dropped. - pub fn new() -> (Self, TaskExecutorRecv) { + pub fn new() -> (Self, TaskExecutorState) { let (sender, recv) = mpsc::unbounded_channel(); + let num_tasks = Arc::new(AtomicUsize::new(0)); - (Self { executor: TaskExecutor::new(), sender }, recv) + ( + Self { executor: TaskExecutor::new(), sender, num_tasks: num_tasks.clone() }, + TaskExecutorState { recv, num_tasks }, + ) } } @@ -47,8 +65,13 @@ impl SpawnNamed for TaskExecutorBroadcast { future: futures::future::BoxFuture<'static, ()>, ) { let sender = self.sender.clone(); + let num_tasks = self.num_tasks.clone(); + let future = Box::pin(async move { + num_tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel); future.await; + num_tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel); + let _ = sender.send(()); }); @@ -62,8 +85,13 @@ impl SpawnNamed for TaskExecutorBroadcast { future: futures::future::BoxFuture<'static, ()>, ) { let sender = self.sender.clone(); + let num_tasks = self.num_tasks.clone(); + let future = Box::pin(async move { + num_tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel); future.await; + num_tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel); + let _ = sender.send(()); }); diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs index 8d97398900f6..04ee7b9b4c94 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/setup.rs @@ -20,7 +20,7 @@ use crate::{ chain_head::test_utils::ChainHeadMockClient, transaction::{ api::TransactionBroadcastApiServer, - tests::executor::{TaskExecutorBroadcast, TaskExecutorRecv}, + tests::executor::{TaskExecutorBroadcast, TaskExecutorState}, TransactionBroadcast as RpcTransactionBroadcast, }, }; @@ -72,11 +72,11 @@ pub fn setup_api( Arc, Arc>>, RpcModule>>>, - TaskExecutorRecv, + TaskExecutorState, MiddlewarePoolRecv, ) { let (pool, api, _) = maintained_pool(options); - let (pool, pool_recv) = MiddlewarePool::new(Arc::new(pool).clone()); + let (pool, pool_state) = MiddlewarePool::new(Arc::new(pool).clone()); let pool = Arc::new(pool); let builder = TestClientBuilder::new(); @@ -89,7 +89,7 @@ pub fn setup_api( RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)) .into_rpc(); - (api, pool, client_mock, tx_api, executor_recv, pool_recv) + (api, pool, client_mock, tx_api, executor_recv, pool_state) } /// Get the next event from the provided middleware in at most 5 seconds. diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 78731c3566c1..903378c1fa86 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -88,7 +88,8 @@ async fn tx_broadcast_enters_pool() { .unwrap(); // Ensure the broadcast future finishes. - let _ = get_next_event!(&mut exec_middleware); + let _ = get_next_event!(&mut exec_middleware.recv); + assert_eq!(0, exec_middleware.num_tasks()); } #[tokio::test] @@ -116,7 +117,8 @@ async fn tx_broadcast_invalid_tx() { // Await the broadcast future to exit. // Without this we'd be subject to races, where we try to call the stop before the tx is // dropped. - let _ = get_next_event!(&mut exec_middleware); + let _ = get_next_event!(&mut exec_middleware.recv); + assert_eq!(0, exec_middleware.num_tasks()); // The broadcast future was dropped, and the operation is no longer active. // When the operation is not active, either from the tx being finalized or a @@ -227,8 +229,9 @@ async fn tx_broadcast_resubmits_future_nonce_tx() { ); // Both broadcast futures must exit. - let _ = get_next_event!(&mut exec_middleware); - let _ = get_next_event!(&mut exec_middleware); + let _ = get_next_event!(&mut exec_middleware.recv); + let _ = get_next_event!(&mut exec_middleware.recv); + assert_eq!(0, exec_middleware.num_tasks()); } /// This test is similar to `tx_broadcast_enters_pool` @@ -294,7 +297,8 @@ async fn tx_broadcast_stop_after_broadcast_finishes() { ); // Ensure the broadcast future terminated properly. - let _ = get_next_event!(&mut exec_middleware); + let _ = get_next_event!(&mut exec_middleware.recv); + assert_eq!(0, exec_middleware.num_tasks()); // The operation ID is no longer valid, check that the broadcast future // cleared out the inner state of the operation. @@ -420,7 +424,8 @@ async fn tx_broadcast_resubmits_invalid_tx() { ); // Ensure the broadcast future terminated properly. - let _ = get_next_event!(&mut exec_middleware); + let _ = get_next_event!(&mut exec_middleware.recv); + assert_eq!(0, exec_middleware.num_tasks()); } /// This is similar to `tx_broadcast_resubmits_invalid_tx`. From 661458ac664f60c3d11ddb021bfffa20119fb1e6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 27 Feb 2024 15:21:59 +0200 Subject: [PATCH 17/18] Update substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs Co-authored-by: James Wilson --- .../src/transaction/tests/transaction_broadcast_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs index 903378c1fa86..690a1a64d746 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/transaction_broadcast_tests.rs @@ -133,7 +133,7 @@ async fn tx_broadcast_invalid_tx() { } #[tokio::test] -async fn tx_invalid_stop() { +async fn tx_stop_with_invalid_operation_id() { let (_, _, _, tx_api, _, _) = setup_api(Default::default()); // Make an invalid stop call. From c06012a80d42c56f4e7c65def0eba29f23320518 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 27 Feb 2024 15:22:08 +0200 Subject: [PATCH 18/18] Update substrate/test-utils/runtime/transaction-pool/src/lib.rs Co-authored-by: James Wilson --- substrate/test-utils/runtime/transaction-pool/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index c866bdef65b2..5202e6e65154 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -215,7 +215,7 @@ impl TestApi { self.chain.write().invalid_hashes.insert(Self::hash_and_length_inner(xts).0); } - /// Remove a transaction that was prior declared as invalid via `[Self::add_invalid]`. + /// Remove a transaction that was previously declared as invalid via `[Self::add_invalid]`. /// /// Next time transaction pool will try to validate this /// extrinsic, api will succeed.