diff --git a/core/network/impl/protocols/grandpa_protocol.cpp b/core/network/impl/protocols/grandpa_protocol.cpp index f729b1818c..abe1af8cfd 100644 --- a/core/network/impl/protocols/grandpa_protocol.cpp +++ b/core/network/impl/protocols/grandpa_protocol.cpp @@ -53,6 +53,10 @@ namespace kagome::network { return base_.start(weak_from_this()); } + void GrandpaProtocol::stop() { + stream_engine_->del(own_info_.id, shared_from_this()); + } + const ProtocolName &GrandpaProtocol::protocolName() const { return base_.protocolName(); } diff --git a/core/network/impl/protocols/grandpa_protocol.hpp b/core/network/impl/protocols/grandpa_protocol.hpp index fa16595cfa..877a365963 100644 --- a/core/network/impl/protocols/grandpa_protocol.hpp +++ b/core/network/impl/protocols/grandpa_protocol.hpp @@ -53,11 +53,8 @@ namespace kagome::network { const blockchain::GenesisBlockHash &genesis_hash, std::shared_ptr scheduler); - /** - * Sets handler for `parytytech/grandpa/1` protocol - * @return true if handler set successfully - */ bool start() override; + void stop(); const std::string &protocolName() const override; @@ -78,7 +75,7 @@ namespace kagome::network { CatchUpResponse &&catch_up_response); private: - const static inline auto kGrandpaProtocolName = "GrandpaProtocol"s; + inline static const auto kGrandpaProtocolName = "GrandpaProtocol"s; enum class Direction { INCOMING, OUTGOING }; void readHandshake(std::shared_ptr stream, Direction direction, diff --git a/core/network/impl/router_libp2p.cpp b/core/network/impl/router_libp2p.cpp index eb192ec1e6..b1f4038c1b 100644 --- a/core/network/impl/router_libp2p.cpp +++ b/core/network/impl/router_libp2p.cpp @@ -68,23 +68,23 @@ namespace kagome::network { } bool RouterLibp2p::prepare() { - block_announce_protocol_.get()->start(); - grandpa_protocol_.get()->start(); - - sync_protocol_.get()->start(); - state_protocol_.get()->start(); - warp_protocol_.get()->start(); - light_protocol_.get()->start(); - - propagate_transactions_protocol_.get()->start(); - - collation_protocol_.get()->start(); - validation_protocol_.get()->start(); - req_collation_protocol_.get()->start(); - req_pov_protocol_.get()->start(); - fetch_chunk_protocol_.get()->start(); - fetch_available_data_protocol_.get()->start(); - statement_fetching_protocol_.get()->start(); + app_state_manager_->takeControl(*block_announce_protocol_.get()); + app_state_manager_->takeControl(*grandpa_protocol_.get()); + + app_state_manager_->takeControl(*sync_protocol_.get()); + app_state_manager_->takeControl(*state_protocol_.get()); + app_state_manager_->takeControl(*warp_protocol_.get()); + app_state_manager_->takeControl(*light_protocol_.get()); + + app_state_manager_->takeControl(*propagate_transactions_protocol_.get()); + + app_state_manager_->takeControl(*collation_protocol_.get()); + app_state_manager_->takeControl(*validation_protocol_.get()); + app_state_manager_->takeControl(*req_collation_protocol_.get()); + app_state_manager_->takeControl(*req_pov_protocol_.get()); + app_state_manager_->takeControl(*fetch_chunk_protocol_.get()); + app_state_manager_->takeControl(*fetch_available_data_protocol_.get()); + app_state_manager_->takeControl(*statement_fetching_protocol_.get()); host_.setProtocolHandler( {ping_protocol_.get()->getProtocolId()}, diff --git a/core/network/impl/stream_engine.cpp b/core/network/impl/stream_engine.cpp index 8f4239c5d6..920ec83838 100644 --- a/core/network/impl/stream_engine.cpp +++ b/core/network/impl/stream_engine.cpp @@ -95,6 +95,37 @@ namespace kagome::network { }); } + void StreamEngine::del(const PeerId &peer_id, + const std::shared_ptr &protocol) { + SL_TRACE(logger_, + "Remove {} streams from peer.(peer={})", + protocol->protocolName(), + peer_id); + streams_.exclusiveAccess([&](auto &streams) { + if (auto it = streams.find(peer_id); it != streams.end()) { + auto &protocols = it->second; + for (auto protocol_it = protocols.begin(); + protocol_it != protocols.end(); + ++protocol_it) { + if (protocol_it->first == protocol) { + auto &descr = protocol_it->second; + if (descr.incoming.stream) { + descr.incoming.stream->reset(); + } + if (descr.outgoing.stream) { + descr.outgoing.stream->reset(); + } + protocols.erase(protocol_it); + break; + } + } + if (protocols.empty()) { + streams.erase(it); + } + } + }); + } + bool StreamEngine::reserveOutgoing( PeerId const &peer_id, std::shared_ptr const &protocol) { BOOST_ASSERT(protocol); diff --git a/core/network/impl/stream_engine.hpp b/core/network/impl/stream_engine.hpp index f9908f4b8b..c1917ec37f 100644 --- a/core/network/impl/stream_engine.hpp +++ b/core/network/impl/stream_engine.hpp @@ -126,14 +126,17 @@ namespace kagome::network { void del(const PeerId &peer_id); - bool reserveOutgoing(PeerId const &peer_id, - std::shared_ptr const &protocol); + void del(const PeerId &peer_id, + const std::shared_ptr &protocol); - void dropReserveOutgoing(PeerId const &peer_id, - std::shared_ptr const &protocol); + bool reserveOutgoing(const PeerId &peer_id, + const std::shared_ptr &protocol); - bool isAlive(PeerId const &peer_id, - std::shared_ptr const &protocol) const; + void dropReserveOutgoing(const PeerId &peer_id, + const std::shared_ptr &protocol); + + bool isAlive(const PeerId &peer_id, + const std::shared_ptr &protocol) const; template void send(const PeerId &peer_id, @@ -143,7 +146,7 @@ namespace kagome::network { BOOST_ASSERT(protocol != nullptr); bool was_sent = false; - streams_.sharedAccess([&](auto const &streams) { + streams_.sharedAccess([&](const auto &streams) { forPeerProtocol( peer_id, streams, protocol, [&](auto type, auto const &descr) { if (descr.hasActiveOutgoing()) { @@ -223,7 +226,7 @@ namespace kagome::network { template size_t count(F &&filter) const { - return streams_.sharedAccess([&](auto const &streams) { + return streams_.sharedAccess([&](const auto &streams) { size_t result = 0; for (auto const &stream : streams) { if (filter(stream.first)) { @@ -254,7 +257,7 @@ namespace kagome::network { template void forEachPeer(F &&f) const { - streams_.sharedAccess([&](auto const &streams) { + streams_.sharedAccess([&](const auto &streams) { for (auto const &[peer_id, protocol_map] : streams) { std::forward(f)(peer_id, protocol_map); } @@ -332,15 +335,15 @@ namespace kagome::network { using PeerMap = std::map; void uploadStream(std::shared_ptr &dst, - std::shared_ptr const &src, - std::shared_ptr const &protocol, + const std::shared_ptr &src, + const std::shared_ptr &protocol, Direction direction); template - void send(PeerId const &peer_id, - std::shared_ptr const &protocol, + void send(const PeerId &peer_id, + const std::shared_ptr &protocol, std::shared_ptr stream, - std::shared_ptr const &msg) { + const std::shared_ptr &msg) { BOOST_ASSERT(stream != nullptr); auto read_writer = std::make_shared(stream); @@ -376,9 +379,9 @@ namespace kagome::network { } template - static void forPeerProtocol(PeerId const &peer_id, + static void forPeerProtocol(const PeerId &peer_id, PM &streams, - std::shared_ptr const &protocol, + const std::shared_ptr &protocol, F &&f) { if (auto it = streams.find(peer_id); it != streams.end()) { forProtocol(it->second, protocol, [&](auto &descr) { @@ -389,8 +392,8 @@ namespace kagome::network { [[maybe_unused]] void dump(std::string_view msg); - void openOutgoingStream(PeerId const &peer_id, - std::shared_ptr const &protocol, + void openOutgoingStream(const PeerId &peer_id, + const std::shared_ptr &protocol, ProtocolDescr &descr); template