Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto selection of sync method #1668

Merged
merged 13 commits into from
Jun 22, 2023
Merged
2 changes: 1 addition & 1 deletion core/application/app_configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ namespace kagome::application {
virtual const std::vector<telemetry::TelemetryEndpoint>
&telemetryEndpoints() const = 0;

enum class SyncMethod { Full, Fast, FastWithoutState, Warp };
enum class SyncMethod { Full, Fast, FastWithoutState, Warp, Auto };
/**
* @return enum constant of the chosen sync method
*/
Expand Down
3 changes: 3 additions & 0 deletions core/application/impl/app_configuration_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ namespace {
if (str == "Warp") {
return SM::Warp;
}
if (str == "Auto") {
return SM::Auto;
}
return std::nullopt;
}

Expand Down
188 changes: 151 additions & 37 deletions core/consensus/babe/impl/babe_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace kagome::consensus::babe {
std::shared_ptr<ConsistencyKeeper> consistency_keeper,
std::shared_ptr<storage::trie::TrieStorage> trie_storage,
primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable)
: app_config_(app_config),
: sync_method_(app_config.syncMethod()),
app_state_manager_(app_state_manager),
lottery_{std::move(lottery)},
babe_config_repo_{std::move(babe_config_repo)},
Expand Down Expand Up @@ -144,40 +144,131 @@ namespace kagome::consensus::babe {
}

bool BabeImpl::prepare() {
auto res = getInitialEpochDescriptor();
if (res.has_error()) {
SL_CRITICAL(log_, "Can't get initial epoch descriptor: {}", res.error());
auto initial_epoch_res = getInitialEpochDescriptor();
if (initial_epoch_res.has_error()) {
SL_CRITICAL(log_,
"Can't get initial epoch descriptor: {}",
initial_epoch_res.error());
return false;
}

best_block_ = block_tree_->bestLeaf();

// Check if best block has state for usual sync method
if (app_config_.syncMethod() == SyncMethod::Full) {
auto best_block_header_res =
block_tree_->getBlockHeader(best_block_.hash);
if (best_block_header_res.has_error()) {
SL_CRITICAL(log_,
"Can't get header of best block ({}): {}",
best_block_,
best_block_header_res.error());
return false;
}
const auto &best_block_header = best_block_header_res.value();
auto best_block_header_res = block_tree_->getBlockHeader(best_block_.hash);
if (best_block_header_res.has_error()) {
SL_CRITICAL(log_,
"Can't get header of best block ({}): {}",
best_block_,
best_block_header_res.error());
return false;
}
const auto &best_block_header = best_block_header_res.value();
const auto &state_root = best_block_header.state_root;

std::chrono::microseconds warp_sync_duration;
std::chrono::microseconds fast_sync_duration;
std::chrono::microseconds full_sync_duration;

// Calculate lag our best block by slots
BabeSlotNumber lag_slots = 0;
if (auto babe_digests_res = getBabeDigests(best_block_header);
babe_digests_res.has_value()) {
auto &[seal, babe_header] = babe_digests_res.value();
lag_slots = babe_util_->getCurrentSlot() - babe_header.slot_number;
}

// WARP: n * header_loading / k + state_loading + lag * block_execution
// { catchup }
// FAST: n * header_loading + state_loading + lag' * block_execution
// { catchup' }
// FULL: n * block_execution + lag" * block_execution
// { catchup" }

{
xDimon marked this conversation as resolved.
Show resolved Hide resolved
#ifdef NDEBUG
auto block_execution =
std::chrono::microseconds(650'000); // 0.65s (wavm)
#else
auto block_execution = std::chrono::microseconds(50'000); // 50ms (wavm)
#endif
auto header_loading = std::chrono::microseconds(5'000); // 5ms
auto state_loading = std::chrono::microseconds(1800'000'000); // 0.5hr
auto warp_proportion = 10'000; // ~one set id change for each 10k blocks

auto warp_catchup = lag_slots * header_loading // time of headers loading
/ warp_proportion // part of requesting headers
+ state_loading; // time of state loading
auto fast_catchup = lag_slots * header_loading // time of headers loading
+ 512 * block_execution // execute non-finalized blocks
+ state_loading; // time of state loading
auto full_catchup = lag_slots * block_execution; // execute all blocks

auto warp_lag = warp_catchup / babe_config_repo_->slotDuration();
auto fast_lag = fast_catchup / babe_config_repo_->slotDuration();
auto full_lag = full_catchup / babe_config_repo_->slotDuration();

warp_sync_duration = warp_catchup + warp_lag * block_execution;
fast_sync_duration = fast_catchup + fast_lag * block_execution;
full_sync_duration = full_catchup + full_lag * block_execution;
}

const auto &state_root = best_block_header.state_root;
bool allow_warp_sync_for_auto = false; // should it select warp for auto

// Check if target block has state
if (auto res = trie_storage_->getEphemeralBatchAt(state_root);
res.has_error()) {
// Check if target block does not have state (full sync not available)
bool full_sync_available = true;
if (auto res = trie_storage_->getEphemeralBatchAt(state_root);
not res.has_value()) {
if (sync_method_ == SyncMethod::Full) {
SL_WARN(log_, "Can't get state of best block: {}", res.error());
SL_CRITICAL(log_,
"Try restart at least once with `--sync Fast' CLI arg");
return false;
}
full_sync_available = false;
}

switch (sync_method_) {
case SyncMethod::Auto:
if (full_sync_duration < fast_sync_duration and full_sync_available) {
SL_INFO(log_, "Sync mode auto: decided Full sync");
sync_method_ = SyncMethod::Full;
} else if (fast_sync_duration < warp_sync_duration
or not allow_warp_sync_for_auto) {
SL_INFO(log_, "Sync mode auto: decided Fast sync");
sync_method_ = SyncMethod::Fast;
} else {
SL_INFO(log_, "Sync mode auto: decided Warp sync");
sync_method_ = SyncMethod::Warp;
}
break;

case SyncMethod::Full:
if (fast_sync_duration < full_sync_duration) {
SL_INFO(log_, "Fast sync would be faster then Full such was defined");
xDimon marked this conversation as resolved.
Show resolved Hide resolved
} else if (warp_sync_duration < full_sync_duration) {
SL_INFO(log_, "Warp sync would be faster then Full such was defined");
}
break;

case SyncMethod::FastWithoutState:
break;

case SyncMethod::Fast:
if (full_sync_duration < fast_sync_duration and full_sync_available) {
SL_INFO(log_, "Full sync would be faster then Fast such was defined");
} else if (warp_sync_duration < fast_sync_duration) {
SL_INFO(log_, "Warp sync would be faster then Fast such was defined");
}

case SyncMethod::Warp:
if (full_sync_duration < warp_sync_duration and full_sync_available) {
SL_INFO(log_, "Full sync would be faster then Warp such was defined");
} else if (fast_sync_duration < warp_sync_duration) {
SL_INFO(log_, "Fast sync would be faster then Earp such was defined");
}
}

current_epoch_ = res.value();
current_epoch_ = initial_epoch_res.value();

chain_sub_->subscribe(chain_sub_->generateSubscriptionSetId(),
primitives::events::ChainEventType::kFinalizedHeads);
Expand Down Expand Up @@ -243,7 +334,7 @@ namespace kagome::consensus::babe {
return true;
}

switch (app_config_.syncMethod()) {
switch (sync_method_) {
case SyncMethod::Full:
current_state_ = State::WAIT_REMOTE_STATUS;
break;
Expand All @@ -255,6 +346,9 @@ namespace kagome::consensus::babe {
babe_status_observable_->notify(
primitives::events::BabeStateEventType::kSyncState, current_state_);
} break;

case SyncMethod::Auto:
UNREACHABLE; // It must be rewritten in prepare stage
}

return true;
Expand Down Expand Up @@ -396,7 +490,7 @@ namespace kagome::consensus::babe {
startStateSyncing(peer_id);
} else if (current_state_ == Babe::State::CATCHING_UP
or current_state_ == Babe::State::WAIT_REMOTE_STATUS) {
onSynchronized();
onCaughtUp(current_best_block);
}
return;
}
Expand Down Expand Up @@ -469,20 +563,13 @@ namespace kagome::consensus::babe {
return;
}

// Just synced
// Caught up some block, possible block of current slot
if (self->current_state_ == Babe::State::CATCHING_UP) {
SL_INFO(self->log_, "Catching up is finished on block {}", block);
self->current_state_ = Babe::State::SYNCHRONIZED;
self->was_synchronized_ = true;
self->telemetry_->notifyWasSynchronized();
self->babe_status_observable_->notify(
primitives::events::BabeStateEventType::kSyncState,
self->current_state_);
self->onCaughtUp(block);
}

// Synced
if (self->current_state_ == Babe::State::SYNCHRONIZED) {
self->onSynchronized();
// Set actual block status
announce.state = block == self->block_tree_->bestLeaf()
? network::BlockState::Best
Expand All @@ -501,7 +588,7 @@ namespace kagome::consensus::babe {
if (current_state_ != State::HEADERS_LOADING) {
return false;
}
if (app_config_.syncMethod() != SyncMethod::Warp) {
if (sync_method_ != SyncMethod::Warp) {
return false;
}
auto target = warp_sync_->request();
Expand Down Expand Up @@ -602,7 +689,7 @@ namespace kagome::consensus::babe {
return;
}

if (app_config_.syncMethod() == SyncMethod::FastWithoutState) {
if (sync_method_ == SyncMethod::FastWithoutState) {
if (app_state_manager_->state()
!= application::AppStateManager::State::ShuttingDown) {
SL_INFO(log_,
Expand Down Expand Up @@ -710,10 +797,37 @@ namespace kagome::consensus::babe {
});
}

void BabeImpl::onCaughtUp(const primitives::BlockInfo &block) {
SL_INFO(log_, "Caught up block {}", block);

if (not was_synchronized_) {
auto header_opt = block_tree_->getBlockHeader(block.hash);
BOOST_ASSERT_MSG(header_opt.has_value(), "Just added block; deq");
auto res = getBabeDigests(header_opt.value());
if (res.has_value()) {
auto &[_, babe_header] = res.value();
if (babe_util_->getCurrentSlot() > babe_header.slot_number + 1) {
current_state_ = Babe::State::WAIT_REMOTE_STATUS;
babe_status_observable_->notify(
primitives::events::BabeStateEventType::kSyncState,
current_state_);
return;
}
}
}

onSynchronized();
}

void BabeImpl::onSynchronized() {
current_state_ = State::SYNCHRONIZED;
was_synchronized_ = true;
telemetry_->notifyWasSynchronized();
if (not was_synchronized_) {
was_synchronized_ = true;

telemetry_->notifyWasSynchronized();
}

current_state_ = Babe::State::SYNCHRONIZED;

babe_status_observable_->notify(
primitives::events::BabeStateEventType::kSyncState, current_state_);

Expand Down
10 changes: 6 additions & 4 deletions core/consensus/babe/impl/babe_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "consensus/babe/babe.hpp"

#include "application/app_configuration.hpp"
#include "clock/timer.hpp"
#include "injector/lazy.hpp"
#include "log/logger.hpp"
Expand All @@ -20,7 +21,6 @@
#include "telemetry/service.hpp"

namespace kagome::application {
class AppConfiguration;
class AppStateManager;
} // namespace kagome::application

Expand Down Expand Up @@ -142,8 +142,6 @@ namespace kagome::consensus::babe {
void onBlockAnnounce(const libp2p::peer::PeerId &peer_id,
const network::BlockAnnounce &announce) override;

void onSynchronized() override;

bool wasSynchronized() const override;

private:
Expand All @@ -161,6 +159,10 @@ namespace kagome::consensus::babe {
void startCatchUp(const libp2p::peer::PeerId &peer_id,
const primitives::BlockInfo &target_block);

void onCaughtUp(const primitives::BlockInfo &block);

void onSynchronized();

void startStateSyncing(const libp2p::peer::PeerId &peer_id);

void runSlot();
Expand Down Expand Up @@ -200,7 +202,7 @@ namespace kagome::consensus::babe {
outcome::result<primitives::Seal> sealBlock(
const primitives::Block &block) const;

const application::AppConfiguration &app_config_;
application::AppConfiguration::SyncMethod sync_method_;
std::shared_ptr<application::AppStateManager> app_state_manager_;
std::shared_ptr<BabeLottery> lottery_;
std::shared_ptr<BabeConfigRepository> babe_config_repo_;
Expand Down
6 changes: 0 additions & 6 deletions core/consensus/grandpa/environment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ namespace kagome::consensus::grandpa {
using ApplyJustificationCb = JustificationObserver::ApplyJustificationCb;
~Environment() override = default;

/**
* Sets back-link to Grandpa
*/
virtual void setJustificationObserver(
std::weak_ptr<JustificationObserver> justification_observer) = 0;

/**
* Make catch-up-request
*/
Expand Down
8 changes: 4 additions & 4 deletions core/consensus/grandpa/impl/environment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ namespace kagome::consensus::grandpa {
std::shared_ptr<blockchain::BlockHeaderRepository> header_repository,
std::shared_ptr<AuthorityManager> authority_manager,
std::shared_ptr<network::GrandpaTransmitter> transmitter,
LazySPtr<JustificationObserver> justification_observer,
std::shared_ptr<boost::asio::io_context> main_thread_context)
: block_tree_{std::move(block_tree)},
header_repository_{std::move(header_repository)},
authority_manager_{std::move(authority_manager)},
transmitter_{std::move(transmitter)},
justification_observer_(std::move(justification_observer)),
main_thread_context_{std::move(main_thread_context)},
logger_{log::createLogger("GrandpaEnvironment", "grandpa")} {
BOOST_ASSERT(block_tree_ != nullptr);
Expand Down Expand Up @@ -280,9 +282,6 @@ namespace kagome::consensus::grandpa {
const BlockInfo &block_info,
const primitives::Justification &raw_justification,
ApplyJustificationCb &&cb) {
auto justification_observer = justification_observer_.lock();
BOOST_ASSERT(justification_observer);

auto res = scale::decode<GrandpaJustification>(raw_justification.data);
if (res.has_error()) {
cb(res.as_failure());
Expand All @@ -300,7 +299,8 @@ namespace kagome::consensus::grandpa {
justification.round_number,
justification.block_info);

justification_observer->applyJustification(justification, std::move(cb));
justification_observer_.get()->applyJustification(justification,
std::move(cb));
}

outcome::result<void> EnvironmentImpl::finalize(
Expand Down
Loading