Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/deffered send #1833

Merged
merged 18 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ endif()

if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
link_libraries(atomic)
elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
link_libraries(atomic)
endif()

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
Expand Down
7 changes: 5 additions & 2 deletions core/consensus/grandpa/catch_up_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <libp2p/peer/peer_id.hpp>

#include "consensus/grandpa/grandpa_context.hpp"
#include "network/peer_manager.hpp"
#include "network/types/grandpa_message.hpp"

namespace kagome::consensus::grandpa {
Expand All @@ -24,8 +25,10 @@ namespace kagome::consensus::grandpa {
* Handler of grandpa catch-up-request messages
* @param msg catch-up-request messages
*/
virtual void onCatchUpRequest(const libp2p::peer::PeerId &peer_id,
network::CatchUpRequest &&msg) = 0;
virtual void onCatchUpRequest(
const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info,
network::CatchUpRequest &&msg) = 0;

/**
* Handler of grandpa catch-up-response messages
Expand Down
54 changes: 34 additions & 20 deletions core/consensus/grandpa/impl/grandpa_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,15 @@ namespace kagome::consensus::grandpa {
}
}

void GrandpaImpl::onNeighborMessage(const libp2p::peer::PeerId &peer_id,
network::GrandpaNeighborMessage &&msg) {
REINVOKE(
*internal_thread_context_, onNeighborMessage, peer_id, std::move(msg));
void GrandpaImpl::onNeighborMessage(
const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info,
network::GrandpaNeighborMessage &&msg) {
REINVOKE(*internal_thread_context_,
onNeighborMessage,
peer_id,
std::move(info),
std::move(msg));

BOOST_ASSERT(internal_thread_context_->isInCurrentThread());
SL_DEBUG(logger_,
Expand All @@ -431,13 +436,13 @@ namespace kagome::consensus::grandpa {
msg.last_finalized,
peer_id);

auto info = peer_manager_->getPeerState(peer_id);
std::optional<VoterSetId> info_set;
std::optional<RoundNumber> info_round;
// copy values before `updatePeerState`

if (info) {
info_set = info->get().set_id;
info_round = info->get().round_number;
info_set = info->set_id;
info_round = info->round_number;
}

bool reputation_changed = false;
Expand Down Expand Up @@ -524,7 +529,7 @@ namespace kagome::consensus::grandpa {
return;
}

if (info->get().last_finalized > block_tree_->getLastFinalized().number) {
if (msg.last_finalized > block_tree_->getLastFinalized().number) {
// Trying to substitute with justifications' request only
main_thread_context_.execute([wself{weak_from_this()},
peer_id,
Expand Down Expand Up @@ -560,14 +565,18 @@ namespace kagome::consensus::grandpa {
}
}

void GrandpaImpl::onCatchUpRequest(const libp2p::peer::PeerId &peer_id,
network::CatchUpRequest &&msg) {
REINVOKE(
*internal_thread_context_, onCatchUpRequest, peer_id, std::move(msg));
void GrandpaImpl::onCatchUpRequest(
const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info_opt,
network::CatchUpRequest &&msg) {
REINVOKE(*internal_thread_context_,
onCatchUpRequest,
peer_id,
std::move(info_opt),
std::move(msg));

auto info_opt = peer_manager_->getPeerState(peer_id);
if (not info_opt.has_value() or not info_opt->get().set_id.has_value()
or not info_opt->get().round_number.has_value()) {
if (not info_opt.has_value() or not info_opt->set_id.has_value()
or not info_opt->round_number.has_value()) {
SL_DEBUG(logger_,
"Catch-up request to round #{} received from {} was rejected: "
"we are not have our view about remote peer",
Expand All @@ -577,7 +586,7 @@ namespace kagome::consensus::grandpa {
peer_id, network::reputation::cost::OUT_OF_SCOPE_MESSAGE);
return;
}
const auto &info = info_opt->get();
const auto &info = *info_opt;

// Check if request is corresponding our view about remote peer by set id
if (msg.voter_set_id != info.set_id.value()) {
Expand Down Expand Up @@ -934,16 +943,17 @@ namespace kagome::consensus::grandpa {
void GrandpaImpl::onVoteMessage(
std::optional<std::shared_ptr<GrandpaContext>> &&existed_context,
const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info,
const VoteMessage &msg) {
REINVOKE(*internal_thread_context_,
onVoteMessage,
std::move(existed_context),
peer_id,
std::move(info),
msg);

auto info = peer_manager_->getPeerState(peer_id);
if (not info.has_value() or not info->get().set_id.has_value()
or not info->get().round_number.has_value()) {
if (not info.has_value() or not info->set_id.has_value()
or not info->round_number.has_value()) {
SL_DEBUG(
logger_,
"{} signed by {} with set_id={} in round={} has received from {} "
Expand Down Expand Up @@ -1502,7 +1512,11 @@ namespace kagome::consensus::grandpa {
if (grandpa_context->vote.has_value()) {
auto const &peer_id = grandpa_context->peer_id.value();
auto const &vote = grandpa_context->vote.value();
self->onVoteMessage(std::move(grandpa_context), peer_id, vote);
auto info = self->peer_manager_->getPeerState(peer_id);
self->onVoteMessage(std::move(grandpa_context),
peer_id,
compactFromRefToOwn(info),
vote);
} else if (grandpa_context->catch_up_response.has_value()) {
auto const &peer_id = grandpa_context->peer_id.value();
auto const &catch_up_response =
Expand Down
3 changes: 3 additions & 0 deletions core/consensus/grandpa/impl/grandpa_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ namespace kagome::consensus::grandpa {
* @param msg received grandpa neighbour message
*/
void onNeighborMessage(const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info_opt,
network::GrandpaNeighborMessage &&msg) override;

// Catch-up methods
Expand All @@ -155,6 +156,7 @@ namespace kagome::consensus::grandpa {
* @param msg network message containing catch up request
*/
void onCatchUpRequest(const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info,
network::CatchUpRequest &&msg) override;

/**
Expand Down Expand Up @@ -190,6 +192,7 @@ namespace kagome::consensus::grandpa {
void onVoteMessage(
std::optional<std::shared_ptr<GrandpaContext>> &&existed_context,
const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info_opt,
const network::VoteMessage &msg) override;

/**
Expand Down
6 changes: 4 additions & 2 deletions core/consensus/grandpa/neighbor_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ namespace kagome::consensus::grandpa {
* Handler of grandpa neighbor messages
* @param msg neighbor messages
*/
virtual void onNeighborMessage(const libp2p::peer::PeerId &peer_id,
network::GrandpaNeighborMessage &&msg) = 0;
virtual void onNeighborMessage(
const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info_opt,
network::GrandpaNeighborMessage &&msg) = 0;
};

} // namespace kagome::consensus::grandpa
2 changes: 2 additions & 0 deletions core/consensus/grandpa/round_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#pragma once

#include "consensus/grandpa/grandpa_context.hpp"
#include "network/peer_manager.hpp"

namespace libp2p::peer {
class PeerId;
Expand Down Expand Up @@ -37,6 +38,7 @@ namespace kagome::consensus::grandpa {
virtual void onVoteMessage(
std::optional<std::shared_ptr<GrandpaContext>> &&existed_context,
const libp2p::peer::PeerId &peer_id,
std::optional<network::PeerStateCompact> &&info_opt,
const VoteMessage &msg) = 0;

/**
Expand Down
92 changes: 90 additions & 2 deletions core/network/helpers/stream_read_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,102 @@ namespace libp2p::connection {
} // namespace libp2p::connection

namespace kagome::network {
struct StreamWrapper final : libp2p::connection::Stream {
std::shared_ptr<libp2p::connection::StreamReadBuffer> stream_;
log::Logger logger_ = log::createLogger("Stream", "network");
const std::thread::id this_id_{std::this_thread::get_id()};

void check() const {
BOOST_ASSERT(this_id_ == std::this_thread::get_id());
}

StreamWrapper(std::shared_ptr<libp2p::connection::StreamReadBuffer> stream)
: stream_{std::move(stream)} {}

bool isClosedForRead() const {
return stream_->isClosedForRead();
}

bool isClosedForWrite() const {
return stream_->isClosedForWrite();
}

bool isClosed() const {
return stream_->isClosed();
}

void close(VoidResultHandlerFunc cb) {
check();
stream_->close(std::move(cb));
}

void reset() {
check();
stream_->reset();
}

void adjustWindowSize(uint32_t new_size, VoidResultHandlerFunc cb) {
stream_->adjustWindowSize(new_size, std::move(cb));
}

outcome::result<bool> isInitiator() const {
return stream_->isInitiator();
}

outcome::result<libp2p::peer::PeerId> remotePeerId() const {
return stream_->remotePeerId();
}

outcome::result<libp2p::multi::Multiaddress> localMultiaddr() const {
return stream_->localMultiaddr();
}

outcome::result<libp2p::multi::Multiaddress> remoteMultiaddr() const {
return stream_->remoteMultiaddr();
}

void read(gsl::span<uint8_t> out, size_t bytes, ReadCallbackFunc cb) {
check();
stream_->read(out, bytes, std::move(cb));
}

void readSome(gsl::span<uint8_t> out, size_t bytes, ReadCallbackFunc cb) {
check();
stream_->readSome(out, bytes, std::move(cb));
}

void deferReadCallback(outcome::result<size_t> res, ReadCallbackFunc cb) {
stream_->deferReadCallback(std::move(res), std::move(cb));
}

void write(gsl::span<const uint8_t> in,
size_t bytes,
WriteCallbackFunc cb) {
check();
stream_->write(in, bytes, std::move(cb));
}

void writeSome(gsl::span<const uint8_t> in,
size_t bytes,
WriteCallbackFunc cb) {
check();
stream_->writeSome(in, bytes, std::move(cb));
}

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) {
stream_->deferWriteCallback(ec, std::move(cb));
}
};

/**
* Wrap stream from `setProtocolHandler`.
* Makes reading from stream buffered.
*/
inline void streamReadBuffer(libp2p::StreamAndProtocol &result) {
constexpr size_t kBuffer{1 << 16};
result.stream = std::make_shared<libp2p::connection::StreamReadBuffer>(
std::move(result.stream), kBuffer);
result.stream = std::make_shared<StreamWrapper>(
std::make_shared<libp2p::connection::StreamReadBuffer>(
std::move(result.stream), kBuffer));
}

/**
Expand Down
11 changes: 8 additions & 3 deletions core/network/impl/protocols/grandpa_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ namespace kagome::network {
[&](network::GrandpaVote &&vote_message) {
SL_VERBOSE(
base_.logger(), "VoteMessage has received from {}", peer_id);
grandpa_observer_->onVoteMessage(std::nullopt, peer_id, vote_message);
auto info = peer_manager_->getPeerState(peer_id);
grandpa_observer_->onVoteMessage(
std::nullopt, peer_id, compactFromRefToOwn(info), vote_message);
addKnown(peer_id, hash);
},
[&](FullCommitMessage &&commit_message) {
Expand All @@ -146,15 +148,18 @@ namespace kagome::network {
SL_VERBOSE(base_.logger(),
"NeighborMessage has received from {}",
peer_id);
auto info = peer_manager_->getPeerState(peer_id);
grandpa_observer_->onNeighborMessage(peer_id,
compactFromRefToOwn(info),
std::move(neighbor_message));
}
},
[&](network::CatchUpRequest &&catch_up_request) {
SL_VERBOSE(
base_.logger(), "CatchUpRequest has received from {}", peer_id);
grandpa_observer_->onCatchUpRequest(peer_id,
std::move(catch_up_request));
auto info = peer_manager_->getPeerState(peer_id);
grandpa_observer_->onCatchUpRequest(
peer_id, compactFromRefToOwn(info), std::move(catch_up_request));
},
[&](network::CatchUpResponse &&catch_up_response) {
SL_VERBOSE(
Expand Down
1 change: 1 addition & 0 deletions core/network/notifications/connect_and_handshake.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ namespace kagome::network::notifications {
notifications::handshake(
std::move(stream), std::move(frame_stream), handshake, std::move(cb));
};

base.host().newStream(peer, base.protocolIds(), std::move(cb));
}
} // namespace kagome::network::notifications
22 changes: 22 additions & 0 deletions core/network/peer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ namespace kagome::network {

using OurView = network::View;

struct PeerStateCompact {
std::optional<RoundNumber> round_number;
std::optional<VoterSetId> set_id;
BlockNumber last_finalized;
};

struct PeerState {
clock::SteadyClock::TimePoint time;
Roles roles = 0;
Expand All @@ -57,8 +63,24 @@ namespace kagome::network {
LruSet<common::Hash256> known_grandpa_messages{
kPeerStateMaxKnownGrandpaMessages,
};

PeerStateCompact compact() const {
return PeerStateCompact{
.round_number = round_number,
.set_id = set_id,
.last_finalized = last_finalized,
};
}
};

inline std::optional<PeerStateCompact> compactFromRefToOwn(
const std::optional<std::reference_wrapper<PeerState>> &opt_ref) {
if (opt_ref) {
return opt_ref->get().compact();
}
return std::nullopt;
}

struct StreamEngine;

/**
Expand Down
Loading