diff --git a/CMakeLists.txt b/CMakeLists.txt index cda72b54bd..c972e7ee05 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,6 +38,8 @@ endif() if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") link_libraries(atomic) +elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") + link_libraries(atomic) endif() set(CMAKE_EXPORT_COMPILE_COMMANDS ON) diff --git a/core/consensus/grandpa/catch_up_observer.hpp b/core/consensus/grandpa/catch_up_observer.hpp index 7002f8239b..bd97d012b6 100644 --- a/core/consensus/grandpa/catch_up_observer.hpp +++ b/core/consensus/grandpa/catch_up_observer.hpp @@ -9,6 +9,7 @@ #include #include "consensus/grandpa/grandpa_context.hpp" +#include "network/peer_manager.hpp" #include "network/types/grandpa_message.hpp" namespace kagome::consensus::grandpa { @@ -24,8 +25,10 @@ namespace kagome::consensus::grandpa { * Handler of grandpa catch-up-request messages * @param msg catch-up-request messages */ - virtual void onCatchUpRequest(const libp2p::peer::PeerId &peer_id, - network::CatchUpRequest &&msg) = 0; + virtual void onCatchUpRequest( + const libp2p::peer::PeerId &peer_id, + std::optional &&info, + network::CatchUpRequest &&msg) = 0; /** * Handler of grandpa catch-up-response messages diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index 113a5524e8..a8b1e9f3e6 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -417,10 +417,15 @@ namespace kagome::consensus::grandpa { } } - void GrandpaImpl::onNeighborMessage(const libp2p::peer::PeerId &peer_id, - network::GrandpaNeighborMessage &&msg) { - REINVOKE( - *internal_thread_context_, onNeighborMessage, peer_id, std::move(msg)); + void GrandpaImpl::onNeighborMessage( + const libp2p::peer::PeerId &peer_id, + std::optional &&info, + network::GrandpaNeighborMessage &&msg) { + REINVOKE(*internal_thread_context_, + onNeighborMessage, + peer_id, + std::move(info), + std::move(msg)); BOOST_ASSERT(internal_thread_context_->isInCurrentThread()); SL_DEBUG(logger_, @@ -431,13 +436,13 @@ namespace kagome::consensus::grandpa { msg.last_finalized, peer_id); - auto info = peer_manager_->getPeerState(peer_id); std::optional info_set; std::optional info_round; // copy values before `updatePeerState` + if (info) { - info_set = info->get().set_id; - info_round = info->get().round_number; + info_set = info->set_id; + info_round = info->round_number; } bool reputation_changed = false; @@ -524,7 +529,7 @@ namespace kagome::consensus::grandpa { return; } - if (info->get().last_finalized > block_tree_->getLastFinalized().number) { + if (msg.last_finalized > block_tree_->getLastFinalized().number) { // Trying to substitute with justifications' request only main_thread_context_.execute([wself{weak_from_this()}, peer_id, @@ -560,14 +565,18 @@ namespace kagome::consensus::grandpa { } } - void GrandpaImpl::onCatchUpRequest(const libp2p::peer::PeerId &peer_id, - network::CatchUpRequest &&msg) { - REINVOKE( - *internal_thread_context_, onCatchUpRequest, peer_id, std::move(msg)); + void GrandpaImpl::onCatchUpRequest( + const libp2p::peer::PeerId &peer_id, + std::optional &&info_opt, + network::CatchUpRequest &&msg) { + REINVOKE(*internal_thread_context_, + onCatchUpRequest, + peer_id, + std::move(info_opt), + std::move(msg)); - auto info_opt = peer_manager_->getPeerState(peer_id); - if (not info_opt.has_value() or not info_opt->get().set_id.has_value() - or not info_opt->get().round_number.has_value()) { + if (not info_opt.has_value() or not info_opt->set_id.has_value() + or not info_opt->round_number.has_value()) { SL_DEBUG(logger_, "Catch-up request to round #{} received from {} was rejected: " "we are not have our view about remote peer", @@ -577,7 +586,7 @@ namespace kagome::consensus::grandpa { peer_id, network::reputation::cost::OUT_OF_SCOPE_MESSAGE); return; } - const auto &info = info_opt->get(); + const auto &info = *info_opt; // Check if request is corresponding our view about remote peer by set id if (msg.voter_set_id != info.set_id.value()) { @@ -934,16 +943,17 @@ namespace kagome::consensus::grandpa { void GrandpaImpl::onVoteMessage( std::optional> &&existed_context, const libp2p::peer::PeerId &peer_id, + std::optional &&info, const VoteMessage &msg) { REINVOKE(*internal_thread_context_, onVoteMessage, std::move(existed_context), peer_id, + std::move(info), msg); - auto info = peer_manager_->getPeerState(peer_id); - if (not info.has_value() or not info->get().set_id.has_value() - or not info->get().round_number.has_value()) { + if (not info.has_value() or not info->set_id.has_value() + or not info->round_number.has_value()) { SL_DEBUG( logger_, "{} signed by {} with set_id={} in round={} has received from {} " @@ -1502,7 +1512,11 @@ namespace kagome::consensus::grandpa { if (grandpa_context->vote.has_value()) { auto const &peer_id = grandpa_context->peer_id.value(); auto const &vote = grandpa_context->vote.value(); - self->onVoteMessage(std::move(grandpa_context), peer_id, vote); + auto info = self->peer_manager_->getPeerState(peer_id); + self->onVoteMessage(std::move(grandpa_context), + peer_id, + compactFromRefToOwn(info), + vote); } else if (grandpa_context->catch_up_response.has_value()) { auto const &peer_id = grandpa_context->peer_id.value(); auto const &catch_up_response = diff --git a/core/consensus/grandpa/impl/grandpa_impl.hpp b/core/consensus/grandpa/impl/grandpa_impl.hpp index e478006668..778d649cbc 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.hpp +++ b/core/consensus/grandpa/impl/grandpa_impl.hpp @@ -140,6 +140,7 @@ namespace kagome::consensus::grandpa { * @param msg received grandpa neighbour message */ void onNeighborMessage(const libp2p::peer::PeerId &peer_id, + std::optional &&info_opt, network::GrandpaNeighborMessage &&msg) override; // Catch-up methods @@ -155,6 +156,7 @@ namespace kagome::consensus::grandpa { * @param msg network message containing catch up request */ void onCatchUpRequest(const libp2p::peer::PeerId &peer_id, + std::optional &&info, network::CatchUpRequest &&msg) override; /** @@ -190,6 +192,7 @@ namespace kagome::consensus::grandpa { void onVoteMessage( std::optional> &&existed_context, const libp2p::peer::PeerId &peer_id, + std::optional &&info_opt, const network::VoteMessage &msg) override; /** diff --git a/core/consensus/grandpa/neighbor_observer.hpp b/core/consensus/grandpa/neighbor_observer.hpp index e941053d52..aba85f134b 100644 --- a/core/consensus/grandpa/neighbor_observer.hpp +++ b/core/consensus/grandpa/neighbor_observer.hpp @@ -24,8 +24,10 @@ namespace kagome::consensus::grandpa { * Handler of grandpa neighbor messages * @param msg neighbor messages */ - virtual void onNeighborMessage(const libp2p::peer::PeerId &peer_id, - network::GrandpaNeighborMessage &&msg) = 0; + virtual void onNeighborMessage( + const libp2p::peer::PeerId &peer_id, + std::optional &&info_opt, + network::GrandpaNeighborMessage &&msg) = 0; }; } // namespace kagome::consensus::grandpa diff --git a/core/consensus/grandpa/round_observer.hpp b/core/consensus/grandpa/round_observer.hpp index 5565272676..d0c6d7dd18 100644 --- a/core/consensus/grandpa/round_observer.hpp +++ b/core/consensus/grandpa/round_observer.hpp @@ -7,6 +7,7 @@ #pragma once #include "consensus/grandpa/grandpa_context.hpp" +#include "network/peer_manager.hpp" namespace libp2p::peer { class PeerId; @@ -37,6 +38,7 @@ namespace kagome::consensus::grandpa { virtual void onVoteMessage( std::optional> &&existed_context, const libp2p::peer::PeerId &peer_id, + std::optional &&info_opt, const VoteMessage &msg) = 0; /** diff --git a/core/network/helpers/stream_read_buffer.hpp b/core/network/helpers/stream_read_buffer.hpp index 7935650a95..06a571d8d1 100644 --- a/core/network/helpers/stream_read_buffer.hpp +++ b/core/network/helpers/stream_read_buffer.hpp @@ -89,14 +89,102 @@ namespace libp2p::connection { } // namespace libp2p::connection namespace kagome::network { + struct StreamWrapper final : libp2p::connection::Stream { + std::shared_ptr stream_; + log::Logger logger_ = log::createLogger("Stream", "network"); + const std::thread::id this_id_{std::this_thread::get_id()}; + + void check() const { + BOOST_ASSERT(this_id_ == std::this_thread::get_id()); + } + + StreamWrapper(std::shared_ptr stream) + : stream_{std::move(stream)} {} + + bool isClosedForRead() const { + return stream_->isClosedForRead(); + } + + bool isClosedForWrite() const { + return stream_->isClosedForWrite(); + } + + bool isClosed() const { + return stream_->isClosed(); + } + + void close(VoidResultHandlerFunc cb) { + check(); + stream_->close(std::move(cb)); + } + + void reset() { + check(); + stream_->reset(); + } + + void adjustWindowSize(uint32_t new_size, VoidResultHandlerFunc cb) { + stream_->adjustWindowSize(new_size, std::move(cb)); + } + + outcome::result isInitiator() const { + return stream_->isInitiator(); + } + + outcome::result remotePeerId() const { + return stream_->remotePeerId(); + } + + outcome::result localMultiaddr() const { + return stream_->localMultiaddr(); + } + + outcome::result remoteMultiaddr() const { + return stream_->remoteMultiaddr(); + } + + void read(gsl::span out, size_t bytes, ReadCallbackFunc cb) { + check(); + stream_->read(out, bytes, std::move(cb)); + } + + void readSome(gsl::span out, size_t bytes, ReadCallbackFunc cb) { + check(); + stream_->readSome(out, bytes, std::move(cb)); + } + + void deferReadCallback(outcome::result res, ReadCallbackFunc cb) { + stream_->deferReadCallback(std::move(res), std::move(cb)); + } + + void write(gsl::span in, + size_t bytes, + WriteCallbackFunc cb) { + check(); + stream_->write(in, bytes, std::move(cb)); + } + + void writeSome(gsl::span in, + size_t bytes, + WriteCallbackFunc cb) { + check(); + stream_->writeSome(in, bytes, std::move(cb)); + } + + void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) { + stream_->deferWriteCallback(ec, std::move(cb)); + } + }; + /** * Wrap stream from `setProtocolHandler`. * Makes reading from stream buffered. */ inline void streamReadBuffer(libp2p::StreamAndProtocol &result) { constexpr size_t kBuffer{1 << 16}; - result.stream = std::make_shared( - std::move(result.stream), kBuffer); + result.stream = std::make_shared( + std::make_shared( + std::move(result.stream), kBuffer)); } /** diff --git a/core/network/impl/protocols/grandpa_protocol.cpp b/core/network/impl/protocols/grandpa_protocol.cpp index 1c00e5863a..46e8cbab6e 100644 --- a/core/network/impl/protocols/grandpa_protocol.cpp +++ b/core/network/impl/protocols/grandpa_protocol.cpp @@ -131,7 +131,9 @@ namespace kagome::network { [&](network::GrandpaVote &&vote_message) { SL_VERBOSE( base_.logger(), "VoteMessage has received from {}", peer_id); - grandpa_observer_->onVoteMessage(std::nullopt, peer_id, vote_message); + auto info = peer_manager_->getPeerState(peer_id); + grandpa_observer_->onVoteMessage( + std::nullopt, peer_id, compactFromRefToOwn(info), vote_message); addKnown(peer_id, hash); }, [&](FullCommitMessage &&commit_message) { @@ -146,15 +148,18 @@ namespace kagome::network { SL_VERBOSE(base_.logger(), "NeighborMessage has received from {}", peer_id); + auto info = peer_manager_->getPeerState(peer_id); grandpa_observer_->onNeighborMessage(peer_id, + compactFromRefToOwn(info), std::move(neighbor_message)); } }, [&](network::CatchUpRequest &&catch_up_request) { SL_VERBOSE( base_.logger(), "CatchUpRequest has received from {}", peer_id); - grandpa_observer_->onCatchUpRequest(peer_id, - std::move(catch_up_request)); + auto info = peer_manager_->getPeerState(peer_id); + grandpa_observer_->onCatchUpRequest( + peer_id, compactFromRefToOwn(info), std::move(catch_up_request)); }, [&](network::CatchUpResponse &&catch_up_response) { SL_VERBOSE( diff --git a/core/network/notifications/connect_and_handshake.hpp b/core/network/notifications/connect_and_handshake.hpp index 6484cde25b..c50d9a47ad 100644 --- a/core/network/notifications/connect_and_handshake.hpp +++ b/core/network/notifications/connect_and_handshake.hpp @@ -63,6 +63,7 @@ namespace kagome::network::notifications { notifications::handshake( std::move(stream), std::move(frame_stream), handshake, std::move(cb)); }; + base.host().newStream(peer, base.protocolIds(), std::move(cb)); } } // namespace kagome::network::notifications diff --git a/core/network/peer_manager.hpp b/core/network/peer_manager.hpp index 529783bd8f..c3fc71dfbf 100644 --- a/core/network/peer_manager.hpp +++ b/core/network/peer_manager.hpp @@ -44,6 +44,12 @@ namespace kagome::network { using OurView = network::View; + struct PeerStateCompact { + std::optional round_number; + std::optional set_id; + BlockNumber last_finalized; + }; + struct PeerState { clock::SteadyClock::TimePoint time; Roles roles = 0; @@ -57,8 +63,24 @@ namespace kagome::network { LruSet known_grandpa_messages{ kPeerStateMaxKnownGrandpaMessages, }; + + PeerStateCompact compact() const { + return PeerStateCompact{ + .round_number = round_number, + .set_id = set_id, + .last_finalized = last_finalized, + }; + } }; + inline std::optional compactFromRefToOwn( + const std::optional> &opt_ref) { + if (opt_ref) { + return opt_ref->get().compact(); + } + return std::nullopt; + } + struct StreamEngine; /** diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index 14c5d2378a..56362594ed 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -1134,28 +1134,44 @@ namespace kagome::parachain { } } - for (auto it = self->pending_known_.begin(); - it != self->pending_known_.end();) { - if (!self->storedDistribBlockEntries().get(it->first)) { - ++it; - } else { - self->logger_->trace( - "Processing pending assignment/approvals.(count={})", - it->second.size()); - for (auto i = it->second.begin(); i != it->second.end(); ++i) { - visit_in_place( - i->second, - [&](const network::Assignment &assignment) { - self->import_and_circulate_assignment( - i->first, - assignment.indirect_assignment_cert, - assignment.candidate_ix); - }, - [&](const network::IndirectSignedApprovalVote &approval) { - self->import_and_circulate_approval(i->first, approval); - }); + DeferedSender + approval_defered_sender{[wself](auto &&msgs) { + if (auto self = wself.lock()) { + self->runDistributeApproval(std::move(msgs)); + } + }}; + { /// Assignments should be sent first + DeferedSender assignment_defered_sender{ + [wself](auto &&msgs) { + if (auto self = wself.lock()) { + self->runDistributeAssignment(std::move(msgs)); + } + }}; + for (auto it = self->pending_known_.begin(); + it != self->pending_known_.end();) { + if (!self->storedDistribBlockEntries().get(it->first)) { + ++it; + } else { + SL_TRACE(self->logger_, + "Processing pending assignment/approvals.(count={})", + it->second.size()); + for (auto i = it->second.begin(); i != it->second.end(); ++i) { + visit_in_place( + i->second, + [&](const network::Assignment &assignment) { + self->import_and_circulate_assignment( + i->first, + assignment_defered_sender, + assignment.indirect_assignment_cert, + assignment.candidate_ix); + }, + [&](const network::IndirectSignedApprovalVote &approval) { + self->import_and_circulate_approval( + i->first, approval_defered_sender, approval); + }); + } + it = self->pending_known_.erase(it); } - it = self->pending_known_.erase(it); } } } @@ -1578,6 +1594,7 @@ namespace kagome::parachain { void ApprovalDistribution::import_and_circulate_assignment( const MessageSource &source, + DeferedSender &defered_sender, const approval::IndirectAssignmentCert &assignment, CandidateIndex claimed_candidate_index) { BOOST_ASSERT(internal_context_->io_context() @@ -1760,13 +1777,17 @@ namespace kagome::parachain { } if (!peers.empty()) { - runDistributeAssignment( - assignment, claimed_candidate_index, std::move(peers)); + defered_sender.postponeSend(peers, + network::Assignment{ + .indirect_assignment_cert = assignment, + .candidate_ix = claimed_candidate_index, + }); } } void ApprovalDistribution::import_and_circulate_approval( const MessageSource &source, + DeferedSender &defered_sender, const network::IndirectSignedApprovalVote &vote) { BOOST_ASSERT(internal_context_->io_context() ->get_executor() @@ -1947,7 +1968,7 @@ namespace kagome::parachain { } if (!peers.empty()) { - runDistributeApproval(vote, std::move(peers)); + defered_sender.postponeSend(peers, vote); } } @@ -2041,6 +2062,12 @@ namespace kagome::parachain { "Received assignments.(peer_id={}, count={})", peer_id, assignments.assignments.size()); + DeferedSender assignment_defered_sender{ + [wself{weak_from_this()}](auto &&msgs) { + if (auto self = wself.lock()) { + self->runDistributeAssignment(std::move(msgs)); + } + }}; for (auto const &assignment : assignments.assignments) { if (auto it = pending_known_.find( assignment.indirect_assignment_cert.block_hash); @@ -2059,6 +2086,7 @@ namespace kagome::parachain { import_and_circulate_assignment( peer_id, + assignment_defered_sender, assignment.indirect_assignment_cert, assignment.candidate_ix); } @@ -2068,6 +2096,12 @@ namespace kagome::parachain { "Received approvals.(peer_id={}, count={})", peer_id, approvals.approvals.size()); + DeferedSender + approval_defered_sender{[wself{weak_from_this()}](auto &&msgs) { + if (auto self = wself.lock()) { + self->runDistributeApproval(std::move(msgs)); + } + }}; for (auto const &approval_vote : approvals.approvals) { if (auto it = pending_known_.find( approval_vote.payload.payload.block_hash); @@ -2084,7 +2118,8 @@ namespace kagome::parachain { continue; } - import_and_circulate_approval(peer_id, approval_vote); + import_and_circulate_approval( + peer_id, approval_defered_sender, approval_vote); } }, [&](const auto &) { UNREACHABLE; }); @@ -2092,34 +2127,16 @@ namespace kagome::parachain { } void ApprovalDistribution::runDistributeAssignment( - const approval::IndirectAssignmentCert &indirect_cert, - CandidateIndex candidate_index, - std::unordered_set &&peers) { - REINVOKE(this_context_, - runDistributeAssignment, - indirect_cert, - candidate_index, - std::move(peers)); + std::unordered_map> + &&messages) { + REINVOKE(this_context_, runDistributeAssignment, std::move(messages)); - SL_DEBUG(logger_, - "Distributing assignment on candidate (block hash={}, candidate " - "index={})", - indirect_cert.block_hash, - candidate_index); - - auto se = pm_->getStreamEngine(); - BOOST_ASSERT(se); - - se->broadcast( - router_->getValidationProtocol(), - std::make_shared< - network::WireMessage>( - network::ApprovalDistributionMessage{network::Assignments{ - .assignments = {network::Assignment{ - .indirect_assignment_cert = indirect_cert, - .candidate_ix = candidate_index, - }}}}), - [&](const libp2p::peer::PeerId &p) { return peers.count(p) != 0ull; }); + SL_TRACE(logger_, + "Distributing assignments to peers. (peers count={})", + messages.size()); + for (auto &&[peer, msg_pack] : messages) { + send_assignments_batched(std::move(msg_pack), peer); + } } void ApprovalDistribution::send_assignments_batched( @@ -2133,6 +2150,20 @@ namespace kagome::parachain { auto se = pm_->getStreamEngine(); BOOST_ASSERT(se); // kMaxAssignmentBatchSize + /** TODO(iceseer): optimize + std::shared_ptr> + pack = std::make_shared< + network::WireMessage>( + network::ApprovalDistributionMessage{network::Assignments{ + .assignments = {},}}); + auto &vp = if_type(*pack); + auto &adm = if_type(vp->get()); + auto &a = if_type(adm->get()); + a->get().assignments = std::move(msg_pack); + se->send(peer, router_->getValidationProtocol(), pack); + * + */ + while (!assignments.empty()) { auto begin = assignments.begin(); auto end = (assignments.size() > kMaxAssignmentBatchSize) @@ -2159,6 +2190,25 @@ namespace kagome::parachain { auto se = pm_->getStreamEngine(); BOOST_ASSERT(se); // kMaxApprovalBatchSize + /** TODO(iceseer): optimize + std::shared_ptr> + pack = std::make_shared< + network::WireMessage>( + network::ApprovalDistributionMessage{network::Approvals{ + .approvals = {}, + }}); + auto &vp = if_type(*pack); + auto &adm = if_type(vp->get()); + auto &a = if_type(adm->get()); + + loop { + a->get().approvals = std::move(msg_pack); + se->send(peer, router_->getValidationProtocol(), pack); + } + * + * + */ + while (!approvals.empty()) { auto begin = approvals.begin(); auto end = (approvals.size() > kMaxApprovalBatchSize) @@ -2178,27 +2228,17 @@ namespace kagome::parachain { } void ApprovalDistribution::runDistributeApproval( - const network::IndirectSignedApprovalVote &vote, - std::unordered_set &&peers) { - REINVOKE(this_context_, runDistributeApproval, vote, std::move(peers)); + std::unordered_map> + &&messages) { + REINVOKE(this_context_, runDistributeApproval, std::move(messages)); - logger_->info( - "Sending an approval to peers. (block={}, index={}, num peers={})", - vote.payload.payload.block_hash, - vote.payload.payload.candidate_index, - peers.size()); - - auto se = pm_->getStreamEngine(); - BOOST_ASSERT(se); - - se->broadcast( - router_->getValidationProtocol(), - std::make_shared< - network::WireMessage>( - network::ApprovalDistributionMessage{network::Approvals{ - .approvals = {vote}, - }}), - [&](const libp2p::peer::PeerId &p) { return peers.count(p) != 0ull; }); + SL_TRACE(logger_, + "Sending an approval messages to peers. (num peers={})", + messages.size()); + for (auto &&[peer, msg_pack] : messages) { + send_approvals_batched(std::move(msg_pack), peer); + } } void ApprovalDistribution::issue_approval(const CandidateHash &candidate_hash, @@ -2294,8 +2334,15 @@ namespace kagome::parachain { .validator_sig = *sig, }); + DeferedSender approval_defered_sender{ + [wself{weak_from_this()}](auto &&msgs) { + if (auto self = wself.lock()) { + self->runDistributeApproval(std::move(msgs)); + } + }}; import_and_circulate_approval( std::nullopt, + approval_defered_sender, network::IndirectSignedApprovalVote{ .payload = { @@ -2348,8 +2395,20 @@ namespace kagome::parachain { const auto &block_hash = indirect_cert.block_hash; const auto validator_index = indirect_cert.validator; - import_and_circulate_assignment( - std::nullopt, indirect_cert, candidate_index); + { + /// Defered send ~dctor() should be called before `launch_approval` + /// That's the reason for brakets + DeferedSender assignment_defered_sender{ + [wself{weak_from_this()}](auto &&msgs) { + if (auto self = wself.lock()) { + self->runDistributeAssignment(std::move(msgs)); + } + }}; + import_and_circulate_assignment(std::nullopt, + assignment_defered_sender, + indirect_cert, + candidate_index); + } std::optional approval_state = approvals_cache_.exclusiveAccess( diff --git a/core/parachain/approval/approval_distribution.hpp b/core/parachain/approval/approval_distribution.hpp index e83d652660..c4aede2c6b 100644 --- a/core/parachain/approval/approval_distribution.hpp +++ b/core/parachain/approval/approval_distribution.hpp @@ -356,6 +356,36 @@ namespace kagome::parachain { bool local; }; + template + struct DeferedSender final { + using ContainerT = + std::unordered_map>; + ContainerT messages; + std::function f; + + template + DeferedSender(F &&f_) : f{std::forward(f_)} {} + + DeferedSender(const DeferedSender &) = delete; + DeferedSender &operator=(const DeferedSender &) = delete; + + DeferedSender(DeferedSender &&) = default; + DeferedSender &operator=(DeferedSender &&) = default; + + void postponeSend(const std::unordered_set &peers, + const T &msg) { + for (const auto &peer : peers) { + messages[peer].emplace_back(msg); + } + } + + ~DeferedSender() { + if (f && !messages.empty()) { + f(std::move(messages)); + } + } + }; + /// Information about candidates in the context of a particular block they /// are included in. In other words, multiple `CandidateEntry`s may exist /// for the same candidate, if it is included by multiple blocks - this is @@ -509,10 +539,12 @@ namespace kagome::parachain { const network::IndirectSignedApprovalVote &vote); void import_and_circulate_assignment( const MessageSource &source, + DeferedSender &defered_sender, const approval::IndirectAssignmentCert &assignment, CandidateIndex claimed_candidate_index); void import_and_circulate_approval( const MessageSource &source, + DeferedSender &defered_sender, const network::IndirectSignedApprovalVote &vote); template @@ -626,9 +658,8 @@ namespace kagome::parachain { BlockImportedCandidates &&candidate); void runDistributeAssignment( - const approval::IndirectAssignmentCert &indirect_cert, - CandidateIndex candidate_index, - std::unordered_set &&peers); + std::unordered_map> &&messages); void send_assignments_batched(std::deque &&assignments, const libp2p::peer::PeerId &peer_id); @@ -638,8 +669,9 @@ namespace kagome::parachain { const libp2p::peer::PeerId &peer_id); void runDistributeApproval( - const network::IndirectSignedApprovalVote &vote, - std::unordered_set &&peers); + std::unordered_map> + &&messages); void runScheduleWakeup(const primitives::BlockHash &block_hash, primitives::BlockNumber block_number, diff --git a/test/core/consensus/babe/CMakeLists.txt b/test/core/consensus/babe/CMakeLists.txt index 2dd3ff7cc7..4208b872a8 100644 --- a/test/core/consensus/babe/CMakeLists.txt +++ b/test/core/consensus/babe/CMakeLists.txt @@ -8,6 +8,7 @@ addtest(babe_test babe_test.cpp ) target_link_libraries(babe_test + primitives consensus network clock diff --git a/test/mock/core/consensus/grandpa/grandpa_mock.hpp b/test/mock/core/consensus/grandpa/grandpa_mock.hpp index 373dcf4061..fa554e9be5 100644 --- a/test/mock/core/consensus/grandpa/grandpa_mock.hpp +++ b/test/mock/core/consensus/grandpa/grandpa_mock.hpp @@ -22,6 +22,7 @@ namespace kagome::consensus::grandpa { MOCK_METHOD(void, onNeighborMessage, (const libp2p::peer::PeerId &peer_id, + std::optional &&, network::GrandpaNeighborMessage &&msg), (override)); @@ -29,6 +30,7 @@ namespace kagome::consensus::grandpa { onVoteMessage, (std::optional> &&, const PeerId &peer_id, + std::optional &&, const VoteMessage &), (override)); @@ -57,7 +59,9 @@ namespace kagome::consensus::grandpa { MOCK_METHOD(void, onCatchUpRequest, - (const PeerId &peer_id, CatchUpRequest &&), + (const PeerId &peer_id, + std::optional &&, + CatchUpRequest &&), (override)); MOCK_METHOD(void,