From 195c623638146459e82ccebd2f3e622fb509059f Mon Sep 17 00:00:00 2001 From: Igor Egorov Date: Thu, 14 Jul 2022 11:52:29 +0300 Subject: [PATCH 01/24] Attempt to fix finalization lag by loading missing justifications Signed-off-by: Igor Egorov --- core/consensus/babe/block_executor.hpp | 4 + .../babe/impl/block_executor_impl.cpp | 11 +- .../babe/impl/block_executor_impl.hpp | 4 + core/consensus/grandpa/impl/grandpa_impl.cpp | 29 ++- core/network/impl/synchronizer_impl.cpp | 182 ++++++++++++++++-- core/network/impl/synchronizer_impl.hpp | 32 ++- core/network/synchronizer.hpp | 9 + .../consensus/babe/block_executor_mock.hpp | 6 + test/mock/core/network/synchronizer_mock.hpp | 14 ++ 9 files changed, 270 insertions(+), 21 deletions(-) diff --git a/core/consensus/babe/block_executor.hpp b/core/consensus/babe/block_executor.hpp index dd7b914d4c..ba22aef345 100644 --- a/core/consensus/babe/block_executor.hpp +++ b/core/consensus/babe/block_executor.hpp @@ -16,6 +16,10 @@ namespace kagome::consensus { virtual ~BlockExecutor() = default; virtual outcome::result applyBlock(primitives::BlockData &&block) = 0; + + virtual outcome::result applyJustification( + const primitives::BlockInfo &block_info, + const primitives::Justification &justification) = 0; }; } // namespace kagome::consensus diff --git a/core/consensus/babe/impl/block_executor_impl.cpp b/core/consensus/babe/impl/block_executor_impl.cpp index e02259f8c5..50e00e74bb 100644 --- a/core/consensus/babe/impl/block_executor_impl.cpp +++ b/core/consensus/babe/impl/block_executor_impl.cpp @@ -284,8 +284,7 @@ namespace kagome::consensus { if (not justifications_.empty()) { std::vector to_remove; for (const auto &[block_info, justification] : justifications_) { - auto res = grandpa_environment_->applyJustification(block_info, - justification); + auto res = applyJustification(block_info, justification); if (res) { to_remove.push_back(block_info); } @@ -297,7 +296,7 @@ namespace kagome::consensus { } } - auto res = grandpa_environment_->applyJustification( + auto res = applyJustification( primitives::BlockInfo(block.header.number, block_hash), b.justification.value()); if (res.has_error()) { @@ -358,6 +357,12 @@ namespace kagome::consensus { return outcome::success(); } + outcome::result BlockExecutorImpl::applyJustification( + const primitives::BlockInfo &block_info, + const primitives::Justification &justification) { + return grandpa_environment_->applyJustification(block_info, justification); + } + void BlockExecutorImpl::rollbackBlock(const primitives::BlockInfo &block) { // Remove possible authority changes scheduled on block authority_update_observer_->cancel(block); diff --git a/core/consensus/babe/impl/block_executor_impl.hpp b/core/consensus/babe/impl/block_executor_impl.hpp index 99b9118248..66a199d2af 100644 --- a/core/consensus/babe/impl/block_executor_impl.hpp +++ b/core/consensus/babe/impl/block_executor_impl.hpp @@ -52,6 +52,10 @@ namespace kagome::consensus { outcome::result applyBlock(primitives::BlockData &&block) override; + outcome::result applyJustification( + const primitives::BlockInfo &block_info, + const primitives::Justification &justification) override; + private: void rollbackBlock(const primitives::BlockInfo &block); diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index ce521639fa..934f169ce9 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -323,10 +323,37 @@ namespace kagome::consensus::grandpa { peer_id); // Ignore peer whose voter_set is different - if (msg.voter_set_id != current_round_->voterSetId()) { + if (msg.voter_set_id < current_round_->voterSetId()) { return; } + if (msg.voter_set_id > current_round_->voterSetId()) { + auto last_finalized = block_tree_->getLastFinalized(); + synchronizer_->syncMissingJustifications( + peer_id, + last_finalized, + std::nullopt, + [wp = weak_from_this(), last_finalized, msg](auto res) { + auto self = wp.lock(); + if (not self) { + return; + } + if (res.has_error()) { + SL_WARN(self->logger_, + "Missing justifications between blocks {} and " + "{} was not loaded: {}", + last_finalized, + msg.last_finalized, + res.error().message()); + } else { + SL_DEBUG(self->logger_, + "Loaded justifications for blocks in range {} - {}", + last_finalized, + res.value()); + } + }); + } + // Check if needed to catch-up peer, then do that if (msg.round_number >= current_round_->roundNumber() + kCatchUpThreshold) { std::ignore = environment_->onCatchUpRequested( diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index f9dd167ae9..64c113c21a 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -297,6 +297,15 @@ namespace kagome::network { false); } + void SynchronizerImpl::syncMissingJustifications( + const PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + Synchronizer::SyncResultHandler &&handler) { + loadJustifications( + peer_id, std::move(target_block), std::move(limit), std::move(handler)); + } + void SynchronizerImpl::findCommonBlock( const libp2p::peer::PeerId &peer_id, primitives::BlockNumber lower, @@ -331,14 +340,7 @@ namespace kagome::network { return; } - scheduler_->schedule( - [wp = weak_from_this(), peer_id, request_fingerprint] { - if (auto self = wp.lock()) { - self->recent_requests_.erase( - std::tuple(peer_id, request_fingerprint)); - } - }, - kRecentnessDuration); + scheduleRecentRequestRemoval(peer_id, request_fingerprint); auto response_handler = [wp = weak_from_this(), lower, @@ -511,14 +513,7 @@ namespace kagome::network { return; } - scheduler_->schedule( - [wp = weak_from_this(), peer_id, request_fingerprint] { - if (auto self = wp.lock()) { - self->recent_requests_.erase( - std::tuple(peer_id, request_fingerprint)); - } - }, - kRecentnessDuration); + scheduleRecentRequestRemoval(peer_id, request_fingerprint); auto response_handler = [wp = weak_from_this(), from, @@ -712,6 +707,114 @@ namespace kagome::network { protocol->request(peer_id, std::move(request), std::move(response_handler)); } + void SynchronizerImpl::loadJustifications(const libp2p::peer::PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler) { + if (node_is_shutting_down_) { + if (handler) handler(Error::SHUTTING_DOWN); + return; + } + + BlocksRequest request{ + BlockAttribute::HEADER | BlockAttribute::JUSTIFICATION, + target_block.hash, + std::nullopt, + Direction::ASCENDING, + limit}; + + auto request_fingerprint = request.fingerprint(); + if (not recent_requests_.emplace(peer_id, request_fingerprint).second) { + SL_ERROR( + log_, + "Can't load justification from {} for block {}: {}", + peer_id, + target_block, + outcome::result(Error::DUPLICATE_REQUEST).error().message()); + if (handler) { + handler(Error::DUPLICATE_REQUEST); + } + return; + } + + scheduleRecentRequestRemoval(peer_id, request_fingerprint); + + auto response_handler = [wp = weak_from_this(), + peer_id, + target_block, + handler = std::move(handler)]( + auto &&response_res) mutable { + auto self = wp.lock(); + if (not self) { + return; + } + + if (response_res.has_error()) { + SL_ERROR(self->log_, + "Can't load justification from {} for block {}: {}", + peer_id, + target_block, + response_res.error().message()); + if (handler) { + handler(response_res.as_failure()); + } + return; + } + + auto &blocks = response_res.value().blocks; + + if (blocks.empty()) { + SL_ERROR(self->log_, + "Can't load block justification from {} for block {}: " + "Response does not have any contents", + peer_id, + target_block); + if (handler) handler(Error::EMPTY_RESPONSE); + return; + } + + bool justification_received = false; + BlockInfo last_justified_block; + for (auto &block : blocks) { + if (not block.header) { + SL_ERROR(self->log_, + "No header was provided from {} for block {} while " + "requesting justifications", + peer_id, + target_block); + if (handler) handler(Error::RESPONSE_WITHOUT_BLOCK_HEADER); + return; + } + if (block.justification) { + justification_received = true; + last_justified_block = + primitives::BlockInfo{block.header->number, block.hash}; + { + std::lock_guard lock(self->justifications_mutex_); + self->justifications_.emplace(last_justified_block, + *block.justification); + } + } + } + + if (justification_received) { + SL_TRACE(self->log_, "Enqueued new justifications: schedule applying"); + self->scheduler_->schedule([wp] { + if (auto self = wp.lock()) { + self->applyNextJustification(); + } + }); + } + if (handler) { + handler(last_justified_block); + } + }; + + auto protocol = router_->getSyncProtocol(); + BOOST_ASSERT_MSG(protocol, "Router did not provide sync protocol"); + protocol->request(peer_id, std::move(request), std::move(response_handler)); + } + void SynchronizerImpl::applyNextBlock() { if (generations_.empty()) { SL_TRACE(log_, "No block for applying"); @@ -818,6 +921,41 @@ namespace kagome::network { }); } + void SynchronizerImpl::applyNextJustification() { + // Operate over the same lock as for the whole blocks application + bool false_val = false; + if (not applying_in_progress_.compare_exchange_strong(false_val, true)) { + SL_TRACE(log_, "Applying justification in progress"); + return; + } + SL_TRACE(log_, "Begin justification applying"); + auto cleanup = gsl::finally([this] { + SL_TRACE(log_, "End justification applying"); + applying_in_progress_ = false; + }); + + std::queue justifications; + { + std::lock_guard lock(justifications_mutex_); + justifications.swap(justifications_); + } + + while (not justifications.empty()) { + auto [block_info, justification] = std::move(justifications.front()); + const auto &block = block_info; // SL_WARN compilation WA + justifications.pop(); + auto res = block_executor_->applyJustification(block_info, justification); + if (res.has_error()) { + SL_WARN(log_, + "Justification for block {} was not applied: {}", + block, + res.error().message()); + } else { + SL_TRACE(log_, "Applied justification for block {}", block); + } + } + } + size_t SynchronizerImpl::discardBlock( const primitives::BlockHash &hash_of_discarding_block) { std::queue queue; @@ -879,6 +1017,18 @@ namespace kagome::network { metric_import_queue_length_->set(known_blocks_.size()); } + void SynchronizerImpl::scheduleRecentRequestRemoval( + const libp2p::peer::PeerId &peer_id, + const BlocksRequest::Fingerprint &fingerprint) { + scheduler_->schedule( + [wp = weak_from_this(), peer_id, fingerprint] { + if (auto self = wp.lock()) { + self->recent_requests_.erase(std::tuple(peer_id, fingerprint)); + } + }, + kRecentnessDuration); + } + void SynchronizerImpl::askNextPortionOfBlocks() { bool false_val = false; if (not asking_blocks_portion_in_progress_.compare_exchange_strong( diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index 2b377870fb..5373b5b85c 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -8,6 +8,7 @@ #include "network/synchronizer.hpp" +#include #include #include @@ -49,7 +50,7 @@ namespace kagome::network { ALREADY_IN_QUEUE, PEER_BUSY, ARRIVED_TOO_EARLY, - DUPLICATE_REQUEST + DUPLICATE_REQUEST, }; SynchronizerImpl( @@ -78,6 +79,12 @@ namespace kagome::network { const libp2p::peer::PeerId &peer_id, SyncResultHandler &&handler) override; + // See loadJustifications + void syncMissingJustifications(const PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler) override; + /// Finds best common block with peer {@param peer_id} in provided interval. /// It is using tail-recursive algorithm, till {@param hint} is /// the needed block @@ -100,6 +107,14 @@ namespace kagome::network { primitives::BlockInfo from, SyncResultHandler &&handler); + /// Loads block justification from {@param peer_id} for {@param + /// target_block} or a range of blocks up to {@param upper_bound_block}. + /// Calls {@param handler} when operation finishes + void loadJustifications(const libp2p::peer::PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler); + private: /// Subscribes handler for block with provided {@param block_info} /// {@param handler} will be called When block is received or discarded @@ -117,6 +132,9 @@ namespace kagome::network { /// Pops next block from queue and tries to apply that void applyNextBlock(); + /// Pops next justification from queue and tries to apply it + void applyNextJustification(); + /// Removes block {@param block} and all all dependent on it from the queue /// @returns number of affected blocks size_t discardBlock(const primitives::BlockHash &block); @@ -125,6 +143,12 @@ namespace kagome::network { /// side-branch for provided finalized block {@param finalized_block} void prune(const primitives::BlockInfo &finalized_block); + /// Purges internal cache of recent requests for specified {@param peer_id} + /// and {@param fingerprint} after kRecentnessDuration timeout + void scheduleRecentRequestRemoval( + const libp2p::peer::PeerId &peer_id, + const BlocksRequest::Fingerprint &fingerprint); + std::shared_ptr block_tree_; std::shared_ptr block_executor_; std::shared_ptr router_; @@ -157,6 +181,12 @@ namespace kagome::network { std::unordered_multimap ancestry_; + // Loaded justifications to apply + using JustificationPair = + std::pair; + std::queue justifications_; + std::mutex justifications_mutex_; + // BlockNumber of blocks (aka height) that is potentially best now primitives::BlockNumber watched_blocks_number_{}; diff --git a/core/network/synchronizer.hpp b/core/network/synchronizer.hpp index ff90ff4408..b8f9871539 100644 --- a/core/network/synchronizer.hpp +++ b/core/network/synchronizer.hpp @@ -40,6 +40,15 @@ namespace kagome::network { virtual bool syncByBlockHeader(const primitives::BlockHeader &header, const libp2p::peer::PeerId &peer_id, SyncResultHandler &&handler) = 0; + + /// Tries to request block justifications from {@param peer_id} for {@param + /// target_block} or a range of blocks up to {@param limit} count. + /// Calls {@param handler} when operation finishes + virtual void syncMissingJustifications( + const libp2p::peer::PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler) = 0; }; } // namespace kagome::network diff --git a/test/mock/core/consensus/babe/block_executor_mock.hpp b/test/mock/core/consensus/babe/block_executor_mock.hpp index 0f600a24a3..2ccbde1883 100644 --- a/test/mock/core/consensus/babe/block_executor_mock.hpp +++ b/test/mock/core/consensus/babe/block_executor_mock.hpp @@ -21,6 +21,12 @@ namespace kagome::consensus { outcome::result applyBlock(primitives::BlockData &&block) override { return applyBlock(block); } + + MOCK_METHOD(outcome::result, + applyJustification, + (const primitives::BlockInfo &block_info, + const primitives::Justification &justification), + (override)); }; } // namespace kagome::consensus diff --git a/test/mock/core/network/synchronizer_mock.hpp b/test/mock/core/network/synchronizer_mock.hpp index 8028256d3a..f91ece9278 100644 --- a/test/mock/core/network/synchronizer_mock.hpp +++ b/test/mock/core/network/synchronizer_mock.hpp @@ -39,6 +39,20 @@ namespace kagome::network { SyncResultHandler &&handler) override { return syncByBlockHeader(block_header, peer_id, handler); }; + + MOCK_METHOD(void, + syncMissingJustifications, + (const libp2p::peer::PeerId &, + primitives::BlockInfo, + std::optional, + const SyncResultHandler &), + ()); + void syncMissingJustifications(const libp2p::peer::PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler) override { + return syncMissingJustifications(peer_id, target_block, limit, handler); + } }; } // namespace kagome::network From 04f68d8be0b4db436416ff2fbddc01b7ead79bdb Mon Sep 17 00:00:00 2001 From: Igor Egorov Date: Thu, 14 Jul 2022 15:04:04 +0300 Subject: [PATCH 02/24] The second iteration Signed-off-by: Igor Egorov --- core/consensus/grandpa/impl/grandpa_impl.cpp | 25 ++++++++++++----- core/network/impl/synchronizer_impl.cpp | 29 ++++++++++++++++++++ 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index 934f169ce9..b821be5bd9 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -326,8 +326,16 @@ namespace kagome::consensus::grandpa { if (msg.voter_set_id < current_round_->voterSetId()) { return; } + const auto &new_set_id = msg.voter_set_id; + const auto &set_id = current_round_->voterSetId(); + const auto &new_round = msg.round_number; + const auto &round = current_round_->roundNumber(); - if (msg.voter_set_id > current_round_->voterSetId()) { + bool greater_voter_set = new_set_id > set_id; + bool same_voter_set = new_set_id == set_id; + bool greater_round = new_round > round; + + if (greater_voter_set or (same_voter_set and greater_round)) { auto last_finalized = block_tree_->getLastFinalized(); synchronizer_->syncMissingJustifications( peer_id, @@ -354,12 +362,15 @@ namespace kagome::consensus::grandpa { }); } - // Check if needed to catch-up peer, then do that - if (msg.round_number >= current_round_->roundNumber() + kCatchUpThreshold) { - std::ignore = environment_->onCatchUpRequested( - peer_id, msg.voter_set_id, msg.round_number - 1); - return; - } + // Trying to substitute with justifications' request only + // + // // Check if needed to catch-up peer, then do that + // if (msg.round_number >= current_round_->roundNumber() + + // kCatchUpThreshold) { + // std::ignore = environment_->onCatchUpRequested( + // peer_id, msg.voter_set_id, msg.round_number - 1); + // return; + // } // Iff peer just reached one of recent round, then share known votes auto info = peer_manager_->getPeerState(peer_id); diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 64c113c21a..fe0cf78591 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -302,6 +302,27 @@ namespace kagome::network { primitives::BlockInfo target_block, std::optional limit, Synchronizer::SyncResultHandler &&handler) { + if (busy_peers_.find(peer_id) != busy_peers_.end()) { + SL_DEBUG( + log_, + "Justifications load since block {} was rescheduled, peer {} is busy", + target_block, + peer_id); + scheduler_->schedule([wp = weak_from_this(), + peer_id, + block = std::move(target_block), + limit = std::move(limit), + handler = std::move(handler)]() mutable { + auto self = wp.lock(); + if (not self) { + return; + } + self->syncMissingJustifications( + peer_id, std::move(block), std::move(limit), std::move(handler)); + }); + return; + } + loadJustifications( peer_id, std::move(target_block), std::move(limit), std::move(handler)); } @@ -716,6 +737,14 @@ namespace kagome::network { return; } + busy_peers_.insert(peer_id); + auto cleanup = gsl::finally([this, peer_id] { + auto peer = busy_peers_.find(peer_id); + if (peer != busy_peers_.end()) { + busy_peers_.erase(peer); + } + }); + BlocksRequest request{ BlockAttribute::HEADER | BlockAttribute::JUSTIFICATION, target_block.hash, From 88dc3a631185917342f6e2b4888252abfc866898 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 19 Jul 2022 12:21:37 +0300 Subject: [PATCH 03/24] fix: log of next epoch digest Signed-off-by: Dmitriy Khaustov aka xDimon --- core/consensus/babe/impl/block_executor_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/consensus/babe/impl/block_executor_impl.cpp b/core/consensus/babe/impl/block_executor_impl.cpp index 50e00e74bb..9df9149091 100644 --- a/core/consensus/babe/impl/block_executor_impl.cpp +++ b/core/consensus/babe/impl/block_executor_impl.cpp @@ -209,8 +209,8 @@ namespace kagome::consensus { SL_VERBOSE(logger_, "Next epoch digest has gotten in block {} (slot {}). " "Randomness: {}", - slot_number, primitives::BlockInfo(block.header.number, block_hash), + slot_number, next_epoch_digest.randomness); } From 4459b32aeae4250b563dbd0cbe91e5c5fd566c87 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 19 Jul 2022 12:27:52 +0300 Subject: [PATCH 04/24] fix: send neighbor message in case of passive round Signed-off-by: Dmitriy Khaustov aka xDimon --- core/consensus/grandpa/impl/grandpa_impl.cpp | 5 +++++ core/consensus/grandpa/impl/voting_round_impl.hpp | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index b821be5bd9..b5546c6fe3 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -298,6 +298,11 @@ namespace kagome::consensus::grandpa { metric_highest_round_->set(current_round_->roundNumber()); if (keypair_) { current_round_->play(); + } else { + auto round = std::dynamic_pointer_cast(current_round_); + if (round) { + round->sendNeighborMessage(); + } } } diff --git a/core/consensus/grandpa/impl/voting_round_impl.hpp b/core/consensus/grandpa/impl/voting_round_impl.hpp index 6c573eda94..eb7689ac2c 100644 --- a/core/consensus/grandpa/impl/voting_round_impl.hpp +++ b/core/consensus/grandpa/impl/voting_round_impl.hpp @@ -239,6 +239,8 @@ namespace kagome::consensus::grandpa { */ MovableRoundState state() const override; + void sendNeighborMessage(); + private: /// Check if peer \param id is primary bool isPrimary(const Id &id) const; @@ -290,7 +292,6 @@ namespace kagome::consensus::grandpa { outcome::result validatePrecommitJustification( const BlockInfo &vote, const GrandpaJustification &justification) const; - void sendNeighborMessage(); void sendProposal(const PrimaryPropose &primary_proposal); void sendPrevote(const Prevote &prevote); void sendPrecommit(const Precommit &precommit); From 681c7c57805259db0e57ee07fa96a5c4309ecf46 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 19 Jul 2022 12:29:13 +0300 Subject: [PATCH 05/24] fix: logging of ping protocol handling Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/router_libp2p.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/network/impl/router_libp2p.cpp b/core/network/impl/router_libp2p.cpp index 6554e12224..b40e69c078 100644 --- a/core/network/impl/router_libp2p.cpp +++ b/core/network/impl/router_libp2p.cpp @@ -46,7 +46,8 @@ namespace kagome::network { [wp = weak_from_this()](auto &&stream) { if (auto self = wp.lock()) { if (auto peer_id = stream->remotePeerId()) { - self->log_->info( + SL_TRACE( + self->log_, "Handled {} protocol stream from: {}", self->ping_protocol_->getProtocolId(), peer_id.value().toBase58()); From 0bf1be362ebae9793c43f19ca85986b4919caa7b Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 19 Jul 2022 12:34:22 +0300 Subject: [PATCH 06/24] fix: return check if peer is alive Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/peer_manager_impl.cpp | 19 +++++++++++++++++++ core/network/impl/stream_engine.hpp | 18 ++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index 6ac6c72ccc..a7872b6a53 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -215,6 +215,25 @@ namespace kagome::network { disconnectFromPeer(peer_id); } + // Check if disconnected + auto block_announce_protocol = router_->getBlockAnnounceProtocol(); + BOOST_ASSERT_MSG(block_announce_protocol, + "Router did not provide block announce protocol"); + + for (auto it = active_peers_.begin(); it != active_peers_.end();) { + auto [peer_id, data] = *it++; + // TODO(d.khaustov) consider better alive check logic + if (not stream_engine_->isAlive(peer_id, block_announce_protocol)) { + // Found disconnected + const auto &peer_id_ref = peer_id; + SL_DEBUG(log_, "Found dead peer_id={}", peer_id_ref.toBase58()); + disconnectFromPeer(peer_id); + // if (not disconnected_peer.has_value()) { + // disconnected_peer = peer_id; + // } + } + } + // Soft limit is exceeded if (active_peers_.size() > soft_limit) { // Get oldest peer diff --git a/core/network/impl/stream_engine.hpp b/core/network/impl/stream_engine.hpp index f5e5df9ac0..54c523881a 100644 --- a/core/network/impl/stream_engine.hpp +++ b/core/network/impl/stream_engine.hpp @@ -248,6 +248,24 @@ namespace kagome::network { }); } + bool isAlive(const PeerId &peer_id, + const std::shared_ptr &protocol) { + BOOST_ASSERT(protocol); + return streams_.exclusiveAccess([&](auto &streams) { + bool is_alive = false; + forSubscriber(peer_id, streams, protocol, [&](auto, auto &descr) { + if (descr.incoming.stream and not descr.incoming.stream->isClosed()) { + is_alive = true; + } + if (descr.outgoing.stream and not descr.outgoing.stream->isClosed()) { + is_alive = true; + } + }); + + return is_alive; + }); + }; + template void send(const PeerId &peer_id, const std::shared_ptr &protocol, From 6a4677b19a05424874796756c9850a981abf272f Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 19 Jul 2022 12:37:31 +0300 Subject: [PATCH 07/24] feature: remove disconnected peer from stream engine Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/stream_engine.hpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/network/impl/stream_engine.hpp b/core/network/impl/stream_engine.hpp index 54c523881a..94f008bacb 100644 --- a/core/network/impl/stream_engine.hpp +++ b/core/network/impl/stream_engine.hpp @@ -480,11 +480,20 @@ namespace kagome::network { if (!stream_res) { self->logger_->error( - "Could not send message to {} stream with {}: {}", + "Could not send message to new {} stream with {}: {}", protocol->protocol(), peer_id, stream_res.error().message()); + if (stream_res + == outcome::failure( + std::make_error_code(std::errc::not_connected))) { + self->logger_->error("GOTCHA!"); // FIXME + + self->del(peer_id); + return; + } + self->streams_.exclusiveAccess([&](auto &streams) { self->forSubscriber( peer_id, streams, protocol, [&](auto, auto &descr) { From bf2f158b7f036d5931d84439babe743945ce14b3 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 19 Jul 2022 12:38:30 +0300 Subject: [PATCH 08/24] feature: forget fingerprint of sync-request in case of empty response Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/synchronizer_impl.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index fe0cf78591..3fe0ad7b5b 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -369,8 +369,8 @@ namespace kagome::network { target = hint, peer_id, handler = std::move(handler), - observed = std::move(observed)]( - auto &&response_res) mutable { + observed = std::move(observed), + request_fingerprint](auto &&response_res) mutable { auto self = wp.lock(); if (not self) { return; @@ -402,6 +402,7 @@ namespace kagome::network { upper - 1, peer_id); handler(Error::EMPTY_RESPONSE); + self->recent_requests_.erase(std::tuple(peer_id, request_fingerprint)); return; } From c18c76038221e796ff5fda4c2ac1351e80734b40 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 19 Jul 2022 12:39:56 +0300 Subject: [PATCH 09/24] refactor: conditions of sync-request and normal catch-up of round Signed-off-by: Dmitriy Khaustov aka xDimon --- core/consensus/grandpa/impl/grandpa_impl.cpp | 93 +++++++++----------- 1 file changed, 43 insertions(+), 50 deletions(-) diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index b5546c6fe3..b12c8dc5b5 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -327,56 +327,6 @@ namespace kagome::consensus::grandpa { msg.last_finalized, peer_id); - // Ignore peer whose voter_set is different - if (msg.voter_set_id < current_round_->voterSetId()) { - return; - } - const auto &new_set_id = msg.voter_set_id; - const auto &set_id = current_round_->voterSetId(); - const auto &new_round = msg.round_number; - const auto &round = current_round_->roundNumber(); - - bool greater_voter_set = new_set_id > set_id; - bool same_voter_set = new_set_id == set_id; - bool greater_round = new_round > round; - - if (greater_voter_set or (same_voter_set and greater_round)) { - auto last_finalized = block_tree_->getLastFinalized(); - synchronizer_->syncMissingJustifications( - peer_id, - last_finalized, - std::nullopt, - [wp = weak_from_this(), last_finalized, msg](auto res) { - auto self = wp.lock(); - if (not self) { - return; - } - if (res.has_error()) { - SL_WARN(self->logger_, - "Missing justifications between blocks {} and " - "{} was not loaded: {}", - last_finalized, - msg.last_finalized, - res.error().message()); - } else { - SL_DEBUG(self->logger_, - "Loaded justifications for blocks in range {} - {}", - last_finalized, - res.value()); - } - }); - } - - // Trying to substitute with justifications' request only - // - // // Check if needed to catch-up peer, then do that - // if (msg.round_number >= current_round_->roundNumber() + - // kCatchUpThreshold) { - // std::ignore = environment_->onCatchUpRequested( - // peer_id, msg.voter_set_id, msg.round_number - 1); - // return; - // } - // Iff peer just reached one of recent round, then share known votes auto info = peer_manager_->getPeerState(peer_id); if (not info.has_value() || msg.voter_set_id != info->set_id @@ -387,6 +337,49 @@ namespace kagome::consensus::grandpa { environment_->sendState(peer_id, round->state(), msg.voter_set_id); } } + + // If peer has the same voter set id + if (msg.voter_set_id == current_round_->voterSetId()) { + // Check if needed to catch-up peer, then do that + if (msg.round_number + >= current_round_->roundNumber() + kCatchUpThreshold) { + std::ignore = environment_->onCatchUpRequested( + peer_id, msg.voter_set_id, msg.round_number - 1); + } + + return; + } + + // Ignore peer whose voter set id lower than our current + if (msg.voter_set_id < current_round_->voterSetId()) { + return; + } + + // Trying to substitute with justifications' request only + auto last_finalized = block_tree_->getLastFinalized(); + synchronizer_->syncMissingJustifications( + peer_id, + last_finalized, + std::nullopt, + [wp = weak_from_this(), last_finalized, msg](auto res) { + auto self = wp.lock(); + if (not self) { + return; + } + if (res.has_error()) { + SL_WARN(self->logger_, + "Missing justifications between blocks {} and " + "{} was not loaded: {}", + last_finalized, + msg.last_finalized, + res.error().message()); + } else { + SL_DEBUG(self->logger_, + "Loaded justifications for blocks in range {} - {}", + last_finalized, + res.value()); + } + }); } void GrandpaImpl::onCatchUpRequest(const libp2p::peer::PeerId &peer_id, From 1ea42d205278055666ab3686aba1be4ff4ccbd7f Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 19 Jul 2022 18:25:58 +0300 Subject: [PATCH 10/24] feature: catch-up based sync-request in according to applyed blocks Signed-off-by: Dmitriy Khaustov aka xDimon --- core/consensus/grandpa/impl/grandpa_impl.cpp | 54 +++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index b12c8dc5b5..822cd5eed8 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -355,31 +355,35 @@ namespace kagome::consensus::grandpa { return; } - // Trying to substitute with justifications' request only - auto last_finalized = block_tree_->getLastFinalized(); - synchronizer_->syncMissingJustifications( - peer_id, - last_finalized, - std::nullopt, - [wp = weak_from_this(), last_finalized, msg](auto res) { - auto self = wp.lock(); - if (not self) { - return; - } - if (res.has_error()) { - SL_WARN(self->logger_, - "Missing justifications between blocks {} and " - "{} was not loaded: {}", - last_finalized, - msg.last_finalized, - res.error().message()); - } else { - SL_DEBUG(self->logger_, - "Loaded justifications for blocks in range {} - {}", - last_finalized, - res.value()); - } - }); + if (info.has_value()) { + if (info->last_finalized <= block_tree_->deepestLeaf().number) { + // Trying to substitute with justifications' request only + auto last_finalized = block_tree_->getLastFinalized(); + synchronizer_->syncMissingJustifications( + peer_id, + last_finalized, + std::nullopt, + [wp = weak_from_this(), last_finalized, msg](auto res) { + auto self = wp.lock(); + if (not self) { + return; + } + if (res.has_error()) { + SL_WARN(self->logger_, + "Missing justifications between blocks {} and " + "{} was not loaded: {}", + last_finalized, + msg.last_finalized, + res.error().message()); + } else { + SL_DEBUG(self->logger_, + "Loaded justifications for blocks in range {} - {}", + last_finalized, + res.value()); + } + }); + } + } } void GrandpaImpl::onCatchUpRequest(const libp2p::peer::PeerId &peer_id, From d736237ab1112b18f4b8ba0534e9b41740fb551e Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Thu, 21 Jul 2022 18:13:58 +0300 Subject: [PATCH 11/24] fix: Mixing full and authority roles Signed-off-by: Dmitriy Khaustov aka xDimon --- core/application/impl/app_configuration_impl.cpp | 4 +++- core/network/impl/protocols/block_announce_protocol.cpp | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/application/impl/app_configuration_impl.cpp b/core/application/impl/app_configuration_impl.cpp index 839d94a8ec..c8b6892a49 100644 --- a/core/application/impl/app_configuration_impl.cpp +++ b/core/application/impl/app_configuration_impl.cpp @@ -295,6 +295,7 @@ namespace kagome::application { bool validator_mode = false; load_bool(val, "validator", validator_mode); if (validator_mode) { + roles_.flags.full = 0; roles_.flags.authority = 1; } @@ -800,7 +801,7 @@ namespace kagome::application { } } - roles_.flags.full = 1; + roles_.flags.full = 0; roles_.flags.authority = 1; p2p_port_ = def_p2p_port; rpc_http_host_ = def_rpc_http_host; @@ -822,6 +823,7 @@ namespace kagome::application { }); if (vm.end() != vm.find("validator")) { + roles_.flags.full = 0; roles_.flags.authority = 1; } diff --git a/core/network/impl/protocols/block_announce_protocol.cpp b/core/network/impl/protocols/block_announce_protocol.cpp index 8bc88f9e2a..80ec6bd05e 100644 --- a/core/network/impl/protocols/block_announce_protocol.cpp +++ b/core/network/impl/protocols/block_announce_protocol.cpp @@ -275,7 +275,9 @@ namespace kagome::network { return; } - read_writer->write(status_res.value(), + const auto &status = status_res.value(); + + read_writer->write(status, [stream = std::move(stream), direction, wp = weak_from_this(), From 415c7ae484a1c6618b6626797f7d9bef5d50e914 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Thu, 21 Jul 2022 19:03:40 +0300 Subject: [PATCH 12/24] refactor: use short peer_id in log Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/peer_manager_impl.cpp | 88 +++++++++---------- .../protocols/block_announce_protocol.cpp | 10 +-- .../impl/protocols/grandpa_protocol.cpp | 6 +- .../propagate_transactions_protocol.cpp | 32 +++---- core/network/impl/stream_engine.hpp | 16 ++-- 5 files changed, 73 insertions(+), 79 deletions(-) diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index a7872b6a53..38cfe9acfb 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -100,9 +100,9 @@ namespace kagome::network { self->peer_rating_repository_->rating(peer_id); rating < 0) { SL_DEBUG(self->log_, - "Disconnecting from peer_id={} due to its negative " + "Disconnecting from peer {} due to its negative " "rating {}", - peer_id.toBase58(), + peer_id, rating); self->disconnectFromPeer(peer_id); return; @@ -111,25 +111,22 @@ namespace kagome::network { } }); - identify_->onIdentifyReceived( - [wp = weak_from_this()](const PeerId &peer_id) { - if (auto self = wp.lock()) { - SL_DEBUG(self->log_, - "Identify received from peer_id={}", - peer_id.toBase58()); - if (auto rating = self->peer_rating_repository_->rating(peer_id); - rating < 0) { - SL_DEBUG( - self->log_, - "Disconnecting from peer_id={} due to its negative rating {}", - peer_id.toBase58(), - rating); - self->disconnectFromPeer(peer_id); - return; - } - self->processFullyConnectedPeer(peer_id); - } - }); + identify_->onIdentifyReceived([wp = weak_from_this()]( + const PeerId &peer_id) { + if (auto self = wp.lock()) { + SL_DEBUG(self->log_, "Identify received from peer {}", peer_id); + if (auto rating = self->peer_rating_repository_->rating(peer_id); + rating < 0) { + SL_DEBUG(self->log_, + "Disconnecting from peer {} due to its negative rating {}", + peer_id, + rating); + self->disconnectFromPeer(peer_id); + return; + } + self->processFullyConnectedPeer(peer_id); + } + }); // Start Identify protocol identify_->start(); @@ -210,8 +207,8 @@ namespace kagome::network { } for (const auto &peer_id : peers_to_disconnect) { SL_DEBUG(log_, - "Disconnecting from peer_id={} due to its negative rating", - peer_id.toBase58()); + "Disconnecting from peer {} due to its negative rating", + peer_id); disconnectFromPeer(peer_id); } @@ -226,11 +223,8 @@ namespace kagome::network { if (not stream_engine_->isAlive(peer_id, block_announce_protocol)) { // Found disconnected const auto &peer_id_ref = peer_id; - SL_DEBUG(log_, "Found dead peer_id={}", peer_id_ref.toBase58()); + SL_DEBUG(log_, "Found dead peer: {}", peer_id_ref); disconnectFromPeer(peer_id); - // if (not disconnected_peer.has_value()) { - // disconnected_peer = peer_id; - // } } } @@ -253,8 +247,7 @@ namespace kagome::network { } else if (oldest_descr.time_point + peer_ttl < clock_->now()) { // Peer is inactive long time auto &oldest_peer_id_ref = oldest_peer_id; - SL_DEBUG( - log_, "Found inactive peer_id={}", oldest_peer_id_ref.toBase58()); + SL_DEBUG(log_, "Found inactive peer: {}", oldest_peer_id_ref); disconnectFromPeer(oldest_peer_id); } else { @@ -318,19 +311,19 @@ namespace kagome::network { auto peer_info = host_.getPeerRepository().getPeerInfo(peer_id); if (peer_info.addresses.empty()) { - SL_DEBUG(log_, "Not found addresses for peer_id={}", peer_id.toBase58()); + SL_DEBUG(log_, "Not found addresses for peer {}", peer_id); connecting_peers_.erase(peer_id); return; } auto connectedness = host_.connectedness(peer_info); if (connectedness == libp2p::Host::Connectedness::CAN_NOT_CONNECT) { - SL_DEBUG(log_, "Can not connect to peer_id={}", peer_id.toBase58()); + SL_DEBUG(log_, "Can not connect to peer {}", peer_id); connecting_peers_.erase(peer_id); return; } - SL_DEBUG(log_, "Try to connect to peer_id={}", peer_info.id.toBase58()); + SL_DEBUG(log_, "Try to connect to peer {}", peer_info.id); for (auto addr : peer_info.addresses) { SL_DEBUG(log_, " address: {}", addr.getStringAddress()); } @@ -345,8 +338,8 @@ namespace kagome::network { if (not res.has_value()) { SL_DEBUG(self->log_, - "Connecting to peer_id={} is failed: {}", - peer_id.toBase58(), + "Connecting to peer {} is failed: {}", + peer_id, res.error().message()); self->connecting_peers_.erase(peer_id); return; @@ -355,16 +348,17 @@ namespace kagome::network { auto &connection = res.value(); auto remote_peer_id_res = connection->remotePeer(); if (not remote_peer_id_res.has_value()) { - SL_DEBUG(self->log_, - "Connected, but not identified yet (expecting peer_id={})", - peer_id.toBase58()); + SL_DEBUG( + self->log_, + "Connected, but not identified yet (expecting peer_id={:l})", + peer_id); self->connecting_peers_.erase(peer_id); return; } auto &remote_peer_id = remote_peer_id_res.value(); if (remote_peer_id == peer_id) { - SL_DEBUG(self->log_, "Connected to peer_id={}", peer_id.toBase58()); + SL_DEBUG(self->log_, "Connected to peer {}", peer_id); self->processFullyConnectedPeer(peer_id); } @@ -375,7 +369,7 @@ namespace kagome::network { void PeerManagerImpl::disconnectFromPeer(const PeerId &peer_id) { auto it = active_peers_.find(peer_id); if (it != active_peers_.end()) { - SL_DEBUG(log_, "Disconnect from peer_id={}", peer_id.toBase58()); + SL_DEBUG(log_, "Disconnect from peer {}", peer_id); stream_engine_->del(peer_id); active_peers_.erase(it); sync_peer_num_->set(active_peers_.size()); @@ -404,7 +398,7 @@ namespace kagome::network { if (conn == nullptr) { SL_DEBUG(log_, "Failed to start pinging {}: No connection to this peer exists", - peer_id.toBase58()); + peer_id); return; } auto [_, is_emplaced] = pinging_connections_.emplace(conn); @@ -415,7 +409,7 @@ namespace kagome::network { SL_DEBUG(log_, "Start pinging of {} (conn={})", - peer_id.toBase58(), + peer_id, static_cast(conn.get())); ping_protocol->startPinging( @@ -427,7 +421,7 @@ namespace kagome::network { if (session_res.has_error()) { SL_DEBUG(self->log_, "Stop pinging of {} (conn={}): {}", - peer_id.toBase58(), + peer_id, static_cast(conn.get()), session_res.error().message()); self->pinging_connections_.erase(conn); @@ -435,7 +429,7 @@ namespace kagome::network { } else { SL_DEBUG(self->log_, "Pinging: {} (conn={}) is alive", - peer_id.toBase58(), + peer_id, static_cast(conn.get())); self->keepAlive(peer_id); } @@ -500,8 +494,8 @@ namespace kagome::network { BOOST_ASSERT(queue_to_connect_.size() == peers_in_queue_.size()); SL_DEBUG(log_, - "New peer_id={} enqueued. In queue: {}", - peer_id.toBase58(), + "New peer_id enqueued: {:l}. In queue: {}", + peer_id, queue_to_connect_.size()); } @@ -582,7 +576,7 @@ namespace kagome::network { if (not stream_res.has_value()) { self->log_->warn("Unable to create stream {} with {}: {}", protocol->protocol(), - peer_id.toBase58(), + peer_id, stream_res.error().message()); self->connecting_peers_.erase(peer_id); self->disconnectFromPeer(peer_id); @@ -628,7 +622,7 @@ namespace kagome::network { SL_DEBUG(log_, "Stream {} with {} is alive", block_announce_protocol->protocol(), - peer_id.toBase58()); + peer_id); connecting_peers_.erase(peer_id); } diff --git a/core/network/impl/protocols/block_announce_protocol.cpp b/core/network/impl/protocols/block_announce_protocol.cpp index 80ec6bd05e..e73ff3d605 100644 --- a/core/network/impl/protocols/block_announce_protocol.cpp +++ b/core/network/impl/protocols/block_announce_protocol.cpp @@ -42,9 +42,9 @@ namespace kagome::network { if (auto self = wp.lock()) { if (auto peer_id = stream->remotePeerId()) { SL_TRACE(self->log_, - "Handled {} protocol stream from: {}", + "Handled {} protocol stream from {}", self->protocol_, - peer_id.value().toBase58()); + peer_id.value()); self->onIncomingStream(std::forward(stream)); return; } @@ -102,7 +102,7 @@ namespace kagome::network { self->log_, "Handshake failed on incoming {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, res.error().message()); stream->reset(); return; @@ -113,7 +113,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Can't register incoming {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, res.error().message()); stream->reset(); return; @@ -125,7 +125,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Fully established incoming {} stream with {}", self->protocol_, - peer_id.toBase58()); + peer_id); }); } diff --git a/core/network/impl/protocols/grandpa_protocol.cpp b/core/network/impl/protocols/grandpa_protocol.cpp index eecf4ea578..3aa76158c0 100644 --- a/core/network/impl/protocols/grandpa_protocol.cpp +++ b/core/network/impl/protocols/grandpa_protocol.cpp @@ -439,7 +439,7 @@ namespace kagome::network { if (not info_opt.has_value()) { SL_DEBUG(log_, "Commit with set_id={} in round={} " - "has not been sent to {}: peen is not connected", + "has not been sent to {}: peer is not connected", set_id, round_number, peer_id); @@ -513,7 +513,7 @@ namespace kagome::network { if (not info_opt.has_value()) { SL_DEBUG(log_, "Catch-up-request with set_id={} in round={} " - "has not been sent to {}: peen is not connected", + "has not been sent to {}: peer is not connected", catch_up_request.voter_set_id, catch_up_request.round_number, peer_id); @@ -561,7 +561,7 @@ namespace kagome::network { if (not info_opt.has_value()) { SL_DEBUG(log_, "Catch-up-response with set_id={} in round={} " - "has not been sent to {}: peen is not connected", + "has not been sent to {}: peer is not connected", catch_up_response.voter_set_id, catch_up_response.round_number, peer_id); diff --git a/core/network/impl/protocols/propagate_transactions_protocol.cpp b/core/network/impl/protocols/propagate_transactions_protocol.cpp index 8b1cfb490e..36f3ecba42 100644 --- a/core/network/impl/protocols/propagate_transactions_protocol.cpp +++ b/core/network/impl/protocols/propagate_transactions_protocol.cpp @@ -54,9 +54,9 @@ namespace kagome::network { if (auto self = wp.lock()) { if (auto peer_id = stream->remotePeerId()) { SL_TRACE(self->log_, - "Handled {} protocol stream from: {}", + "Handled {} protocol stream from {}", self->protocol_, - peer_id.value().toBase58()); + peer_id.value()); self->onIncomingStream(std::forward(stream)); return; } @@ -92,7 +92,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Handshake failed on incoming {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, res.error().message()); stream->reset(); return; @@ -103,7 +103,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Can't register incoming {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, res.error().message()); stream->reset(); return; @@ -112,7 +112,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Fully established incoming {} stream with {}", self->protocol_, - peer_id.toBase58()); + peer_id); }); } @@ -134,7 +134,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Can't create outgoing {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, stream_res.error().message()); cb(stream_res.as_failure()); return; @@ -153,7 +153,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Handshake failed on outgoing {} stream with {}: {}", self->protocol_, - stream->remotePeerId().value().toBase58(), + stream->remotePeerId().value(), res.error().message()); stream->reset(); cb(res.as_failure()); @@ -165,7 +165,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Can't register outgoing {} stream with {}: {}", self->protocol_, - stream->remotePeerId().value().toBase58(), + stream->remotePeerId().value(), res.error().message()); stream->reset(); cb(res.as_failure()); @@ -175,7 +175,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Fully established outgoing {} stream with {}", self->protocol_, - stream->remotePeerId().value().toBase58()); + stream->remotePeerId().value()); cb(std::move(stream)); }; @@ -203,7 +203,7 @@ namespace kagome::network { if (not remote_handshake_res.has_value()) { SL_VERBOSE(self->log_, "Can't read handshake from {}: {}", - stream->remotePeerId().value().toBase58(), + stream->remotePeerId().value(), remote_handshake_res.error().message()); stream->reset(); cb(remote_handshake_res.as_failure()); @@ -212,7 +212,7 @@ namespace kagome::network { SL_TRACE(self->log_, "Handshake has received from {}", - stream->remotePeerId().value().toBase58()); + stream->remotePeerId().value()); switch (direction) { case Direction::OUTGOING: @@ -247,7 +247,7 @@ namespace kagome::network { if (not write_res.has_value()) { SL_VERBOSE(self->log_, "Can't send handshake to {}: {}", - stream->remotePeerId().value().toBase58(), + stream->remotePeerId().value(), write_res.error().message()); stream->reset(); cb(write_res.as_failure()); @@ -256,7 +256,7 @@ namespace kagome::network { SL_TRACE(self->log_, "Handshake has sent to {}", - stream->remotePeerId().value().toBase58()); + stream->remotePeerId().value()); switch (direction) { case Direction::OUTGOING: @@ -287,8 +287,8 @@ namespace kagome::network { if (not message_res.has_value()) { SL_VERBOSE(self->log_, - "Can't read grandpa message from {}: {}", - stream->remotePeerId().value().toBase58(), + "Can't read propagated transactions from {}: {}", + stream->remotePeerId().value(), message_res.error().message()); stream->reset(); return; @@ -300,7 +300,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Received {} propagated transactions from {}", message.extrinsics.size(), - peer_id.toBase58()); + peer_id); if (self->babe_->wasSynchronized()) { for (auto &ext : message.extrinsics) { diff --git a/core/network/impl/stream_engine.hpp b/core/network/impl/stream_engine.hpp index 94f008bacb..4adf98f6b7 100644 --- a/core/network/impl/stream_engine.hpp +++ b/core/network/impl/stream_engine.hpp @@ -163,12 +163,12 @@ namespace kagome::network { is_incoming ? stream : nullptr, is_outgoing ? stream : nullptr}); SL_DEBUG(logger_, - "Added {} {} stream with peer_id={}", + "Added {} {} stream with peer {}", direction == Direction::INCOMING ? "incoming" : direction == Direction::OUTGOING ? "outgoing" : "bidirectional", protocol->protocol(), - peer_id.toBase58()); + peer_id); } return outcome::success(); }); @@ -204,9 +204,9 @@ namespace kagome::network { if (reserved) { SL_DEBUG(logger_, - "Reserved {} stream with {}", + "Reserved {} stream with peer {}", protocol->protocol(), - peer_id.toBase58()); + peer_id); } } @@ -384,14 +384,14 @@ namespace kagome::network { dst = src; SL_DEBUG(logger_, - "{} {} stream with peer_id={} was {}", + "{} {} stream with peer {} was {}", direction == Direction::BIDIRECTIONAL ? "Bidirectional" : direction == Direction::INCOMING ? "Incoming" : "Outgoing", protocol->protocol(), dst->remotePeerId().has_value() - ? dst->remotePeerId().value().toBase58() - : "{no PeerId}", + ? fmt::format("{}", dst->remotePeerId().value()) + : "without PeerId", replaced ? "replaced" : "stored"); } @@ -452,7 +452,7 @@ namespace kagome::network { logger_->debug("DUMP: vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"); logger_->debug("DUMP: {}", msg); forEachPeer([&](const auto &peer_id, auto const &proto_map) { - logger_->debug("DUMP: Peer {}", peer_id.toBase58()); + logger_->debug("DUMP: Peer {}", peer_id); for (auto const &[protocol, descr] : proto_map) { logger_->debug("DUMP: Protocol {}", protocol); logger_->debug("DUMP: I={} O={} Messages:{}", From a1c245a6f9dbbb15e6a8f63b42b168c969ba7100 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Fri, 22 Jul 2022 11:20:44 +0300 Subject: [PATCH 13/24] fix: crash at execute next round after applying of justification Signed-off-by: Dmitriy Khaustov aka xDimon --- core/consensus/grandpa/impl/grandpa_impl.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index 822cd5eed8..18f6568560 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -809,7 +809,7 @@ namespace kagome::consensus::grandpa { auto opt_round = selectRound(justification.round_number, std::nullopt); std::shared_ptr round; bool need_to_make_round_current = false; - if (!opt_round.has_value()) { + if (not opt_round.has_value()) { // This is justification for already finalized block if (current_round_->lastFinalizedBlock().number > block_info.number) { return VotingRoundError::JUSTIFICATION_FOR_BLOCK_IN_PAST; @@ -862,17 +862,17 @@ namespace kagome::consensus::grandpa { "Rewind grandpa till round #{} by received justification", justification.round_number); } else { - round = *opt_round; + round = std::move(opt_round.value()); } OUTCOME_TRY(round->applyJustification(block_info, justification)); if (need_to_make_round_current) { current_round_->end(); - current_round_ = std::move(round); + current_round_ = round; } - tryExecuteNextRound(current_round_); + tryExecuteNextRound(round); // if round == current round, then execution of the next round will be // elsewhere From a4aca09b0ab44534d5a76a791c53e4a45794f129 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Fri, 22 Jul 2022 11:25:04 +0300 Subject: [PATCH 14/24] refactor: make sit_id and round in peer data optional Signed-off-by: Dmitriy Khaustov aka xDimon --- .../impl/protocols/grandpa_protocol.cpp | 93 ++++++++++++++----- core/network/peer_manager.hpp | 4 +- 2 files changed, 74 insertions(+), 23 deletions(-) diff --git a/core/network/impl/protocols/grandpa_protocol.cpp b/core/network/impl/protocols/grandpa_protocol.cpp index 3aa76158c0..098611bd60 100644 --- a/core/network/impl/protocols/grandpa_protocol.cpp +++ b/core/network/impl/protocols/grandpa_protocol.cpp @@ -175,8 +175,8 @@ namespace kagome::network { self->peer_manager_->getPeerState(self->own_info_.id); if (own_peer_state.has_value()) { self->neighbor(GrandpaNeighborMessage{ - .round_number = own_peer_state->round_number, - .voter_set_id = own_peer_state->set_id, + .round_number = own_peer_state->round_number.value_or(1), + .voter_set_id = own_peer_state->set_id.value_or(0), .last_finalized = own_peer_state->last_finalized}); } @@ -302,26 +302,33 @@ namespace kagome::network { auto peer_id = stream->remotePeerId().value(); auto &grandpa_message = grandpa_message_res.value(); - SL_VERBOSE(self->log_, "Message has received from {}", peer_id); - visit_in_place( grandpa_message, [&](const network::GrandpaVote &vote_message) { + SL_VERBOSE(self->log_, "VoteMessage has received from {}", peer_id); self->grandpa_observer_->onVoteMessage(peer_id, vote_message); }, [&](const FullCommitMessage &commit_message) { + SL_VERBOSE( + self->log_, "CommitMessage has received from {}", peer_id); self->grandpa_observer_->onCommitMessage(peer_id, commit_message); }, [&](const GrandpaNeighborMessage &neighbor_message) { + SL_VERBOSE( + self->log_, "NeighborMessage has received from {}", peer_id); self->grandpa_observer_->onNeighborMessage(peer_id, neighbor_message); self->peer_manager_->updatePeerState(peer_id, neighbor_message); }, [&](const network::CatchUpRequest &catch_up_request) { + SL_VERBOSE( + self->log_, "CatchUpRequest has received from {}", peer_id); self->grandpa_observer_->onCatchUpRequest(peer_id, catch_up_request); }, [&](const network::CatchUpResponse &catch_up_response) { + SL_VERBOSE( + self->log_, "CatchUpResponse has received from {}", peer_id); self->grandpa_observer_->onCatchUpResponse(peer_id, catch_up_response); }); @@ -341,7 +348,7 @@ namespace kagome::network { if (not info_opt.has_value()) { SL_DEBUG(log_, "Vote signed by {} with set_id={} in round={} " - "has not been sent to {}: peen is not connected", + "has not been sent to {}: peer is not connected", msg.id(), msg.counter, msg.round_number, @@ -350,6 +357,17 @@ namespace kagome::network { } const auto &info = info_opt.value(); + if (not info.set_id.has_value() or not info.round_number.has_value()) { + SL_DEBUG(log_, + "Vote signed by {} with set_id={} in round={} " + "has not been sent to {}: set id or round number unknown", + msg.id(), + msg.counter, + msg.round_number, + peer_id); + return false; + } + // If a peer is at a given voter set, it is impolite to send messages // from an earlier voter set. It is extremely impolite to send messages // from a future voter set. @@ -361,13 +379,13 @@ namespace kagome::network { msg.counter, msg.round_number, peer_id, - info.set_id); + info.set_id.value()); return false; } // If a peer is at round r, is impolite to send messages about r-2 or // earlier - if (msg.round_number + 2 < info.round_number) { + if (msg.round_number + 2 < info.round_number.value()) { SL_DEBUG( log_, "Vote signed by {} with set_id={} in round={} " @@ -376,21 +394,21 @@ namespace kagome::network { msg.counter, msg.round_number, peer_id, - info.round_number); + info.round_number.value()); return false; } // If a peer is at round r, is extremely impolite to send messages about // r+1 or later - if (msg.round_number > info.round_number) { + if (msg.round_number > info.round_number.value()) { SL_DEBUG(log_, "Vote signed by {} with set_id={} in round={} " - "has not been sent to {} as impolite: their round too old: {}", + "has not been sent to {} as impolite: their round is old: {}", msg.id(), msg.counter, msg.round_number, peer_id, - info.round_number); + info.round_number.value()); return false; } @@ -447,6 +465,16 @@ namespace kagome::network { } const auto &info = info_opt.value(); + if (not info.set_id.has_value() or not info.round_number.has_value()) { + SL_DEBUG(log_, + "Commit with set_id={} in round={} " + "has not been sent to {}: set id or round number unknown", + set_id, + round_number, + peer_id); + return false; + } + // It is especially impolite to send commits which are invalid, or from // a different Set ID than the receiving peer has indicated. if (set_id != info.set_id) { @@ -456,12 +484,12 @@ namespace kagome::network { set_id, round_number, peer_id, - info.set_id); + info.set_id.value()); return false; } // Don't send commit if that has not actual for remote peer already - if (round_number < info.round_number) { + if (round_number < info.round_number.value()) { SL_DEBUG( log_, "Commit with set_id={} in round={} " @@ -469,7 +497,7 @@ namespace kagome::network { set_id, round_number, peer_id, - info.round_number); + info.round_number.value()); return false; } @@ -483,7 +511,7 @@ namespace kagome::network { set_id, round_number, peer_id, - info.round_number); + info.round_number.value()); return false; } @@ -505,9 +533,11 @@ namespace kagome::network { void GrandpaProtocol::catchUpRequest(const libp2p::peer::PeerId &peer_id, CatchUpRequest &&catch_up_request) { - SL_DEBUG(log_, - "Send catch-up request: beginning with grandpa round number {}", - catch_up_request.round_number); + SL_DEBUG( + log_, + "Send catch-up-request to {} beginning with grandpa round number {}", + peer_id, + catch_up_request.round_number); auto info_opt = peer_manager_->getPeerState(peer_id); if (not info_opt.has_value()) { @@ -521,6 +551,16 @@ namespace kagome::network { } const auto &info = info_opt.value(); + if (not info.set_id.has_value() or not info.round_number.has_value()) { + SL_DEBUG(log_, + "Catch-up-request with set_id={} in round={} " + "has not been sent to {}: set id or round number unknown", + catch_up_request.voter_set_id, + catch_up_request.round_number, + peer_id); + return; + } + /// Impolite to send a catch up request to a peer in a new different Set ID. if (catch_up_request.voter_set_id != info.set_id) { SL_DEBUG(log_, @@ -534,7 +574,7 @@ namespace kagome::network { /// It is impolite to send a catch up request for a round `R` to a peer /// whose announced view is behind `R`. - if (catch_up_request.round_number < info.round_number - 1) { + if (catch_up_request.round_number < info.round_number.value() - 1) { SL_DEBUG(log_, "Catch-up-request with set_id={} in round={} " "has not been sent to {}: too old round for requested", @@ -569,14 +609,25 @@ namespace kagome::network { } const auto &info = info_opt.value(); + if (not info.set_id.has_value() or not info.round_number.has_value()) { + SL_DEBUG(log_, + "Catch-up-response with set_id={} in round={} " + "has not been sent to {}: set id or round number unknown", + catch_up_response.voter_set_id, + catch_up_response.round_number, + peer_id); + return; + } + /// Impolite to send a catch up request to a peer in a new different Set ID. if (catch_up_response.voter_set_id != info.set_id) { SL_DEBUG(log_, "Catch-up-response with set_id={} in round={} " - "has not been sent to {}: different set id", + "has not been sent to {}: {} set id", catch_up_response.voter_set_id, catch_up_response.round_number, - peer_id); + peer_id, + info.set_id.has_value() ? "different" : "unknown"); return; } diff --git a/core/network/peer_manager.hpp b/core/network/peer_manager.hpp index ca2c911c25..ddd7787ff3 100644 --- a/core/network/peer_manager.hpp +++ b/core/network/peer_manager.hpp @@ -21,8 +21,8 @@ namespace kagome::network { clock::SteadyClock::TimePoint time; Roles roles = 0; BlockInfo best_block = {0, {}}; - RoundNumber round_number = 0; - MembershipCounter set_id = 0; + std::optional round_number = std::nullopt; + std::optional set_id = std::nullopt; BlockNumber last_finalized = 0; }; From 6dfc212ac285c8da60d74ff9b0abc0dacab4adf0 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Fri, 22 Jul 2022 11:32:05 +0300 Subject: [PATCH 15/24] refactor: order of peer data (about grandpa) initialization Signed-off-by: Dmitriy Khaustov aka xDimon --- core/consensus/grandpa/impl/grandpa_impl.cpp | 65 ++++++++++--------- .../impl/protocols/grandpa_protocol.cpp | 1 - 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index 18f6568560..ef6a72da1d 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -327,10 +327,13 @@ namespace kagome::consensus::grandpa { msg.last_finalized, peer_id); - // Iff peer just reached one of recent round, then share known votes auto info = peer_manager_->getPeerState(peer_id); - if (not info.has_value() || msg.voter_set_id != info->set_id - || msg.round_number > info->round_number) { + + // Iff peer just reached one of recent round, then share known votes + if (not info.has_value() + or (info->set_id.has_value() and msg.voter_set_id != info->set_id) + or (info->round_number.has_value() + and msg.round_number > info->round_number)) { if (auto opt_round = selectRound(msg.round_number, msg.voter_set_id); opt_round.has_value()) { auto &round = opt_round.value(); @@ -355,34 +358,34 @@ namespace kagome::consensus::grandpa { return; } - if (info.has_value()) { - if (info->last_finalized <= block_tree_->deepestLeaf().number) { - // Trying to substitute with justifications' request only - auto last_finalized = block_tree_->getLastFinalized(); - synchronizer_->syncMissingJustifications( - peer_id, - last_finalized, - std::nullopt, - [wp = weak_from_this(), last_finalized, msg](auto res) { - auto self = wp.lock(); - if (not self) { - return; - } - if (res.has_error()) { - SL_WARN(self->logger_, - "Missing justifications between blocks {} and " - "{} was not loaded: {}", - last_finalized, - msg.last_finalized, - res.error().message()); - } else { - SL_DEBUG(self->logger_, - "Loaded justifications for blocks in range {} - {}", - last_finalized, - res.value()); - } - }); - } + peer_manager_->updatePeerState(peer_id, msg); + + if (info->last_finalized <= block_tree_->deepestLeaf().number) { + // Trying to substitute with justifications' request only + auto last_finalized = block_tree_->getLastFinalized(); + synchronizer_->syncMissingJustifications( + peer_id, + last_finalized, + std::nullopt, + [wp = weak_from_this(), last_finalized, msg](auto res) { + auto self = wp.lock(); + if (not self) { + return; + } + if (res.has_error()) { + SL_WARN(self->logger_, + "Missing justifications between blocks {} and " + "{} was not loaded: {}", + last_finalized, + msg.last_finalized, + res.error().message()); + } else { + SL_DEBUG(self->logger_, + "Loaded justifications for blocks in range {} - {}", + last_finalized, + res.value()); + } + }); } } diff --git a/core/network/impl/protocols/grandpa_protocol.cpp b/core/network/impl/protocols/grandpa_protocol.cpp index 098611bd60..853d85d3dd 100644 --- a/core/network/impl/protocols/grandpa_protocol.cpp +++ b/core/network/impl/protocols/grandpa_protocol.cpp @@ -318,7 +318,6 @@ namespace kagome::network { self->log_, "NeighborMessage has received from {}", peer_id); self->grandpa_observer_->onNeighborMessage(peer_id, neighbor_message); - self->peer_manager_->updatePeerState(peer_id, neighbor_message); }, [&](const network::CatchUpRequest &catch_up_request) { SL_VERBOSE( From 49960d019d4ddd21c0b2904f8f30fbfb0704dc2c Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Tue, 26 Jul 2022 12:35:45 +0300 Subject: [PATCH 16/24] refactor: comment disconnecting of peer by liveliness Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/peer_manager_impl.cpp | 32 ++++++++++++------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index 38cfe9acfb..daace663ee 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -212,22 +212,22 @@ namespace kagome::network { disconnectFromPeer(peer_id); } - // Check if disconnected - auto block_announce_protocol = router_->getBlockAnnounceProtocol(); - BOOST_ASSERT_MSG(block_announce_protocol, - "Router did not provide block announce protocol"); - - for (auto it = active_peers_.begin(); it != active_peers_.end();) { - auto [peer_id, data] = *it++; - // TODO(d.khaustov) consider better alive check logic - if (not stream_engine_->isAlive(peer_id, block_announce_protocol)) { - // Found disconnected - const auto &peer_id_ref = peer_id; - SL_DEBUG(log_, "Found dead peer: {}", peer_id_ref); - disconnectFromPeer(peer_id); - } - } - +// // Check if disconnected +// auto block_announce_protocol = router_->getBlockAnnounceProtocol(); +// BOOST_ASSERT_MSG(block_announce_protocol, +// "Router did not provide block announce protocol"); +// +// for (auto it = active_peers_.begin(); it != active_peers_.end();) { +// auto [peer_id, data] = *it++; +// // TODO(d.khaustov) consider better alive check logic +// if (not stream_engine_->isAlive(peer_id, block_announce_protocol)) { +// // Found disconnected +// const auto &peer_id_ref = peer_id; +// SL_DEBUG(log_, "Found dead peer: {}", peer_id_ref); +// disconnectFromPeer(peer_id); +// } +// } +// // Soft limit is exceeded if (active_peers_.size() > soft_limit) { // Get oldest peer From 0408f7c1f4233ba69befc71555d422f9c2efeb9b Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Thu, 28 Jul 2022 11:28:00 +0300 Subject: [PATCH 17/24] fix: getting of descending chain to block Signed-off-by: Dmitriy Khaustov aka xDimon --- core/blockchain/impl/block_tree_impl.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index 9e628d5e2e..3ef15baf15 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -985,11 +985,13 @@ namespace kagome::blockchain { } const auto &header = header_res.value(); + chain.emplace_back(hash); + if (header.parent_hash == primitives::BlockHash{}) { break; } - chain.emplace_back(header.parent_hash); + hash = header.parent_hash; } return std::vector(chain.begin(), chain.end()); From 381c90f0555b6b0d9e89cfacc545ae5721c6c8fd Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Thu, 28 Jul 2022 11:56:04 +0300 Subject: [PATCH 18/24] feature: suppression of catch-up request to one peer for time Signed-off-by: Dmitriy Khaustov aka xDimon --- .../impl/protocols/grandpa_protocol.cpp | 35 ++++++++++++++++--- .../impl/protocols/grandpa_protocol.hpp | 13 ++++++- .../impl/protocols/protocol_factory.cpp | 10 ++++-- .../impl/protocols/protocol_factory.hpp | 6 +++- core/network/impl/synchronizer_impl.hpp | 3 +- core/network/types/grandpa_message.hpp | 9 +++++ 6 files changed, 65 insertions(+), 11 deletions(-) diff --git a/core/network/impl/protocols/grandpa_protocol.cpp b/core/network/impl/protocols/grandpa_protocol.cpp index 853d85d3dd..0c26bfa652 100644 --- a/core/network/impl/protocols/grandpa_protocol.cpp +++ b/core/network/impl/protocols/grandpa_protocol.cpp @@ -26,14 +26,16 @@ namespace kagome::network { std::shared_ptr grandpa_observer, const OwnPeerInfo &own_info, std::shared_ptr stream_engine, - std::shared_ptr peer_manager) + std::shared_ptr peer_manager, + std::shared_ptr scheduler) : host_(host), io_context_(std::move(io_context)), app_config_(app_config), grandpa_observer_(std::move(grandpa_observer)), own_info_(own_info), stream_engine_(std::move(stream_engine)), - peer_manager_(std::move(peer_manager)) { + peer_manager_(std::move(peer_manager)), + scheduler_(std::move(scheduler)) { const_cast(protocol_) = kGrandpaProtocol; } @@ -560,7 +562,7 @@ namespace kagome::network { return; } - /// Impolite to send a catch up request to a peer in a new different Set ID. + // Impolite to send a catch up request to a peer in a new different Set ID. if (catch_up_request.voter_set_id != info.set_id) { SL_DEBUG(log_, "Catch-up-request with set_id={} in round={} " @@ -571,8 +573,8 @@ namespace kagome::network { return; } - /// It is impolite to send a catch up request for a round `R` to a peer - /// whose announced view is behind `R`. + // It is impolite to send a catch-up request for a round `R` to a peer + // whose announced view is behind `R`. if (catch_up_request.round_number < info.round_number.value() - 1) { SL_DEBUG(log_, "Catch-up-request with set_id={} in round={} " @@ -583,6 +585,29 @@ namespace kagome::network { return; } + auto fingerprint = 0;//catch_up_request.fingerprint(); + auto [_, ok] = recent_catchup_requests_.emplace(peer_id, fingerprint); + + // It is impolite to replay a catch-up request + if (not ok) { + SL_DEBUG(log_, + "Catch-up-request with set_id={} in round={} " + "has not been sent to {}: impolite to replay catch-up request", + catch_up_request.voter_set_id, + catch_up_request.round_number, + peer_id); + return; + } + + scheduler_->schedule( + [wp = weak_from_this(), peer_id, fingerprint] { + if (auto self = wp.lock()) { + self->recent_catchup_requests_.erase( + std::tuple(peer_id, fingerprint)); + } + }, + kRecentnessDuration); + auto shared_msg = KAGOME_EXTRACT_SHARED_CACHE(GrandpaProtocol, GrandpaMessage); (*shared_msg) = GrandpaMessage(std::move(catch_up_request)); diff --git a/core/network/impl/protocols/grandpa_protocol.hpp b/core/network/impl/protocols/grandpa_protocol.hpp index 251e51296a..6684ffdbc9 100644 --- a/core/network/impl/protocols/grandpa_protocol.hpp +++ b/core/network/impl/protocols/grandpa_protocol.hpp @@ -12,6 +12,7 @@ #include #include +#include #include "application/app_configuration.hpp" #include "consensus/grandpa/grandpa_observer.hpp" @@ -48,7 +49,9 @@ namespace kagome::network { std::shared_ptr grandpa_observer, const OwnPeerInfo &own_info, std::shared_ptr stream_engine, - std::shared_ptr peer_manager); + std::shared_ptr peer_manager, + std::shared_ptr scheduler + ); const Protocol &protocol() const override { return protocol_; @@ -94,6 +97,9 @@ namespace kagome::network { const int &msg, std::function>)> &&cb); + static constexpr std::chrono::milliseconds kRecentnessDuration = + std::chrono::seconds(300); + libp2p::Host &host_; std::shared_ptr io_context_; const application::AppConfiguration &app_config_; @@ -101,7 +107,12 @@ namespace kagome::network { const OwnPeerInfo &own_info_; std::shared_ptr stream_engine_; std::shared_ptr peer_manager_; + std::shared_ptr scheduler_; const libp2p::peer::Protocol protocol_; + + std::set> + recent_catchup_requests_; + log::Logger log_ = log::createLogger("GrandpaProtocol", "grandpa_protocol"); }; diff --git a/core/network/impl/protocols/protocol_factory.cpp b/core/network/impl/protocols/protocol_factory.cpp index db56c8b5d3..da28008561 100644 --- a/core/network/impl/protocols/protocol_factory.cpp +++ b/core/network/impl/protocols/protocol_factory.cpp @@ -19,7 +19,8 @@ namespace kagome::network { extrinsic_events_engine, std::shared_ptr ext_event_key_repo, - std::shared_ptr peer_rating_repository) + std::shared_ptr peer_rating_repository, + std::shared_ptr scheduler) : host_(host), app_config_(app_config), chain_spec_(chain_spec), @@ -29,13 +30,15 @@ namespace kagome::network { stream_engine_(std::move(stream_engine)), extrinsic_events_engine_{std::move(extrinsic_events_engine)}, ext_event_key_repo_{std::move(ext_event_key_repo)}, - peer_rating_repository_{std::move(peer_rating_repository)} { + peer_rating_repository_{std::move(peer_rating_repository)}, + scheduler_{std::move(scheduler)} { BOOST_ASSERT(io_context_ != nullptr); BOOST_ASSERT(hasher_ != nullptr); BOOST_ASSERT(stream_engine_ != nullptr); BOOST_ASSERT(extrinsic_events_engine_ != nullptr); BOOST_ASSERT(ext_event_key_repo_ != nullptr); BOOST_ASSERT(peer_rating_repository_ != nullptr); + BOOST_ASSERT(scheduler_ != nullptr); } std::shared_ptr @@ -57,7 +60,8 @@ namespace kagome::network { grandpa_observer_.lock(), own_info_, stream_engine_, - peer_manager_.lock()); + peer_manager_.lock(), + scheduler_); } std::shared_ptr diff --git a/core/network/impl/protocols/protocol_factory.hpp b/core/network/impl/protocols/protocol_factory.hpp index 81eefa1907..f7659b7f4c 100644 --- a/core/network/impl/protocols/protocol_factory.hpp +++ b/core/network/impl/protocols/protocol_factory.hpp @@ -17,6 +17,8 @@ #include "network/rating_repository.hpp" #include "primitives/event_types.hpp" +#include + namespace kagome::network { class ProtocolFactory final { @@ -33,7 +35,8 @@ namespace kagome::network { extrinsic_events_engine, std::shared_ptr ext_event_key_repo, - std::shared_ptr peer_rating_repository); + std::shared_ptr peer_rating_repository, + std::shared_ptr scheduler); void setBlockTree( const std::shared_ptr &block_tree) { @@ -92,6 +95,7 @@ namespace kagome::network { std::shared_ptr ext_event_key_repo_; std::shared_ptr peer_rating_repository_; + std::shared_ptr scheduler_; std::weak_ptr block_tree_; std::weak_ptr babe_; diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index 5373b5b85c..01c60d1577 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -200,7 +200,8 @@ namespace kagome::network { std::atomic_bool asking_blocks_portion_in_progress_ = false; std::set busy_peers_; - std::set> recent_requests_; + std::set> + recent_requests_; }; } // namespace kagome::network diff --git a/core/network/types/grandpa_message.hpp b/core/network/types/grandpa_message.hpp index 7f284f3899..4a3cf787fe 100644 --- a/core/network/types/grandpa_message.hpp +++ b/core/network/types/grandpa_message.hpp @@ -78,6 +78,15 @@ namespace kagome::network { struct CatchUpRequest { RoundNumber round_number; MembershipCounter voter_set_id; + + using Fingerprint = size_t; + + inline Fingerprint fingerprint() const { + auto result = std::hash()(round_number); + + boost::hash_combine(result, std::hash()(voter_set_id)); + return result; + }; }; template Date: Thu, 28 Jul 2022 11:58:09 +0300 Subject: [PATCH 19/24] fix: log and some corner case in sync protocol Signed-off-by: Dmitriy Khaustov aka xDimon --- .../impl/sync_protocol_observer_impl.cpp | 47 +++++++++++++++---- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/core/network/impl/sync_protocol_observer_impl.cpp b/core/network/impl/sync_protocol_observer_impl.cpp index 1c64b62020..d215d28853 100644 --- a/core/network/impl/sync_protocol_observer_impl.cpp +++ b/core/network/impl/sync_protocol_observer_impl.cpp @@ -10,6 +10,7 @@ #include "application/app_configuration.hpp" #include "network/common.hpp" #include "network/helpers/peer_id_formatter.hpp" +#include "primitives/common.hpp" OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, SyncProtocolObserverImpl::Error, @@ -68,17 +69,37 @@ namespace kagome::network { if (response.blocks.empty()) { SL_DEBUG(log_, "Return response id={}: no blocks", request_id); } else if (response.blocks.size() == 1) { - SL_DEBUG(log_, - "Return response id={}: {}, count 1", - request_id, - response.blocks.front().hash); + if (response.blocks.front().header.has_value()) { + SL_DEBUG(log_, + "Return response id={}: {}, count 1", + request_id, + primitives::BlockInfo(response.blocks.front().header->number, + response.blocks.front().hash)); + } else { + SL_DEBUG(log_, + "Return response id={}: {}, count 1", + request_id, + response.blocks.front().hash); + } } else { - SL_DEBUG(log_, - "Return response id={}: from {} to {}, count {}", - request_id, - response.blocks.front().hash, - response.blocks.back().hash, - response.blocks.size()); + if (response.blocks.front().header.has_value() + and response.blocks.back().header.has_value()) { + SL_DEBUG(log_, + "Return response id={}: from {} to {}, count {}", + request_id, + primitives::BlockInfo(response.blocks.front().header->number, + response.blocks.front().hash), + primitives::BlockInfo(response.blocks.back().header->number, + response.blocks.back().hash), + response.blocks.size()); + } else { + SL_DEBUG(log_, + "Return response id={}: from {} to {}, count {}", + request_id, + response.blocks.front().hash, + response.blocks.back().hash, + response.blocks.size()); + } } requested_ids_.erase(request_id); @@ -141,12 +162,18 @@ namespace kagome::network { auto header_res = blocks_headers_->getBlockHeader(hash); if (header_res) { new_block.header = std::move(header_res.value()); + } else { + response.blocks.pop_back(); + break; } } if (body_needed) { auto body_res = block_tree_->getBlockBody(hash); if (body_res) { new_block.body = std::move(body_res.value()); + } else { + response.blocks.pop_back(); + break; } } if (justification_needed) { From be61d472583c343eb3ef20a1b5174cbf2e78022e Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Thu, 28 Jul 2022 11:59:13 +0300 Subject: [PATCH 20/24] refactor: use cli bootstrap nodes early then spec's ones Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/peer_manager_impl.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index daace663ee..87caf08db0 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -131,6 +131,11 @@ namespace kagome::network { // Start Identify protocol identify_->start(); + // Enqueue bootstrap nodes with permanent lifetime + for (const auto &bootstrap_node : bootstrap_nodes_) { + kademlia_->addPeer(bootstrap_node, true); + } + // Enqueue last active peers as first peers set but with limited lifetime auto last_active_peers = loadLastActivePeers(); SL_DEBUG(log_, @@ -140,11 +145,6 @@ namespace kagome::network { kademlia_->addPeer(peer_info, false); } - // Enqueue bootstrap nodes with permanent lifetime - for (const auto &bootstrap_node : bootstrap_nodes_) { - kademlia_->addPeer(bootstrap_node, true); - } - // Start Kademlia (processing incoming message and random walking) kademlia_->start(); From db0900303810691dd0e38b408cc5a4f4d400a555 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Thu, 28 Jul 2022 12:05:07 +0300 Subject: [PATCH 21/24] fix: allow to create next round by commit for non finalized Signed-off-by: Dmitriy Khaustov aka xDimon --- core/consensus/grandpa/impl/grandpa_impl.cpp | 110 ++++++++++-------- .../grandpa/impl/voting_round_impl.cpp | 4 +- 2 files changed, 64 insertions(+), 50 deletions(-) diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index ef6a72da1d..6dc7eeb459 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -164,9 +164,8 @@ namespace kagome::consensus::grandpa { std::shared_ptr GrandpaImpl::makeNextRound( const std::shared_ptr &round) { - BOOST_ASSERT(round->finalizedBlock().has_value()); - - BlockInfo best_block = round->finalizedBlock().value(); + BlockInfo best_block = + round->finalizedBlock().value_or(round->lastFinalizedBlock()); auto authorities_opt = authority_manager_->authorities(best_block, IsBlockFinalized{true}); @@ -809,63 +808,78 @@ namespace kagome::consensus::grandpa { outcome::result GrandpaImpl::applyJustification( const BlockInfo &block_info, const GrandpaJustification &justification) { - auto opt_round = selectRound(justification.round_number, std::nullopt); + auto round_opt = selectRound(justification.round_number, std::nullopt); std::shared_ptr round; bool need_to_make_round_current = false; - if (not opt_round.has_value()) { + if (round_opt.has_value()) { + round = std::move(round_opt.value()); + } else { // This is justification for already finalized block if (current_round_->lastFinalizedBlock().number > block_info.number) { return VotingRoundError::JUSTIFICATION_FOR_BLOCK_IN_PAST; } - MovableRoundState round_state{ - .round_number = justification.round_number, - .last_finalized_block = current_round_->lastFinalizedBlock(), - .votes = {}, - .finalized = block_info}; - - auto authorities_opt = - authority_manager_->authorities(block_info, IsBlockFinalized{false}); - if (!authorities_opt) { - SL_WARN(logger_, - "Can't retrieve authorities to apply a justification " - "at block {}", - block_info); - return VotingRoundError::NO_KNOWN_AUTHORITIES_FOR_BLOCK; - } - auto &authorities = authorities_opt.value(); - - // This is justification for non-actual round - if (authorities->id < current_round_->voterSetId() - or (authorities->id == current_round_->voterSetId() - && justification.round_number < current_round_->roundNumber())) { - return VotingRoundError::JUSTIFICATION_FOR_ROUND_IN_PAST; - } + auto prev_round_opt = + selectRound(justification.round_number - 1, std::nullopt); + + if (prev_round_opt.has_value()) { + const auto &prev_round = prev_round_opt.value(); + round = makeNextRound(prev_round); + need_to_make_round_current = true; + BOOST_ASSERT(round); + + SL_DEBUG(logger_, + "Hop grandpa to round #{} by received justification", + justification.round_number); + } else { + MovableRoundState round_state{ + .round_number = justification.round_number, + .last_finalized_block = current_round_->lastFinalizedBlock(), + .votes = {}, + .finalized = block_info}; + + auto authorities_opt = authority_manager_->authorities( + block_info, IsBlockFinalized{false}); + if (!authorities_opt) { + SL_WARN(logger_, + "Can't retrieve authorities to apply a justification " + "at block {}", + block_info); + return VotingRoundError::NO_KNOWN_AUTHORITIES_FOR_BLOCK; + } + auto &authorities = authorities_opt.value(); + + // This is justification for non-actual round + if (authorities->id < current_round_->voterSetId() + or (authorities->id == current_round_->voterSetId() + && justification.round_number + < current_round_->roundNumber())) { + return VotingRoundError::JUSTIFICATION_FOR_ROUND_IN_PAST; + } - if (authorities->id > current_round_->voterSetId() + 1) { - return VotingRoundError::WRONG_ORDER_OF_VOTER_SET_ID; - } + if (authorities->id > current_round_->voterSetId() + 1) { + return VotingRoundError::WRONG_ORDER_OF_VOTER_SET_ID; + } - auto voters = std::make_shared(authorities->id); - for (const auto &authority : *authorities) { - auto res = voters->insert( - primitives::GrandpaSessionKey(authority.id.id), authority.weight); - if (res.has_error()) { - SL_CRITICAL( - logger_, "Can't make voter set: {}", res.error().message()); - return res.as_failure(); + auto voters = std::make_shared(authorities->id); + for (const auto &authority : *authorities) { + auto res = voters->insert( + primitives::GrandpaSessionKey(authority.id.id), authority.weight); + if (res.has_error()) { + SL_CRITICAL( + logger_, "Can't make voter set: {}", res.error().message()); + return res.as_failure(); + } } - } - round = makeInitialRound(round_state, std::move(voters)); - need_to_make_round_current = true; - BOOST_ASSERT(round); + round = makeInitialRound(round_state, std::move(voters)); + need_to_make_round_current = true; + BOOST_ASSERT(round); - SL_DEBUG(logger_, - "Rewind grandpa till round #{} by received justification", - justification.round_number); - } else { - round = std::move(opt_round.value()); + SL_DEBUG(logger_, + "Rewind grandpa till round #{} by received justification", + justification.round_number); + } } OUTCOME_TRY(round->applyJustification(block_info, justification)); diff --git a/core/consensus/grandpa/impl/voting_round_impl.cpp b/core/consensus/grandpa/impl/voting_round_impl.cpp index 51f7ae7423..9ac3d349e2 100644 --- a/core/consensus/grandpa/impl/voting_round_impl.cpp +++ b/core/consensus/grandpa/impl/voting_round_impl.cpp @@ -114,10 +114,10 @@ namespace kagome::consensus::grandpa { clock, scheduler) { BOOST_ASSERT(previous_round != nullptr); - BOOST_ASSERT(previous_round->finalizedBlock().has_value()); previous_round_ = previous_round; - last_finalized_block_ = previous_round->finalizedBlock().value(); + last_finalized_block_ = previous_round->finalizedBlock().value_or( + previous_round->lastFinalizedBlock()); } VotingRoundImpl::VotingRoundImpl( From d94eb5cbb56ddb983d8d84cf01897c8d09963038 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Thu, 28 Jul 2022 12:05:57 +0300 Subject: [PATCH 22/24] fix: order of peer state update Signed-off-by: Dmitriy Khaustov aka xDimon --- core/consensus/grandpa/impl/grandpa_impl.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index 6dc7eeb459..6efdf997b4 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -340,6 +340,8 @@ namespace kagome::consensus::grandpa { } } + peer_manager_->updatePeerState(peer_id, msg); + // If peer has the same voter set id if (msg.voter_set_id == current_round_->voterSetId()) { // Check if needed to catch-up peer, then do that @@ -357,8 +359,6 @@ namespace kagome::consensus::grandpa { return; } - peer_manager_->updatePeerState(peer_id, msg); - if (info->last_finalized <= block_tree_->deepestLeaf().number) { // Trying to substitute with justifications' request only auto last_finalized = block_tree_->getLastFinalized(); From 025af5a982ac8c340083d6dbd28db578b155682f Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Thu, 28 Jul 2022 12:13:48 +0300 Subject: [PATCH 23/24] fix: typo Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/peer_manager_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index 87caf08db0..47eb662414 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -148,7 +148,7 @@ namespace kagome::network { // Start Kademlia (processing incoming message and random walking) kademlia_->start(); - // Do first aligning of peers count + // Do first alignment of peers count align(); return true; From a861af587f14c8ba600dbb9cb821531375750816 Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov aka xDimon Date: Mon, 1 Aug 2022 10:47:40 +0300 Subject: [PATCH 24/24] fix: review issues Signed-off-by: Dmitriy Khaustov aka xDimon --- core/network/impl/protocols/grandpa_protocol.hpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/network/impl/protocols/grandpa_protocol.hpp b/core/network/impl/protocols/grandpa_protocol.hpp index 6684ffdbc9..f054cf168b 100644 --- a/core/network/impl/protocols/grandpa_protocol.hpp +++ b/core/network/impl/protocols/grandpa_protocol.hpp @@ -10,9 +10,9 @@ #include +#include #include #include -#include #include "application/app_configuration.hpp" #include "consensus/grandpa/grandpa_observer.hpp" @@ -50,8 +50,7 @@ namespace kagome::network { const OwnPeerInfo &own_info, std::shared_ptr stream_engine, std::shared_ptr peer_manager, - std::shared_ptr scheduler - ); + std::shared_ptr scheduler); const Protocol &protocol() const override { return protocol_; @@ -97,6 +96,9 @@ namespace kagome::network { const int &msg, std::function>)> &&cb); + /// Node should send catch-up requests rarely to be polite, because + /// processing of them consume more enough resources. + /// How long replying outgoing catch-up requests must be suppressed static constexpr std::chrono::milliseconds kRecentnessDuration = std::chrono::seconds(300);