From 455ba5b2196f47ccb65c7b6d54f82ef3d84280d1 Mon Sep 17 00:00:00 2001 From: Ashley Ruglys Date: Tue, 1 Jun 2021 16:43:40 +0200 Subject: [PATCH] Revert "Use `SpawnTaskHandle`s for spawning tasks in the tx pool (#8958)" This reverts commit bfef07c0d22ead3ab3c4e0e90ddf9b0e3537566e. --- Cargo.lock | 17 +++++++++++++++++ client/transaction-pool/Cargo.toml | 1 + client/transaction-pool/src/api.rs | 16 +++++++++------- client/transaction-pool/src/lib.rs | 4 ++-- client/transaction-pool/src/testing/pool.rs | 5 ++--- 5 files changed, 31 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc107b0a53bc7..fca6465198aa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2059,6 +2059,22 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "futures-diagnose" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdcef58a173af8148b182684c9f2d5250875adbcaff7b5794073894f9d8634a9" +dependencies = [ + "futures 0.1.31", + "futures 0.3.13", + "lazy_static", + "log", + "parking_lot 0.9.0", + "pin-project 0.4.27", + "serde", + "serde_json", +] + [[package]] name = "futures-executor" version = "0.3.13" @@ -8100,6 +8116,7 @@ version = "3.0.0" dependencies = [ "assert_matches", "futures 0.3.13", + "futures-diagnose", "hex", "intervalier", "log", diff --git a/client/transaction-pool/Cargo.toml b/client/transaction-pool/Cargo.toml index 6b105520baec5..d457d709d1222 100644 --- a/client/transaction-pool/Cargo.toml +++ b/client/transaction-pool/Cargo.toml @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] codec = { package = "parity-scale-codec", version = "2.0.0" } thiserror = "1.0.21" futures = { version = "0.3.1", features = ["compat"] } +futures-diagnose = "1.0" intervalier = "0.4.0" log = "0.4.8" parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] } diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index fe1f99e0a3c2e..2ebf038844fab 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -21,7 +21,7 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc}; use codec::{Decode, Encode}; use futures::{ - channel::oneshot, future::{Future, FutureExt, ready, Ready}, + channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready}, }; use sc_client_api::{ @@ -31,7 +31,6 @@ use sp_runtime::{ generic::BlockId, traits::{self, Block as BlockT, BlockIdTo, Header as HeaderT, Hash as HashT}, transaction_validity::{TransactionValidity, TransactionSource}, }; -use sp_core::traits::SpawnNamed; use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use sp_api::{ProvideRuntimeApi, ApiExt}; use prometheus_endpoint::Registry as PrometheusRegistry; @@ -41,7 +40,7 @@ use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}}; /// The transaction pool logic for full client. pub struct FullChainApi { client: Arc, - spawner: Box, + pool: ThreadPool, _marker: PhantomData, metrics: Option>, } @@ -51,7 +50,6 @@ impl FullChainApi { pub fn new( client: Arc, prometheus: Option<&PrometheusRegistry>, - spawner: impl SpawnNamed + 'static, ) -> Self { let metrics = prometheus.map(ApiMetrics::register).and_then(|r| { match r { @@ -69,9 +67,13 @@ impl FullChainApi { FullChainApi { client, + pool: ThreadPoolBuilder::new() + .pool_size(2) + .name_prefix("txpool-verifier") + .create() + .expect("Failed to spawn verifier threads, that are critical for node operation."), _marker: Default::default(), metrics, - spawner: Box::new(spawner) , } } } @@ -107,9 +109,9 @@ where let metrics = self.metrics.clone(); metrics.report(|m| m.validations_scheduled.inc()); - self.spawner.spawn_blocking( + self.pool.spawn_ok(futures_diagnose::diagnose( "validate-transaction", - Box::pin(async move { + async move { let res = validate_transaction_blocking(&*client, &at, source, uxt); if let Err(e) = tx.send(res) { log::warn!("Unable to send a validate transaction result: {:?}", e); diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index bcabc5b873997..bc5f6e367ff86 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -366,10 +366,10 @@ where options: sc_transaction_graph::Options, is_validator: txpool::IsValidator, prometheus: Option<&PrometheusRegistry>, - spawner: impl SpawnNamed + Clone + 'static, + spawner: impl SpawnNamed, client: Arc, ) -> Arc { - let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, spawner.clone())); + let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus)); let pool = Arc::new(Self::with_revalidation_type( options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner )); diff --git a/client/transaction-pool/src/testing/pool.rs b/client/transaction-pool/src/testing/pool.rs index 1a76c28a0e0d4..904870ae0ece9 100644 --- a/client/transaction-pool/src/testing/pool.rs +++ b/client/transaction-pool/src/testing/pool.rs @@ -35,7 +35,6 @@ use std::collections::BTreeSet; use sc_client_api::client::BlockchainEvents; use sc_block_builder::BlockBuilderProvider; use sp_consensus::BlockOrigin; -use sp_core::testing::TaskExecutor; fn pool() -> Pool { Pool::new(Default::default(), true.into(), TestApi::with_alice_nonce(209).into()) @@ -936,7 +935,7 @@ fn should_not_accept_old_signatures() { let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client, None, TaskExecutor::new()))).0 + BasicPool::new_test(Arc::new(FullChainApi::new(client, None))).0 ); let transfer = Transfer { @@ -972,7 +971,7 @@ fn import_notification_to_pool_maintain_works() { let mut client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None, TaskExecutor::new()))).0 + BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None))).0 ); // Prepare the extrisic, push it to the pool and check that it was added.