diff --git a/core/api/CMakeLists.txt b/core/api/CMakeLists.txt index 980496cc27..b537f1095f 100644 --- a/core/api/CMakeLists.txt +++ b/core/api/CMakeLists.txt @@ -46,7 +46,6 @@ add_library(api transport/impl/ws/ws_session.cpp transport/impl/ws/ws_listener_impl.cpp transport/tuner.cpp - transport/rpc_thread_pool.cpp transport/error.cpp jrpc/jrpc_handle_batch.cpp jrpc/jrpc_server_impl.cpp diff --git a/core/api/service/impl/api_service_impl.cpp b/core/api/service/impl/api_service_impl.cpp index 1929a36691..30bd05ba69 100644 --- a/core/api/service/impl/api_service_impl.cpp +++ b/core/api/service/impl/api_service_impl.cpp @@ -124,7 +124,6 @@ namespace kagome::api { ApiServiceImpl::ApiServiceImpl( application::AppStateManager &app_state_manager, - std::shared_ptr thread_pool, std::vector> listeners, std::shared_ptr server, std::vector> processors, @@ -135,9 +134,10 @@ namespace kagome::api { extrinsic_event_key_repo, std::shared_ptr block_tree, std::shared_ptr trie_storage, - std::shared_ptr core) - : thread_pool_(std::move(thread_pool)), - listeners_(std::move(listeners)), + std::shared_ptr core, + std::shared_ptr watchdog, + std::shared_ptr rpc_context) + : listeners_(std::move(listeners)), server_(std::move(server)), logger_{log::createLogger("ApiService", "api")}, block_tree_{std::move(block_tree)}, @@ -146,8 +146,9 @@ namespace kagome::api { subscription_engines_{.storage = std::move(storage_sub_engine), .chain = std::move(chain_sub_engine), .ext = std::move(ext_sub_engine)}, - extrinsic_event_key_repo_{std::move(extrinsic_event_key_repo)} { - BOOST_ASSERT(thread_pool_); + extrinsic_event_key_repo_{std::move(extrinsic_event_key_repo)}, + execution_thread_pool_{std::make_shared( + std::move(watchdog), "rpc", 1ull, std::move(rpc_context))} { BOOST_ASSERT(block_tree_); BOOST_ASSERT(trie_storage_); BOOST_ASSERT(core_); @@ -234,13 +235,11 @@ namespace kagome::api { } // namespace kagome::api bool ApiServiceImpl::start() { - thread_pool_->start(); SL_DEBUG(logger_, "API Service started"); return true; } void ApiServiceImpl::stop() { - thread_pool_->stop(); SL_DEBUG(logger_, "API Service stopped"); } diff --git a/core/api/service/impl/api_service_impl.hpp b/core/api/service/impl/api_service_impl.hpp index dae1ab3a84..3d95cb67f7 100644 --- a/core/api/service/impl/api_service_impl.hpp +++ b/core/api/service/impl/api_service_impl.hpp @@ -15,7 +15,6 @@ #include -#include "api/transport/rpc_thread_pool.hpp" #include "api/transport/session.hpp" #include "common/buffer.hpp" #include "containers/objects_cache.hpp" @@ -23,6 +22,7 @@ #include "primitives/block_id.hpp" #include "primitives/event_types.hpp" #include "subscription/subscription_engine.hpp" +#include "utils/thread_pool.hpp" namespace kagome::api { class JRpcProcessor; @@ -112,7 +112,6 @@ namespace kagome::api { public: ApiServiceImpl(application::AppStateManager &app_state_manager, - std::shared_ptr thread_pool, std::vector> listeners, std::shared_ptr server, std::vector> processors, @@ -123,7 +122,9 @@ namespace kagome::api { extrinsic_event_key_repo, std::shared_ptr block_tree, std::shared_ptr trie_storage, - std::shared_ptr core); + std::shared_ptr core, + std::shared_ptr watchdog, + std::shared_ptr rpc_context); ~ApiServiceImpl() override = default; @@ -224,7 +225,6 @@ namespace kagome::api { return obj; } - std::shared_ptr thread_pool_; std::vector> listeners_; std::shared_ptr server_; log::Logger logger_; @@ -244,5 +244,7 @@ namespace kagome::api { } subscription_engines_; std::shared_ptr extrinsic_event_key_repo_; + + std::shared_ptr execution_thread_pool_; }; } // namespace kagome::api diff --git a/core/api/transport/rpc_thread_pool.cpp b/core/api/transport/rpc_thread_pool.cpp deleted file mode 100644 index 96fae5edcf..0000000000 --- a/core/api/transport/rpc_thread_pool.cpp +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright Quadrivium LLC - * All Rights Reserved - * SPDX-License-Identifier: Apache-2.0 - */ - -#include "api/transport/rpc_thread_pool.hpp" - -#include -#include - -namespace kagome::api { - - RpcThreadPool::RpcThreadPool(std::shared_ptr context, - std::shared_ptr watchdog, - const Configuration &configuration) - : context_{std::move(context)}, - watchdog_{std::move(watchdog)}, - config_{configuration} { - BOOST_ASSERT(context_); - } - - void RpcThreadPool::start() { - threads_.reserve(config_.max_thread_number); - // Create a pool of threads to run all of the io_contexts. - for (std::size_t i = 0; i < config_.min_thread_number; ++i) { - auto thread = std::make_shared([context = context_, - watchdog = watchdog_, - rpc_thread_number = i + 1] { - soralog::util::setThreadName(fmt::format("rpc.{}", rpc_thread_number)); - watchdog->run(context); - }); - thread->detach(); - threads_.emplace_back(std::move(thread)); - } - SL_DEBUG(logger_, "Thread pool started"); - } - - void RpcThreadPool::stop() { - context_->stop(); - SL_DEBUG(logger_, "Thread pool stopped"); - } - -} // namespace kagome::api diff --git a/core/api/transport/rpc_thread_pool.hpp b/core/api/transport/rpc_thread_pool.hpp deleted file mode 100644 index eac52435f7..0000000000 --- a/core/api/transport/rpc_thread_pool.hpp +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright Quadrivium LLC - * All Rights Reserved - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include -#include -#include -#include - -#include "api/transport/rpc_io_context.hpp" -#include "log/logger.hpp" -#include "utils/watchdog.hpp" - -namespace kagome::api { - - /** - * @brief thread pool for serve RPC calls - */ - class RpcThreadPool : public std::enable_shared_from_this { - public: - using Context = RpcContext; - - struct Configuration { - size_t min_thread_number = 1; - size_t max_thread_number = 10; - }; - - RpcThreadPool(std::shared_ptr context, - std::shared_ptr watchdog, - const Configuration &configuration); - - ~RpcThreadPool() = default; - - /** - * @brief starts pool - */ - void start(); - - /** - * @brief stops pool - */ - void stop(); - - private: - std::shared_ptr context_; - std::shared_ptr watchdog_; - const Configuration config_; - - std::vector> threads_; - - log::Logger logger_ = log::createLogger("RpcThreadPool", "rpc_transport"); - }; - -} // namespace kagome::api diff --git a/core/api/transport/tuner.hpp b/core/api/transport/tuner.hpp index 0442fd8b4c..0a0a008a46 100644 --- a/core/api/transport/tuner.hpp +++ b/core/api/transport/tuner.hpp @@ -9,7 +9,6 @@ #include #include -#include "api/transport/rpc_io_context.hpp" #include "log/logger.hpp" namespace kagome::api { diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index 17eb0da629..be7e347e2a 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -41,7 +41,6 @@ #include "api/service/system/system_jrpc_processor.hpp" #include "api/transport/impl/ws/ws_listener_impl.hpp" #include "api/transport/impl/ws/ws_session.hpp" -#include "api/transport/rpc_thread_pool.hpp" #include "application/app_configuration.hpp" #include "application/impl/app_state_manager_impl.hpp" #include "application/impl/chain_spec_impl.hpp" @@ -598,7 +597,6 @@ namespace { auto makeApplicationInjector(sptr config, Ts &&...args) { // default values for configurations - api::RpcThreadPool::Configuration rpc_thread_pool_config{}; api::WsSession::Configuration ws_config{}; transaction_pool::PoolModeratorImpl::Params pool_moderator_config{}; transaction_pool::TransactionPool::Limits tp_pool_limits{}; @@ -624,7 +622,6 @@ namespace { // clang-format off return di::make_injector( // bind configs - useConfig(rpc_thread_pool_config), useConfig(ws_config), useConfig(pool_moderator_config), useConfig(tp_pool_limits), diff --git a/core/utils/thread_pool.hpp b/core/utils/thread_pool.hpp index a30c54c0bf..041ef9343c 100644 --- a/core/utils/thread_pool.hpp +++ b/core/utils/thread_pool.hpp @@ -81,8 +81,11 @@ namespace kagome { ThreadPool(std::shared_ptr watchdog, std::string_view pool_tag, - size_t thread_count) - : ioc_{std::make_shared()}, + size_t thread_count, + std::optional> ioc = + std::nullopt) + : ioc_{ioc.has_value() ? std::move(ioc.value()) + : std::make_shared()}, work_guard_{ioc_->get_executor()} { BOOST_ASSERT(ioc_); BOOST_ASSERT(thread_count > 0); diff --git a/test/core/api/transport/http_listener_test.cpp b/test/core/api/transport/http_listener_test.cpp index 843dc67b33..2a742e3f38 100644 --- a/test/core/api/transport/http_listener_test.cpp +++ b/test/core/api/transport/http_listener_test.cpp @@ -28,14 +28,31 @@ TEST_F(HttpListenerTest, EchoSuccess) { backward::SignalHandling sh; #endif - app_state_manager->atLaunch([ctx{main_context}] { - std::thread([ctx] { ctx->run_for(3s); }).detach(); + std::unique_ptr asio_runner; + + app_state_manager->atLaunch([&] { + asio_runner = + std::make_unique([ctx{main_context}, watchdog{watchdog}] { + soralog::util::setThreadName("asio_runner"); + watchdog->run(ctx); + }); return true; }); + std::thread watchdog_thread([watchdog{watchdog}] { + soralog::util::setThreadName("watchdog"); + watchdog->checkLoop(kagome::kWatchdogDefaultTimeout); + }); + + app_state_manager->atShutdown([ctx{main_context}] { ctx->stop(); }); + + std::unique_ptr client_thread; + main_context->post([&] { - std::thread( + client_thread = std::make_unique( [&](Endpoint endpoint, std::string request, std::string response) { + soralog::util::setThreadName("client"); + auto local_context = std::make_shared(); bool time_is_out; @@ -64,9 +81,14 @@ TEST_F(HttpListenerTest, EchoSuccess) { }, endpoint, request, - response) - .detach(); + response); }); app_state_manager->run(); + + watchdog->stop(); + + client_thread->join(); + asio_runner->join(); + watchdog_thread.join(); } diff --git a/test/core/api/transport/listener_test.hpp b/test/core/api/transport/listener_test.hpp index 1319c63ce1..e4199f747f 100644 --- a/test/core/api/transport/listener_test.hpp +++ b/test/core/api/transport/listener_test.hpp @@ -13,7 +13,6 @@ #include "api/jrpc/jrpc_processor.hpp" #include "api/jrpc/jrpc_server.hpp" #include "api/service/impl/api_service_impl.hpp" -#include "api/transport/rpc_thread_pool.hpp" #include "application/impl/app_state_manager_impl.hpp" #include "common/buffer.hpp" #include "core/api/client/http_client.hpp" @@ -28,6 +27,7 @@ #include "runtime/runtime_context.hpp" #include "subscription/extrinsic_event_key_repository.hpp" #include "subscription/subscriber.hpp" +#include "testutil/asio_wait.hpp" #include "testutil/outcome.hpp" #include "testutil/prepare_loggers.hpp" #include "transaction_pool/transaction_pool_error.hpp" @@ -37,6 +37,7 @@ using namespace kagome::api; using namespace kagome::common; using namespace kagome::subscription; using namespace kagome::primitives; +using kagome::ThreadPool; using kagome::Watchdog; using kagome::application::AppConfigurationMock; using kagome::application::AppStateManager; @@ -88,8 +89,12 @@ struct ListenerTest : public ::testing::Test { response = R"({"jsonrpc":"2.0","id":0,"result":)" + std::to_string(payload) + "}"; + auto seed = static_cast( + std::chrono::system_clock::now().time_since_epoch().count()); + auto rand = rand_r(&seed); + endpoint.address(boost::asio::ip::address::from_string("127.0.0.1")); - endpoint.port(4321); + endpoint.port(1024 + rand % (65536 - 1024)); // random non-sudo port ON_CALL(app_config, rpcEndpoint()).WillByDefault(ReturnRef(endpoint)); ON_CALL(app_config, maxWsConnections()).WillByDefault(Return(100)); @@ -98,7 +103,6 @@ struct ListenerTest : public ::testing::Test { service = std::make_shared( *app_state_manager, - thread_pool, std::vector>({listener}), server, std::vector>(processors), @@ -108,13 +112,14 @@ struct ListenerTest : public ::testing::Test { ext_event_key_repo, block_tree, trie_storage, - core); + core, + watchdog, + rpc_context); } void TearDown() override { request.clear(); response.clear(); - service.reset(); } @@ -126,12 +131,6 @@ struct ListenerTest : public ::testing::Test { sptr app_state_manager = std::make_shared(); - kagome::api::RpcThreadPool::Configuration config = {1, 1}; - - sptr thread_pool = - std::make_shared( - rpc_context, std::make_shared(), config); - sptr api = std::make_shared(); sptr server = std::make_shared(); @@ -155,6 +154,7 @@ struct ListenerTest : public ::testing::Test { std::shared_ptr trie_storage = std::make_shared(); std::shared_ptr core = std::make_shared(); + std::shared_ptr watchdog = std::make_shared(); sptr service; }; diff --git a/test/core/api/transport/ws_listener_test.cpp b/test/core/api/transport/ws_listener_test.cpp index 343a997644..7a83e9d8be 100644 --- a/test/core/api/transport/ws_listener_test.cpp +++ b/test/core/api/transport/ws_listener_test.cpp @@ -28,14 +28,31 @@ TEST_F(WsListenerTest, EchoSuccess) { backward::SignalHandling sh; #endif - app_state_manager->atLaunch([ctx{main_context}] { - std::thread([ctx] { ctx->run_for(3s); }).detach(); + std::unique_ptr asio_runner; + + app_state_manager->atLaunch([&] { + asio_runner = + std::make_unique([ctx{main_context}, watchdog{watchdog}] { + soralog::util::setThreadName("asio_runner"); + watchdog->run(ctx); + }); return true; }); + std::thread watchdog_thread([watchdog{watchdog}] { + soralog::util::setThreadName("watchdog"); + watchdog->checkLoop(kagome::kWatchdogDefaultTimeout); + }); + + app_state_manager->atShutdown([ctx{main_context}] { ctx->stop(); }); + + std::unique_ptr client_thread; + main_context->post([&] { - std::thread( + client_thread = std::make_unique( [&](Endpoint endpoint, std::string request, std::string response) { + soralog::util::setThreadName("client"); + auto local_context = std::make_shared(); bool time_is_out; @@ -64,9 +81,14 @@ TEST_F(WsListenerTest, EchoSuccess) { }, endpoint, request, - response) - .detach(); + response); }); app_state_manager->run(); + + watchdog->stop(); + + client_thread->join(); + asio_runner->join(); + watchdog_thread.join(); }