Skip to content

Commit

Permalink
r/vote_stm: added detection of a node with longest log
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Apr 17, 2024
1 parent b06b4a9 commit 27f8bd5
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 19 deletions.
2 changes: 2 additions & 0 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ consensus::consensus(
ss::shared_ptr<storage::log> l,
scheduling_config scheduling_config,
config::binding<std::chrono::milliseconds> disk_timeout,
config::binding<bool> enable_longest_log_detection,
consensus_client_protocol client,
consensus::leader_cb_t cb,
storage::api& storage,
Expand All @@ -116,6 +117,7 @@ consensus::consensus(
, _log(l)
, _scheduling(scheduling_config)
, _disk_timeout(std::move(disk_timeout))
, _enable_longest_log_detection(std::move(enable_longest_log_detection))
, _client_protocol(client)
, _leader_notification(std::move(cb))
, _fstats(
Expand Down
3 changes: 3 additions & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "base/likely.h"
#include "base/seastarx.h"
#include "config/property.h"
#include "features/feature_table.h"
#include "hashing/crc32c.h"
#include "metrics/metrics.h"
Expand Down Expand Up @@ -99,6 +100,7 @@ class consensus {
ss::shared_ptr<storage::log>,
scheduling_config,
config::binding<std::chrono::milliseconds> disk_timeout,
config::binding<bool> enable_longest_log_detection,
consensus_client_protocol,
leader_cb_t,
storage::api&,
Expand Down Expand Up @@ -778,6 +780,7 @@ class consensus {
ss::shared_ptr<storage::log> _log;
scheduling_config _scheduling;
config::binding<std::chrono::milliseconds> _disk_timeout;
config::binding<bool> _enable_longest_log_detection;
consensus_client_protocol _client_protocol;
leader_cb_t _leader_notification;

Expand Down
26 changes: 26 additions & 0 deletions src/v/raft/group_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <boost/range/join.hpp>

#include <algorithm>
#include <optional>

namespace raft {
Expand Down Expand Up @@ -329,6 +330,12 @@ class group_configuration
template<typename Predicate>
requires std::predicate<Predicate, vnode>
bool majority(Predicate&& f) const;
/**
* Returns true if for all of group_nodes predicate returns true
*/
template<typename Predicate>
requires std::predicate<Predicate, vnode>
bool all_of(Predicate&& f) const;

version_t version() const { return _version; }

Expand Down Expand Up @@ -472,6 +479,15 @@ bool majority(Predicate&& f, Range&& range) {

return cnt >= (range.size() / 2) + 1;
}
template<typename Predicate, typename Range>
bool all_of(Predicate&& f, Range&& range) {
if (range.empty()) {
return true;
}

return std::all_of(
std::cbegin(range), std::cend(range), std::forward<Predicate>(f));
}
} // namespace details

template<typename Func>
Expand Down Expand Up @@ -533,6 +549,16 @@ bool group_configuration::majority(Predicate&& f) const {
&& details::majority(f, _old->voters);
}

template<typename Predicate>
requires std::predicate<Predicate, vnode>
bool group_configuration::all_of(Predicate&& f) const {
if (!_old) {
return details::all_of(std::forward<Predicate>(f), _current.voters);
}
return details::all_of(f, _current.voters)
&& details::all_of(f, _old->voters);
}

template<typename Func>
void group_configuration::for_each_voter(Func&& f) const {
auto ids = unique_voter_ids();
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ ss::future<ss::lw_shared_ptr<raft::consensus>> group_manager::create_group(
log,
scheduling_config(_raft_sg, raft_priority()),
_configuration.raft_io_timeout_ms,
_configuration.enable_longest_log_detection,
_client,
[this](raft::leadership_status st) {
trigger_leadership_notification(std::move(st));
Expand Down
2 changes: 2 additions & 0 deletions src/v/raft/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once
#include "cluster/notification.h"
#include "config/property.h"
#include "metrics/metrics.h"
#include "model/metadata.h"
#include "raft/heartbeat_manager.h"
Expand Down Expand Up @@ -47,6 +48,7 @@ class group_manager {
config::binding<model::write_caching_mode> write_caching;
config::binding<std::chrono::milliseconds> write_caching_flush_ms;
config::binding<std::optional<size_t>> write_caching_flush_bytes;
config::binding<bool> enable_longest_log_detection;
};
using config_provider_fn = ss::noncopyable_function<configuration()>;

Expand Down
49 changes: 49 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ TEST_P_CORO(
* the offsets to appear.
*/
TEST_P_CORO(quorum_acks_fixture, test_progress_on_truncation) {
/**
* Truncation detection test is expected to experience a log truncation,
* hence we disable longest log detection
*/
set_enable_longest_log_detection(false);
co_await create_simple_group(3);
auto leader_id = co_await wait_for_leader(10s);
auto params = GetParam();
Expand Down Expand Up @@ -484,3 +489,47 @@ INSTANTIATE_TEST_SUITE_P(
.c_lvl = consistency_level::quorum_ack, .write_caching = false},
test_parameters{
.c_lvl = consistency_level::quorum_ack, .write_caching = true}));

TEST_F_CORO(raft_fixture, test_prioritizing_longest_log) {
co_await create_simple_group(3);

/**
* Enable write
*/
co_await set_write_caching(true);
auto r = co_await retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replicate(
make_batches(10, 10, 128),
replicate_options(consistency_level::quorum_ack));
});
ASSERT_TRUE_CORO(r.has_value());
/**
* wait for all nodes
*/
auto visible_offset = r.value().last_offset;
co_await wait_for_visible_offset(visible_offset, 10s);

/**
* Stop all nodes
*/
auto ids_set = all_ids();
std::vector<model::node_id> ids(ids_set.begin(), ids_set.end());
auto survivor = random_generators::random_choice(ids);

for (auto& id : ids) {
auto data_dir = node(id).raft()->log()->config().base_directory();
co_await stop_node(
id, survivor == id ? remove_data_dir::no : remove_data_dir::yes);
add_node(id, model::revision_id(0), std::move(data_dir));
}

for (auto& [id, n] : nodes()) {
co_await n->init_and_start(all_vnodes());
}

auto leader_id = wait_for_leader(10s);

co_await wait_for_visible_offset(visible_offset, 10s);
}
25 changes: 17 additions & 8 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ raft_node_instance::raft_node_instance(
model::revision_id revision,
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb)
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
Expand All @@ -376,7 +377,8 @@ raft_node_instance::raft_node_instance(
})
, _recovery_scheduler(
config::mock_binding<size_t>(64), config::mock_binding(10ms))
, _leader_clb(std::move(leader_update_clb)) {
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection) {
config::shard_local_cfg().disable_metrics.set_value(true);
}

Expand All @@ -386,7 +388,8 @@ raft_node_instance::raft_node_instance(
ss::sstring base_directory,
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb)
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
Expand All @@ -402,7 +405,8 @@ raft_node_instance::raft_node_instance(
})
, _recovery_scheduler(
config::mock_binding<size_t>(64), config::mock_binding(10ms))
, _leader_clb(std::move(leader_update_clb)) {
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection) {
config::shard_local_cfg().disable_metrics.set_value(true);
}

Expand Down Expand Up @@ -446,6 +450,7 @@ raft_node_instance::initialise(std::vector<raft::vnode> initial_nodes) {
scheduling_config(
ss::default_scheduling_group(), ss::default_priority_class()),
config::mock_binding<std::chrono::milliseconds>(1s),
config::mock_binding<bool>(_enable_longest_log_detection),
consensus_client_protocol(_protocol),
[this](leadership_status ls) { leadership_notification_callback(ls); },
_storage.local(),
Expand Down Expand Up @@ -588,9 +593,12 @@ seastar::future<> raft_fixture::SetUpAsync() {
raft_node_instance&
raft_fixture::add_node(model::node_id id, model::revision_id rev) {
auto instance = std::make_unique<raft_node_instance>(
id, rev, *this, _features, [id, this](leadership_status lst) {
_leaders_view[id] = lst;
});
id,
rev,
*this,
_features,
[id, this](leadership_status lst) { _leaders_view[id] = lst; },
_enable_longest_log_detection);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand All @@ -604,7 +612,8 @@ raft_node_instance& raft_fixture::add_node(
std::move(base_dir),
*this,
_features,
[id, this](leadership_status lst) { _leaders_view[id] = lst; });
[id, this](leadership_status lst) { _leaders_view[id] = lst; },
_enable_longest_log_detection);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand Down
12 changes: 10 additions & 2 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,16 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
ss::sstring base_directory,
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb);
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection);

raft_node_instance(
model::node_id id,
model::revision_id revision,
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb);
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection);

raft_node_instance(const raft_node_instance&) = delete;
raft_node_instance(raft_node_instance&&) noexcept = delete;
Expand Down Expand Up @@ -248,6 +250,7 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
leader_update_clb_t _leader_clb;
ss::lw_shared_ptr<consensus> _raft;
bool started = false;
bool _enable_longest_log_detection;
};

class raft_fixture
Expand Down Expand Up @@ -462,6 +465,10 @@ class raft_fixture
ss::future<> reset_background_flushing() const;
ss::future<> set_write_caching(bool) const;

void set_enable_longest_log_detection(bool value) {
_enable_longest_log_detection = value;
}

private:
void validate_leaders();

Expand All @@ -471,6 +478,7 @@ class raft_fixture
absl::flat_hash_map<model::node_id, leadership_status> _leaders_view;

ss::sharded<features::feature_table> _features;
bool _enable_longest_log_detection = true;
};

