Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit offchain threads #1630

Merged
merged 4 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/authority_discovery/query/query_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ namespace kagome::authority_discovery {
auto peer_id_str = peer.id.toBase58();
for (auto &pb : record.addresses()) {
OUTCOME_TRY(address, libp2p::multi::Multiaddress::create(str2byte(pb)));
if (address.getPeerId() != peer_id_str) {
auto id = address.getPeerId();
if (not id) {
continue;
}
if (id != peer_id_str) {
return Error::INCONSISTENT_PEER_ID;
}
peer.addresses.emplace_back(std::move(address));
Expand Down
45 changes: 15 additions & 30 deletions core/offchain/impl/offchain_worker_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

#include "offchain/impl/offchain_worker_impl.hpp"

#include <thread>

#include <libp2p/host/host.hpp>

#include "api/service/author/author_api.hpp"
Expand All @@ -15,6 +13,7 @@
#include "offchain/impl/offchain_local_storage.hpp"
#include "offchain/offchain_worker_pool.hpp"
#include "runtime/common/executor.hpp"
#include "runtime/runtime_api/impl/offchain_worker_api.hpp"
#include "storage/database_error.hpp"

namespace kagome::offchain {
Expand Down Expand Up @@ -63,40 +62,26 @@ namespace kagome::offchain {
outcome::result<void> OffchainWorkerImpl::run() {
BOOST_ASSERT(not ocw_pool_->getWorker());

auto main_thread_func = [ocw = shared_from_this(), ocw_pool = ocw_pool_] {
soralog::util::setThreadName("ocw." + std::to_string(ocw->block_.number));

ocw_pool->addWorker(ocw);
soralog::util::setThreadName("ocw." + std::to_string(block_.number));

SL_TRACE(
ocw->log_, "Offchain worker is started for block {}", ocw->block_);
ocw_pool_->addWorker(shared_from_this());
auto remove = gsl::finally([&] { ocw_pool_->removeWorker(); });

auto res = ocw->executor_->callAt<void>(
ocw->block_.hash, "OffchainWorkerApi_offchain_worker", ocw->header_);
SL_TRACE(log_, "Offchain worker is started for block {}", block_);

ocw_pool->removeWorker();
auto res = runtime::callOffchainWorkerApi(*executor_, block_.hash, header_);

if (res.has_failure()) {
SL_ERROR(ocw->log_,
"Can't execute offchain worker for block {}: {}",
ocw->block_,
res.error());
return;
}

SL_DEBUG(ocw->log_,
"Offchain worker is successfully executed for block {}",
ocw->block_);
};

try {
std::thread(std::move(main_thread_func)).detach();
} catch (const std::system_error &exception) {
return outcome::failure(exception.code());
} catch (...) {
BOOST_UNREACHABLE_RETURN({});
if (res.has_error()) {
SL_ERROR(log_,
"Can't execute offchain worker for block {}: {}",
block_,
res.error());
return res.error();
}

SL_DEBUG(
log_, "Offchain worker is successfully executed for block {}", block_);

return outcome::success();
}

Expand Down
77 changes: 77 additions & 0 deletions core/offchain/impl/runner.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef KAGOME_OFFCHAIN_IMPL_RUNNER_HPP
#define KAGOME_OFFCHAIN_IMPL_RUNNER_HPP

#include <deque>
#include <gsl/gsl_util>
#include <mutex>

#include "utils/thread_pool.hpp"

namespace kagome::offchain {
/**
* Enqueue at most `max_tasks_` to run on number of `threads_`.
* Old tasks do not run and are removed when queue is full.
*/
class Runner : public std::enable_shared_from_this<Runner> {
public:
using Task = std::function<void()>;

Runner(size_t threads, size_t max_tasks)
: threads_{threads},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

мне кажется не стоит тут порождать потоки, у нас уже создается пулл для фоновых задач и можно выполнять в нем https://imgur.com/7zEUjSI.png
А в пулле нам по сути без разницы на какую работу тратится время пулла, от этого ресурсов больше не станет, так что счетчик max_tasks будет нормально работать.
Зато в дальнейшем это даст например возможность ограничить этим же кодом некоторые задачи в парачейнах.

free_threads_{threads},
max_tasks_{max_tasks},
thread_pool_{threads_} {}

void run(Task &&task) {
std::unique_lock lock{mutex_};
if (tasks_.size() >= max_tasks_) {
tasks_.pop_front();
}
if (free_threads_ == 0) {
tasks_.emplace_back(std::move(task));
return;
}
--free_threads_;
lock.unlock();
thread_pool_.io_context()->post(
[weak{weak_from_this()}, task{std::move(task)}] {
if (auto self = weak.lock()) {
auto release = gsl::finally([&] {
std::unique_lock lock{self->mutex_};
++self->free_threads_;
});
task();
self->drain();
}
});
}

private:
void drain() {
while (true) {
std::unique_lock lock{mutex_};
if (tasks_.empty()) {
break;
}
auto task{std::move(tasks_.front())};
tasks_.pop_front();
lock.unlock();
task();
}
}

std::mutex mutex_;
const size_t threads_;
size_t free_threads_;
const size_t max_tasks_;
std::deque<Task> tasks_;
ThreadPool thread_pool_;
};
} // namespace kagome::offchain

#endif // KAGOME_OFFCHAIN_IMPL_RUNNER_HPP
21 changes: 17 additions & 4 deletions core/runtime/runtime_api/impl/offchain_worker_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,29 @@
#include "runtime/runtime_api/impl/offchain_worker_api.hpp"

#include "application/app_configuration.hpp"
#include "offchain/impl/runner.hpp"
#include "offchain/offchain_worker_factory.hpp"
#include "runtime/common/executor.hpp"

namespace kagome::runtime {
constexpr size_t kMaxThreads = 3;
constexpr size_t kMaxTasks = 1000;

outcome::result<void> callOffchainWorkerApi(
Executor &executor,
const primitives::BlockHash &block,
const primitives::BlockHeader &header) {
return executor.callAt<void>(
block, "OffchainWorkerApi_offchain_worker", header);
}

OffchainWorkerApiImpl::OffchainWorkerApiImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<offchain::OffchainWorkerFactory> ocw_factory,
std::shared_ptr<Executor> executor)
: app_config_(app_config),
ocw_factory_(std::move(ocw_factory)),
runner_{std::make_shared<offchain::Runner>(kMaxThreads, kMaxTasks)},
executor_(std::move(executor)) {
BOOST_ASSERT(ocw_factory_);
BOOST_ASSERT(executor_);
Expand All @@ -39,11 +52,11 @@ namespace kagome::runtime {
}
}

auto worker = ocw_factory_->make(executor_, header);

auto res = worker->run();
runner_->run([worker = ocw_factory_->make(executor_, header)] {
std::ignore = worker->run();
});

return res;
return outcome::success();
}

} // namespace kagome::runtime
13 changes: 9 additions & 4 deletions core/runtime/runtime_api/impl/offchain_worker_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ namespace kagome::application {
}
namespace kagome::offchain {
class OffchainWorkerFactory;
}
class Runner;
} // namespace kagome::offchain

namespace kagome::runtime {

class Executor;

class OffchainWorkerApiImpl final
: public OffchainWorkerApi,
std::enable_shared_from_this<OffchainWorkerApiImpl> {
outcome::result<void> callOffchainWorkerApi(
Executor &executor,
const primitives::BlockHash &block,
const primitives::BlockHeader &header);

class OffchainWorkerApiImpl final : public OffchainWorkerApi {
public:
OffchainWorkerApiImpl(
const application::AppConfiguration &app_config,
Expand All @@ -35,6 +39,7 @@ namespace kagome::runtime {
private:
const application::AppConfiguration &app_config_;
std::shared_ptr<offchain::OffchainWorkerFactory> ocw_factory_;
std::shared_ptr<offchain::Runner> runner_;
std::shared_ptr<Executor> executor_;
};

Expand Down
5 changes: 5 additions & 0 deletions core/utils/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <memory>
#include <optional>
#include <thread>

#include "utils/non_copyable.hpp"
Expand Down Expand Up @@ -107,6 +108,10 @@ namespace kagome {
}
}

const std::shared_ptr<boost::asio::io_context> &io_context() const {
return ioc_;
}

std::shared_ptr<ThreadHandler> handler() {
BOOST_ASSERT(ioc_);
return std::make_shared<ThreadHandler>(ioc_);
Expand Down