Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix can disconnect #2149

Merged
merged 11 commits into from
Jul 22, 2024
Merged
9 changes: 6 additions & 3 deletions core/crypto/sr25519/sr25519_provider_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "crypto/sr25519/sr25519_provider_impl.hpp"

#include "crypto/sr25519_types.hpp"
#include "utils/non_null_dangling.hpp"

namespace kagome::crypto {
outcome::result<Sr25519Keypair> Sr25519ProviderImpl::generateKeypair(
Expand Down Expand Up @@ -39,7 +40,7 @@ namespace kagome::crypto {
sr25519_sign(signature.data(),
keypair.public_key.data(),
keypair.secret_key.unsafeBytes().data(),
message.data(),
nonNullDangling(message),
message.size());
} catch (...) {
return Sr25519ProviderError::SIGN_UNKNOWN_ERROR;
Expand Down Expand Up @@ -68,8 +69,10 @@ namespace kagome::crypto {
const Sr25519PublicKey &public_key) const {
bool result = false;
try {
result = sr25519_verify(
signature.data(), message.data(), message.size(), public_key.data());
result = sr25519_verify(signature.data(),
nonNullDangling(message),
message.size(),
public_key.data());
} catch (...) {
return Sr25519ProviderError::SIGN_UNKNOWN_ERROR;
}
Expand Down
2 changes: 1 addition & 1 deletion core/host_api/impl/elliptic_curves_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace kagome::host_api {
std::shared_ptr<const runtime::MemoryProvider> memory_provider,
std::shared_ptr<const crypto::EllipticCurves> elliptic_curves)
: logger_(log::createLogger("EllipticCurvesExtension",
"ecliptic_curves_extension")),
"elliptic_curves_extension")),
memory_provider_(std::move(memory_provider)),
elliptic_curves_(std::move(elliptic_curves)) {
BOOST_ASSERT(memory_provider_ != nullptr);
Expand Down
2 changes: 1 addition & 1 deletion core/host_api/impl/elliptic_curves_extension.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace kagome::crypto {

namespace kagome::host_api {
/**
* Implements extension functions related to ecliptic curves
* Implements extension functions related to elliptic curves
*/
class EllipticCurvesExtension {
public:
Expand Down
1 change: 1 addition & 0 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ namespace {
di::bind<parachain::BitfieldStore>.template to<parachain::BitfieldStoreImpl>(),
di::bind<parachain::BackingStore>.template to<parachain::BackingStoreImpl>(),
di::bind<parachain::BackedCandidatesSource>.template to<parachain::ParachainProcessorImpl>(),
di::bind<network::CanDisconnect>.template to<parachain::ParachainProcessorImpl>(),
di::bind<parachain::Pvf>.template to<parachain::PvfImpl>(),
di::bind<network::CollationObserver>.template to<parachain::ParachainObserverImpl>(),
di::bind<network::ValidationObserver>.template to<parachain::ParachainObserverImpl>(),
Expand Down
1 change: 1 addition & 0 deletions core/log/configurator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ namespace kagome::log {
- name: runtime_api
- name: host_api
children:
- name: elliptic_curves_extension
- name: memory_extension
- name: io_extension
- name: crypto_extension
Expand Down
23 changes: 23 additions & 0 deletions core/network/can_disconnect.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

namespace libp2p::peer {
class PeerId;
} // namespace libp2p::peer
namespace libp2p {
using peer::PeerId;
}

namespace kagome::network {
class CanDisconnect {
public:
virtual ~CanDisconnect() = default;

virtual bool canDisconnect(const libp2p::PeerId &) const = 0;
};
} // namespace kagome::network
9 changes: 5 additions & 4 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <libp2p/protocol/ping.hpp>

#include "common/main_thread_pool.hpp"
#include "network/can_disconnect.hpp"
#include "network/impl/protocols/beefy_protocol_impl.hpp"
#include "network/impl/protocols/grandpa_protocol.hpp"
#include "network/impl/protocols/parachain_protocols.hpp"
Expand Down Expand Up @@ -90,6 +91,7 @@ namespace kagome::network {
std::shared_ptr<storage::SpacedStorage> storage,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ReputationRepository> reputation_repository,
LazySPtr<CanDisconnect> can_disconnect,
std::shared_ptr<PeerView> peer_view)
: log_{log::createLogger("PeerManager", "network")},
host_(host),
Expand All @@ -107,6 +109,7 @@ namespace kagome::network {
storage_{storage->getSpace(storage::Space::kDefault)},
hasher_{std::move(hasher)},
reputation_repository_{std::move(reputation_repository)},
can_disconnect_{std::move(can_disconnect)},
peer_view_{std::move(peer_view)} {
BOOST_ASSERT(identify_ != nullptr);
BOOST_ASSERT(kademlia_ != nullptr);
Expand Down Expand Up @@ -327,10 +330,8 @@ namespace kagome::network {

for (const auto &[peer_id, desc] : active_peers_) {
// Skip peer having immunity
if (auto it = peer_states_.find(peer_id); it != peer_states_.end()) {
if (not it->second.can_be_disconnected()) {
continue;
}
if (not can_disconnect_.get()->canDisconnect(peer_id)) {
continue;
}

const uint64_t last_activity_ms =
Expand Down
4 changes: 4 additions & 0 deletions core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "clock/clock.hpp"
#include "consensus/grandpa/voting_round.hpp"
#include "crypto/hasher.hpp"
#include "injector/lazy.hpp"
#include "log/logger.hpp"
#include "metrics/metrics.hpp"
#include "network/impl/protocols/block_announce_protocol.hpp"
Expand All @@ -45,6 +46,7 @@ namespace kagome {
}

namespace kagome::network {
class CanDisconnect;

enum class PeerType { PEER_TYPE_IN = 0, PEER_TYPE_OUT };

Expand Down Expand Up @@ -76,6 +78,7 @@ namespace kagome::network {
std::shared_ptr<storage::SpacedStorage> storage,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ReputationRepository> reputation_repository,
LazySPtr<CanDisconnect> can_disconnect,
std::shared_ptr<PeerView> peer_view);

/** @see poolHandlerReadyMake */
Expand Down Expand Up @@ -204,6 +207,7 @@ namespace kagome::network {
std::shared_ptr<storage::BufferStorage> storage_;
std::shared_ptr<crypto::Hasher> hasher_;
std::shared_ptr<ReputationRepository> reputation_repository_;
LazySPtr<CanDisconnect> can_disconnect_;
std::shared_ptr<network::PeerView> peer_view_;

libp2p::event::Handle add_peer_handle_;
Expand Down
15 changes: 0 additions & 15 deletions core/network/peer_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ namespace kagome::network {
LruSet<common::Hash256> known_grandpa_messages{
kPeerStateMaxKnownGrandpaMessages,
};
uint32_t use_count = 0;

/// @brief parachain peer state
std::optional<CollatingPeerState> collator_state = std::nullopt;
Expand Down Expand Up @@ -86,20 +85,6 @@ namespace kagome::network {
return fresh_implicit;
}

/**
* Set of functions to manipulate in-parachain set of nodes.
*/
bool can_be_disconnected() const {
return 0 == use_count;
}
void inc_use_count() {
++use_count;
}
void dec_use_count() {
BOOST_ASSERT(use_count > 0);
--use_count;
}

/// Whether we know that a peer knows a relay-parent. The peer knows the
/// relay-parent if it is either implicit or explicit in their view.
/// However, if it is implicit via an active-leaf we don't recognize, we
Expand Down
3 changes: 2 additions & 1 deletion core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "parachain/approval/state.hpp"
#include "primitives/math.hpp"
#include "runtime/runtime_api/parachain_host_types.hpp"
#include "utils/non_null_dangling.hpp"
#include "utils/pool_handler_ready_make.hpp"

static constexpr size_t kMaxAssignmentBatchSize = 200ull;
Expand Down Expand Up @@ -104,7 +105,7 @@ namespace {
rvm_sample,
config.n_cores,
&relay_vrf_story,
lc.data(),
nonNullDangling(lc),
lc.size(),
&cert_output,
&cert_proof,
Expand Down
3 changes: 2 additions & 1 deletion core/parachain/pvf/kagome_pvf_worker_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "crypto/bip39/impl/bip39_provider_impl.hpp"
#include "crypto/ecdsa/ecdsa_provider_impl.hpp"
#include "crypto/ed25519/ed25519_provider_impl.hpp"
#include "crypto/elliptic_curves/elliptic_curves_impl.hpp"
#include "crypto/hasher/hasher_impl.hpp"
#include "crypto/pbkdf2/impl/pbkdf2_provider_impl.hpp"
#include "crypto/secp256k1/secp256k1_provider_impl.hpp"
Expand Down Expand Up @@ -82,7 +83,7 @@ namespace kagome::parachain {
di::bind<crypto::Bip39Provider>.to<crypto::Bip39ProviderImpl>(),
di::bind<crypto::Pbkdf2Provider>.to<crypto::Pbkdf2ProviderImpl>(),
di::bind<crypto::Secp256k1Provider>.to<crypto::Secp256k1ProviderImpl>(),
bind_null<crypto::EllipticCurves>(),
di::bind<crypto::EllipticCurves>.template to<crypto::EllipticCurvesImpl>(),
bind_null<crypto::KeyStore>(),
bind_null<offchain::OffchainPersistentStorage>(),
bind_null<offchain::OffchainWorkerPool>(),
Expand Down
4 changes: 1 addition & 3 deletions core/parachain/pvf/pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ namespace kagome::parachain {
const Hash256 &code_hash,
BufferView code_zstd,
const runtime::RuntimeContext::ContextParams &config) const {
auto make_timer = [] { return metric_pvf_preparation_time().timer(); };
decltype(make_timer()) timer;
auto timer = metric_pvf_preparation_time().timer();
return pool_->precompile(
code_hash,
[&]() mutable -> runtime::RuntimeCodeProvider::Result {
timer.emplace(make_timer().value());
OUTCOME_TRY(code, runtime::uncompressCodeIfNeeded(code_zstd));
metric_code_size.observe(code.size());
return std::make_shared<Buffer>(code);
Expand Down
103 changes: 65 additions & 38 deletions core/parachain/validator/impl/parachain_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "utils/map.hpp"
#include "utils/pool_handler.hpp"
#include "utils/profiler.hpp"
#include "utils/weak_macro.hpp"

#ifndef TRY_GET_OR_RET
#define TRY_GET_OR_RET(name, op) \
Expand Down Expand Up @@ -175,6 +176,8 @@ namespace kagome::parachain {
sync_state_observable_(std::move(sync_state_observable)),
query_audi_{std::move(query_audi)},
per_session_(RefCache<SessionIndex, PerSessionState>::create()),
peer_use_count_(
std::make_shared<decltype(peer_use_count_)::element_type>()),
slots_util_(std::move(slots_util)),
babe_config_repo_(std::move(babe_config_repo)),
chain_sub_{std::move(chain_sub_engine)},
Expand Down Expand Up @@ -908,11 +911,8 @@ namespace kagome::parachain {
tryOpenOutgoingValidationStream(
peer->id,
network::CollationVersion::VStaging,
[wptr{weak_from_this()}, peer_id{peer->id}](auto &&stream) {
TRY_GET_OR_RET(self, wptr.lock());
auto ps = self->pm_->getPeerState(peer_id);

ps->get().inc_use_count();
[WEAK_SELF, peer_id{peer->id}](auto &&stream) {
WEAK_LOCK(self);
self->sendMyView(peer_id,
stream,
self->router_->getValidationProtocolVStaging());
Expand All @@ -926,48 +926,66 @@ namespace kagome::parachain {
Groups &&_groups,
grid::Views &&_grid_view,
ValidatorIndex _our_index,
const std::shared_ptr<network::PeerManager> &_pm,
const std::shared_ptr<authority_discovery::Query> &_query_audi)
std::shared_ptr<PeerUseCount> peers)
: session{_session},
session_info{_session_info},
groups{std::move(_groups)},
grid_view{std::move(_grid_view)},
our_index{_our_index},
pm{_pm},
query_audi{_query_audi} {}
peers{std::move(peers)} {
if (our_index) {
our_group = groups.byValidatorIndex(*our_index);
}
if (our_group) {
BOOST_ASSERT(*our_group < session_info.validator_groups.size());
if (grid_view) {
BOOST_ASSERT(*our_group < grid_view->size());
}
}
updatePeers(true);
}

ParachainProcessorImpl::PerSessionState::~PerSessionState() {
if (our_index && grid_view) {
if (auto our_group = groups.byValidatorIndex(*our_index)) {
BOOST_ASSERT(*our_group < session_info.validator_groups.size());
const auto &group = session_info.validator_groups[*our_group];

auto dec_use_count_for_peer = [&](ValidatorIndex vi) {
if (auto peer = query_audi->get(session_info.discovery_keys[vi])) {
auto ps = pm->getPeerState(peer->id);
BOOST_ASSERT(ps);
ps->get().dec_use_count();
}
};

/// update peers of our group
for (const auto vi : group) {
dec_use_count_for_peer(vi);
}
updatePeers(false);
}

/// update peers in grid view
if (grid_view) {
BOOST_ASSERT(*our_group < grid_view->size());
const auto &view = (*grid_view)[*our_group];
for (const auto vi : view.sending) {
dec_use_count_for_peer(vi);
void ParachainProcessorImpl::PerSessionState::updatePeers(bool add) const {
if (not our_index or not our_group or not this->peers) {
return;
}
auto &peers = *this->peers;
SAFE_UNIQUE(peers) {
auto f = [&](ValidatorIndex i) {
auto &id = session_info.discovery_keys[i];
auto it = peers.find(id);
if (add) {
if (it == peers.end()) {
it = peers.emplace(id, 0).first;
}
++it->second;
} else {
if (it == peers.end()) {
throw std::logic_error{"inconsistent PeerUseCount"};
}
for (const auto vi : view.receiving) {
dec_use_count_for_peer(vi);
--it->second;
if (it->second == 0) {
peers.erase(it);
}
}
};
for (auto &i : session_info.validator_groups[*our_group]) {
f(i);
}
}
if (grid_view) {
auto &view = grid_view->at(*our_group);
for (auto &i : view.sending) {
f(i);
}
for (auto &i : view.receiving) {
f(i);
}
}
};
}

outcome::result<std::optional<runtime::ClaimQueueSnapshot>>
Expand Down Expand Up @@ -1064,7 +1082,7 @@ namespace kagome::parachain {
relay_parent);
}

auto per_session_state = per_session_->get_or_insert(session_index, [&]() {
auto per_session_state = per_session_->get_or_insert(session_index, [&] {
const auto validator_index{validator->validatorIndex()};
SL_TRACE(
logger_, "===> Grid build (validator_index={})", validator_index);
Expand All @@ -1080,8 +1098,7 @@ namespace kagome::parachain {
Groups{session_info->validator_groups, minimum_backing_votes},
std::move(grid_view),
validator_index,
pm_,
query_audi_);
peer_use_count_);
});

if (auto our_group = per_session_state->value().groups.byValidatorIndex(
Expand Down Expand Up @@ -5759,4 +5776,14 @@ namespace kagome::parachain {
return outcome::success();
}

bool ParachainProcessorImpl::canDisconnect(const libp2p::PeerId &peer) const {
auto audi = query_audi_->get(peer);
if (not audi) {
return true;
}
auto &peers = *peer_use_count_;
return SAFE_SHARED(peers) {
return peers.contains(*audi);
};
}
} // namespace kagome::parachain
Loading
Loading