std::ostream& operator<<(std::ostream& o, msg_type type);
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/tests/raft_group_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ struct raft_node {
seastar::default_scheduling_group(),
seastar::default_priority_class()),
config::mock_binding<std::chrono::milliseconds>(10s),
config::mock_binding<bool>(true),
raft::make_rpc_client_protocol(self_id, cache),
[this](raft::leadership_status st) { leader_callback(st); },
storage.local(),
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/tests/replication_monitor_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ TEST_P_CORO(monitor_test_fixture, replication_monitor_wait) {
}

TEST_P_CORO(monitor_test_fixture, truncation_detection) {
set_enable_longest_log_detection(false);
co_await create_simple_group(3);
auto leader = co_await wait_for_leader(10s);
co_await set_write_caching(write_caching());
Expand Down
4 changes: 3 additions & 1 deletion src/v/raft/tests/simple_raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ struct simple_raft_fixture {
model::write_caching_mode::default_false),
.write_caching_flush_ms = config::mock_binding(100ms),
.write_caching_flush_bytes
= config::mock_binding<std::optional<size_t>>(std::nullopt)};
= config::mock_binding<std::optional<size_t>>(std::nullopt),
.enable_longest_log_detection = config::mock_binding<bool>(
true)};
},
[] {
return raft::recovery_memory_quota::configuration{
Expand Down
Loading

0 comments on commit 27f8bd5

Please sign in to comment.