Skip to content

Commit

Permalink
thread_pool ref
Browse files Browse the repository at this point in the history
Signed-off-by: iceseer <iceseer@gmail.com>
  • Loading branch information
iceseer committed Nov 20, 2022
1 parent d0d7bfb commit 0d45910
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 197 deletions.
7 changes: 7 additions & 0 deletions core/parachain/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
add_library(thread_pool
thread_pool.cpp
)
target_link_libraries(thread_pool
Boost::boost
)

add_library(validator_parachain
availability/bitfield/signer.cpp
Expand All @@ -14,4 +20,5 @@ target_link_libraries(validator_parachain
collation_protocol
protocol_error
peer_view
thread_pool
)
144 changes: 144 additions & 0 deletions core/parachain/tasks_sequence.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//
// Created by iceseer on 11/20/22.
//

#ifndef KAGOME_TASKS_SEQUENCE_HPP
#define KAGOME_TASKS_SEQUENCE_HPP

#include <memory>
#include <type_traits>

#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/signal_set.hpp>

namespace kagome::thread {

// clang-format off
/*
* Code of `sequence` allows to execute multiple tasks in different threads
* like so the output of the task is the incoming argument for the next one.
* Additionally it makes check of the returned outcome::result<T>.
*
* ThreadQueueContext allows you to make wrapper around any sync/async subsystem.
*
* Example:
* tp_ - std::shared_ptr<kagome::thread::ThreadPool>
* c_ - std::shared_ptr<boost::asio::io_context>
*
* thread::sequence(
* thread::createTask(tp_, []() -> outcome::result<int> { return 100; }),
* thread::createTask(
* c_, [](auto a) -> outcome::result<float> { return 10.f + a; }),
* thread::createTask(tp_,
* [](auto b) -> outcome::result<std::string> {
* static_assert(std::is_floating_point_v<decltype(b)>);
* return std::to_string(static_cast<int>(b))
* + " is the result";
* }),
* thread::createTask(tp_,
* [](auto c) { assert(c == "110 is the result"); }));
* */
// clang-format on

template <typename T>
struct ThreadQueueContext {
template <typename D>
[[maybe_unused]] ThreadQueueContext(D &&);
template <typename F>
[[maybe_unused]] void operator()(F &&func);
};

template <>
struct ThreadQueueContext<std::weak_ptr<boost::asio::io_context>> {
using Type = std::weak_ptr<boost::asio::io_context>;
Type t;

template <typename D>
ThreadQueueContext(D &&arg) : t{std::forward<D>(arg)} {}

template <typename F>
void operator()(F &&func) {
if (auto call_context = t.lock()) {
boost::asio::post(*call_context, std::forward<F>(func));
}
}
};

template <>
struct ThreadQueueContext<std::shared_ptr<boost::asio::io_context>>
: ThreadQueueContext<std::weak_ptr<boost::asio::io_context>> {
template <typename D>
ThreadQueueContext(D &&arg)
: ThreadQueueContext<std::weak_ptr<boost::asio::io_context>>(
std::forward<D>(arg)) {}
};

template <typename T>
auto createThreadQueueContext(T &&t) {
return ThreadQueueContext<std::decay_t<T>>{std::forward<T>(t)};
}

template <typename C, typename F>
auto createTask(C &&c, F &&f) {
return std::make_pair(createThreadQueueContext(std::forward<C>(c)),
std::forward<F>(f));
}

template <typename T, typename K, typename... Args>
void sequence(std::pair<T, K> &&t, Args &&...args) {
__internal_contextCall(std::move(t.first),
[func{std::move(t.second)},
forwarding_func{__internal_bindArgs(
std::forward<Args>(args)...)}]() mutable {
forwarding_func(func());
});
}

template <typename T, typename K>
void sequence(std::pair<T, K> &&t) {
__internal_contextCall(std::move(t.first), std::move(t.second));
}

template <typename T, typename F>
void __internal_contextCall(ThreadQueueContext<T> &&t, F &&f) {
t(std::forward<F>(f));
}

template <typename R, typename T, typename K>
void __internal_executeI(R &&r, std::pair<T, K> &&t) {
if (std::forward<R>(r).has_value()) {
__internal_contextCall(
std::move(t.first),
[r{std::forward<R>(r).value()}, func{std::move(t.second)}]() mutable {
func(std::move(r));
});
}
}

template <typename R, typename T, typename K, typename... Args>
void __internal_executeI(R &&r, std::pair<T, K> &&t, Args &&...args) {
if (std::forward<R>(r).has_value()) {
__internal_contextCall(std::move(t.first),
[r{std::forward<R>(r).value()},
func{std::move(t.second)},
forwarding_func{__internal_bindArgs(
std::forward<Args>(args)...)}]() mutable {
forwarding_func(func(std::move(r)));
});
}
}

template <typename... Args>
auto __internal_bindArgs(Args &&...args) {
return std::bind(
[](auto &&...args) mutable { __internal_executeI(std::move(args)...); },
std::placeholders::_1,
std::forward<Args>(args)...);
}

} // namespace kagome::thread

#endif // KAGOME_TASKS_SEQUENCE_HPP
63 changes: 63 additions & 0 deletions core/parachain/thread_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#include "parachain/thread_pool.hpp"

namespace kagome::thread {

ThreadPool::ThreadPool(
std::shared_ptr<application::AppStateManager> app_state_manager,
size_t thread_count)
: thread_count_{thread_count} {
BOOST_ASSERT(thread_count_ > 0);

if (app_state_manager) app_state_manager->takeControl(*this);
}

ThreadPool::ThreadPool(size_t thread_count) : thread_count_{thread_count} {
BOOST_ASSERT(thread_count_ > 0);
}

ThreadPool::~ThreadPool() {
/// check that all workers are stopped.
BOOST_ASSERT(workers_.empty());
}

bool ThreadPool::prepare() {
context_ = std::make_shared<WorkersContext>();
work_guard_ = std::make_shared<WorkGuard>(context_->get_executor());
workers_.reserve(thread_count_);
return true;
}

bool ThreadPool::start() {
BOOST_ASSERT(context_);
BOOST_ASSERT(work_guard_);
for (size_t ix = 0; ix < thread_count_; ++ix) {
workers_.emplace_back(
[wptr{this->weak_from_this()}, context{context_}]() {
if (auto self = wptr.lock()) {
self->logger_->debug("Started thread worker with id: {}",
std::this_thread::get_id());
}
context->run();
});
}
return true;
}

void ThreadPool::stop() {
work_guard_.reset();
if (context_) {
context_->stop();
}
for (auto &worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
workers_.clear();
}

} // namespace kagome::thread
Loading

0 comments on commit 0d45910

Please sign in to comment.