Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(libp2p) thread safe scheduler #2055

Merged
merged 1 commit into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/Hunter/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ hunter_config(

hunter_config(
libp2p
VERSION 0.1.19
VERSION 0.1.20
KEEP_PACKAGE_SOURCES
)

Expand Down
4 changes: 2 additions & 2 deletions cmake/Hunter/hunter-gate-url.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
HunterGate(
URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm7.zip
SHA1 be5869134ef7448fe2420d60dbb9706596b1b8bd
URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm8.zip
SHA1 dc0af42b358dc0bcab304a455e80681c12d52e0f
LOCAL
)
44 changes: 23 additions & 21 deletions core/consensus/grandpa/impl/grandpa_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,24 +175,7 @@ namespace kagome::consensus::grandpa {
}

// Timer to send neighbor message if round does not change long time (1 min)
fallback_timer_handle_ = scheduler_->scheduleWithHandle(
[wp{weak_from_this()}] {
auto self = wp.lock();
if (not self) {
return;
}
BOOST_ASSERT_MSG(self->current_round_,
"Current round must be defiled anytime after start");
auto round =
std::dynamic_pointer_cast<VotingRoundImpl>(self->current_round_);
if (round) {
round->sendNeighborMessage();
}

std::ignore =
self->fallback_timer_handle_.reschedule(std::chrono::minutes(1));
},
std::chrono::minutes(1));
setTimerFallback();

tryExecuteNextRound(current_round_);

Expand All @@ -206,7 +189,7 @@ namespace kagome::consensus::grandpa {
}

void GrandpaImpl::stop() {
fallback_timer_handle_.cancel();
fallback_timer_handle_.reset();
}

std::shared_ptr<VotingRound> GrandpaImpl::makeInitialRound(
Expand Down Expand Up @@ -387,7 +370,7 @@ namespace kagome::consensus::grandpa {
BOOST_ASSERT(res.value() != nullptr);
current_round_ = std::move(res.value());

std::ignore = fallback_timer_handle_.reschedule(std::chrono::minutes(1));
setTimerFallback();

// Truncate chain of rounds
size_t i = 0;
Expand Down Expand Up @@ -743,7 +726,7 @@ namespace kagome::consensus::grandpa {

::libp2p::common::FinalAction cleanup([&] {
if (need_cleanup_when_exiting_scope) {
catchup_request_timer_handle_.cancel();
catchup_request_timer_handle_.reset();
pending_catchup_request_.reset();
}
});
Expand Down Expand Up @@ -1472,4 +1455,23 @@ namespace kagome::consensus::grandpa {
}
update.update();
}

void GrandpaImpl::setTimerFallback() {
fallback_timer_handle_ = scheduler_->scheduleWithHandle(
[weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return;
}
BOOST_ASSERT_MSG(self->current_round_,
"Current round must be defiled anytime after start");
auto round =
std::dynamic_pointer_cast<VotingRoundImpl>(self->current_round_);
if (round) {
round->sendNeighborMessage();
}
self->setTimerFallback();
},
std::chrono::minutes(1));
}
} // namespace kagome::consensus::grandpa
2 changes: 2 additions & 0 deletions core/consensus/grandpa/impl/grandpa_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ namespace kagome::consensus::grandpa {
void saveCachedVotes();
void applyCachedVotes(VotingRound &round);

void setTimerFallback();

log::Logger logger_ = log::createLogger("Grandpa", "grandpa");

const size_t kVotesCacheSize = 5;
Expand Down
10 changes: 5 additions & 5 deletions core/consensus/grandpa/impl/voting_round_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ namespace kagome::consensus::grandpa {
}
BOOST_ASSERT(stage_ == Stage::PREVOTE_RUNS);

stage_timer_handle_.cancel();
stage_timer_handle_.reset();
on_complete_handler_ = nullptr;

stage_ = Stage::END_PREVOTE;
Expand Down Expand Up @@ -356,7 +356,7 @@ namespace kagome::consensus::grandpa {
BOOST_ASSERT(stage_ == Stage::PRECOMMIT_RUNS
|| stage_ == Stage::PRECOMMIT_WAITS_FOR_PREVOTES);

stage_timer_handle_.cancel();
stage_timer_handle_.reset();

// https://github.com/paritytech/finality-grandpa/blob/8c45a664c05657f0c71057158d3ba555ba7d20de/src/voter/voting_round.rs#L630-L633
if (not prevote_ghost_) {
Expand Down Expand Up @@ -438,7 +438,7 @@ namespace kagome::consensus::grandpa {
}
BOOST_ASSERT(stage_ == Stage::WAITING_RUNS);

stage_timer_handle_.cancel();
stage_timer_handle_.reset();
on_complete_handler_ = nullptr;

// Final attempt to finalize round what should be success
Expand All @@ -452,8 +452,8 @@ namespace kagome::consensus::grandpa {
if (stage_ != Stage::COMPLETED) {
SL_DEBUG(logger_, "Round #{}: End round", round_number_);
on_complete_handler_ = nullptr;
stage_timer_handle_.cancel();
pending_timer_handle_.cancel();
stage_timer_handle_.reset();
pending_timer_handle_.reset();
stage_ = Stage::COMPLETED;
}
}
Expand Down
1 change: 0 additions & 1 deletion core/dispute_coordinator/impl/dispute_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,6 @@ namespace kagome::dispute {

void DisputeCoordinatorImpl::process_portion_incoming_disputes() {
if (rate_limit_timer_) {
rate_limit_timer_->cancel();
rate_limit_timer_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ namespace kagome::network {
+ app_config_.outPeers();
const auto peer_ttl = app_config_.peeringConfig().peerTtl;

align_timer_.cancel();
align_timer_.reset();

clearClosedPingingConnections();

Expand Down
13 changes: 10 additions & 3 deletions core/network/impl/reputation_repository_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ namespace kagome::network {
void ReputationRepositoryImpl::start() {
main_thread_->execute([weak{weak_from_this()}] {
if (auto self = weak.lock()) {
self->tick_handler_ =
self->scheduler_->scheduleWithHandle([self] { self->tick(); }, 1s);
self->tick();
}
});
}
Expand Down Expand Up @@ -137,7 +136,15 @@ namespace kagome::network {
reputation_table_.erase(cit);
}
}
std::ignore = tick_handler_.reschedule(1s);
tick_handler_ = scheduler_->scheduleWithHandle(
[weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return;
}
self->tick();
},
1s);
}

} // namespace kagome::network
8 changes: 4 additions & 4 deletions core/telemetry/impl/service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ namespace kagome::telemetry {

void TelemetryServiceImpl::stop() {
shutdown_requested_ = true;
frequent_timer_.cancel();
delayed_timer_.cancel();
frequent_timer_.reset();
delayed_timer_.reset();
for (auto &connection : connections_) {
connection->shutdown();
}
Expand Down Expand Up @@ -180,7 +180,7 @@ namespace kagome::telemetry {
}

void TelemetryServiceImpl::frequentNotificationsRoutine() {
frequent_timer_.cancel();
frequent_timer_.reset();
if (shutdown_requested_) {
return;
}
Expand Down Expand Up @@ -216,7 +216,7 @@ namespace kagome::telemetry {
}

void TelemetryServiceImpl::delayedNotificationsRoutine() {
delayed_timer_.cancel();
delayed_timer_.reset();
if (shutdown_requested_) {
return;
}
Expand Down
11 changes: 0 additions & 11 deletions test/core/consensus/beefy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ using testing::_;
using testing::Return;

struct Timer : libp2p::basic::Scheduler {
void pulse(std::chrono::milliseconds current_clock) noexcept override {
abort();
}
std::chrono::milliseconds now() const noexcept override {
abort();
}
Expand All @@ -83,14 +80,6 @@ struct Timer : libp2p::basic::Scheduler {
cb_.emplace(std::move(cb));
return Handle{};
}
void cancel(Handle::Ticket ticket) noexcept override {
abort();
}
outcome::result<Handle::Ticket> reschedule(
Handle::Ticket ticket,
std::chrono::milliseconds delay_from_now) noexcept override {
abort();
}

void call() {
if (cb_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ class VotingRoundTest : public testing::Test,
vote_graph_ = std::make_shared<VoteGraphImpl>(base, config.voters, env_);

scheduler_ = std::make_shared<libp2p::basic::SchedulerMock>();
EXPECT_CALL(*scheduler_, scheduleImplMockCall(_, _, _)).Times(AnyNumber());
EXPECT_CALL(*scheduler_, nowMockCall()).Times(AnyNumber());
EXPECT_CALL(*scheduler_, scheduleImpl(_, _, _)).Times(AnyNumber());
EXPECT_CALL(*scheduler_, now()).Times(AnyNumber());

previous_round_ = std::make_shared<VotingRoundMock>();
ON_CALL(*previous_round_, lastFinalizedBlock())
Expand Down
12 changes: 6 additions & 6 deletions test/core/consensus/timeline/timeline_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ TEST_F(TimelineTest, SingleValidator) {
.WillRepeatedly(Return(ValidatorStatus::SingleValidator));
EXPECT_CALL(*production_consensus, processSlot(_, best_block)).Times(0);
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(Invoke([&](auto cb) {
on_run_slot = std::move(cb);
return SchedulerMock::Handle{};
Expand All @@ -285,7 +285,7 @@ TEST_F(TimelineTest, SingleValidator) {
EXPECT_CALL(*production_consensus, processSlot(current_slot, best_block))
.WillOnce(Return(outcome::success()));
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(
Invoke([&](auto cb) { return SchedulerMock::Handle{}; })));

Expand Down Expand Up @@ -324,7 +324,7 @@ TEST_F(TimelineTest, Validator) {
// - don't process slot, because node is not synchronized
EXPECT_CALL(*production_consensus, processSlot(_, best_block)).Times(0);
// - don't wait time to run slot, because node is not synchronized
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, _)).Times(0);
EXPECT_CALL(*scheduler, scheduleImpl(_, _, _)).Times(0);

timeline->start();

Expand All @@ -349,7 +349,7 @@ TEST_F(TimelineTest, Validator) {
// - process slot won't start, because slot is not changed
EXPECT_CALL(*production_consensus, processSlot(_, _)).Times(0);
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(Invoke([&](auto cb) {
on_run_slot_2 = std::move(cb);
return SchedulerMock::Handle{};
Expand Down Expand Up @@ -380,7 +380,7 @@ TEST_F(TimelineTest, Validator) {
EXPECT_CALL(*production_consensus, processSlot(current_slot, best_block))
.WillOnce(Return(SlotLeadershipError::NO_SLOT_LEADER));
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(Invoke([&](auto cb) {
on_run_slot_3 = std::move(cb);
return SchedulerMock::Handle{};
Expand All @@ -406,7 +406,7 @@ TEST_F(TimelineTest, Validator) {
EXPECT_CALL(*production_consensus, processSlot(current_slot, best_block))
.WillOnce(Return(outcome::success()));
// - start to wait for end of current slot
EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, false))
EXPECT_CALL(*scheduler, scheduleImpl(_, _, false))
.WillOnce(WithArg<0>(Invoke([&](auto cb) {
on_run_slot_3 = std::move(cb);
return SchedulerMock::Handle{};
Expand Down
2 changes: 1 addition & 1 deletion test/core/network/synchronizer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class SynchronizerTest
EXPECT_CALL(*router, getSyncProtocol())
.WillRepeatedly(Return(sync_protocol));

EXPECT_CALL(*scheduler, scheduleImplMockCall(_, _, _)).Times(AnyNumber());
EXPECT_CALL(*scheduler, scheduleImpl(_, _, _)).Times(AnyNumber());

EXPECT_CALL(app_config, syncMethod())
.WillOnce(Return(application::SyncMethod::Full));
Expand Down
Loading