Skip to content

Commit

Permalink
Babe time (#1568)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <turuslan.devbox@gmail.com>
  • Loading branch information
turuslan authored Apr 20, 2023
1 parent 1999292 commit ef9d8ba
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 53 deletions.
16 changes: 12 additions & 4 deletions core/authorship/impl/proposer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ namespace kagome::authorship {

ProposerImpl::ProposerImpl(
std::shared_ptr<BlockBuilderFactory> block_builder_factory,
std::shared_ptr<Clock> clock,
std::shared_ptr<transaction_pool::TransactionPool> transaction_pool,
std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
ext_sub_engine,
std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
extrinsic_event_key_repo)
: block_builder_factory_{std::move(block_builder_factory)},
clock_{std::move(clock)},
transaction_pool_{std::move(transaction_pool)},
ext_sub_engine_{std::move(ext_sub_engine)},
extrinsic_event_key_repo_{std::move(extrinsic_event_key_repo)} {
Expand All @@ -39,6 +41,7 @@ namespace kagome::authorship {

outcome::result<primitives::Block> ProposerImpl::propose(
const primitives::BlockInfo &parent_block,
std::optional<Clock::TimePoint> deadline,
const primitives::InherentData &inherent_data,
const primitives::Digest &inherent_digest,
TrieChangesTrackerOpt changes_tracker) {
Expand Down Expand Up @@ -100,10 +103,14 @@ namespace kagome::authorship {
// number of transactions to be pushed to the block

size_t included_tx_count = 0;
std::vector<primitives::Transaction::Hash> included_hashes;
for (const auto &[hash, tx] : ready_txs) {
const auto &tx_ref = tx;
if (deadline && clock_->now() >= deadline) {
break;
}

scale::ScaleEncoderStream s(true);
s << tx_ref->ext;
s << tx->ext;
auto estimate_tx_size = s.size();

if (block_size + estimate_tx_size > block_size_limit) {
Expand All @@ -121,7 +128,7 @@ namespace kagome::authorship {
break;
}

SL_DEBUG(logger_, "Adding extrinsic: {}", tx_ref->ext.data);
SL_DEBUG(logger_, "Adding extrinsic: {}", tx->ext.data);
auto inserted_res = block_builder->pushExtrinsic(tx->ext);
if (not inserted_res) {
if (BlockBuilderError::EXHAUSTS_RESOURCES == inserted_res.error()) {
Expand All @@ -144,6 +151,7 @@ namespace kagome::authorship {
block_size += estimate_tx_size;
transaction_pushed = true;
++included_tx_count;
included_hashes.emplace_back(hash);
}
}
metric_tx_included_in_block_->set(included_tx_count);
Expand All @@ -156,7 +164,7 @@ namespace kagome::authorship {

OUTCOME_TRY(block, block_builder->bake());

for (const auto &[hash, tx] : ready_txs) {
for (const auto &hash : included_hashes) {
auto removed_res = transaction_pool_->removeOne(hash);
if (not removed_res) {
logger_->error(
Expand Down
3 changes: 3 additions & 0 deletions core/authorship/impl/proposer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace kagome::authorship {

ProposerImpl(
std::shared_ptr<BlockBuilderFactory> block_builder_factory,
std::shared_ptr<Clock> clock,
std::shared_ptr<transaction_pool::TransactionPool> transaction_pool,
std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
ext_sub_engine,
Expand All @@ -38,12 +39,14 @@ namespace kagome::authorship {

outcome::result<primitives::Block> propose(
const primitives::BlockInfo &parent_block,
std::optional<Clock::TimePoint> deadline,
const primitives::InherentData &inherent_data,
const primitives::Digest &inherent_digest,
TrieChangesTrackerOpt changes_tracker) override;

private:
std::shared_ptr<BlockBuilderFactory> block_builder_factory_;
std::shared_ptr<Clock> clock_;
std::shared_ptr<transaction_pool::TransactionPool> transaction_pool_;
std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
ext_sub_engine_;
Expand Down
3 changes: 3 additions & 0 deletions core/authorship/proposer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ namespace kagome::authorship {
*/
class Proposer {
public:
using Clock = clock::SystemClock;

virtual ~Proposer() = default;

/**
Expand All @@ -31,6 +33,7 @@ namespace kagome::authorship {
*/
virtual outcome::result<primitives::Block> propose(
const primitives::BlockInfo &parent_block,
std::optional<Clock::TimePoint> deadline,
const primitives::InherentData &inherent_data,
const primitives::Digest &inherent_digest,
TrieChangesTrackerOpt changes_tracker) = 0;
Expand Down
69 changes: 36 additions & 33 deletions core/consensus/babe/impl/babe_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ namespace kagome::consensus::babe {
* @param authority_key authority
* @return index of authority in list of authorities
*/
std::optional<uint64_t> getAuthorityIndex(
std::optional<primitives::AuthorityIndex> getAuthorityIndex(
const primitives::AuthorityList &authorities,
const primitives::BabeSessionKey &authority_key) {
uint64_t n = 0;
primitives::AuthorityIndex n = 0;
for (auto &authority : authorities) {
if (authority.id.id == authority_key) {
return n;
Expand Down Expand Up @@ -665,11 +665,12 @@ namespace kagome::consensus::babe {
bool rewind_slots; // NOLINT
auto slot = current_slot_;

clock::SystemClock::TimePoint now;
do {
// check that we are really in the middle of the slot, as expected; we
// can cooperate with a relatively little (kMaxLatency) latency, as our
// node will be able to retrieve
auto now = clock_->now();
now = clock_->now();

auto finish_time = babe_util_->slotFinishTime(current_slot_);

Expand All @@ -693,9 +694,9 @@ namespace kagome::consensus::babe {
}
} while (rewind_slots);

// Slot processing begins in 1/3 slot time before end
auto finish_time = babe_util_->slotFinishTime(current_slot_)
- babe_config_repo_->slotDuration() / 3;
// Slot processing begins in 1/3 slot time after start
auto finish_time = babe_util_->slotStartTime(current_slot_)
+ babe_config_repo_->slotDuration() / 3;

SL_VERBOSE(log_,
"Starting a slot {} in epoch {} (remains {:.2f} sec.)",
Expand All @@ -708,16 +709,16 @@ namespace kagome::consensus::babe {

// everything is OK: wait for the end of the slot
timer_->expiresAt(finish_time);
timer_->asyncWait([&](auto &&ec) {
timer_->asyncWait([this, now](auto &&ec) {
if (ec) {
log_->error("error happened while waiting on the timer: {}", ec);
return;
}
processSlot();
processSlot(now);
});
}

void BabeImpl::processSlot() {
void BabeImpl::processSlot(clock::SystemClock::TimePoint slot_timestamp) {
BOOST_ASSERT(keypair_ != nullptr);

best_block_ = block_tree_->bestLeaf();
Expand Down Expand Up @@ -763,7 +764,7 @@ namespace kagome::consensus::babe {
const auto &authority_index = authority_index_res.value();

if (lottery_->getEpoch() != current_epoch_) {
changeLotteryEpoch(current_epoch_, babe_config);
changeLotteryEpoch(current_epoch_, authority_index, babe_config);
}

auto slot_leadership = lottery_->getSlotLeadership(current_slot_);
Expand All @@ -777,8 +778,10 @@ namespace kagome::consensus::babe {
common::Buffer(vrf_result.output),
common::Buffer(vrf_result.proof));

processSlotLeadership(
SlotType::Primary, std::cref(vrf_result), authority_index);
processSlotLeadership(SlotType::Primary,
slot_timestamp,
std::cref(vrf_result),
authority_index);
} else if (babe_config.allowed_slots
== primitives::AllowedSlots::PrimaryAndSecondaryPlain
or babe_config.allowed_slots
Expand All @@ -800,16 +803,20 @@ namespace kagome::consensus::babe {
common::Buffer(vrf.output),
common::Buffer(vrf.proof));

processSlotLeadership(
SlotType::SecondaryVRF, std::cref(vrf), authority_index);
processSlotLeadership(SlotType::SecondaryVRF,
slot_timestamp,
std::cref(vrf),
authority_index);
} else { // plain secondary slots mode
SL_DEBUG(
log_,
"Babe author {} is block producer in secondary plain slot",
keypair_->public_key);

processSlotLeadership(
SlotType::SecondaryPlain, std::nullopt, authority_index);
processSlotLeadership(SlotType::SecondaryPlain,
slot_timestamp,
std::nullopt,
authority_index);
}
} else {
SL_TRACE(log_,
Expand Down Expand Up @@ -848,7 +855,7 @@ namespace kagome::consensus::babe {

// everything is OK: wait for the end of the slot
timer_->expiresAt(start_time);
timer_->asyncWait([&](auto &&ec) {
timer_->asyncWait([this](auto &&ec) {
if (ec) {
log_->error("error happened while waiting on the timer: {}", ec);
return;
Expand Down Expand Up @@ -912,6 +919,7 @@ namespace kagome::consensus::babe {

void BabeImpl::processSlotLeadership(
SlotType slot_type,
clock::SystemClock::TimePoint slot_timestamp,
std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
primitives::AuthorityIndex authority_index) {
BOOST_ASSERT(keypair_ != nullptr);
Expand All @@ -930,7 +938,7 @@ namespace kagome::consensus::babe {

primitives::InherentData inherent_data;
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
clock_->now().time_since_epoch())
slot_timestamp.time_since_epoch())
.count();

if (auto res = inherent_data.putData<uint64_t>(kTimestampId, now);
Expand Down Expand Up @@ -986,8 +994,13 @@ namespace kagome::consensus::babe {
std::make_shared<storage::changes_trie::StorageChangesTrackerImpl>();

// create new block
auto pre_seal_block_res = proposer_->propose(
best_block_, inherent_data, {babe_pre_digest}, changes_tracker);
auto pre_seal_block_res =
proposer_->propose(best_block_,
babe_util_->slotFinishTime(current_slot_)
- babe_config_repo_->slotDuration() / 3,
inherent_data,
{babe_pre_digest},
changes_tracker);
if (!pre_seal_block_res) {
SL_ERROR(log_, "Cannot propose a block: {}", pre_seal_block_res.error());
return;
Expand Down Expand Up @@ -1121,22 +1134,12 @@ namespace kagome::consensus::babe {

void BabeImpl::changeLotteryEpoch(
const EpochDescriptor &epoch,
primitives::AuthorityIndex authority_index,
const primitives::BabeConfiguration &babe_config) const {
BOOST_ASSERT(keypair_ != nullptr);

auto authority_index_res =
getAuthorityIndex(babe_config.authorities, keypair_->public_key);
if (not authority_index_res) {
SL_CRITICAL(log_,
"Block production failed: This node is not in the list of "
"authorities. (public key: {})",
keypair_->public_key);
return;
}

auto threshold = calculateThreshold(babe_config.leadership_rate,
babe_config.authorities,
authority_index_res.value());
auto threshold = calculateThreshold(
babe_config.leadership_rate, babe_config.authorities, authority_index);

lottery_->changeEpoch(epoch, babe_config.randomness, threshold, *keypair_);
}
Expand Down
4 changes: 3 additions & 1 deletion core/consensus/babe/impl/babe_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ namespace kagome::consensus::babe {
/**
* Process the current Babe slot
*/
void processSlot();
void processSlot(clock::SystemClock::TimePoint slot_timestamp);

/**
* Gather block and broadcast it
Expand All @@ -163,6 +163,7 @@ namespace kagome::consensus::babe {
*/
void processSlotLeadership(
SlotType slot_type,
clock::SystemClock::TimePoint slot_timestamp,
std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
primitives::AuthorityIndex authority_index);

Expand All @@ -173,6 +174,7 @@ namespace kagome::consensus::babe {

void changeLotteryEpoch(
const EpochDescriptor &epoch,
primitives::AuthorityIndex authority_index,
const primitives::BabeConfiguration &babe_config) const;

outcome::result<primitives::PreRuntime> babePreDigest(
Expand Down
Loading

0 comments on commit ef9d8ba

Please sign in to comment.