Skip to content

Commit

Permalink
Refactor RPC threads (#1956)
Browse files Browse the repository at this point in the history
* refactor: replace injected rpc thread pool by common custom created
* fix: use rpc context in rpc thread pool
* refactor: remove useless code
* fix: review issues
* fix: test

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>
  • Loading branch information
xDimon authored Jan 30, 2024
1 parent bfdc75d commit d1538f8
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 142 deletions.
1 change: 0 additions & 1 deletion core/api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ add_library(api
transport/impl/ws/ws_session.cpp
transport/impl/ws/ws_listener_impl.cpp
transport/tuner.cpp
transport/rpc_thread_pool.cpp
transport/error.cpp
jrpc/jrpc_handle_batch.cpp
jrpc/jrpc_server_impl.cpp
Expand Down
15 changes: 7 additions & 8 deletions core/api/service/impl/api_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ namespace kagome::api {

ApiServiceImpl::ApiServiceImpl(
application::AppStateManager &app_state_manager,
std::shared_ptr<api::RpcThreadPool> thread_pool,
std::vector<std::shared_ptr<Listener>> listeners,
std::shared_ptr<JRpcServer> server,
std::vector<std::shared_ptr<JRpcProcessor>> processors,
Expand All @@ -135,9 +134,10 @@ namespace kagome::api {
extrinsic_event_key_repo,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<storage::trie::TrieStorage> trie_storage,
std::shared_ptr<runtime::Core> core)
: thread_pool_(std::move(thread_pool)),
listeners_(std::move(listeners)),
std::shared_ptr<runtime::Core> core,
std::shared_ptr<Watchdog> watchdog,
std::shared_ptr<RpcContext> rpc_context)
: listeners_(std::move(listeners)),
server_(std::move(server)),
logger_{log::createLogger("ApiService", "api")},
block_tree_{std::move(block_tree)},
Expand All @@ -146,8 +146,9 @@ namespace kagome::api {
subscription_engines_{.storage = std::move(storage_sub_engine),
.chain = std::move(chain_sub_engine),
.ext = std::move(ext_sub_engine)},
extrinsic_event_key_repo_{std::move(extrinsic_event_key_repo)} {
BOOST_ASSERT(thread_pool_);
extrinsic_event_key_repo_{std::move(extrinsic_event_key_repo)},
execution_thread_pool_{std::make_shared<ThreadPool>(
std::move(watchdog), "rpc", 1ull, std::move(rpc_context))} {
BOOST_ASSERT(block_tree_);
BOOST_ASSERT(trie_storage_);
BOOST_ASSERT(core_);
Expand Down Expand Up @@ -234,13 +235,11 @@ namespace kagome::api {
} // namespace kagome::api

bool ApiServiceImpl::start() {
thread_pool_->start();
SL_DEBUG(logger_, "API Service started");
return true;
}

void ApiServiceImpl::stop() {
thread_pool_->stop();
SL_DEBUG(logger_, "API Service stopped");
}

Expand Down
10 changes: 6 additions & 4 deletions core/api/service/impl/api_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@

#include <jsonrpc-lean/fault.h>

#include "api/transport/rpc_thread_pool.hpp"
#include "api/transport/session.hpp"
#include "common/buffer.hpp"
#include "containers/objects_cache.hpp"
#include "log/logger.hpp"
#include "primitives/block_id.hpp"
#include "primitives/event_types.hpp"
#include "subscription/subscription_engine.hpp"
#include "utils/thread_pool.hpp"

namespace kagome::api {
class JRpcProcessor;
Expand Down Expand Up @@ -112,7 +112,6 @@ namespace kagome::api {

public:
ApiServiceImpl(application::AppStateManager &app_state_manager,
std::shared_ptr<api::RpcThreadPool> thread_pool,
std::vector<std::shared_ptr<Listener>> listeners,
std::shared_ptr<JRpcServer> server,
std::vector<std::shared_ptr<JRpcProcessor>> processors,
Expand All @@ -123,7 +122,9 @@ namespace kagome::api {
extrinsic_event_key_repo,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<storage::trie::TrieStorage> trie_storage,
std::shared_ptr<runtime::Core> core);
std::shared_ptr<runtime::Core> core,
std::shared_ptr<Watchdog> watchdog,
std::shared_ptr<RpcContext> rpc_context);

~ApiServiceImpl() override = default;

Expand Down Expand Up @@ -224,7 +225,6 @@ namespace kagome::api {
return obj;
}

std::shared_ptr<api::RpcThreadPool> thread_pool_;
std::vector<std::shared_ptr<Listener>> listeners_;
std::shared_ptr<JRpcServer> server_;
log::Logger logger_;
Expand All @@ -244,5 +244,7 @@ namespace kagome::api {
} subscription_engines_;
std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
extrinsic_event_key_repo_;

std::shared_ptr<ThreadPool> execution_thread_pool_;
};
} // namespace kagome::api
44 changes: 0 additions & 44 deletions core/api/transport/rpc_thread_pool.cpp

This file was deleted.

58 changes: 0 additions & 58 deletions core/api/transport/rpc_thread_pool.hpp

This file was deleted.

1 change: 0 additions & 1 deletion core/api/transport/tuner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <memory>

#include <boost/asio/ip/tcp.hpp>
#include "api/transport/rpc_io_context.hpp"
#include "log/logger.hpp"

namespace kagome::api {
Expand Down
3 changes: 0 additions & 3 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "api/service/system/system_jrpc_processor.hpp"
#include "api/transport/impl/ws/ws_listener_impl.hpp"
#include "api/transport/impl/ws/ws_session.hpp"
#include "api/transport/rpc_thread_pool.hpp"
#include "application/app_configuration.hpp"
#include "application/impl/app_state_manager_impl.hpp"
#include "application/impl/chain_spec_impl.hpp"
Expand Down Expand Up @@ -598,7 +597,6 @@ namespace {
auto makeApplicationInjector(sptr<application::AppConfiguration> config,
Ts &&...args) {
// default values for configurations
api::RpcThreadPool::Configuration rpc_thread_pool_config{};
api::WsSession::Configuration ws_config{};
transaction_pool::PoolModeratorImpl::Params pool_moderator_config{};
transaction_pool::TransactionPool::Limits tp_pool_limits{};
Expand All @@ -624,7 +622,6 @@ namespace {
// clang-format off
return di::make_injector(
// bind configs
useConfig(rpc_thread_pool_config),
useConfig(ws_config),
useConfig(pool_moderator_config),
useConfig(tp_pool_limits),
Expand Down
7 changes: 5 additions & 2 deletions core/utils/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ namespace kagome {

ThreadPool(std::shared_ptr<Watchdog> watchdog,
std::string_view pool_tag,
size_t thread_count)
: ioc_{std::make_shared<boost::asio::io_context>()},
size_t thread_count,
std::optional<std::shared_ptr<boost::asio::io_context>> ioc =
std::nullopt)
: ioc_{ioc.has_value() ? std::move(ioc.value())
: std::make_shared<boost::asio::io_context>()},
work_guard_{ioc_->get_executor()} {
BOOST_ASSERT(ioc_);
BOOST_ASSERT(thread_count > 0);
Expand Down
32 changes: 27 additions & 5 deletions test/core/api/transport/http_listener_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,31 @@ TEST_F(HttpListenerTest, EchoSuccess) {
backward::SignalHandling sh;
#endif

app_state_manager->atLaunch([ctx{main_context}] {
std::thread([ctx] { ctx->run_for(3s); }).detach();
std::unique_ptr<std::thread> asio_runner;

app_state_manager->atLaunch([&] {
asio_runner =
std::make_unique<std::thread>([ctx{main_context}, watchdog{watchdog}] {
soralog::util::setThreadName("asio_runner");
watchdog->run(ctx);
});
return true;
});

std::thread watchdog_thread([watchdog{watchdog}] {
soralog::util::setThreadName("watchdog");
watchdog->checkLoop(kagome::kWatchdogDefaultTimeout);
});

app_state_manager->atShutdown([ctx{main_context}] { ctx->stop(); });

std::unique_ptr<std::thread> client_thread;

main_context->post([&] {
std::thread(
client_thread = std::make_unique<std::thread>(
[&](Endpoint endpoint, std::string request, std::string response) {
soralog::util::setThreadName("client");

auto local_context = std::make_shared<Context>();

bool time_is_out;
Expand Down Expand Up @@ -64,9 +81,14 @@ TEST_F(HttpListenerTest, EchoSuccess) {
},
endpoint,
request,
response)
.detach();
response);
});

app_state_manager->run();

watchdog->stop();

client_thread->join();
asio_runner->join();
watchdog_thread.join();
}
22 changes: 11 additions & 11 deletions test/core/api/transport/listener_test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "api/jrpc/jrpc_processor.hpp"
#include "api/jrpc/jrpc_server.hpp"
#include "api/service/impl/api_service_impl.hpp"
#include "api/transport/rpc_thread_pool.hpp"
#include "application/impl/app_state_manager_impl.hpp"
#include "common/buffer.hpp"
#include "core/api/client/http_client.hpp"
Expand All @@ -28,6 +27,7 @@
#include "runtime/runtime_context.hpp"
#include "subscription/extrinsic_event_key_repository.hpp"
#include "subscription/subscriber.hpp"
#include "testutil/asio_wait.hpp"
#include "testutil/outcome.hpp"
#include "testutil/prepare_loggers.hpp"
#include "transaction_pool/transaction_pool_error.hpp"
Expand All @@ -37,6 +37,7 @@ using namespace kagome::api;
using namespace kagome::common;
using namespace kagome::subscription;
using namespace kagome::primitives;
using kagome::ThreadPool;
using kagome::Watchdog;
using kagome::application::AppConfigurationMock;
using kagome::application::AppStateManager;
Expand Down Expand Up @@ -88,8 +89,12 @@ struct ListenerTest : public ::testing::Test {
response =
R"({"jsonrpc":"2.0","id":0,"result":)" + std::to_string(payload) + "}";

auto seed = static_cast<unsigned int>(
std::chrono::system_clock::now().time_since_epoch().count());
auto rand = rand_r(&seed);

endpoint.address(boost::asio::ip::address::from_string("127.0.0.1"));
endpoint.port(4321);
endpoint.port(1024 + rand % (65536 - 1024)); // random non-sudo port
ON_CALL(app_config, rpcEndpoint()).WillByDefault(ReturnRef(endpoint));
ON_CALL(app_config, maxWsConnections()).WillByDefault(Return(100));

Expand All @@ -98,7 +103,6 @@ struct ListenerTest : public ::testing::Test {

service = std::make_shared<ApiServiceImpl>(
*app_state_manager,
thread_pool,
std::vector<std::shared_ptr<Listener>>({listener}),
server,
std::vector<std::shared_ptr<JRpcProcessor>>(processors),
Expand All @@ -108,13 +112,14 @@ struct ListenerTest : public ::testing::Test {
ext_event_key_repo,
block_tree,
trie_storage,
core);
core,
watchdog,
rpc_context);
}

void TearDown() override {
request.clear();
response.clear();

service.reset();
}

Expand All @@ -126,12 +131,6 @@ struct ListenerTest : public ::testing::Test {
sptr<kagome::application::AppStateManager> app_state_manager =
std::make_shared<kagome::application::AppStateManagerImpl>();

kagome::api::RpcThreadPool::Configuration config = {1, 1};

sptr<kagome::api::RpcThreadPool> thread_pool =
std::make_shared<kagome::api::RpcThreadPool>(
rpc_context, std::make_shared<Watchdog>(), config);

sptr<ApiStub> api = std::make_shared<ApiStub>();

sptr<JRpcServer> server = std::make_shared<JRpcServerImpl>();
Expand All @@ -155,6 +154,7 @@ struct ListenerTest : public ::testing::Test {
std::shared_ptr<TrieStorage> trie_storage =
std::make_shared<TrieStorageMock>();
std::shared_ptr<CoreMock> core = std::make_shared<CoreMock>();
std::shared_ptr<Watchdog> watchdog = std::make_shared<Watchdog>();

sptr<ApiService> service;
};
Loading

0 comments on commit d1538f8

Please sign in to comment.