Skip to content

Commit

Permalink
feature/Ellastic-scaling + priority nodes + cluster integr (#2102)
Browse files Browse the repository at this point in the history
* send my view and inc use count

Signed-off-by: iceseer <iceseer@gmail.com>

* undisconnectable peers

Signed-off-by: iceseer <iceseer@gmail.com>

* statements store test

Signed-off-by: iceseer <iceseer@gmail.com>

* request statements from peer transmitted us a manifest

Signed-off-by: iceseer <iceseer@gmail.com>

* BackedCandidate fixes

Signed-off-by: iceseer <iceseer@gmail.com>

* - ParachainHost_node_features
- inject_core_index usage

Signed-off-by: iceseer <iceseer@gmail.com>

* cluster integration

Signed-off-by: iceseer <iceseer@gmail.com>

* remove `need_to_process` check based on active_leaf

Signed-off-by: iceseer <iceseer@gmail.com>

* Always return true in handle_import_statements

* kick_off_seconding

Signed-off-by: iceseer <iceseer@gmail.com>

* dequeue_next_collation_and_fetch

Signed-off-by: iceseer <iceseer@gmail.com>

---------

Signed-off-by: iceseer <iceseer@gmail.com>
  • Loading branch information
iceseer authored Jun 20, 2024
1 parent 76c3e31 commit 7e5bd47
Show file tree
Hide file tree
Showing 38 changed files with 2,890 additions and 1,392 deletions.
6 changes: 6 additions & 0 deletions cmake/Hunter/init.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,11 @@ set(HUNTER_CACHE_SERVERS
CACHE STRING "Binary cache server"
)

# https://hunter.readthedocs.io/en/latest/reference/user-variables.html#hunter-use-cache-servers
# set(
# HUNTER_USE_CACHE_SERVERS NO
# CACHE STRING "Disable binary cache"
# )

include(${CMAKE_CURRENT_LIST_DIR}/HunterGate.cmake)
include(${CMAKE_CURRENT_LIST_DIR}/hunter-gate-url.cmake)
15 changes: 15 additions & 0 deletions core/api/jrpc/value_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ namespace kagome::api {
inline jsonrpc::Value makeValue(const primitives::Version &);
inline jsonrpc::Value makeValue(const primitives::Justification &);
inline jsonrpc::Value makeValue(const primitives::RpcMethods &);
inline jsonrpc::Value makeValue(
const primitives::events::RemoveAfterFinalizationParams &val);
inline jsonrpc::Value makeValue(
const primitives::events::RemoveAfterFinalizationParams::HeaderInfo &val);

inline jsonrpc::Value makeValue(const uint32_t &val) {
return static_cast<int64_t>(val);
Expand Down Expand Up @@ -150,6 +154,17 @@ namespace kagome::api {
return value;
}

inline jsonrpc::Value makeValue(
const primitives::events::RemoveAfterFinalizationParams &val) {
return makeValue(val.removed);
}

inline jsonrpc::Value makeValue(
const primitives::events::RemoveAfterFinalizationParams::HeaderInfo
&val) {
return makeValue(val.hash);
}

template <size_t N>
inline jsonrpc::Value makeValue(const common::Blob<N> &val) {
return makeValue(BufferView{val});
Expand Down
66 changes: 39 additions & 27 deletions core/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,9 +813,13 @@ namespace kagome::blockchain {

OUTCOME_TRY(p.storage_->putJustification(justification, block_hash));

std::vector<primitives::BlockHash> retired_hashes;
std::vector<
primitives::events::RemoveAfterFinalizationParams::HeaderInfo>
retired_hashes;
for (auto parent = node->parent(); parent; parent = parent->parent()) {
retired_hashes.emplace_back(parent->info.hash);
retired_hashes.emplace_back(
primitives::events::RemoveAfterFinalizationParams::HeaderInfo{
parent->info.hash, parent->info.number});
}

auto changes = p.tree_->finalize(node);
Expand Down Expand Up @@ -846,12 +850,13 @@ namespace kagome::blockchain {

main_pool_handler_->execute(
[weak{weak_from_this()},
retired_hashes{std::move(retired_hashes)}] {
retired{primitives::events::RemoveAfterFinalizationParams{
std::move(retired_hashes), header.number}}] {
if (auto self = weak.lock()) {
self->chain_events_engine_->notify(
primitives::events::ChainEventType::
kDeactivateAfterFinalization,
retired_hashes);
retired);
}
});

Expand Down Expand Up @@ -1301,7 +1306,8 @@ namespace kagome::blockchain {
}

std::vector<primitives::Extrinsic> extrinsics;
std::vector<primitives::BlockHash> retired_hashes;
std::vector<primitives::events::RemoveAfterFinalizationParams::HeaderInfo>
retired_hashes;

// remove from storage
retired_hashes.reserve(changes.prune.size());
Expand Down Expand Up @@ -1329,33 +1335,39 @@ namespace kagome::blockchain {
BOOST_ASSERT(block_header_opt.has_value());
OUTCOME_TRY(p.state_pruner_->pruneDiscarded(block_header_opt.value()));
}
retired_hashes.emplace_back(block.hash);
retired_hashes.emplace_back(
primitives::events::RemoveAfterFinalizationParams::HeaderInfo{
block.hash, block.number});
OUTCOME_TRY(p.storage_->removeBlock(block.hash));
}

// trying to return extrinsics back to transaction pool
main_pool_handler_->execute([extrinsics{std::move(extrinsics)},
wself{weak_from_this()},
retired_hashes{
std::move(retired_hashes)}]() mutable {
if (auto self = wself.lock()) {
auto eo = self->block_tree_data_.sharedAccess(
[&](const BlockTreeData &p) { return p.extrinsic_observer_; });

for (auto &&extrinsic : extrinsics) {
auto result = eo->onTxMessage(extrinsic);
if (result) {
SL_DEBUG(self->log_, "Tx {} was reapplied", result.value().toHex());
} else {
SL_DEBUG(self->log_, "Tx was skipped: {}", result.error());
}
}
main_pool_handler_->execute(
[extrinsics{std::move(extrinsics)},
wself{weak_from_this()},
retired{primitives::events::RemoveAfterFinalizationParams{
std::move(retired_hashes),
getLastFinalizedNoLock(p).number}}]() mutable {
if (auto self = wself.lock()) {
auto eo = self->block_tree_data_.sharedAccess(
[&](const BlockTreeData &p) { return p.extrinsic_observer_; });

for (auto &&extrinsic : extrinsics) {
auto result = eo->onTxMessage(extrinsic);
if (result) {
SL_DEBUG(
self->log_, "Tx {} was reapplied", result.value().toHex());
} else {
SL_DEBUG(self->log_, "Tx was skipped: {}", result.error());
}
}

self->chain_events_engine_->notify(
primitives::events::ChainEventType::kDeactivateAfterFinalization,
retired_hashes);
}
});
self->chain_events_engine_->notify(
primitives::events::ChainEventType::
kDeactivateAfterFinalization,
retired);
}
});

return outcome::success();
}
Expand Down
5 changes: 3 additions & 2 deletions core/network/impl/protocols/fetch_attested_candidate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ namespace kagome::network {

private:
std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType request, std::shared_ptr<Stream> /*stream*/) override {
RequestType request, std::shared_ptr<Stream> stream) override {
base().logger()->info(
"Fetching attested candidate request.(candidate={})",
request.candidate_hash);
auto res = pp_->OnFetchAttestedCandidateRequest(std::move(request));
auto res = pp_->OnFetchAttestedCandidateRequest(
std::move(request), stream->remotePeerId().value());
if (res.has_error()) {
base().logger()->error(
"Fetching attested candidate response failed.(error={})",
Expand Down
44 changes: 34 additions & 10 deletions core/network/peer_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ namespace kagome::network {
std::chrono::system_clock::time_point last_active;
};

struct CollationEvent {
CollatorId collator_id;
struct {
RelayHash relay_parent;
network::ParachainId para_id;
libp2p::peer::PeerId peer_id;
std::optional<Hash> commitments_hash;
} pending_collation;
};

using OurView = network::View;

struct PeerStateCompact {
Expand All @@ -66,6 +56,7 @@ namespace kagome::network {
LruSet<common::Hash256> known_grandpa_messages{
kPeerStateMaxKnownGrandpaMessages,
};
uint32_t use_count = 0;

/// @brief parachain peer state
std::optional<CollatingPeerState> collator_state = std::nullopt;
Expand Down Expand Up @@ -95,6 +86,20 @@ namespace kagome::network {
return fresh_implicit;
}

/**
* Set of functions to manipulate in-parachain set of nodes.
*/
bool can_be_disconnected() const {
return 0 == use_count;
}
void inc_use_count() {
++use_count;
}
void dec_use_count() {
BOOST_ASSERT(use_count > 0);
--use_count;
}

/// Whether we know that a peer knows a relay-parent. The peer knows the
/// relay-parent if it is either implicit or explicit in their view.
/// However, if it is implicit via an active-leaf we don't recognize, we
Expand Down Expand Up @@ -162,3 +167,22 @@ namespace kagome::network {
}

} // namespace kagome::network

template <>
struct std::hash<kagome::network::FetchedCollation> {
size_t operator()(
const kagome::network::FetchedCollation &value) const noexcept {
using CollatorId = kagome::parachain::CollatorId;
using CandidateHash = kagome::parachain::CandidateHash;
using RelayHash = kagome::parachain::RelayHash;
using ParachainId = kagome::parachain::ParachainId;

size_t result = std::hash<RelayHash>()(value.relay_parent);
boost::hash_combine(result, std::hash<ParachainId>()(value.para_id));
boost::hash_combine(result,
std::hash<CandidateHash>()(value.candidate_hash));
boost::hash_combine(result, std::hash<CollatorId>()(value.collator_id));

return result;
}
};
73 changes: 70 additions & 3 deletions core/network/types/collator_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#pragma once

#include <boost/variant.hpp>
#include <libp2p/peer/peer_info.hpp>
#include <scale/bitvec.hpp>
#include <tuple>
#include <type_traits>
Expand Down Expand Up @@ -83,6 +84,7 @@ namespace kagome::network {
common::Buffer payload;
};

using PoV = ParachainBlock;
using RequestPov = CandidateHash;
using ResponsePov = boost::variant<ParachainBlock, Empty>;

Expand Down Expand Up @@ -124,11 +126,28 @@ namespace kagome::network {
struct CollationResponse {
SCALE_TIE(2);

CandidateReceipt receipt; /// Candidate receipt
ParachainBlock pov; /// PoV block
/// Candidate receipt
CandidateReceipt receipt;

/// PoV block
ParachainBlock pov;
};

using ReqCollationResponseData = boost::variant<CollationResponse>;
struct CollationWithParentHeadData {
SCALE_TIE(3);
/// The receipt of the candidate.
CandidateReceipt receipt;

/// Candidate's proof of validity.
ParachainBlock pov;

/// The head data of the candidate's parent.
/// This is needed for elastic scaling to work.
HeadData parent_head_data;
};

using ReqCollationResponseData =
boost::variant<CollationResponse, CollationWithParentHeadData>;

/**
* Sent by clients who want to retrieve the advertised collation at the
Expand Down Expand Up @@ -181,6 +200,14 @@ namespace kagome::network {
CandidateCommitments commitments; /// commitments retrieved from validation
/// result and produced by the execution
/// and validation parachain candidate

CandidateReceipt to_plain(const crypto::Hasher &hasher) const {
CandidateReceipt receipt;
receipt.descriptor = descriptor,
receipt.commitments_hash =
hasher.blake2b_256(scale::encode(commitments).value());
return receipt;
}
};

struct FetchStatementRequest {
Expand All @@ -206,6 +233,38 @@ namespace kagome::network {
CommittedCandidateReceipt candidate;
std::vector<ValidityAttestation> validity_votes;
scale::BitVec validator_indices;

/// Creates `BackedCandidate` from args.
static BackedCandidate from(
CommittedCandidateReceipt candidate_,
std::vector<ValidityAttestation> validity_votes_,
scale::BitVec validator_indices_,
std::optional<CoreIndex> core_index_) {
BackedCandidate backed{
.candidate = std::move(candidate_),
.validity_votes = std::move(validity_votes_),
.validator_indices = std::move(validator_indices_),
};

if (core_index_) {
backed.inject_core_index(*core_index_);
}

return backed;
}

void inject_core_index(CoreIndex core_index) {
scale::BitVec core_index_to_inject;
core_index_to_inject.bits.assign(8, false);

auto val = uint8_t(core_index);
for (size_t i = 0; i < 8; ++i) {
core_index_to_inject.bits[i] = (val >> i) & 1;
}
validator_indices.bits.insert(validator_indices.bits.end(),
core_index_to_inject.bits.begin(),
core_index_to_inject.bits.end());
}
};

using CandidateState =
Expand Down Expand Up @@ -432,6 +491,14 @@ namespace kagome::network {
std::optional<CollatorId> collator;
};

inline const CandidateHash &candidateHash(const CompactStatement &val) {
auto p = visit_in_place(
val, [&](const auto &v) -> std::reference_wrapper<const CandidateHash> {
return v;
});
return p.get();
}

inline CandidateHash candidateHash(const crypto::Hasher &hasher,
const CommittedCandidateReceipt &receipt) {
auto commitments_hash =
Expand Down
Loading

0 comments on commit 7e5bd47

Please sign in to comment.