From 7f5782cc0c06eacd747d04bddb10f01529068c01 Mon Sep 17 00:00:00 2001 From: iceseer Date: Wed, 12 Apr 2023 11:30:20 +0300 Subject: [PATCH 1/4] refactoring thread pool Signed-off-by: iceseer --- .../validator/impl/parachain_processor.cpp | 3 +- core/utils/thread_pool.hpp | 167 ++++++++++++++++-- 2 files changed, 158 insertions(+), 12 deletions(-) diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 5e58e4af73..5be24ffe2e 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -250,6 +250,7 @@ namespace kagome::parachain { } bool ParachainProcessorImpl::start() { + thread_pool_->handler().start(); return true; } @@ -607,7 +608,7 @@ namespace kagome::parachain { peer_id); sequenceIgnore( - thread_pool_->io_context()->wrap( + thread_pool_->handler().io_context()->wrap( asAsync([wself{weak_from_this()}, candidate{std::move(candidate)}, pov{std::move(pov)}, diff --git a/core/utils/thread_pool.hpp b/core/utils/thread_pool.hpp index 83623ef289..d0045adb9c 100644 --- a/core/utils/thread_pool.hpp +++ b/core/utils/thread_pool.hpp @@ -6,6 +6,7 @@ #ifndef KAGOME_UTILS_THREAD_POOL_HPP #define KAGOME_UTILS_THREAD_POOL_HPP +#include #include #include #include @@ -14,40 +15,184 @@ #include "utils/non_copyable.hpp" namespace kagome { - // TODO(turuslan): use `ThreadPool` in `RpcThreadPool` and `RpcContext` + + class ThreadHandler final { + enum struct State : uint32_t { kStopped = 0, kStarted }; + + public: + ThreadHandler(ThreadHandler &&) = delete; + ThreadHandler(ThreadHandler const &) = delete; + + ThreadHandler &operator=(ThreadHandler &&) = delete; + ThreadHandler &operator=(ThreadHandler const &) = delete; + + ThreadHandler() + : execution_state_{State::kStopped}, + ioc_{std::make_shared()}, + work_guard_{ioc_->get_executor()} {} + + explicit ThreadHandler(std::shared_ptr io_context) + : execution_state_{State::kStopped}, + ioc_{std::move(io_context)}, + work_guard_{} {} + + ~ThreadHandler() { + if (work_guard_) { + ioc_->stop(); + } + } + + void start() { + execution_state_.store(State::kStarted, std::memory_order_release); + } + + void stop() { + execution_state_.store(State::kStopped, std::memory_order_release); + } + + template + void execute(F &&func) { + BOOST_ASSERT(ioc_); + if (State::kStarted == execution_state_.load(std::memory_order_acquire)) { + ioc_->post(std::forward(func)); + } + } + + template + auto reinvoke(F &&func, Args &&...args) + -> std::optional> { + if (!isInCurrentThread()) { + execute([func(std::forward(func)), + tup{std::tuple...>{ + std::forward(args)...}}]() mutable { + std::apply([&](auto &&...a) mutable { func(std::move(a)...); }, + std::move(tup)); + }); + return std::nullopt; + } + return {std::make_tuple(std::forward(args)...)}; + } + + bool isInCurrentThread() const { + BOOST_ASSERT(ioc_); + return ioc_->get_executor().running_in_this_thread(); + } + + std::shared_ptr io_context() const { + return ioc_; + } + + private: + std::atomic execution_state_; + std::shared_ptr ioc_; + std::optional> + work_guard_; + }; /** * Creates `io_context` and runs it on `thread_count` threads. */ - class ThreadPool final : NonCopyable, NonMovable { + class ThreadPool final { + enum struct State : uint32_t { kStopped = 0, kStarted }; + public: + ThreadPool(ThreadPool &&) = delete; + ThreadPool(ThreadPool const &) = delete; + + ThreadPool &operator=(ThreadPool &&) = delete; + ThreadPool &operator=(ThreadPool const &) = delete; + explicit ThreadPool(size_t thread_count) - : io_context_{std::make_shared()}, - work_guard_{io_context_->get_executor()} { + : handler_{std::make_optional()} { + BOOST_ASSERT(handler_); BOOST_ASSERT(thread_count > 0); threads_.reserve(thread_count); + for (size_t i = 0; i < thread_count; ++i) { - threads_.emplace_back([io{io_context_}] { io->run(); }); + threads_.emplace_back([io{handler_->io_context()}] { io->run(); }); } } ~ThreadPool() { - io_context_->stop(); + BOOST_ASSERT(handler_); + handler_ = std::nullopt; + + BOOST_ASSERT(!threads_.empty()); for (auto &thread : threads_) { thread.join(); } } - const std::shared_ptr &io_context() const { - return io_context_; + template + void execute(F &&func) { + BOOST_ASSERT(handler_); + handler_->execute(std::forward(func)); + } + + template + auto reinvoke(F &&func, Args &&...args) + -> std::optional> { + BOOST_ASSERT(handler_); + return handler_->reinvoke(std::forward(func), + std::forward(args)...); + } + + bool isInCurrentThread() const { + BOOST_ASSERT(handler_); + return handler_->isInCurrentThread(); + } + + ThreadHandler &handler() { + BOOST_ASSERT(handler_); + return *handler_; } private: - std::shared_ptr io_context_; - boost::asio::executor_work_guard - work_guard_; + std::optional handler_; std::vector threads_; }; } // namespace kagome +#define REINVOKE_3(ctx, func, in_1, in_2, in_3, out_1, out_2, out_3) \ + auto res##func = (ctx).reinvoke( \ + [wself(weak_from_this())](auto &&x1, auto &&x2, auto &&x3) { \ + if (auto self = wself.lock()) { \ + self->func(std::move(x1), std::move(x2), std::move(x3)); \ + } \ + }, \ + std::move(in_1), \ + std::move(in_2), \ + std::move(in_3)); \ + if (!(res##func)) return; \ + auto &&[func##x1, func##x2, func##x3] = std::move(*(res##func)); \ + auto && (out_1) = func##x1; \ + auto && (out_2) = func##x2; \ + auto && (out_3) = func##x3; + +#define REINVOKE_2(ctx, func, in_1, in_2, out_1, out_2) \ + auto res##func = (ctx).reinvoke( \ + [wself(weak_from_this())](auto &&x1, auto &&x2) { \ + if (auto self = wself.lock()) { \ + self->func(std::move(x1), std::move(x2)); \ + } \ + }, \ + std::move(in_1), \ + std::move(in_2)); \ + if (!(res##func)) return; \ + auto &&[func##x1, func##x2] = std::move(*(res##func)); \ + auto && (out_1) = func##x1; \ + auto && (out_2) = func##x2; + +#define REINVOKE_1(ctx, func, in, out) \ + auto res##func = (ctx).reinvoke( \ + [wself(weak_from_this())](auto &&x) { \ + if (auto self = wself.lock()) { \ + self->func(std::move(x)); \ + } \ + }, \ + std::move(in)); \ + if (!(res##func)) return; \ + auto && (out) = std::get<0>(std::move(*(res##func))); + #endif // KAGOME_UTILS_THREAD_POOL_HPP From 40556924d514c481d12f2fba38a25565e180cb98 Mon Sep 17 00:00:00 2001 From: iceseer Date: Mon, 17 Apr 2023 11:47:22 +0300 Subject: [PATCH 2/4] issues Signed-off-by: iceseer --- .../validator/impl/parachain_processor.cpp | 7 ++- .../validator/parachain_processor.hpp | 1 + core/utils/thread_pool.hpp | 63 +++++-------------- 3 files changed, 20 insertions(+), 51 deletions(-) diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 5be24ffe2e..56ed1c9ac9 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -94,7 +94,8 @@ namespace kagome::parachain { parachain_host_(std::move(parachain_host)), app_config_(app_config), babe_status_observable_(std::move(babe_status_observable)), - query_audi_{std::move(query_audi)} { + query_audi_{std::move(query_audi)}, + thread_handler_{thread_pool_->handler()} { BOOST_ASSERT(pm_); BOOST_ASSERT(peer_view_); BOOST_ASSERT(crypto_provider_); @@ -250,7 +251,7 @@ namespace kagome::parachain { } bool ParachainProcessorImpl::start() { - thread_pool_->handler().start(); + thread_handler_->start(); return true; } @@ -608,7 +609,7 @@ namespace kagome::parachain { peer_id); sequenceIgnore( - thread_pool_->handler().io_context()->wrap( + thread_handler_->io_context()->wrap( asAsync([wself{weak_from_this()}, candidate{std::move(candidate)}, pov{std::move(pov)}, diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index cc7820111c..e0fcfda6d3 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -429,6 +429,7 @@ namespace kagome::parachain { std::shared_ptr query_audi_; std::shared_ptr chain_sub_; + std::shared_ptr thread_handler_; }; } // namespace kagome::parachain diff --git a/core/utils/thread_pool.hpp b/core/utils/thread_pool.hpp index d0045adb9c..00a7ef0250 100644 --- a/core/utils/thread_pool.hpp +++ b/core/utils/thread_pool.hpp @@ -26,21 +26,9 @@ namespace kagome { ThreadHandler &operator=(ThreadHandler &&) = delete; ThreadHandler &operator=(ThreadHandler const &) = delete; - ThreadHandler() - : execution_state_{State::kStopped}, - ioc_{std::make_shared()}, - work_guard_{ioc_->get_executor()} {} - explicit ThreadHandler(std::shared_ptr io_context) - : execution_state_{State::kStopped}, - ioc_{std::move(io_context)}, - work_guard_{} {} - - ~ThreadHandler() { - if (work_guard_) { - ioc_->stop(); - } - } + : execution_state_{State::kStopped}, ioc_{std::move(io_context)} {} + ~ThreadHandler() = default; void start() { execution_state_.store(State::kStarted, std::memory_order_release); @@ -85,9 +73,6 @@ namespace kagome { private: std::atomic execution_state_; std::shared_ptr ioc_; - std::optional> - work_guard_; }; /** @@ -104,52 +89,34 @@ namespace kagome { ThreadPool &operator=(ThreadPool const &) = delete; explicit ThreadPool(size_t thread_count) - : handler_{std::make_optional()} { - BOOST_ASSERT(handler_); + : ioc_{std::make_shared()}, + work_guard_{ioc_->get_executor()} { + BOOST_ASSERT(ioc_); BOOST_ASSERT(thread_count > 0); - threads_.reserve(thread_count); + threads_.reserve(thread_count); for (size_t i = 0; i < thread_count; ++i) { - threads_.emplace_back([io{handler_->io_context()}] { io->run(); }); + threads_.emplace_back([io{ioc_}] { io->run(); }); } } ~ThreadPool() { - BOOST_ASSERT(handler_); - handler_ = std::nullopt; - - BOOST_ASSERT(!threads_.empty()); + ioc_->stop(); for (auto &thread : threads_) { thread.join(); } } - template - void execute(F &&func) { - BOOST_ASSERT(handler_); - handler_->execute(std::forward(func)); - } - - template - auto reinvoke(F &&func, Args &&...args) - -> std::optional> { - BOOST_ASSERT(handler_); - return handler_->reinvoke(std::forward(func), - std::forward(args)...); - } - - bool isInCurrentThread() const { - BOOST_ASSERT(handler_); - return handler_->isInCurrentThread(); - } - - ThreadHandler &handler() { - BOOST_ASSERT(handler_); - return *handler_; + std::shared_ptr handler() { + BOOST_ASSERT(ioc_); + return std::make_shared(ioc_); } private: - std::optional handler_; + std::shared_ptr ioc_; + std::optional> + work_guard_; std::vector threads_; }; } // namespace kagome From b072b6d3cd96860c2e8c68335c998c0e478035d8 Mon Sep 17 00:00:00 2001 From: iceseer Date: Tue, 18 Apr 2023 10:35:12 +0300 Subject: [PATCH 3/4] issues Signed-off-by: iceseer --- .../validator/impl/parachain_processor.cpp | 162 +++++++++--------- .../validator/parachain_processor.hpp | 140 +++++++-------- core/utils/thread_pool.hpp | 8 +- 3 files changed, 156 insertions(+), 154 deletions(-) diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 6cf2ce6533..caec310615 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -122,8 +122,8 @@ namespace kagome::parachain { bool ParachainProcessorImpl::prepare() { bitfield_signer_->setBroadcastCallback( [log{logger_}, wptr_self{weak_from_this()}]( - primitives::BlockHash const &relay_parent, - network::SignedBitfield const &bitfield) { + const primitives::BlockHash &relay_parent, + const network::SignedBitfield &bitfield) { log->info("Distribute bitfield on {}", relay_parent); if (auto self = wptr_self.lock()) { auto msg = std::make_shared< @@ -235,7 +235,7 @@ namespace kagome::parachain { return true; } - void ParachainProcessorImpl::broadcastView(network::View const &view) const { + void ParachainProcessorImpl::broadcastView(const network::View &view) const { auto msg = std::make_shared< network::WireMessage>( network::ViewUpdate{.view = view}); @@ -273,7 +273,7 @@ namespace kagome::parachain { return Error::KEY_NOT_PRESENT; } - auto const n_cores = cores.size(); + const auto n_cores = cores.size(); std::optional assignment; std::optional required_collator; @@ -281,9 +281,9 @@ namespace kagome::parachain { for (CoreIndex core_index = 0; core_index < static_cast(cores.size()); ++core_index) { - if (auto const *scheduled = + if (const auto *scheduled = boost::get(&cores[core_index])) { - auto const group_index = + const auto group_index = group_rotation_info.groupForCore(core_index, n_cores); if (group_index < validator_groups.size()) { auto &g = validator_groups[group_index]; @@ -357,7 +357,7 @@ namespace kagome::parachain { auto &seconded = parachain_state.seconded; auto &issued_statements = parachain_state.issued_statements; - network::CandidateDescriptor const &descriptor = + const network::CandidateDescriptor &descriptor = candidateDescriptorFrom(response); primitives::BlockHash const candidate_hash = candidateHashFrom(response); @@ -378,7 +378,7 @@ namespace kagome::parachain { return; } - auto const candidate_para_id = descriptor.para_id; + const auto candidate_para_id = descriptor.para_id; if (candidate_para_id != assignment) { logger_->warn( "Try to second for para_id {} out of our assignment {}.", @@ -399,7 +399,7 @@ namespace kagome::parachain { return; } - auto const can_process = + const auto can_process = pending_candidates.exclusiveAccess([&](auto &container) { auto it = container.find(pending_collation.pending_collation.relay_parent); @@ -431,8 +431,8 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onValidationProtocolMsg( - libp2p::peer::PeerId const &peer_id, - network::ValidatorProtocolMessage const &message) { + const libp2p::peer::PeerId &peer_id, + const network::ValidatorProtocolMessage &message) { if (auto m{boost::get(&message)}) { auto bd{boost::get(m)}; BOOST_ASSERT_MSG( @@ -459,8 +459,8 @@ namespace kagome::parachain { template void ParachainProcessorImpl::requestPoV( - libp2p::peer::PeerInfo const &peer_info, - CandidateHash const &candidate_hash, + const libp2p::peer::PeerInfo &peer_info, + const CandidateHash &candidate_hash, F &&callback) { /// TODO(iceseer): request PoV from validator, who seconded candidate /// But now we can assume, that if we received either `seconded` or `valid` @@ -478,7 +478,7 @@ namespace kagome::parachain { } std::optional - ParachainProcessorImpl::retrieveSessionInfo(RelayHash const &relay_parent) { + ParachainProcessorImpl::retrieveSessionInfo(const RelayHash &relay_parent) { if (auto session_index = parachain_host_->session_index_for_child(relay_parent); session_index.has_value()) { @@ -492,17 +492,17 @@ namespace kagome::parachain { } void ParachainProcessorImpl::kickOffValidationWork( - RelayHash const &relay_parent, + const RelayHash &relay_parent, AttestingData &attesting_data, RelayParentState ¶chain_state) { - auto const candidate_hash{candidateHashFrom(attesting_data.candidate)}; + const auto candidate_hash{candidateHashFrom(attesting_data.candidate)}; BOOST_ASSERT(this_context_->get_executor().running_in_this_thread()); if (!parachain_state.awaiting_validation.insert(candidate_hash).second) { return; } - auto const &collator_id = + const auto &collator_id = collatorIdFromDescriptor(attesting_data.candidate.descriptor); if (parachain_state.required_collator && collator_id != *parachain_state.required_collator) { @@ -524,7 +524,7 @@ namespace kagome::parachain { return; } - auto const &authority_id = + const auto &authority_id = session_info->discovery_keys[attesting_data.from_validator]; if (auto peer = query_audi_->get(authority_id)) { requestPoV( @@ -580,7 +580,7 @@ namespace kagome::parachain { outcome::result ParachainProcessorImpl::OnFetchChunkRequest( - network::FetchChunkRequest const &request) { + const network::FetchChunkRequest &request) { if (auto chunk = av_store_->getChunk(request.candidate, request.chunk_index)) { return network::Chunk{ @@ -595,8 +595,8 @@ namespace kagome::parachain { void ParachainProcessorImpl::appendAsyncValidationTask( network::CandidateReceipt &&candidate, network::ParachainBlock &&pov, - primitives::BlockHash const &relay_parent, - libp2p::peer::PeerId const &peer_id, + const primitives::BlockHash &relay_parent, + const libp2p::peer::PeerId &peer_id, RelayParentState ¶chain_state, const primitives::BlockHash &candidate_hash, size_t n_validators) { @@ -672,7 +672,7 @@ namespace kagome::parachain { template outcome::result ParachainProcessorImpl::sign( - T const &t) const { + const T &t) const { if (!keypair_) { return Error::KEY_NOT_PRESENT; } @@ -686,7 +686,7 @@ namespace kagome::parachain { ParachainProcessorImpl::tryGetStateByRelayParent( const primitives::BlockHash &relay_parent) { BOOST_ASSERT(this_context_->get_executor().running_in_this_thread()); - auto const it = our_current_state_.state_by_relay_parent.find(relay_parent); + const auto it = our_current_state_.state_by_relay_parent.find(relay_parent); if (it != our_current_state_.state_by_relay_parent.end()) { return it->second; } @@ -697,7 +697,7 @@ namespace kagome::parachain { ParachainProcessorImpl::storeStateByRelayParent( const primitives::BlockHash &relay_parent, RelayParentState &&val) { BOOST_ASSERT(this_context_->get_executor().running_in_this_thread()); - auto const &[it, inserted] = + const auto &[it, inserted] = our_current_state_.state_by_relay_parent.insert( {relay_parent, std::move(val)}); BOOST_ASSERT(inserted); @@ -705,9 +705,9 @@ namespace kagome::parachain { } void ParachainProcessorImpl::handleStatement( - libp2p::peer::PeerId const &peer_id, - primitives::BlockHash const &relay_parent, - network::SignedStatement const &statement) { + const libp2p::peer::PeerId &peer_id, + const primitives::BlockHash &relay_parent, + const network::SignedStatement &statement) { BOOST_ASSERT(this_context_->get_executor().running_in_this_thread()); auto opt_parachain_state = tryGetStateByRelayParent(relay_parent); if (!opt_parachain_state) { @@ -739,7 +739,7 @@ namespace kagome::parachain { std::optional> attesting_ref = visit_in_place( parachain::getPayload(statement).candidate_state, - [&](network::CommittedCandidateReceipt const &seconded) + [&](const network::CommittedCandidateReceipt &seconded) -> std::optional> { auto const &candidate_hash = result->imported.candidate; auto opt_candidate = @@ -760,7 +760,7 @@ namespace kagome::parachain { std::make_pair(candidate_hash, std::move(attesting))); return it->second; }, - [&](primitives::BlockHash const &candidate_hash) + [&](const primitives::BlockHash &candidate_hash) -> std::optional> { auto it = fallbacks.find(candidate_hash); if (it == fallbacks.end()) { @@ -777,7 +777,7 @@ namespace kagome::parachain { it->second.from_validator = statement.payload.ix; return it->second; }, - [&](auto const &) + [&](const auto &) -> std::optional> { BOOST_ASSERT(!"Not used!"); return std::nullopt; @@ -793,8 +793,8 @@ namespace kagome::parachain { std::optional ParachainProcessorImpl::importStatementToTable( ParachainProcessorImpl::RelayParentState &relayParentState, - primitives::BlockHash const &candidate_hash, - network::SignedStatement const &statement) { + const primitives::BlockHash &candidate_hash, + const network::SignedStatement &statement) { SL_TRACE( logger_, "Import statement into table.(candidate={})", candidate_hash); @@ -809,7 +809,7 @@ namespace kagome::parachain { } void ParachainProcessorImpl::notifyBackedCandidate( - network::SignedStatement const &statement) { + const network::SignedStatement &statement) { logger_->error( "Not implemented. Should notify somebody that backed candidate " "appeared."); @@ -818,10 +818,10 @@ namespace kagome::parachain { std::optional ParachainProcessorImpl::attested( network::CommittedCandidateReceipt &&candidate, - BackingStore::StatementInfo const &data, + const BackingStore::StatementInfo &data, size_t validity_threshold) { - auto const &validity_votes = data.second; - auto const valid_votes = validity_votes.size(); + const auto &validity_votes = data.second; + const auto valid_votes = validity_votes.size(); if (valid_votes < validity_threshold) { return std::nullopt; } @@ -858,16 +858,16 @@ namespace kagome::parachain { std::optional ParachainProcessorImpl::attested_candidate( - CandidateHash const &digest, - ParachainProcessorImpl::TableContext const &context) { + const CandidateHash &digest, + const ParachainProcessorImpl::TableContext &context) { if (auto opt_validity_votes = backing_store_->get_validity_votes(digest)) { auto &data = opt_validity_votes->get(); - GroupIndex const group = data.first; + const GroupIndex group = data.first; auto candidate{backing_store_->get_candidate(digest)}; BOOST_ASSERT(candidate); - auto const v_threshold = context.requisite_votes(group); + const auto v_threshold = context.requisite_votes(group); return attested(std::move(*candidate), data, v_threshold); } return std::nullopt; @@ -876,18 +876,18 @@ namespace kagome::parachain { std::optional ParachainProcessorImpl::table_attested_to_backed( AttestedCandidate &&attested, TableContext &table_context) { - auto const para_id = attested.group_id; + const auto para_id = attested.group_id; if (auto it = table_context.groups.find(para_id); it != table_context.groups.end()) { - auto const &group = it->second; + const auto &group = it->second; scale::BitVec validator_indices{}; validator_indices.bits.resize(group.size(), false); std::vector> vote_positions; vote_positions.reserve(attested.validity_votes.size()); - auto position = [](auto const &container, - auto const &val) -> std::optional { + auto position = [](const auto &container, + const auto &val) -> std::optional { for (size_t ix = 0; ix < container.size(); ++ix) { if (val == container[ix]) { return ix; @@ -898,7 +898,7 @@ namespace kagome::parachain { for (size_t orig_idx = 0; orig_idx < attested.validity_votes.size(); ++orig_idx) { - auto const &id = attested.validity_votes[orig_idx].first; + const auto &id = attested.validity_votes[orig_idx].first; if (auto p = position(group, id)) { validator_indices.bits[*p] = true; vote_positions.emplace_back(orig_idx, *p); @@ -912,11 +912,11 @@ namespace kagome::parachain { std::sort( vote_positions.begin(), vote_positions.end(), - [](auto const &l, auto const &r) { return l.second < r.second; }); + [](const auto &l, const auto &r) { return l.second < r.second; }); std::vector validity_votes; validity_votes.reserve(vote_positions.size()); - for (auto const &[pos_in_votes, _pos_in_group] : vote_positions) { + for (const auto &[pos_in_votes, _pos_in_group] : vote_positions) { validity_votes.emplace_back( std::move(attested.validity_votes[pos_in_votes].second)); } @@ -932,8 +932,8 @@ namespace kagome::parachain { std::optional ParachainProcessorImpl::importStatement( - network::RelayHash const &relay_parent, - network::SignedStatement const &statement, + const network::RelayHash &relay_parent, + const network::SignedStatement &statement, ParachainProcessorImpl::RelayParentState &relayParentState) { auto import_result = importStatementToTable( relayParentState, @@ -1034,7 +1034,7 @@ namespace kagome::parachain { template bool ParachainProcessorImpl::tryOpenOutgoingStream( - libp2p::peer::PeerId const &peer_id, + const libp2p::peer::PeerId &peer_id, std::shared_ptr protocol, F &&callback) { auto stream_engine = pm_->getStreamEngine(); @@ -1083,7 +1083,7 @@ namespace kagome::parachain { template bool ParachainProcessorImpl::tryOpenOutgoingCollatingStream( - libp2p::peer::PeerId const &peer_id, F &&callback) { + const libp2p::peer::PeerId &peer_id, F &&callback) { auto protocol = router_->getCollationProtocol(); BOOST_ASSERT(protocol); @@ -1093,7 +1093,7 @@ namespace kagome::parachain { template bool ParachainProcessorImpl::tryOpenOutgoingValidationStream( - libp2p::peer::PeerId const &peer_id, F &&callback) { + const libp2p::peer::PeerId &peer_id, F &&callback) { auto protocol = router_->getValidationProtocol(); BOOST_ASSERT(protocol); @@ -1124,7 +1124,7 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onIncomingCollationStream( - libp2p::peer::PeerId const &peer_id) { + const libp2p::peer::PeerId &peer_id) { if (tryOpenOutgoingCollatingStream( peer_id, [wptr{weak_from_this()}, peer_id](auto &&stream) { if (auto self = wptr.lock()) { @@ -1137,7 +1137,7 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onIncomingValidationStream( - libp2p::peer::PeerId const &peer_id) { + const libp2p::peer::PeerId &peer_id) { if (tryOpenOutgoingValidationStream( peer_id, [wptr{weak_from_this()}, peer_id](auto &&stream) { if (auto self = wptr.lock()) { @@ -1158,15 +1158,15 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onIncomingCollator( - libp2p::peer::PeerId const &peer_id, + const libp2p::peer::PeerId &peer_id, network::CollatorPublicKey pubkey, network::ParachainId para_id) { pm_->setCollating(peer_id, pubkey, para_id); } void ParachainProcessorImpl::handleNotify( - libp2p::peer::PeerId const &peer_id, - primitives::BlockHash const &relay_parent) { + const libp2p::peer::PeerId &peer_id, + const primitives::BlockHash &relay_parent) { if (tryOpenOutgoingCollatingStream( peer_id, [peer_id, relay_parent, wptr{weak_from_this()}]( @@ -1212,9 +1212,9 @@ namespace kagome::parachain { } void ParachainProcessorImpl::notify( - libp2p::peer::PeerId const &peer_id, - primitives::BlockHash const &relay_parent, - network::SignedStatement const &statement) { + const libp2p::peer::PeerId &peer_id, + const primitives::BlockHash &relay_parent, + const network::SignedStatement &statement) { our_current_state_.seconded_statements[peer_id].emplace_back( std::make_pair(relay_parent, statement)); handleNotify(peer_id, relay_parent); @@ -1226,8 +1226,8 @@ namespace kagome::parachain { } outcome::result ParachainProcessorImpl::advCanBeProcessed( - primitives::BlockHash const &relay_parent, - libp2p::peer::PeerId const &peer_id) { + const primitives::BlockHash &relay_parent, + const libp2p::peer::PeerId &peer_id) { BOOST_ASSERT(this_context_->get_executor().running_in_this_thread()); OUTCOME_TRY(canProcessParachains()); @@ -1245,7 +1245,7 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onValidationComplete( - libp2p::peer::PeerId const &peer_id, + const libp2p::peer::PeerId &peer_id, ValidateAndSecondResult &&validation_result) { logger_->warn("On validation complete. (peer={}, relay parent={})", peer_id, @@ -1262,7 +1262,7 @@ namespace kagome::parachain { auto ¶chain_state = opt_parachain_state->get(); auto &seconded = parachain_state.seconded; - auto const candidate_hash = candidateHashFrom(validation_result.candidate); + const auto candidate_hash = candidateHashFrom(validation_result.candidate); if (!validation_result.result) { logger_->warn("Candidate {} validation failed with: {}", candidate_hash, @@ -1291,8 +1291,8 @@ namespace kagome::parachain { } void ParachainProcessorImpl::notifyStatementDistributionSystem( - primitives::BlockHash const &relay_parent, - network::SignedStatement const &statement) { + const primitives::BlockHash &relay_parent, + const network::SignedStatement &statement) { auto se = pm_->getStreamEngine(); BOOST_ASSERT(se); @@ -1313,23 +1313,23 @@ namespace kagome::parachain { outcome::result ParachainProcessorImpl::validateCandidate( - network::CandidateReceipt const &candidate, - network::ParachainBlock const &pov) { + const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov) { return pvf_->pvfSync(candidate, pov); } outcome::result> ParachainProcessorImpl::validateErasureCoding( - runtime::AvailableData const &validating_data, size_t n_validators) { + const runtime::AvailableData &validating_data, size_t n_validators) { return toChunks(n_validators, validating_data); } void ParachainProcessorImpl::notifyAvailableData( std::vector &&chunks, - primitives::BlockHash const &relay_parent, - network::CandidateHash const &candidate_hash, - network::ParachainBlock const &pov, - runtime::PersistedValidationData const &data) { + const primitives::BlockHash &relay_parent, + const network::CandidateHash &candidate_hash, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &data) { makeTrieProof(chunks); /// TODO(iceseer): remove copy av_store_->storeData( @@ -1341,12 +1341,12 @@ namespace kagome::parachain { ParachainProcessorImpl::validateAndMakeAvailable( network::CandidateReceipt &&candidate, network::ParachainBlock &&pov, - libp2p::peer::PeerId const &peer_id, - primitives::BlockHash const &relay_parent, + const libp2p::peer::PeerId &peer_id, + const primitives::BlockHash &relay_parent, size_t n_validators) { TicToc _measure{"Parachain validation", logger_}; - auto const candidate_hash{candidateHashFrom(candidate)}; + const auto candidate_hash{candidateHashFrom(candidate)}; auto validation_result = validateCandidate(candidate, pov); if (!validation_result) { logger_->warn( @@ -1383,7 +1383,7 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onAttestComplete( - libp2p::peer::PeerId const &, ValidateAndSecondResult &&result) { + const libp2p::peer::PeerId &, ValidateAndSecondResult &&result) { auto parachain_state = tryGetStateByRelayParent(result.relay_parent); if (!parachain_state) { logger_->warn( @@ -1396,7 +1396,7 @@ namespace kagome::parachain { result.relay_parent, result.candidate.descriptor.para_id); - auto const candidate_hash = candidateHashFrom(result.candidate); + const auto candidate_hash = candidateHashFrom(result.candidate); parachain_state->get().fallbacks.erase(candidate_hash); if (parachain_state->get().issued_statements.count(candidate_hash) == 0) { @@ -1413,8 +1413,8 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onAttestNoPoVComplete( - network::RelayHash const &relay_parent, - CandidateHash const &candidate_hash) { + const network::RelayHash &relay_parent, + const CandidateHash &candidate_hash) { auto parachain_state = tryGetStateByRelayParent(relay_parent); if (!parachain_state) { logger_->warn( @@ -1443,7 +1443,7 @@ namespace kagome::parachain { } void ParachainProcessorImpl::requestCollations( - network::CollationEvent const &pending_collation) { + const network::CollationEvent &pending_collation) { router_->getReqCollationProtocol()->request( pending_collation.pending_collation.peer_id, network::CollationFetchingRequest{ diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index 5259d49914..fe7401f12b 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -99,25 +99,27 @@ namespace kagome::parachain { ~ParachainProcessorImpl() = default; bool prepare(); - void requestCollations(network::CollationEvent const &pending_collation); + bool start(); + + void requestCollations(const network::CollationEvent &pending_collation); outcome::result canProcessParachains() const; outcome::result advCanBeProcessed( - primitives::BlockHash const &relay_parent, - libp2p::peer::PeerId const &peer_id); + const primitives::BlockHash &relay_parent, + const libp2p::peer::PeerId &peer_id); - void handleStatement(libp2p::peer::PeerId const &peer_id, - primitives::BlockHash const &relay_parent, - network::SignedStatement const &statement); - void onIncomingCollator(libp2p::peer::PeerId const &peer_id, + void handleStatement(const libp2p::peer::PeerId &peer_id, + const primitives::BlockHash &relay_parent, + const network::SignedStatement &statement); + void onIncomingCollator(const libp2p::peer::PeerId &peer_id, network::CollatorPublicKey pubkey, network::ParachainId para_id); - void onIncomingCollationStream(libp2p::peer::PeerId const &peer_id); - void onIncomingValidationStream(libp2p::peer::PeerId const &peer_id); + void onIncomingCollationStream(const libp2p::peer::PeerId &peer_id); + void onIncomingValidationStream(const libp2p::peer::PeerId &peer_id); void onValidationProtocolMsg( - libp2p::peer::PeerId const &peer_id, - network::ValidatorProtocolMessage const &message); + const libp2p::peer::PeerId &peer_id, + const network::ValidatorProtocolMessage &message); outcome::result OnFetchChunkRequest( - network::FetchChunkRequest const &request); + const network::FetchChunkRequest &request); network::ResponsePov getPov(CandidateHash &&candidate_hash); auto getAvStore() { @@ -195,27 +197,27 @@ namespace kagome::parachain { * Validation. */ outcome::result validateCandidate( - network::CandidateReceipt const &candidate, - network::ParachainBlock const &pov); + const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov); outcome::result> validateErasureCoding( - runtime::AvailableData const &validating_data, size_t n_validators); + const runtime::AvailableData &validating_data, size_t n_validators); outcome::result validateAndMakeAvailable( network::CandidateReceipt &&candidate, network::ParachainBlock &&pov, - libp2p::peer::PeerId const &peer_id, - primitives::BlockHash const &relay_parent, + const libp2p::peer::PeerId &peer_id, + const primitives::BlockHash &relay_parent, size_t n_validators); template - void requestPoV(libp2p::peer::PeerInfo const &peer_info, - CandidateHash const &candidate_hash, + void requestPoV(const libp2p::peer::PeerInfo &peer_info, + const CandidateHash &candidate_hash, F &&callback); std::optional attested_candidate( - CandidateHash const &digest, TableContext const &context); + const CandidateHash &digest, const TableContext &context); std::optional attested( network::CommittedCandidateReceipt &&candidate, - BackingStore::StatementInfo const &data, + const BackingStore::StatementInfo &data, size_t validity_threshold); std::optional table_attested_to_backed( AttestedCandidate &&attested, TableContext &table_context); @@ -223,26 +225,26 @@ namespace kagome::parachain { /* * Logic. */ - void onValidationComplete(libp2p::peer::PeerId const &peer_id, + void onValidationComplete(const libp2p::peer::PeerId &peer_id, ValidateAndSecondResult &&result); - void onAttestComplete(libp2p::peer::PeerId const &peer_id, + void onAttestComplete(const libp2p::peer::PeerId &peer_id, ValidateAndSecondResult &&result); - void onAttestNoPoVComplete(network::RelayHash const &relay_parent, - CandidateHash const &candidate_hash); + void onAttestNoPoVComplete(const network::RelayHash &relay_parent, + const CandidateHash &candidate_hash); template void appendAsyncValidationTask(network::CandidateReceipt &&candidate, network::ParachainBlock &&pov, - primitives::BlockHash const &relay_parent, - libp2p::peer::PeerId const &peer_id, + const primitives::BlockHash &relay_parent, + const libp2p::peer::PeerId &peer_id, RelayParentState ¶chain_state, const primitives::BlockHash &candidate_hash, size_t n_validators); - void kickOffValidationWork(RelayHash const &relay_parent, + void kickOffValidationWork(const RelayHash &relay_parent, AttestingData &attesting_data, RelayParentState ¶chain_state); std::optional retrieveSessionInfo( - RelayHash const &relay_parent); + const RelayHash &relay_parent); void handleFetchedCollation(network::CollationEvent &&pending_collation, network::CollationFetchingResponse &&response); template @@ -254,61 +256,61 @@ namespace kagome::parachain { ValidatorIndex validator_ix, RelayParentState ¶chain_state); std::optional importStatement( - network::RelayHash const &relay_parent, - network::SignedStatement const &statement, + const network::RelayHash &relay_parent, + const network::SignedStatement &statement, ParachainProcessorImpl::RelayParentState &relayParentState); /* * Helpers. */ primitives::BlockHash candidateHashFrom( - network::CandidateReceipt const &candidate) { + const network::CandidateReceipt &candidate) { return hasher_->blake2b_256(scale::encode(candidate).value()); } primitives::BlockHash candidateHashFrom( - network::CollationFetchingResponse const &collation) { + const network::CollationFetchingResponse &collation) { return visit_in_place( collation.response_data, - [&](network::CollationResponse const &collation_response) + [&](const network::CollationResponse &collation_response) -> primitives::BlockHash { return candidateHashFrom(collation_response.receipt); }); } - network::CandidateDescriptor const &candidateDescriptorFrom( - network::CollationFetchingResponse const &collation) { + const network::CandidateDescriptor &candidateDescriptorFrom( + const network::CollationFetchingResponse &collation) { return visit_in_place( collation.response_data, - [](network::CollationResponse const &collation_response) - -> network::CandidateDescriptor const & { + [](const network::CollationResponse &collation_response) + -> const network::CandidateDescriptor & { return collation_response.receipt.descriptor; }); } - std::optional> - candidateDescriptorFrom(network::Statement const &statement) { + std::optional> + candidateDescriptorFrom(const network::Statement &statement) { return visit_in_place( statement.candidate_state, - [](network::CommittedCandidateReceipt const &receipt) + [](const network::CommittedCandidateReceipt &receipt) -> std::optional< - std::reference_wrapper> { + std::reference_wrapper> { return receipt.descriptor; }, [](...) -> std::optional< - std::reference_wrapper> { + std::reference_wrapper> { BOOST_ASSERT(false); return std::nullopt; }); } - network::CollatorPublicKey const &collatorIdFromDescriptor( - network::CandidateDescriptor const &descriptor) { + const network::CollatorPublicKey &collatorIdFromDescriptor( + const network::CandidateDescriptor &descriptor) { return descriptor.collator_id; } network::CandidateReceipt candidateFromCommittedCandidateReceipt( - network::CommittedCandidateReceipt const &data) { + const network::CommittedCandidateReceipt &data) { return network::CandidateReceipt{ .descriptor = data.descriptor, .commitments_hash = @@ -316,18 +318,18 @@ namespace kagome::parachain { } primitives::BlockHash candidateHashFrom( - network::Statement const &statement) { + const network::Statement &statement) { return visit_in_place( statement.candidate_state, - [&](network::CommittedCandidateReceipt const &data) { + [&](const network::CommittedCandidateReceipt &data) { return hasher_->blake2b_256( scale::encode(candidateFromCommittedCandidateReceipt(data)) .value()); }, - [&](primitives::BlockHash const &candidate_hash) { + [&](const primitives::BlockHash &candidate_hash) { return candidate_hash; }, - [](auto const &) { + [](const auto &) { BOOST_ASSERT(!"Not used!"); return primitives::BlockHash{}; }); @@ -336,26 +338,26 @@ namespace kagome::parachain { /* * Notification */ - void broadcastView(network::View const &view) const; + void broadcastView(const network::View &view) const; template void notify_internal(std::shared_ptr &context, F &&func) { BOOST_ASSERT(context); boost::asio::post(*context, std::forward(func)); } - void notifyBackedCandidate(network::SignedStatement const &statement); + void notifyBackedCandidate(const network::SignedStatement &statement); void notifyAvailableData(std::vector &&chunk_list, - primitives::BlockHash const &relay_parent, - network::CandidateHash const &candidate_hash, - network::ParachainBlock const &pov, - runtime::PersistedValidationData const &data); + const primitives::BlockHash &relay_parent, + const network::CandidateHash &candidate_hash, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &data); void notifyStatementDistributionSystem( - primitives::BlockHash const &relay_parent, - network::SignedStatement const &statement); - void notify(libp2p::peer::PeerId const &peer_id, - primitives::BlockHash const &relay_parent, - network::SignedStatement const &statement); - void handleNotify(libp2p::peer::PeerId const &peer_id, - primitives::BlockHash const &relay_parent); + const primitives::BlockHash &relay_parent, + const network::SignedStatement &statement); + void notify(const libp2p::peer::PeerId &peer_id, + const primitives::BlockHash &relay_parent, + const network::SignedStatement &statement); + void handleNotify(const libp2p::peer::PeerId &peer_id, + const primitives::BlockHash &relay_parent); std::optional> tryGetStateByRelayParent(const primitives::BlockHash &relay_parent); @@ -367,13 +369,13 @@ namespace kagome::parachain { const primitives::BlockHash &relay_parent); template - bool tryOpenOutgoingCollatingStream(libp2p::peer::PeerId const &peer_id, + bool tryOpenOutgoingCollatingStream(const libp2p::peer::PeerId &peer_id, F &&callback); template - bool tryOpenOutgoingValidationStream(libp2p::peer::PeerId const &peer_id, + bool tryOpenOutgoingValidationStream(const libp2p::peer::PeerId &peer_id, F &&callback); template - bool tryOpenOutgoingStream(libp2p::peer::PeerId const &peer_id, + bool tryOpenOutgoingStream(const libp2p::peer::PeerId &peer_id, std::shared_ptr protocol, F &&callback); @@ -384,12 +386,12 @@ namespace kagome::parachain { bool isValidatingNode() const; template - outcome::result sign(T const &t) const; + outcome::result sign(const T &t) const; std::optional importStatementToTable( ParachainProcessorImpl::RelayParentState &relayParentState, - primitives::BlockHash const &candidate_hash, - network::SignedStatement const &statement); + const primitives::BlockHash &candidate_hash, + const network::SignedStatement &statement); std::shared_ptr pm_; std::shared_ptr crypto_provider_; diff --git a/core/utils/thread_pool.hpp b/core/utils/thread_pool.hpp index 00a7ef0250..9f83501869 100644 --- a/core/utils/thread_pool.hpp +++ b/core/utils/thread_pool.hpp @@ -21,10 +21,10 @@ namespace kagome { public: ThreadHandler(ThreadHandler &&) = delete; - ThreadHandler(ThreadHandler const &) = delete; + ThreadHandler(const ThreadHandler &) = delete; ThreadHandler &operator=(ThreadHandler &&) = delete; - ThreadHandler &operator=(ThreadHandler const &) = delete; + ThreadHandler &operator=(const ThreadHandler &) = delete; explicit ThreadHandler(std::shared_ptr io_context) : execution_state_{State::kStopped}, ioc_{std::move(io_context)} {} @@ -83,10 +83,10 @@ namespace kagome { public: ThreadPool(ThreadPool &&) = delete; - ThreadPool(ThreadPool const &) = delete; + ThreadPool(const ThreadPool &) = delete; ThreadPool &operator=(ThreadPool &&) = delete; - ThreadPool &operator=(ThreadPool const &) = delete; + ThreadPool &operator=(const ThreadPool &) = delete; explicit ThreadPool(size_t thread_count) : ioc_{std::make_shared()}, From ee8063a6d48065e13d3f78d29dcc8d45e97f7db7 Mon Sep 17 00:00:00 2001 From: iceseer Date: Wed, 19 Apr 2023 09:12:29 +0300 Subject: [PATCH 4/4] build fix Signed-off-by: iceseer --- core/parachain/validator/parachain_processor.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index fe7401f12b..9eb305677d 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -287,18 +287,18 @@ namespace kagome::parachain { }); } - std::optional> + std::optional> candidateDescriptorFrom(const network::Statement &statement) { return visit_in_place( statement.candidate_state, [](const network::CommittedCandidateReceipt &receipt) -> std::optional< - std::reference_wrapper> { + std::reference_wrapper> { return receipt.descriptor; }, [](...) -> std::optional< - std::reference_wrapper> { + std::reference_wrapper> { BOOST_ASSERT(false); return std::nullopt; });