diff --git a/core/application/impl/app_configuration_impl.cpp b/core/application/impl/app_configuration_impl.cpp index 839d94a8ec..c8b6892a49 100644 --- a/core/application/impl/app_configuration_impl.cpp +++ b/core/application/impl/app_configuration_impl.cpp @@ -295,6 +295,7 @@ namespace kagome::application { bool validator_mode = false; load_bool(val, "validator", validator_mode); if (validator_mode) { + roles_.flags.full = 0; roles_.flags.authority = 1; } @@ -800,7 +801,7 @@ namespace kagome::application { } } - roles_.flags.full = 1; + roles_.flags.full = 0; roles_.flags.authority = 1; p2p_port_ = def_p2p_port; rpc_http_host_ = def_rpc_http_host; @@ -822,6 +823,7 @@ namespace kagome::application { }); if (vm.end() != vm.find("validator")) { + roles_.flags.full = 0; roles_.flags.authority = 1; } diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index 9e628d5e2e..3ef15baf15 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -985,11 +985,13 @@ namespace kagome::blockchain { } const auto &header = header_res.value(); + chain.emplace_back(hash); + if (header.parent_hash == primitives::BlockHash{}) { break; } - chain.emplace_back(header.parent_hash); + hash = header.parent_hash; } return std::vector(chain.begin(), chain.end()); diff --git a/core/consensus/babe/block_executor.hpp b/core/consensus/babe/block_executor.hpp index dd7b914d4c..ba22aef345 100644 --- a/core/consensus/babe/block_executor.hpp +++ b/core/consensus/babe/block_executor.hpp @@ -16,6 +16,10 @@ namespace kagome::consensus { virtual ~BlockExecutor() = default; virtual outcome::result applyBlock(primitives::BlockData &&block) = 0; + + virtual outcome::result applyJustification( + const primitives::BlockInfo &block_info, + const primitives::Justification &justification) = 0; }; } // namespace kagome::consensus diff --git a/core/consensus/babe/impl/block_executor_impl.cpp b/core/consensus/babe/impl/block_executor_impl.cpp index e02259f8c5..9df9149091 100644 --- a/core/consensus/babe/impl/block_executor_impl.cpp +++ b/core/consensus/babe/impl/block_executor_impl.cpp @@ -209,8 +209,8 @@ namespace kagome::consensus { SL_VERBOSE(logger_, "Next epoch digest has gotten in block {} (slot {}). " "Randomness: {}", - slot_number, primitives::BlockInfo(block.header.number, block_hash), + slot_number, next_epoch_digest.randomness); } @@ -284,8 +284,7 @@ namespace kagome::consensus { if (not justifications_.empty()) { std::vector to_remove; for (const auto &[block_info, justification] : justifications_) { - auto res = grandpa_environment_->applyJustification(block_info, - justification); + auto res = applyJustification(block_info, justification); if (res) { to_remove.push_back(block_info); } @@ -297,7 +296,7 @@ namespace kagome::consensus { } } - auto res = grandpa_environment_->applyJustification( + auto res = applyJustification( primitives::BlockInfo(block.header.number, block_hash), b.justification.value()); if (res.has_error()) { @@ -358,6 +357,12 @@ namespace kagome::consensus { return outcome::success(); } + outcome::result BlockExecutorImpl::applyJustification( + const primitives::BlockInfo &block_info, + const primitives::Justification &justification) { + return grandpa_environment_->applyJustification(block_info, justification); + } + void BlockExecutorImpl::rollbackBlock(const primitives::BlockInfo &block) { // Remove possible authority changes scheduled on block authority_update_observer_->cancel(block); diff --git a/core/consensus/babe/impl/block_executor_impl.hpp b/core/consensus/babe/impl/block_executor_impl.hpp index 99b9118248..66a199d2af 100644 --- a/core/consensus/babe/impl/block_executor_impl.hpp +++ b/core/consensus/babe/impl/block_executor_impl.hpp @@ -52,6 +52,10 @@ namespace kagome::consensus { outcome::result applyBlock(primitives::BlockData &&block) override; + outcome::result applyJustification( + const primitives::BlockInfo &block_info, + const primitives::Justification &justification) override; + private: void rollbackBlock(const primitives::BlockInfo &block); diff --git a/core/consensus/grandpa/impl/grandpa_impl.cpp b/core/consensus/grandpa/impl/grandpa_impl.cpp index ce521639fa..6efdf997b4 100644 --- a/core/consensus/grandpa/impl/grandpa_impl.cpp +++ b/core/consensus/grandpa/impl/grandpa_impl.cpp @@ -164,9 +164,8 @@ namespace kagome::consensus::grandpa { std::shared_ptr GrandpaImpl::makeNextRound( const std::shared_ptr &round) { - BOOST_ASSERT(round->finalizedBlock().has_value()); - - BlockInfo best_block = round->finalizedBlock().value(); + BlockInfo best_block = + round->finalizedBlock().value_or(round->lastFinalizedBlock()); auto authorities_opt = authority_manager_->authorities(best_block, IsBlockFinalized{true}); @@ -298,6 +297,11 @@ namespace kagome::consensus::grandpa { metric_highest_round_->set(current_round_->roundNumber()); if (keypair_) { current_round_->play(); + } else { + auto round = std::dynamic_pointer_cast(current_round_); + if (round) { + round->sendNeighborMessage(); + } } } @@ -322,28 +326,66 @@ namespace kagome::consensus::grandpa { msg.last_finalized, peer_id); - // Ignore peer whose voter_set is different - if (msg.voter_set_id != current_round_->voterSetId()) { - return; - } - - // Check if needed to catch-up peer, then do that - if (msg.round_number >= current_round_->roundNumber() + kCatchUpThreshold) { - std::ignore = environment_->onCatchUpRequested( - peer_id, msg.voter_set_id, msg.round_number - 1); - return; - } + auto info = peer_manager_->getPeerState(peer_id); // Iff peer just reached one of recent round, then share known votes - auto info = peer_manager_->getPeerState(peer_id); - if (not info.has_value() || msg.voter_set_id != info->set_id - || msg.round_number > info->round_number) { + if (not info.has_value() + or (info->set_id.has_value() and msg.voter_set_id != info->set_id) + or (info->round_number.has_value() + and msg.round_number > info->round_number)) { if (auto opt_round = selectRound(msg.round_number, msg.voter_set_id); opt_round.has_value()) { auto &round = opt_round.value(); environment_->sendState(peer_id, round->state(), msg.voter_set_id); } } + + peer_manager_->updatePeerState(peer_id, msg); + + // If peer has the same voter set id + if (msg.voter_set_id == current_round_->voterSetId()) { + // Check if needed to catch-up peer, then do that + if (msg.round_number + >= current_round_->roundNumber() + kCatchUpThreshold) { + std::ignore = environment_->onCatchUpRequested( + peer_id, msg.voter_set_id, msg.round_number - 1); + } + + return; + } + + // Ignore peer whose voter set id lower than our current + if (msg.voter_set_id < current_round_->voterSetId()) { + return; + } + + if (info->last_finalized <= block_tree_->deepestLeaf().number) { + // Trying to substitute with justifications' request only + auto last_finalized = block_tree_->getLastFinalized(); + synchronizer_->syncMissingJustifications( + peer_id, + last_finalized, + std::nullopt, + [wp = weak_from_this(), last_finalized, msg](auto res) { + auto self = wp.lock(); + if (not self) { + return; + } + if (res.has_error()) { + SL_WARN(self->logger_, + "Missing justifications between blocks {} and " + "{} was not loaded: {}", + last_finalized, + msg.last_finalized, + res.error().message()); + } else { + SL_DEBUG(self->logger_, + "Loaded justifications for blocks in range {} - {}", + last_finalized, + res.value()); + } + }); + } } void GrandpaImpl::onCatchUpRequest(const libp2p::peer::PeerId &peer_id, @@ -766,73 +808,88 @@ namespace kagome::consensus::grandpa { outcome::result GrandpaImpl::applyJustification( const BlockInfo &block_info, const GrandpaJustification &justification) { - auto opt_round = selectRound(justification.round_number, std::nullopt); + auto round_opt = selectRound(justification.round_number, std::nullopt); std::shared_ptr round; bool need_to_make_round_current = false; - if (!opt_round.has_value()) { + if (round_opt.has_value()) { + round = std::move(round_opt.value()); + } else { // This is justification for already finalized block if (current_round_->lastFinalizedBlock().number > block_info.number) { return VotingRoundError::JUSTIFICATION_FOR_BLOCK_IN_PAST; } - MovableRoundState round_state{ - .round_number = justification.round_number, - .last_finalized_block = current_round_->lastFinalizedBlock(), - .votes = {}, - .finalized = block_info}; - - auto authorities_opt = - authority_manager_->authorities(block_info, IsBlockFinalized{false}); - if (!authorities_opt) { - SL_WARN(logger_, - "Can't retrieve authorities to apply a justification " - "at block {}", - block_info); - return VotingRoundError::NO_KNOWN_AUTHORITIES_FOR_BLOCK; - } - auto &authorities = authorities_opt.value(); - - // This is justification for non-actual round - if (authorities->id < current_round_->voterSetId() - or (authorities->id == current_round_->voterSetId() - && justification.round_number < current_round_->roundNumber())) { - return VotingRoundError::JUSTIFICATION_FOR_ROUND_IN_PAST; - } + auto prev_round_opt = + selectRound(justification.round_number - 1, std::nullopt); + + if (prev_round_opt.has_value()) { + const auto &prev_round = prev_round_opt.value(); + round = makeNextRound(prev_round); + need_to_make_round_current = true; + BOOST_ASSERT(round); + + SL_DEBUG(logger_, + "Hop grandpa to round #{} by received justification", + justification.round_number); + } else { + MovableRoundState round_state{ + .round_number = justification.round_number, + .last_finalized_block = current_round_->lastFinalizedBlock(), + .votes = {}, + .finalized = block_info}; + + auto authorities_opt = authority_manager_->authorities( + block_info, IsBlockFinalized{false}); + if (!authorities_opt) { + SL_WARN(logger_, + "Can't retrieve authorities to apply a justification " + "at block {}", + block_info); + return VotingRoundError::NO_KNOWN_AUTHORITIES_FOR_BLOCK; + } + auto &authorities = authorities_opt.value(); + + // This is justification for non-actual round + if (authorities->id < current_round_->voterSetId() + or (authorities->id == current_round_->voterSetId() + && justification.round_number + < current_round_->roundNumber())) { + return VotingRoundError::JUSTIFICATION_FOR_ROUND_IN_PAST; + } - if (authorities->id > current_round_->voterSetId() + 1) { - return VotingRoundError::WRONG_ORDER_OF_VOTER_SET_ID; - } + if (authorities->id > current_round_->voterSetId() + 1) { + return VotingRoundError::WRONG_ORDER_OF_VOTER_SET_ID; + } - auto voters = std::make_shared(authorities->id); - for (const auto &authority : *authorities) { - auto res = voters->insert( - primitives::GrandpaSessionKey(authority.id.id), authority.weight); - if (res.has_error()) { - SL_CRITICAL( - logger_, "Can't make voter set: {}", res.error().message()); - return res.as_failure(); + auto voters = std::make_shared(authorities->id); + for (const auto &authority : *authorities) { + auto res = voters->insert( + primitives::GrandpaSessionKey(authority.id.id), authority.weight); + if (res.has_error()) { + SL_CRITICAL( + logger_, "Can't make voter set: {}", res.error().message()); + return res.as_failure(); + } } - } - round = makeInitialRound(round_state, std::move(voters)); - need_to_make_round_current = true; - BOOST_ASSERT(round); + round = makeInitialRound(round_state, std::move(voters)); + need_to_make_round_current = true; + BOOST_ASSERT(round); - SL_DEBUG(logger_, - "Rewind grandpa till round #{} by received justification", - justification.round_number); - } else { - round = *opt_round; + SL_DEBUG(logger_, + "Rewind grandpa till round #{} by received justification", + justification.round_number); + } } OUTCOME_TRY(round->applyJustification(block_info, justification)); if (need_to_make_round_current) { current_round_->end(); - current_round_ = std::move(round); + current_round_ = round; } - tryExecuteNextRound(current_round_); + tryExecuteNextRound(round); // if round == current round, then execution of the next round will be // elsewhere diff --git a/core/consensus/grandpa/impl/voting_round_impl.cpp b/core/consensus/grandpa/impl/voting_round_impl.cpp index 51f7ae7423..9ac3d349e2 100644 --- a/core/consensus/grandpa/impl/voting_round_impl.cpp +++ b/core/consensus/grandpa/impl/voting_round_impl.cpp @@ -114,10 +114,10 @@ namespace kagome::consensus::grandpa { clock, scheduler) { BOOST_ASSERT(previous_round != nullptr); - BOOST_ASSERT(previous_round->finalizedBlock().has_value()); previous_round_ = previous_round; - last_finalized_block_ = previous_round->finalizedBlock().value(); + last_finalized_block_ = previous_round->finalizedBlock().value_or( + previous_round->lastFinalizedBlock()); } VotingRoundImpl::VotingRoundImpl( diff --git a/core/consensus/grandpa/impl/voting_round_impl.hpp b/core/consensus/grandpa/impl/voting_round_impl.hpp index 6c573eda94..eb7689ac2c 100644 --- a/core/consensus/grandpa/impl/voting_round_impl.hpp +++ b/core/consensus/grandpa/impl/voting_round_impl.hpp @@ -239,6 +239,8 @@ namespace kagome::consensus::grandpa { */ MovableRoundState state() const override; + void sendNeighborMessage(); + private: /// Check if peer \param id is primary bool isPrimary(const Id &id) const; @@ -290,7 +292,6 @@ namespace kagome::consensus::grandpa { outcome::result validatePrecommitJustification( const BlockInfo &vote, const GrandpaJustification &justification) const; - void sendNeighborMessage(); void sendProposal(const PrimaryPropose &primary_proposal); void sendPrevote(const Prevote &prevote); void sendPrecommit(const Precommit &precommit); diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index 6ac6c72ccc..47eb662414 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -100,9 +100,9 @@ namespace kagome::network { self->peer_rating_repository_->rating(peer_id); rating < 0) { SL_DEBUG(self->log_, - "Disconnecting from peer_id={} due to its negative " + "Disconnecting from peer {} due to its negative " "rating {}", - peer_id.toBase58(), + peer_id, rating); self->disconnectFromPeer(peer_id); return; @@ -111,29 +111,31 @@ namespace kagome::network { } }); - identify_->onIdentifyReceived( - [wp = weak_from_this()](const PeerId &peer_id) { - if (auto self = wp.lock()) { - SL_DEBUG(self->log_, - "Identify received from peer_id={}", - peer_id.toBase58()); - if (auto rating = self->peer_rating_repository_->rating(peer_id); - rating < 0) { - SL_DEBUG( - self->log_, - "Disconnecting from peer_id={} due to its negative rating {}", - peer_id.toBase58(), - rating); - self->disconnectFromPeer(peer_id); - return; - } - self->processFullyConnectedPeer(peer_id); - } - }); + identify_->onIdentifyReceived([wp = weak_from_this()]( + const PeerId &peer_id) { + if (auto self = wp.lock()) { + SL_DEBUG(self->log_, "Identify received from peer {}", peer_id); + if (auto rating = self->peer_rating_repository_->rating(peer_id); + rating < 0) { + SL_DEBUG(self->log_, + "Disconnecting from peer {} due to its negative rating {}", + peer_id, + rating); + self->disconnectFromPeer(peer_id); + return; + } + self->processFullyConnectedPeer(peer_id); + } + }); // Start Identify protocol identify_->start(); + // Enqueue bootstrap nodes with permanent lifetime + for (const auto &bootstrap_node : bootstrap_nodes_) { + kademlia_->addPeer(bootstrap_node, true); + } + // Enqueue last active peers as first peers set but with limited lifetime auto last_active_peers = loadLastActivePeers(); SL_DEBUG(log_, @@ -143,15 +145,10 @@ namespace kagome::network { kademlia_->addPeer(peer_info, false); } - // Enqueue bootstrap nodes with permanent lifetime - for (const auto &bootstrap_node : bootstrap_nodes_) { - kademlia_->addPeer(bootstrap_node, true); - } - // Start Kademlia (processing incoming message and random walking) kademlia_->start(); - // Do first aligning of peers count + // Do first alignment of peers count align(); return true; @@ -210,11 +207,27 @@ namespace kagome::network { } for (const auto &peer_id : peers_to_disconnect) { SL_DEBUG(log_, - "Disconnecting from peer_id={} due to its negative rating", - peer_id.toBase58()); + "Disconnecting from peer {} due to its negative rating", + peer_id); disconnectFromPeer(peer_id); } +// // Check if disconnected +// auto block_announce_protocol = router_->getBlockAnnounceProtocol(); +// BOOST_ASSERT_MSG(block_announce_protocol, +// "Router did not provide block announce protocol"); +// +// for (auto it = active_peers_.begin(); it != active_peers_.end();) { +// auto [peer_id, data] = *it++; +// // TODO(d.khaustov) consider better alive check logic +// if (not stream_engine_->isAlive(peer_id, block_announce_protocol)) { +// // Found disconnected +// const auto &peer_id_ref = peer_id; +// SL_DEBUG(log_, "Found dead peer: {}", peer_id_ref); +// disconnectFromPeer(peer_id); +// } +// } +// // Soft limit is exceeded if (active_peers_.size() > soft_limit) { // Get oldest peer @@ -234,8 +247,7 @@ namespace kagome::network { } else if (oldest_descr.time_point + peer_ttl < clock_->now()) { // Peer is inactive long time auto &oldest_peer_id_ref = oldest_peer_id; - SL_DEBUG( - log_, "Found inactive peer_id={}", oldest_peer_id_ref.toBase58()); + SL_DEBUG(log_, "Found inactive peer: {}", oldest_peer_id_ref); disconnectFromPeer(oldest_peer_id); } else { @@ -299,19 +311,19 @@ namespace kagome::network { auto peer_info = host_.getPeerRepository().getPeerInfo(peer_id); if (peer_info.addresses.empty()) { - SL_DEBUG(log_, "Not found addresses for peer_id={}", peer_id.toBase58()); + SL_DEBUG(log_, "Not found addresses for peer {}", peer_id); connecting_peers_.erase(peer_id); return; } auto connectedness = host_.connectedness(peer_info); if (connectedness == libp2p::Host::Connectedness::CAN_NOT_CONNECT) { - SL_DEBUG(log_, "Can not connect to peer_id={}", peer_id.toBase58()); + SL_DEBUG(log_, "Can not connect to peer {}", peer_id); connecting_peers_.erase(peer_id); return; } - SL_DEBUG(log_, "Try to connect to peer_id={}", peer_info.id.toBase58()); + SL_DEBUG(log_, "Try to connect to peer {}", peer_info.id); for (auto addr : peer_info.addresses) { SL_DEBUG(log_, " address: {}", addr.getStringAddress()); } @@ -326,8 +338,8 @@ namespace kagome::network { if (not res.has_value()) { SL_DEBUG(self->log_, - "Connecting to peer_id={} is failed: {}", - peer_id.toBase58(), + "Connecting to peer {} is failed: {}", + peer_id, res.error().message()); self->connecting_peers_.erase(peer_id); return; @@ -336,16 +348,17 @@ namespace kagome::network { auto &connection = res.value(); auto remote_peer_id_res = connection->remotePeer(); if (not remote_peer_id_res.has_value()) { - SL_DEBUG(self->log_, - "Connected, but not identified yet (expecting peer_id={})", - peer_id.toBase58()); + SL_DEBUG( + self->log_, + "Connected, but not identified yet (expecting peer_id={:l})", + peer_id); self->connecting_peers_.erase(peer_id); return; } auto &remote_peer_id = remote_peer_id_res.value(); if (remote_peer_id == peer_id) { - SL_DEBUG(self->log_, "Connected to peer_id={}", peer_id.toBase58()); + SL_DEBUG(self->log_, "Connected to peer {}", peer_id); self->processFullyConnectedPeer(peer_id); } @@ -356,7 +369,7 @@ namespace kagome::network { void PeerManagerImpl::disconnectFromPeer(const PeerId &peer_id) { auto it = active_peers_.find(peer_id); if (it != active_peers_.end()) { - SL_DEBUG(log_, "Disconnect from peer_id={}", peer_id.toBase58()); + SL_DEBUG(log_, "Disconnect from peer {}", peer_id); stream_engine_->del(peer_id); active_peers_.erase(it); sync_peer_num_->set(active_peers_.size()); @@ -385,7 +398,7 @@ namespace kagome::network { if (conn == nullptr) { SL_DEBUG(log_, "Failed to start pinging {}: No connection to this peer exists", - peer_id.toBase58()); + peer_id); return; } auto [_, is_emplaced] = pinging_connections_.emplace(conn); @@ -396,7 +409,7 @@ namespace kagome::network { SL_DEBUG(log_, "Start pinging of {} (conn={})", - peer_id.toBase58(), + peer_id, static_cast(conn.get())); ping_protocol->startPinging( @@ -408,7 +421,7 @@ namespace kagome::network { if (session_res.has_error()) { SL_DEBUG(self->log_, "Stop pinging of {} (conn={}): {}", - peer_id.toBase58(), + peer_id, static_cast(conn.get()), session_res.error().message()); self->pinging_connections_.erase(conn); @@ -416,7 +429,7 @@ namespace kagome::network { } else { SL_DEBUG(self->log_, "Pinging: {} (conn={}) is alive", - peer_id.toBase58(), + peer_id, static_cast(conn.get())); self->keepAlive(peer_id); } @@ -481,8 +494,8 @@ namespace kagome::network { BOOST_ASSERT(queue_to_connect_.size() == peers_in_queue_.size()); SL_DEBUG(log_, - "New peer_id={} enqueued. In queue: {}", - peer_id.toBase58(), + "New peer_id enqueued: {:l}. In queue: {}", + peer_id, queue_to_connect_.size()); } @@ -563,7 +576,7 @@ namespace kagome::network { if (not stream_res.has_value()) { self->log_->warn("Unable to create stream {} with {}: {}", protocol->protocol(), - peer_id.toBase58(), + peer_id, stream_res.error().message()); self->connecting_peers_.erase(peer_id); self->disconnectFromPeer(peer_id); @@ -609,7 +622,7 @@ namespace kagome::network { SL_DEBUG(log_, "Stream {} with {} is alive", block_announce_protocol->protocol(), - peer_id.toBase58()); + peer_id); connecting_peers_.erase(peer_id); } diff --git a/core/network/impl/protocols/block_announce_protocol.cpp b/core/network/impl/protocols/block_announce_protocol.cpp index 8bc88f9e2a..e73ff3d605 100644 --- a/core/network/impl/protocols/block_announce_protocol.cpp +++ b/core/network/impl/protocols/block_announce_protocol.cpp @@ -42,9 +42,9 @@ namespace kagome::network { if (auto self = wp.lock()) { if (auto peer_id = stream->remotePeerId()) { SL_TRACE(self->log_, - "Handled {} protocol stream from: {}", + "Handled {} protocol stream from {}", self->protocol_, - peer_id.value().toBase58()); + peer_id.value()); self->onIncomingStream(std::forward(stream)); return; } @@ -102,7 +102,7 @@ namespace kagome::network { self->log_, "Handshake failed on incoming {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, res.error().message()); stream->reset(); return; @@ -113,7 +113,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Can't register incoming {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, res.error().message()); stream->reset(); return; @@ -125,7 +125,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Fully established incoming {} stream with {}", self->protocol_, - peer_id.toBase58()); + peer_id); }); } @@ -275,7 +275,9 @@ namespace kagome::network { return; } - read_writer->write(status_res.value(), + const auto &status = status_res.value(); + + read_writer->write(status, [stream = std::move(stream), direction, wp = weak_from_this(), diff --git a/core/network/impl/protocols/grandpa_protocol.cpp b/core/network/impl/protocols/grandpa_protocol.cpp index eecf4ea578..0c26bfa652 100644 --- a/core/network/impl/protocols/grandpa_protocol.cpp +++ b/core/network/impl/protocols/grandpa_protocol.cpp @@ -26,14 +26,16 @@ namespace kagome::network { std::shared_ptr grandpa_observer, const OwnPeerInfo &own_info, std::shared_ptr stream_engine, - std::shared_ptr peer_manager) + std::shared_ptr peer_manager, + std::shared_ptr scheduler) : host_(host), io_context_(std::move(io_context)), app_config_(app_config), grandpa_observer_(std::move(grandpa_observer)), own_info_(own_info), stream_engine_(std::move(stream_engine)), - peer_manager_(std::move(peer_manager)) { + peer_manager_(std::move(peer_manager)), + scheduler_(std::move(scheduler)) { const_cast(protocol_) = kGrandpaProtocol; } @@ -175,8 +177,8 @@ namespace kagome::network { self->peer_manager_->getPeerState(self->own_info_.id); if (own_peer_state.has_value()) { self->neighbor(GrandpaNeighborMessage{ - .round_number = own_peer_state->round_number, - .voter_set_id = own_peer_state->set_id, + .round_number = own_peer_state->round_number.value_or(1), + .voter_set_id = own_peer_state->set_id.value_or(0), .last_finalized = own_peer_state->last_finalized}); } @@ -302,26 +304,32 @@ namespace kagome::network { auto peer_id = stream->remotePeerId().value(); auto &grandpa_message = grandpa_message_res.value(); - SL_VERBOSE(self->log_, "Message has received from {}", peer_id); - visit_in_place( grandpa_message, [&](const network::GrandpaVote &vote_message) { + SL_VERBOSE(self->log_, "VoteMessage has received from {}", peer_id); self->grandpa_observer_->onVoteMessage(peer_id, vote_message); }, [&](const FullCommitMessage &commit_message) { + SL_VERBOSE( + self->log_, "CommitMessage has received from {}", peer_id); self->grandpa_observer_->onCommitMessage(peer_id, commit_message); }, [&](const GrandpaNeighborMessage &neighbor_message) { + SL_VERBOSE( + self->log_, "NeighborMessage has received from {}", peer_id); self->grandpa_observer_->onNeighborMessage(peer_id, neighbor_message); - self->peer_manager_->updatePeerState(peer_id, neighbor_message); }, [&](const network::CatchUpRequest &catch_up_request) { + SL_VERBOSE( + self->log_, "CatchUpRequest has received from {}", peer_id); self->grandpa_observer_->onCatchUpRequest(peer_id, catch_up_request); }, [&](const network::CatchUpResponse &catch_up_response) { + SL_VERBOSE( + self->log_, "CatchUpResponse has received from {}", peer_id); self->grandpa_observer_->onCatchUpResponse(peer_id, catch_up_response); }); @@ -341,7 +349,7 @@ namespace kagome::network { if (not info_opt.has_value()) { SL_DEBUG(log_, "Vote signed by {} with set_id={} in round={} " - "has not been sent to {}: peen is not connected", + "has not been sent to {}: peer is not connected", msg.id(), msg.counter, msg.round_number, @@ -350,6 +358,17 @@ namespace kagome::network { } const auto &info = info_opt.value(); + if (not info.set_id.has_value() or not info.round_number.has_value()) { + SL_DEBUG(log_, + "Vote signed by {} with set_id={} in round={} " + "has not been sent to {}: set id or round number unknown", + msg.id(), + msg.counter, + msg.round_number, + peer_id); + return false; + } + // If a peer is at a given voter set, it is impolite to send messages // from an earlier voter set. It is extremely impolite to send messages // from a future voter set. @@ -361,13 +380,13 @@ namespace kagome::network { msg.counter, msg.round_number, peer_id, - info.set_id); + info.set_id.value()); return false; } // If a peer is at round r, is impolite to send messages about r-2 or // earlier - if (msg.round_number + 2 < info.round_number) { + if (msg.round_number + 2 < info.round_number.value()) { SL_DEBUG( log_, "Vote signed by {} with set_id={} in round={} " @@ -376,21 +395,21 @@ namespace kagome::network { msg.counter, msg.round_number, peer_id, - info.round_number); + info.round_number.value()); return false; } // If a peer is at round r, is extremely impolite to send messages about // r+1 or later - if (msg.round_number > info.round_number) { + if (msg.round_number > info.round_number.value()) { SL_DEBUG(log_, "Vote signed by {} with set_id={} in round={} " - "has not been sent to {} as impolite: their round too old: {}", + "has not been sent to {} as impolite: their round is old: {}", msg.id(), msg.counter, msg.round_number, peer_id, - info.round_number); + info.round_number.value()); return false; } @@ -439,7 +458,7 @@ namespace kagome::network { if (not info_opt.has_value()) { SL_DEBUG(log_, "Commit with set_id={} in round={} " - "has not been sent to {}: peen is not connected", + "has not been sent to {}: peer is not connected", set_id, round_number, peer_id); @@ -447,6 +466,16 @@ namespace kagome::network { } const auto &info = info_opt.value(); + if (not info.set_id.has_value() or not info.round_number.has_value()) { + SL_DEBUG(log_, + "Commit with set_id={} in round={} " + "has not been sent to {}: set id or round number unknown", + set_id, + round_number, + peer_id); + return false; + } + // It is especially impolite to send commits which are invalid, or from // a different Set ID than the receiving peer has indicated. if (set_id != info.set_id) { @@ -456,12 +485,12 @@ namespace kagome::network { set_id, round_number, peer_id, - info.set_id); + info.set_id.value()); return false; } // Don't send commit if that has not actual for remote peer already - if (round_number < info.round_number) { + if (round_number < info.round_number.value()) { SL_DEBUG( log_, "Commit with set_id={} in round={} " @@ -469,7 +498,7 @@ namespace kagome::network { set_id, round_number, peer_id, - info.round_number); + info.round_number.value()); return false; } @@ -483,7 +512,7 @@ namespace kagome::network { set_id, round_number, peer_id, - info.round_number); + info.round_number.value()); return false; } @@ -505,15 +534,17 @@ namespace kagome::network { void GrandpaProtocol::catchUpRequest(const libp2p::peer::PeerId &peer_id, CatchUpRequest &&catch_up_request) { - SL_DEBUG(log_, - "Send catch-up request: beginning with grandpa round number {}", - catch_up_request.round_number); + SL_DEBUG( + log_, + "Send catch-up-request to {} beginning with grandpa round number {}", + peer_id, + catch_up_request.round_number); auto info_opt = peer_manager_->getPeerState(peer_id); if (not info_opt.has_value()) { SL_DEBUG(log_, "Catch-up-request with set_id={} in round={} " - "has not been sent to {}: peen is not connected", + "has not been sent to {}: peer is not connected", catch_up_request.voter_set_id, catch_up_request.round_number, peer_id); @@ -521,7 +552,17 @@ namespace kagome::network { } const auto &info = info_opt.value(); - /// Impolite to send a catch up request to a peer in a new different Set ID. + if (not info.set_id.has_value() or not info.round_number.has_value()) { + SL_DEBUG(log_, + "Catch-up-request with set_id={} in round={} " + "has not been sent to {}: set id or round number unknown", + catch_up_request.voter_set_id, + catch_up_request.round_number, + peer_id); + return; + } + + // Impolite to send a catch up request to a peer in a new different Set ID. if (catch_up_request.voter_set_id != info.set_id) { SL_DEBUG(log_, "Catch-up-request with set_id={} in round={} " @@ -532,9 +573,9 @@ namespace kagome::network { return; } - /// It is impolite to send a catch up request for a round `R` to a peer - /// whose announced view is behind `R`. - if (catch_up_request.round_number < info.round_number - 1) { + // It is impolite to send a catch-up request for a round `R` to a peer + // whose announced view is behind `R`. + if (catch_up_request.round_number < info.round_number.value() - 1) { SL_DEBUG(log_, "Catch-up-request with set_id={} in round={} " "has not been sent to {}: too old round for requested", @@ -544,6 +585,29 @@ namespace kagome::network { return; } + auto fingerprint = 0;//catch_up_request.fingerprint(); + auto [_, ok] = recent_catchup_requests_.emplace(peer_id, fingerprint); + + // It is impolite to replay a catch-up request + if (not ok) { + SL_DEBUG(log_, + "Catch-up-request with set_id={} in round={} " + "has not been sent to {}: impolite to replay catch-up request", + catch_up_request.voter_set_id, + catch_up_request.round_number, + peer_id); + return; + } + + scheduler_->schedule( + [wp = weak_from_this(), peer_id, fingerprint] { + if (auto self = wp.lock()) { + self->recent_catchup_requests_.erase( + std::tuple(peer_id, fingerprint)); + } + }, + kRecentnessDuration); + auto shared_msg = KAGOME_EXTRACT_SHARED_CACHE(GrandpaProtocol, GrandpaMessage); (*shared_msg) = GrandpaMessage(std::move(catch_up_request)); @@ -561,7 +625,7 @@ namespace kagome::network { if (not info_opt.has_value()) { SL_DEBUG(log_, "Catch-up-response with set_id={} in round={} " - "has not been sent to {}: peen is not connected", + "has not been sent to {}: peer is not connected", catch_up_response.voter_set_id, catch_up_response.round_number, peer_id); @@ -569,14 +633,25 @@ namespace kagome::network { } const auto &info = info_opt.value(); + if (not info.set_id.has_value() or not info.round_number.has_value()) { + SL_DEBUG(log_, + "Catch-up-response with set_id={} in round={} " + "has not been sent to {}: set id or round number unknown", + catch_up_response.voter_set_id, + catch_up_response.round_number, + peer_id); + return; + } + /// Impolite to send a catch up request to a peer in a new different Set ID. if (catch_up_response.voter_set_id != info.set_id) { SL_DEBUG(log_, "Catch-up-response with set_id={} in round={} " - "has not been sent to {}: different set id", + "has not been sent to {}: {} set id", catch_up_response.voter_set_id, catch_up_response.round_number, - peer_id); + peer_id, + info.set_id.has_value() ? "different" : "unknown"); return; } diff --git a/core/network/impl/protocols/grandpa_protocol.hpp b/core/network/impl/protocols/grandpa_protocol.hpp index 251e51296a..f054cf168b 100644 --- a/core/network/impl/protocols/grandpa_protocol.hpp +++ b/core/network/impl/protocols/grandpa_protocol.hpp @@ -10,6 +10,7 @@ #include +#include #include #include @@ -48,7 +49,8 @@ namespace kagome::network { std::shared_ptr grandpa_observer, const OwnPeerInfo &own_info, std::shared_ptr stream_engine, - std::shared_ptr peer_manager); + std::shared_ptr peer_manager, + std::shared_ptr scheduler); const Protocol &protocol() const override { return protocol_; @@ -94,6 +96,12 @@ namespace kagome::network { const int &msg, std::function>)> &&cb); + /// Node should send catch-up requests rarely to be polite, because + /// processing of them consume more enough resources. + /// How long replying outgoing catch-up requests must be suppressed + static constexpr std::chrono::milliseconds kRecentnessDuration = + std::chrono::seconds(300); + libp2p::Host &host_; std::shared_ptr io_context_; const application::AppConfiguration &app_config_; @@ -101,7 +109,12 @@ namespace kagome::network { const OwnPeerInfo &own_info_; std::shared_ptr stream_engine_; std::shared_ptr peer_manager_; + std::shared_ptr scheduler_; const libp2p::peer::Protocol protocol_; + + std::set> + recent_catchup_requests_; + log::Logger log_ = log::createLogger("GrandpaProtocol", "grandpa_protocol"); }; diff --git a/core/network/impl/protocols/propagate_transactions_protocol.cpp b/core/network/impl/protocols/propagate_transactions_protocol.cpp index 8b1cfb490e..36f3ecba42 100644 --- a/core/network/impl/protocols/propagate_transactions_protocol.cpp +++ b/core/network/impl/protocols/propagate_transactions_protocol.cpp @@ -54,9 +54,9 @@ namespace kagome::network { if (auto self = wp.lock()) { if (auto peer_id = stream->remotePeerId()) { SL_TRACE(self->log_, - "Handled {} protocol stream from: {}", + "Handled {} protocol stream from {}", self->protocol_, - peer_id.value().toBase58()); + peer_id.value()); self->onIncomingStream(std::forward(stream)); return; } @@ -92,7 +92,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Handshake failed on incoming {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, res.error().message()); stream->reset(); return; @@ -103,7 +103,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Can't register incoming {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, res.error().message()); stream->reset(); return; @@ -112,7 +112,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Fully established incoming {} stream with {}", self->protocol_, - peer_id.toBase58()); + peer_id); }); } @@ -134,7 +134,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Can't create outgoing {} stream with {}: {}", self->protocol_, - peer_id.toBase58(), + peer_id, stream_res.error().message()); cb(stream_res.as_failure()); return; @@ -153,7 +153,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Handshake failed on outgoing {} stream with {}: {}", self->protocol_, - stream->remotePeerId().value().toBase58(), + stream->remotePeerId().value(), res.error().message()); stream->reset(); cb(res.as_failure()); @@ -165,7 +165,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Can't register outgoing {} stream with {}: {}", self->protocol_, - stream->remotePeerId().value().toBase58(), + stream->remotePeerId().value(), res.error().message()); stream->reset(); cb(res.as_failure()); @@ -175,7 +175,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Fully established outgoing {} stream with {}", self->protocol_, - stream->remotePeerId().value().toBase58()); + stream->remotePeerId().value()); cb(std::move(stream)); }; @@ -203,7 +203,7 @@ namespace kagome::network { if (not remote_handshake_res.has_value()) { SL_VERBOSE(self->log_, "Can't read handshake from {}: {}", - stream->remotePeerId().value().toBase58(), + stream->remotePeerId().value(), remote_handshake_res.error().message()); stream->reset(); cb(remote_handshake_res.as_failure()); @@ -212,7 +212,7 @@ namespace kagome::network { SL_TRACE(self->log_, "Handshake has received from {}", - stream->remotePeerId().value().toBase58()); + stream->remotePeerId().value()); switch (direction) { case Direction::OUTGOING: @@ -247,7 +247,7 @@ namespace kagome::network { if (not write_res.has_value()) { SL_VERBOSE(self->log_, "Can't send handshake to {}: {}", - stream->remotePeerId().value().toBase58(), + stream->remotePeerId().value(), write_res.error().message()); stream->reset(); cb(write_res.as_failure()); @@ -256,7 +256,7 @@ namespace kagome::network { SL_TRACE(self->log_, "Handshake has sent to {}", - stream->remotePeerId().value().toBase58()); + stream->remotePeerId().value()); switch (direction) { case Direction::OUTGOING: @@ -287,8 +287,8 @@ namespace kagome::network { if (not message_res.has_value()) { SL_VERBOSE(self->log_, - "Can't read grandpa message from {}: {}", - stream->remotePeerId().value().toBase58(), + "Can't read propagated transactions from {}: {}", + stream->remotePeerId().value(), message_res.error().message()); stream->reset(); return; @@ -300,7 +300,7 @@ namespace kagome::network { SL_VERBOSE(self->log_, "Received {} propagated transactions from {}", message.extrinsics.size(), - peer_id.toBase58()); + peer_id); if (self->babe_->wasSynchronized()) { for (auto &ext : message.extrinsics) { diff --git a/core/network/impl/protocols/protocol_factory.cpp b/core/network/impl/protocols/protocol_factory.cpp index db56c8b5d3..da28008561 100644 --- a/core/network/impl/protocols/protocol_factory.cpp +++ b/core/network/impl/protocols/protocol_factory.cpp @@ -19,7 +19,8 @@ namespace kagome::network { extrinsic_events_engine, std::shared_ptr ext_event_key_repo, - std::shared_ptr peer_rating_repository) + std::shared_ptr peer_rating_repository, + std::shared_ptr scheduler) : host_(host), app_config_(app_config), chain_spec_(chain_spec), @@ -29,13 +30,15 @@ namespace kagome::network { stream_engine_(std::move(stream_engine)), extrinsic_events_engine_{std::move(extrinsic_events_engine)}, ext_event_key_repo_{std::move(ext_event_key_repo)}, - peer_rating_repository_{std::move(peer_rating_repository)} { + peer_rating_repository_{std::move(peer_rating_repository)}, + scheduler_{std::move(scheduler)} { BOOST_ASSERT(io_context_ != nullptr); BOOST_ASSERT(hasher_ != nullptr); BOOST_ASSERT(stream_engine_ != nullptr); BOOST_ASSERT(extrinsic_events_engine_ != nullptr); BOOST_ASSERT(ext_event_key_repo_ != nullptr); BOOST_ASSERT(peer_rating_repository_ != nullptr); + BOOST_ASSERT(scheduler_ != nullptr); } std::shared_ptr @@ -57,7 +60,8 @@ namespace kagome::network { grandpa_observer_.lock(), own_info_, stream_engine_, - peer_manager_.lock()); + peer_manager_.lock(), + scheduler_); } std::shared_ptr diff --git a/core/network/impl/protocols/protocol_factory.hpp b/core/network/impl/protocols/protocol_factory.hpp index 81eefa1907..f7659b7f4c 100644 --- a/core/network/impl/protocols/protocol_factory.hpp +++ b/core/network/impl/protocols/protocol_factory.hpp @@ -17,6 +17,8 @@ #include "network/rating_repository.hpp" #include "primitives/event_types.hpp" +#include + namespace kagome::network { class ProtocolFactory final { @@ -33,7 +35,8 @@ namespace kagome::network { extrinsic_events_engine, std::shared_ptr ext_event_key_repo, - std::shared_ptr peer_rating_repository); + std::shared_ptr peer_rating_repository, + std::shared_ptr scheduler); void setBlockTree( const std::shared_ptr &block_tree) { @@ -92,6 +95,7 @@ namespace kagome::network { std::shared_ptr ext_event_key_repo_; std::shared_ptr peer_rating_repository_; + std::shared_ptr scheduler_; std::weak_ptr block_tree_; std::weak_ptr babe_; diff --git a/core/network/impl/router_libp2p.cpp b/core/network/impl/router_libp2p.cpp index 6554e12224..b40e69c078 100644 --- a/core/network/impl/router_libp2p.cpp +++ b/core/network/impl/router_libp2p.cpp @@ -46,7 +46,8 @@ namespace kagome::network { [wp = weak_from_this()](auto &&stream) { if (auto self = wp.lock()) { if (auto peer_id = stream->remotePeerId()) { - self->log_->info( + SL_TRACE( + self->log_, "Handled {} protocol stream from: {}", self->ping_protocol_->getProtocolId(), peer_id.value().toBase58()); diff --git a/core/network/impl/stream_engine.hpp b/core/network/impl/stream_engine.hpp index f5e5df9ac0..4adf98f6b7 100644 --- a/core/network/impl/stream_engine.hpp +++ b/core/network/impl/stream_engine.hpp @@ -163,12 +163,12 @@ namespace kagome::network { is_incoming ? stream : nullptr, is_outgoing ? stream : nullptr}); SL_DEBUG(logger_, - "Added {} {} stream with peer_id={}", + "Added {} {} stream with peer {}", direction == Direction::INCOMING ? "incoming" : direction == Direction::OUTGOING ? "outgoing" : "bidirectional", protocol->protocol(), - peer_id.toBase58()); + peer_id); } return outcome::success(); }); @@ -204,9 +204,9 @@ namespace kagome::network { if (reserved) { SL_DEBUG(logger_, - "Reserved {} stream with {}", + "Reserved {} stream with peer {}", protocol->protocol(), - peer_id.toBase58()); + peer_id); } } @@ -248,6 +248,24 @@ namespace kagome::network { }); } + bool isAlive(const PeerId &peer_id, + const std::shared_ptr &protocol) { + BOOST_ASSERT(protocol); + return streams_.exclusiveAccess([&](auto &streams) { + bool is_alive = false; + forSubscriber(peer_id, streams, protocol, [&](auto, auto &descr) { + if (descr.incoming.stream and not descr.incoming.stream->isClosed()) { + is_alive = true; + } + if (descr.outgoing.stream and not descr.outgoing.stream->isClosed()) { + is_alive = true; + } + }); + + return is_alive; + }); + }; + template void send(const PeerId &peer_id, const std::shared_ptr &protocol, @@ -366,14 +384,14 @@ namespace kagome::network { dst = src; SL_DEBUG(logger_, - "{} {} stream with peer_id={} was {}", + "{} {} stream with peer {} was {}", direction == Direction::BIDIRECTIONAL ? "Bidirectional" : direction == Direction::INCOMING ? "Incoming" : "Outgoing", protocol->protocol(), dst->remotePeerId().has_value() - ? dst->remotePeerId().value().toBase58() - : "{no PeerId}", + ? fmt::format("{}", dst->remotePeerId().value()) + : "without PeerId", replaced ? "replaced" : "stored"); } @@ -434,7 +452,7 @@ namespace kagome::network { logger_->debug("DUMP: vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"); logger_->debug("DUMP: {}", msg); forEachPeer([&](const auto &peer_id, auto const &proto_map) { - logger_->debug("DUMP: Peer {}", peer_id.toBase58()); + logger_->debug("DUMP: Peer {}", peer_id); for (auto const &[protocol, descr] : proto_map) { logger_->debug("DUMP: Protocol {}", protocol); logger_->debug("DUMP: I={} O={} Messages:{}", @@ -462,11 +480,20 @@ namespace kagome::network { if (!stream_res) { self->logger_->error( - "Could not send message to {} stream with {}: {}", + "Could not send message to new {} stream with {}: {}", protocol->protocol(), peer_id, stream_res.error().message()); + if (stream_res + == outcome::failure( + std::make_error_code(std::errc::not_connected))) { + self->logger_->error("GOTCHA!"); // FIXME + + self->del(peer_id); + return; + } + self->streams_.exclusiveAccess([&](auto &streams) { self->forSubscriber( peer_id, streams, protocol, [&](auto, auto &descr) { diff --git a/core/network/impl/sync_protocol_observer_impl.cpp b/core/network/impl/sync_protocol_observer_impl.cpp index 1c64b62020..d215d28853 100644 --- a/core/network/impl/sync_protocol_observer_impl.cpp +++ b/core/network/impl/sync_protocol_observer_impl.cpp @@ -10,6 +10,7 @@ #include "application/app_configuration.hpp" #include "network/common.hpp" #include "network/helpers/peer_id_formatter.hpp" +#include "primitives/common.hpp" OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, SyncProtocolObserverImpl::Error, @@ -68,17 +69,37 @@ namespace kagome::network { if (response.blocks.empty()) { SL_DEBUG(log_, "Return response id={}: no blocks", request_id); } else if (response.blocks.size() == 1) { - SL_DEBUG(log_, - "Return response id={}: {}, count 1", - request_id, - response.blocks.front().hash); + if (response.blocks.front().header.has_value()) { + SL_DEBUG(log_, + "Return response id={}: {}, count 1", + request_id, + primitives::BlockInfo(response.blocks.front().header->number, + response.blocks.front().hash)); + } else { + SL_DEBUG(log_, + "Return response id={}: {}, count 1", + request_id, + response.blocks.front().hash); + } } else { - SL_DEBUG(log_, - "Return response id={}: from {} to {}, count {}", - request_id, - response.blocks.front().hash, - response.blocks.back().hash, - response.blocks.size()); + if (response.blocks.front().header.has_value() + and response.blocks.back().header.has_value()) { + SL_DEBUG(log_, + "Return response id={}: from {} to {}, count {}", + request_id, + primitives::BlockInfo(response.blocks.front().header->number, + response.blocks.front().hash), + primitives::BlockInfo(response.blocks.back().header->number, + response.blocks.back().hash), + response.blocks.size()); + } else { + SL_DEBUG(log_, + "Return response id={}: from {} to {}, count {}", + request_id, + response.blocks.front().hash, + response.blocks.back().hash, + response.blocks.size()); + } } requested_ids_.erase(request_id); @@ -141,12 +162,18 @@ namespace kagome::network { auto header_res = blocks_headers_->getBlockHeader(hash); if (header_res) { new_block.header = std::move(header_res.value()); + } else { + response.blocks.pop_back(); + break; } } if (body_needed) { auto body_res = block_tree_->getBlockBody(hash); if (body_res) { new_block.body = std::move(body_res.value()); + } else { + response.blocks.pop_back(); + break; } } if (justification_needed) { diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index f9dd167ae9..3fe0ad7b5b 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -297,6 +297,36 @@ namespace kagome::network { false); } + void SynchronizerImpl::syncMissingJustifications( + const PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + Synchronizer::SyncResultHandler &&handler) { + if (busy_peers_.find(peer_id) != busy_peers_.end()) { + SL_DEBUG( + log_, + "Justifications load since block {} was rescheduled, peer {} is busy", + target_block, + peer_id); + scheduler_->schedule([wp = weak_from_this(), + peer_id, + block = std::move(target_block), + limit = std::move(limit), + handler = std::move(handler)]() mutable { + auto self = wp.lock(); + if (not self) { + return; + } + self->syncMissingJustifications( + peer_id, std::move(block), std::move(limit), std::move(handler)); + }); + return; + } + + loadJustifications( + peer_id, std::move(target_block), std::move(limit), std::move(handler)); + } + void SynchronizerImpl::findCommonBlock( const libp2p::peer::PeerId &peer_id, primitives::BlockNumber lower, @@ -331,14 +361,7 @@ namespace kagome::network { return; } - scheduler_->schedule( - [wp = weak_from_this(), peer_id, request_fingerprint] { - if (auto self = wp.lock()) { - self->recent_requests_.erase( - std::tuple(peer_id, request_fingerprint)); - } - }, - kRecentnessDuration); + scheduleRecentRequestRemoval(peer_id, request_fingerprint); auto response_handler = [wp = weak_from_this(), lower, @@ -346,8 +369,8 @@ namespace kagome::network { target = hint, peer_id, handler = std::move(handler), - observed = std::move(observed)]( - auto &&response_res) mutable { + observed = std::move(observed), + request_fingerprint](auto &&response_res) mutable { auto self = wp.lock(); if (not self) { return; @@ -379,6 +402,7 @@ namespace kagome::network { upper - 1, peer_id); handler(Error::EMPTY_RESPONSE); + self->recent_requests_.erase(std::tuple(peer_id, request_fingerprint)); return; } @@ -511,14 +535,7 @@ namespace kagome::network { return; } - scheduler_->schedule( - [wp = weak_from_this(), peer_id, request_fingerprint] { - if (auto self = wp.lock()) { - self->recent_requests_.erase( - std::tuple(peer_id, request_fingerprint)); - } - }, - kRecentnessDuration); + scheduleRecentRequestRemoval(peer_id, request_fingerprint); auto response_handler = [wp = weak_from_this(), from, @@ -712,6 +729,122 @@ namespace kagome::network { protocol->request(peer_id, std::move(request), std::move(response_handler)); } + void SynchronizerImpl::loadJustifications(const libp2p::peer::PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler) { + if (node_is_shutting_down_) { + if (handler) handler(Error::SHUTTING_DOWN); + return; + } + + busy_peers_.insert(peer_id); + auto cleanup = gsl::finally([this, peer_id] { + auto peer = busy_peers_.find(peer_id); + if (peer != busy_peers_.end()) { + busy_peers_.erase(peer); + } + }); + + BlocksRequest request{ + BlockAttribute::HEADER | BlockAttribute::JUSTIFICATION, + target_block.hash, + std::nullopt, + Direction::ASCENDING, + limit}; + + auto request_fingerprint = request.fingerprint(); + if (not recent_requests_.emplace(peer_id, request_fingerprint).second) { + SL_ERROR( + log_, + "Can't load justification from {} for block {}: {}", + peer_id, + target_block, + outcome::result(Error::DUPLICATE_REQUEST).error().message()); + if (handler) { + handler(Error::DUPLICATE_REQUEST); + } + return; + } + + scheduleRecentRequestRemoval(peer_id, request_fingerprint); + + auto response_handler = [wp = weak_from_this(), + peer_id, + target_block, + handler = std::move(handler)]( + auto &&response_res) mutable { + auto self = wp.lock(); + if (not self) { + return; + } + + if (response_res.has_error()) { + SL_ERROR(self->log_, + "Can't load justification from {} for block {}: {}", + peer_id, + target_block, + response_res.error().message()); + if (handler) { + handler(response_res.as_failure()); + } + return; + } + + auto &blocks = response_res.value().blocks; + + if (blocks.empty()) { + SL_ERROR(self->log_, + "Can't load block justification from {} for block {}: " + "Response does not have any contents", + peer_id, + target_block); + if (handler) handler(Error::EMPTY_RESPONSE); + return; + } + + bool justification_received = false; + BlockInfo last_justified_block; + for (auto &block : blocks) { + if (not block.header) { + SL_ERROR(self->log_, + "No header was provided from {} for block {} while " + "requesting justifications", + peer_id, + target_block); + if (handler) handler(Error::RESPONSE_WITHOUT_BLOCK_HEADER); + return; + } + if (block.justification) { + justification_received = true; + last_justified_block = + primitives::BlockInfo{block.header->number, block.hash}; + { + std::lock_guard lock(self->justifications_mutex_); + self->justifications_.emplace(last_justified_block, + *block.justification); + } + } + } + + if (justification_received) { + SL_TRACE(self->log_, "Enqueued new justifications: schedule applying"); + self->scheduler_->schedule([wp] { + if (auto self = wp.lock()) { + self->applyNextJustification(); + } + }); + } + if (handler) { + handler(last_justified_block); + } + }; + + auto protocol = router_->getSyncProtocol(); + BOOST_ASSERT_MSG(protocol, "Router did not provide sync protocol"); + protocol->request(peer_id, std::move(request), std::move(response_handler)); + } + void SynchronizerImpl::applyNextBlock() { if (generations_.empty()) { SL_TRACE(log_, "No block for applying"); @@ -818,6 +951,41 @@ namespace kagome::network { }); } + void SynchronizerImpl::applyNextJustification() { + // Operate over the same lock as for the whole blocks application + bool false_val = false; + if (not applying_in_progress_.compare_exchange_strong(false_val, true)) { + SL_TRACE(log_, "Applying justification in progress"); + return; + } + SL_TRACE(log_, "Begin justification applying"); + auto cleanup = gsl::finally([this] { + SL_TRACE(log_, "End justification applying"); + applying_in_progress_ = false; + }); + + std::queue justifications; + { + std::lock_guard lock(justifications_mutex_); + justifications.swap(justifications_); + } + + while (not justifications.empty()) { + auto [block_info, justification] = std::move(justifications.front()); + const auto &block = block_info; // SL_WARN compilation WA + justifications.pop(); + auto res = block_executor_->applyJustification(block_info, justification); + if (res.has_error()) { + SL_WARN(log_, + "Justification for block {} was not applied: {}", + block, + res.error().message()); + } else { + SL_TRACE(log_, "Applied justification for block {}", block); + } + } + } + size_t SynchronizerImpl::discardBlock( const primitives::BlockHash &hash_of_discarding_block) { std::queue queue; @@ -879,6 +1047,18 @@ namespace kagome::network { metric_import_queue_length_->set(known_blocks_.size()); } + void SynchronizerImpl::scheduleRecentRequestRemoval( + const libp2p::peer::PeerId &peer_id, + const BlocksRequest::Fingerprint &fingerprint) { + scheduler_->schedule( + [wp = weak_from_this(), peer_id, fingerprint] { + if (auto self = wp.lock()) { + self->recent_requests_.erase(std::tuple(peer_id, fingerprint)); + } + }, + kRecentnessDuration); + } + void SynchronizerImpl::askNextPortionOfBlocks() { bool false_val = false; if (not asking_blocks_portion_in_progress_.compare_exchange_strong( diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index 2b377870fb..01c60d1577 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -8,6 +8,7 @@ #include "network/synchronizer.hpp" +#include #include #include @@ -49,7 +50,7 @@ namespace kagome::network { ALREADY_IN_QUEUE, PEER_BUSY, ARRIVED_TOO_EARLY, - DUPLICATE_REQUEST + DUPLICATE_REQUEST, }; SynchronizerImpl( @@ -78,6 +79,12 @@ namespace kagome::network { const libp2p::peer::PeerId &peer_id, SyncResultHandler &&handler) override; + // See loadJustifications + void syncMissingJustifications(const PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler) override; + /// Finds best common block with peer {@param peer_id} in provided interval. /// It is using tail-recursive algorithm, till {@param hint} is /// the needed block @@ -100,6 +107,14 @@ namespace kagome::network { primitives::BlockInfo from, SyncResultHandler &&handler); + /// Loads block justification from {@param peer_id} for {@param + /// target_block} or a range of blocks up to {@param upper_bound_block}. + /// Calls {@param handler} when operation finishes + void loadJustifications(const libp2p::peer::PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler); + private: /// Subscribes handler for block with provided {@param block_info} /// {@param handler} will be called When block is received or discarded @@ -117,6 +132,9 @@ namespace kagome::network { /// Pops next block from queue and tries to apply that void applyNextBlock(); + /// Pops next justification from queue and tries to apply it + void applyNextJustification(); + /// Removes block {@param block} and all all dependent on it from the queue /// @returns number of affected blocks size_t discardBlock(const primitives::BlockHash &block); @@ -125,6 +143,12 @@ namespace kagome::network { /// side-branch for provided finalized block {@param finalized_block} void prune(const primitives::BlockInfo &finalized_block); + /// Purges internal cache of recent requests for specified {@param peer_id} + /// and {@param fingerprint} after kRecentnessDuration timeout + void scheduleRecentRequestRemoval( + const libp2p::peer::PeerId &peer_id, + const BlocksRequest::Fingerprint &fingerprint); + std::shared_ptr block_tree_; std::shared_ptr block_executor_; std::shared_ptr router_; @@ -157,6 +181,12 @@ namespace kagome::network { std::unordered_multimap ancestry_; + // Loaded justifications to apply + using JustificationPair = + std::pair; + std::queue justifications_; + std::mutex justifications_mutex_; + // BlockNumber of blocks (aka height) that is potentially best now primitives::BlockNumber watched_blocks_number_{}; @@ -170,7 +200,8 @@ namespace kagome::network { std::atomic_bool asking_blocks_portion_in_progress_ = false; std::set busy_peers_; - std::set> recent_requests_; + std::set> + recent_requests_; }; } // namespace kagome::network diff --git a/core/network/peer_manager.hpp b/core/network/peer_manager.hpp index ca2c911c25..ddd7787ff3 100644 --- a/core/network/peer_manager.hpp +++ b/core/network/peer_manager.hpp @@ -21,8 +21,8 @@ namespace kagome::network { clock::SteadyClock::TimePoint time; Roles roles = 0; BlockInfo best_block = {0, {}}; - RoundNumber round_number = 0; - MembershipCounter set_id = 0; + std::optional round_number = std::nullopt; + std::optional set_id = std::nullopt; BlockNumber last_finalized = 0; }; diff --git a/core/network/synchronizer.hpp b/core/network/synchronizer.hpp index ff90ff4408..b8f9871539 100644 --- a/core/network/synchronizer.hpp +++ b/core/network/synchronizer.hpp @@ -40,6 +40,15 @@ namespace kagome::network { virtual bool syncByBlockHeader(const primitives::BlockHeader &header, const libp2p::peer::PeerId &peer_id, SyncResultHandler &&handler) = 0; + + /// Tries to request block justifications from {@param peer_id} for {@param + /// target_block} or a range of blocks up to {@param limit} count. + /// Calls {@param handler} when operation finishes + virtual void syncMissingJustifications( + const libp2p::peer::PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler) = 0; }; } // namespace kagome::network diff --git a/core/network/types/grandpa_message.hpp b/core/network/types/grandpa_message.hpp index 7f284f3899..4a3cf787fe 100644 --- a/core/network/types/grandpa_message.hpp +++ b/core/network/types/grandpa_message.hpp @@ -78,6 +78,15 @@ namespace kagome::network { struct CatchUpRequest { RoundNumber round_number; MembershipCounter voter_set_id; + + using Fingerprint = size_t; + + inline Fingerprint fingerprint() const { + auto result = std::hash()(round_number); + + boost::hash_combine(result, std::hash()(voter_set_id)); + return result; + }; }; template applyBlock(primitives::BlockData &&block) override { return applyBlock(block); } + + MOCK_METHOD(outcome::result, + applyJustification, + (const primitives::BlockInfo &block_info, + const primitives::Justification &justification), + (override)); }; } // namespace kagome::consensus diff --git a/test/mock/core/network/synchronizer_mock.hpp b/test/mock/core/network/synchronizer_mock.hpp index 8028256d3a..f91ece9278 100644 --- a/test/mock/core/network/synchronizer_mock.hpp +++ b/test/mock/core/network/synchronizer_mock.hpp @@ -39,6 +39,20 @@ namespace kagome::network { SyncResultHandler &&handler) override { return syncByBlockHeader(block_header, peer_id, handler); }; + + MOCK_METHOD(void, + syncMissingJustifications, + (const libp2p::peer::PeerId &, + primitives::BlockInfo, + std::optional, + const SyncResultHandler &), + ()); + void syncMissingJustifications(const libp2p::peer::PeerId &peer_id, + primitives::BlockInfo target_block, + std::optional limit, + SyncResultHandler &&handler) override { + return syncMissingJustifications(peer_id, target_block, limit, handler); + } }; } // namespace kagome::network