diff --git a/core/authority_discovery/query/query_impl.cpp b/core/authority_discovery/query/query_impl.cpp index cd629a1448..4dedd55838 100644 --- a/core/authority_discovery/query/query_impl.cpp +++ b/core/authority_discovery/query/query_impl.cpp @@ -176,7 +176,11 @@ namespace kagome::authority_discovery { auto peer_id_str = peer.id.toBase58(); for (auto &pb : record.addresses()) { OUTCOME_TRY(address, libp2p::multi::Multiaddress::create(str2byte(pb))); - if (address.getPeerId() != peer_id_str) { + auto id = address.getPeerId(); + if (not id) { + continue; + } + if (id != peer_id_str) { return Error::INCONSISTENT_PEER_ID; } peer.addresses.emplace_back(std::move(address)); diff --git a/core/offchain/impl/offchain_worker_impl.cpp b/core/offchain/impl/offchain_worker_impl.cpp index c0da3b413d..8bf126cff9 100644 --- a/core/offchain/impl/offchain_worker_impl.cpp +++ b/core/offchain/impl/offchain_worker_impl.cpp @@ -5,8 +5,6 @@ #include "offchain/impl/offchain_worker_impl.hpp" -#include - #include #include "api/service/author/author_api.hpp" @@ -15,6 +13,7 @@ #include "offchain/impl/offchain_local_storage.hpp" #include "offchain/offchain_worker_pool.hpp" #include "runtime/common/executor.hpp" +#include "runtime/runtime_api/impl/offchain_worker_api.hpp" #include "storage/database_error.hpp" namespace kagome::offchain { @@ -63,40 +62,26 @@ namespace kagome::offchain { outcome::result OffchainWorkerImpl::run() { BOOST_ASSERT(not ocw_pool_->getWorker()); - auto main_thread_func = [ocw = shared_from_this(), ocw_pool = ocw_pool_] { - soralog::util::setThreadName("ocw." + std::to_string(ocw->block_.number)); - - ocw_pool->addWorker(ocw); + soralog::util::setThreadName("ocw." + std::to_string(block_.number)); - SL_TRACE( - ocw->log_, "Offchain worker is started for block {}", ocw->block_); + ocw_pool_->addWorker(shared_from_this()); + auto remove = gsl::finally([&] { ocw_pool_->removeWorker(); }); - auto res = ocw->executor_->callAt( - ocw->block_.hash, "OffchainWorkerApi_offchain_worker", ocw->header_); + SL_TRACE(log_, "Offchain worker is started for block {}", block_); - ocw_pool->removeWorker(); + auto res = runtime::callOffchainWorkerApi(*executor_, block_.hash, header_); - if (res.has_failure()) { - SL_ERROR(ocw->log_, - "Can't execute offchain worker for block {}: {}", - ocw->block_, - res.error()); - return; - } - - SL_DEBUG(ocw->log_, - "Offchain worker is successfully executed for block {}", - ocw->block_); - }; - - try { - std::thread(std::move(main_thread_func)).detach(); - } catch (const std::system_error &exception) { - return outcome::failure(exception.code()); - } catch (...) { - BOOST_UNREACHABLE_RETURN({}); + if (res.has_error()) { + SL_ERROR(log_, + "Can't execute offchain worker for block {}: {}", + block_, + res.error()); + return res.error(); } + SL_DEBUG( + log_, "Offchain worker is successfully executed for block {}", block_); + return outcome::success(); } diff --git a/core/offchain/impl/runner.hpp b/core/offchain/impl/runner.hpp new file mode 100644 index 0000000000..d90f4e0f78 --- /dev/null +++ b/core/offchain/impl/runner.hpp @@ -0,0 +1,77 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KAGOME_OFFCHAIN_IMPL_RUNNER_HPP +#define KAGOME_OFFCHAIN_IMPL_RUNNER_HPP + +#include +#include +#include + +#include "utils/thread_pool.hpp" + +namespace kagome::offchain { + /** + * Enqueue at most `max_tasks_` to run on number of `threads_`. + * Old tasks do not run and are removed when queue is full. + */ + class Runner : public std::enable_shared_from_this { + public: + using Task = std::function; + + Runner(size_t threads, size_t max_tasks) + : threads_{threads}, + free_threads_{threads}, + max_tasks_{max_tasks}, + thread_pool_{threads_} {} + + void run(Task &&task) { + std::unique_lock lock{mutex_}; + if (tasks_.size() >= max_tasks_) { + tasks_.pop_front(); + } + if (free_threads_ == 0) { + tasks_.emplace_back(std::move(task)); + return; + } + --free_threads_; + lock.unlock(); + thread_pool_.io_context()->post( + [weak{weak_from_this()}, task{std::move(task)}] { + if (auto self = weak.lock()) { + auto release = gsl::finally([&] { + std::unique_lock lock{self->mutex_}; + ++self->free_threads_; + }); + task(); + self->drain(); + } + }); + } + + private: + void drain() { + while (true) { + std::unique_lock lock{mutex_}; + if (tasks_.empty()) { + break; + } + auto task{std::move(tasks_.front())}; + tasks_.pop_front(); + lock.unlock(); + task(); + } + } + + std::mutex mutex_; + const size_t threads_; + size_t free_threads_; + const size_t max_tasks_; + std::deque tasks_; + ThreadPool thread_pool_; + }; +} // namespace kagome::offchain + +#endif // KAGOME_OFFCHAIN_IMPL_RUNNER_HPP diff --git a/core/runtime/runtime_api/impl/offchain_worker_api.cpp b/core/runtime/runtime_api/impl/offchain_worker_api.cpp index afde90e7f5..e267f1c8d5 100644 --- a/core/runtime/runtime_api/impl/offchain_worker_api.cpp +++ b/core/runtime/runtime_api/impl/offchain_worker_api.cpp @@ -6,9 +6,21 @@ #include "runtime/runtime_api/impl/offchain_worker_api.hpp" #include "application/app_configuration.hpp" +#include "offchain/impl/runner.hpp" #include "offchain/offchain_worker_factory.hpp" +#include "runtime/common/executor.hpp" namespace kagome::runtime { + constexpr size_t kMaxThreads = 3; + constexpr size_t kMaxTasks = 1000; + + outcome::result callOffchainWorkerApi( + Executor &executor, + const primitives::BlockHash &block, + const primitives::BlockHeader &header) { + return executor.callAt( + block, "OffchainWorkerApi_offchain_worker", header); + } OffchainWorkerApiImpl::OffchainWorkerApiImpl( const application::AppConfiguration &app_config, @@ -16,6 +28,7 @@ namespace kagome::runtime { std::shared_ptr executor) : app_config_(app_config), ocw_factory_(std::move(ocw_factory)), + runner_{std::make_shared(kMaxThreads, kMaxTasks)}, executor_(std::move(executor)) { BOOST_ASSERT(ocw_factory_); BOOST_ASSERT(executor_); @@ -39,11 +52,11 @@ namespace kagome::runtime { } } - auto worker = ocw_factory_->make(executor_, header); - - auto res = worker->run(); + runner_->run([worker = ocw_factory_->make(executor_, header)] { + std::ignore = worker->run(); + }); - return res; + return outcome::success(); } } // namespace kagome::runtime diff --git a/core/runtime/runtime_api/impl/offchain_worker_api.hpp b/core/runtime/runtime_api/impl/offchain_worker_api.hpp index a6c85a8891..0173c76d91 100644 --- a/core/runtime/runtime_api/impl/offchain_worker_api.hpp +++ b/core/runtime/runtime_api/impl/offchain_worker_api.hpp @@ -13,15 +13,19 @@ namespace kagome::application { } namespace kagome::offchain { class OffchainWorkerFactory; -} + class Runner; +} // namespace kagome::offchain namespace kagome::runtime { class Executor; - class OffchainWorkerApiImpl final - : public OffchainWorkerApi, - std::enable_shared_from_this { + outcome::result callOffchainWorkerApi( + Executor &executor, + const primitives::BlockHash &block, + const primitives::BlockHeader &header); + + class OffchainWorkerApiImpl final : public OffchainWorkerApi { public: OffchainWorkerApiImpl( const application::AppConfiguration &app_config, @@ -35,6 +39,7 @@ namespace kagome::runtime { private: const application::AppConfiguration &app_config_; std::shared_ptr ocw_factory_; + std::shared_ptr runner_; std::shared_ptr executor_; }; diff --git a/core/utils/thread_pool.hpp b/core/utils/thread_pool.hpp index 9d89966136..3a1669b2a2 100644 --- a/core/utils/thread_pool.hpp +++ b/core/utils/thread_pool.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include "utils/non_copyable.hpp" @@ -107,6 +108,10 @@ namespace kagome { } } + const std::shared_ptr &io_context() const { + return ioc_; + } + std::shared_ptr handler() { BOOST_ASSERT(ioc_); return std::make_shared(ioc_);