Skip to content

Commit

Permalink
Improves in grandpa (#1302)
Browse files Browse the repository at this point in the history
* fix: initial neighbor message
* fix: establishing outgoing grandpa stream
* fix: condition establishing outgoing grandpa stream

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>
  • Loading branch information
xDimon authored Aug 4, 2022
1 parent 60cd8b4 commit 9c9021c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 12 deletions.
22 changes: 20 additions & 2 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,14 +564,16 @@ namespace kagome::network {
block_announce_protocol->newOutgoingStream(
peer_info,
[wp = weak_from_this(),
peer_id = peer_info.id,
peer_info,
protocol = block_announce_protocol,
connection](auto &&stream_res) {
auto self = wp.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;

self->stream_engine_->dropReserveOutgoing(peer_id, protocol);
if (not stream_res.has_value()) {
self->log_->warn("Unable to create stream {} with {}: {}",
Expand Down Expand Up @@ -617,6 +619,22 @@ namespace kagome::network {

self->reserveStreams(peer_id);
self->startPingingPeer(peer_id);

// Establish outgoing grandpa stream if node synced
auto r_info_opt = self->getPeerState(peer_id);
auto o_info_opt = self->getPeerState(self->own_peer_info_.id);
if (r_info_opt.has_value() and o_info_opt.has_value()) {
auto &r_info = r_info_opt.value();
auto &o_info = o_info_opt.value();

if (r_info.best_block.number <= o_info.best_block.number) {
auto grandpa_protocol = self->router_->getGrandpaProtocol();
BOOST_ASSERT_MSG(grandpa_protocol,
"Router did not provide grandpa protocol");
grandpa_protocol->newOutgoingStream(peer_info,
[](const auto &...) {});
}
}
});
} else {
SL_DEBUG(log_,
Expand All @@ -630,7 +648,7 @@ namespace kagome::network {
host_.getPeerRepository().getAddressRepository().getAddresses(peer_id);
if (addresses_res.has_value()) {
auto &addresses = addresses_res.value();
PeerInfo peer_info{.id = peer_id, .addresses = std::move(addresses)};
peer_info.addresses = std::move(addresses);
kademlia_->addPeer(peer_info, false);
}
}
Expand Down
44 changes: 36 additions & 8 deletions core/network/impl/protocols/grandpa_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,21 @@ namespace kagome::network {
auto own_peer_state =
self->peer_manager_->getPeerState(self->own_info_.id);
if (own_peer_state.has_value()) {
self->neighbor(GrandpaNeighborMessage{
GrandpaNeighborMessage msg{
.round_number = own_peer_state->round_number.value_or(1),
.voter_set_id = own_peer_state->set_id.value_or(0),
.last_finalized = own_peer_state->last_finalized});
.last_finalized = own_peer_state->last_finalized};

SL_DEBUG(self->log_,
"Send initial neighbor message: grandpa round number {}",
msg.round_number);

auto shared_msg =
KAGOME_EXTRACT_SHARED_CACHE(GrandpaProtocol, GrandpaMessage);
(*shared_msg) = GrandpaMessage(std::move(msg));

self->stream_engine_->send(
stream->remotePeerId().value(), self, std::move(shared_msg));
}

cb(std::move(stream));
Expand Down Expand Up @@ -585,11 +596,28 @@ namespace kagome::network {
return;
}

auto fingerprint = 0;//catch_up_request.fingerprint();
auto [_, ok] = recent_catchup_requests_.emplace(peer_id, fingerprint);
auto round_id = std::tuple(info.round_number.value(), info.set_id.value());

auto [iter_by_round, ok_by_round] =
recent_catchup_requests_by_round_.emplace(round_id);

if (not ok_by_round) {
SL_DEBUG(log_,
"Catch-up-request with set_id={} in round={} "
"has not been sent to {}: "
"the same catch-up request had sent to another peer",
catch_up_request.voter_set_id,
catch_up_request.round_number,
peer_id);
return;
}

auto [iter_by_peer, ok_by_peer] =
recent_catchup_requests_by_peer_.emplace(peer_id);

// It is impolite to replay a catch-up request
if (not ok) {
if (not ok_by_peer) {
recent_catchup_requests_by_round_.erase(iter_by_round);
SL_DEBUG(log_,
"Catch-up-request with set_id={} in round={} "
"has not been sent to {}: impolite to replay catch-up request",
Expand All @@ -600,10 +628,10 @@ namespace kagome::network {
}

scheduler_->schedule(
[wp = weak_from_this(), peer_id, fingerprint] {
[wp = weak_from_this(), round_id, peer_id] {
if (auto self = wp.lock()) {
self->recent_catchup_requests_.erase(
std::tuple(peer_id, fingerprint));
self->recent_catchup_requests_by_round_.erase(round_id);
self->recent_catchup_requests_by_peer_.erase(peer_id);
}
},
kRecentnessDuration);
Expand Down
7 changes: 5 additions & 2 deletions core/network/impl/protocols/grandpa_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ namespace kagome::network {
std::shared_ptr<libp2p::basic::Scheduler> scheduler_;
const libp2p::peer::Protocol protocol_;

std::set<std::tuple<libp2p::peer::PeerId, CatchUpRequest::Fingerprint>>
recent_catchup_requests_;
std::set<std::tuple<consensus::grandpa::RoundNumber,
consensus::grandpa::MembershipCounter>>
recent_catchup_requests_by_round_;

std::set<libp2p::peer::PeerId> recent_catchup_requests_by_peer_;

log::Logger log_ = log::createLogger("GrandpaProtocol", "grandpa_protocol");
};
Expand Down

0 comments on commit 9c9021c

Please sign in to comment.