diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 4d7ab7b1e6..1d5a1f5d72 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -19,7 +19,7 @@ -### Possible Drawbacks +### Possible Drawbacks diff --git a/core/network/impl/peer_manager_impl.cpp b/core/network/impl/peer_manager_impl.cpp index d4a7510781..a43a7d4d6f 100644 --- a/core/network/impl/peer_manager_impl.cpp +++ b/core/network/impl/peer_manager_impl.cpp @@ -532,28 +532,29 @@ namespace kagome::network { void PeerManagerImpl::updatePeerState( const PeerId &peer_id, const BlockAnnounceHandshake &handshake) { - auto [it, is_new] = peer_states_.emplace(peer_id, PeerState{}); - it->second.time = clock_->now(); - it->second.roles = handshake.roles; - it->second.best_block = handshake.best_block; + auto &state = peer_states_[peer_id]; + state.time = clock_->now(); + state.roles = handshake.roles; + state.best_block = handshake.best_block; } void PeerManagerImpl::updatePeerState(const PeerId &peer_id, const BlockAnnounce &announce) { auto hash = hasher_->blake2b_256(scale::encode(announce.header).value()); - auto [it, _] = peer_states_.emplace(peer_id, PeerState{}); - it->second.time = clock_->now(); - it->second.best_block = {announce.header.number, hash}; + auto &state = peer_states_[peer_id]; + state.time = clock_->now(); + state.best_block = {announce.header.number, hash}; + state.known_blocks.add(hash); } void PeerManagerImpl::updatePeerState( const PeerId &peer_id, const GrandpaNeighborMessage &neighbor_message) { - auto [it, _] = peer_states_.emplace(peer_id, PeerState{}); - it->second.time = clock_->now(); - it->second.round_number = neighbor_message.round_number; - it->second.set_id = neighbor_message.voter_set_id; - it->second.last_finalized = neighbor_message.last_finalized; + auto &state = peer_states_[peer_id]; + state.time = clock_->now(); + state.round_number = neighbor_message.round_number; + state.set_id = neighbor_message.voter_set_id; + state.last_finalized = neighbor_message.last_finalized; } std::optional> diff --git a/core/network/impl/protocols/block_announce_protocol.cpp b/core/network/impl/protocols/block_announce_protocol.cpp index 87f1958f9b..4c7a19c4b7 100644 --- a/core/network/impl/protocols/block_announce_protocol.cpp +++ b/core/network/impl/protocols/block_announce_protocol.cpp @@ -24,6 +24,7 @@ namespace kagome::network { std::shared_ptr stream_engine, std::shared_ptr block_tree, std::shared_ptr observer, + std::shared_ptr hasher, std::shared_ptr peer_manager) : base_(kBlockAnnounceProtocolName, host, @@ -34,6 +35,7 @@ namespace kagome::network { stream_engine_(std::move(stream_engine)), block_tree_(std::move(block_tree)), observer_(std::move(observer)), + hasher_(std::move(hasher)), peer_manager_(std::move(peer_manager)) { BOOST_ASSERT(stream_engine_ != nullptr); BOOST_ASSERT(block_tree_ != nullptr); @@ -372,6 +374,8 @@ namespace kagome::network { } void BlockAnnounceProtocol::blockAnnounce(BlockAnnounce &&announce) { + auto hash = hasher_->blake2b_256(scale::encode(announce.header).value()); + auto shared_msg = KAGOME_EXTRACT_SHARED_CACHE(BlockAnnounceProtocol, BlockAnnounce); (*shared_msg) = std::move(announce); @@ -380,11 +384,10 @@ namespace kagome::network { base_.logger(), "Send announce of block #{}", announce.header.number); stream_engine_->broadcast( - shared_from_this(), - shared_msg, - RandomGossipStrategy{ - stream_engine_->outgoingStreamsNumber(shared_from_this()), - app_config_.luckyPeers()}); + shared_from_this(), shared_msg, [&](const PeerId &peer) { + auto state = peer_manager_->getPeerState(peer); + return state and state->get().known_blocks.add(hash); + }); } } // namespace kagome::network diff --git a/core/network/impl/protocols/block_announce_protocol.hpp b/core/network/impl/protocols/block_announce_protocol.hpp index 59ac32a888..7c0091215a 100644 --- a/core/network/impl/protocols/block_announce_protocol.hpp +++ b/core/network/impl/protocols/block_announce_protocol.hpp @@ -52,6 +52,7 @@ namespace kagome::network { std::shared_ptr stream_engine, std::shared_ptr block_tree, std::shared_ptr observer, + std::shared_ptr hasher, std::shared_ptr peer_manager); bool start() override; @@ -91,13 +92,14 @@ namespace kagome::network { void readAnnounce(std::shared_ptr stream); - const static inline auto kBlockAnnounceProtocolName = + inline static const auto kBlockAnnounceProtocolName = "BlockAnnounceProtocol"s; ProtocolBaseImpl base_; const application::AppConfiguration &app_config_; std::shared_ptr stream_engine_; std::shared_ptr block_tree_; std::shared_ptr observer_; + std::shared_ptr hasher_; std::shared_ptr peer_manager_; }; diff --git a/core/network/peer_manager.hpp b/core/network/peer_manager.hpp index 27318247eb..1f06b2e024 100644 --- a/core/network/peer_manager.hpp +++ b/core/network/peer_manager.hpp @@ -18,9 +18,11 @@ #include "network/types/grandpa_message.hpp" #include "outcome/outcome.hpp" #include "primitives/common.hpp" +#include "utils/lru.hpp" #include "utils/non_copyable.hpp" namespace kagome::network { + constexpr size_t kPeerStateMaxKnownBlocks = 1024; struct CollatorState { network::ParachainId parachain_id; @@ -50,6 +52,7 @@ namespace kagome::network { BlockNumber last_finalized = 0; std::optional collator_state = std::nullopt; std::optional view; + LruSet known_blocks{kPeerStateMaxKnownBlocks}; }; struct StreamEngine; diff --git a/core/utils/lru.hpp b/core/utils/lru.hpp new file mode 100644 index 0000000000..161e30bf13 --- /dev/null +++ b/core/utils/lru.hpp @@ -0,0 +1,151 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KAGOME_UTILS_LRU_HPP +#define KAGOME_UTILS_LRU_HPP + +#include +#include + +namespace kagome { + /** + * `std::unordered_map` with LRU. + */ + template + class Lru { + public: + struct ItFwd; + + struct Item { + V v; + ItFwd more, less; + }; + + using Map = std::unordered_map; + using It = typename Map::iterator; + + struct ItFwd : It { + operator bool() const { + return *this != It{}; + } + }; + + Lru(size_t capacity) : capacity_{capacity} { + if (capacity_ == 0) { + throw std::length_error{"Lru(capacity=0)"}; + } + } + + Lru(const Lru &) = delete; + void operator=(const Lru &) = delete; + + size_t capacity() const { + return capacity_; + } + + size_t size() const { + return map_.size(); + } + + std::optional> get(const K &k) { + auto it = map_.find(k); + if (it == map_.end()) { + return std::nullopt; + } + lru_use(it); + return std::ref(it->second.v); + } + + V &put(const K &k, V v) { + return put2(k, std::move(v)).first->second.v; + } + + private: + std::pair put2(const K &k, V &&v) { + auto it = map_.find(k); + if (it == map_.end()) { + if (map_.size() >= capacity_) { + lru_pop(); + } + it = map_.emplace(k, Item{std::move(v), {}, {}}).first; + lru_push(it); + return {it, true}; + } + it->second.v = std::move(v); + lru_use(it); + return {it, false}; + } + + void lru_use(It it) { + if (it == most_) { + return; + } + lru_extract(it->second); + lru_push(it); + } + + void lru_push(It it) { + BOOST_ASSERT(not it->second.less); + BOOST_ASSERT(not it->second.more); + it->second.less = most_; + if (most_) { + most_->second.more = ItFwd{it}; + } + most_ = ItFwd{it}; + if (not least_) { + least_ = most_; + } + } + + void lru_extract(Item &v) { + if (v.more) { + v.more->second.less = v.less; + } else { + most_ = v.less; + } + if (v.less) { + v.less->second.more = v.more; + } else { + least_ = v.more; + } + v.more = {}; + v.less = {}; + } + + void lru_pop() { + auto it = least_; + lru_extract(it->second); + map_.erase(it); + } + + Map map_; + size_t capacity_; + ItFwd most_, least_; + + template + friend class LruSet; + }; + + template + class LruSet { + public: + LruSet(size_t capacity) : lru_{capacity} {} + + bool has(const K &k) { + return lru_.get(k).has_value(); + } + + bool add(const K &k) { + return lru_.put2(k, {}).second; + } + + private: + struct V {}; + + Lru lru_; + }; +} // namespace kagome + +#endif // KAGOME_UTILS_LRU_HPP