Skip to content

Commit

Permalink
r/tests: added test validating time to elect a new leader
Browse files Browse the repository at this point in the history
Added a test validating if a leader election caused by removing leader
from the replica set takes a comparable amount of time to the leadership
transfer.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
(cherry picked from commit 9c58109)
  • Loading branch information
mmaslankaprv committed Aug 7, 2024
1 parent 51ccd59 commit c861c8d
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 32 deletions.
115 changes: 115 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
#include "test_utils/async.h"
#include "test_utils/test.h"

#include <seastar/core/circular_buffer.hh>

#include <algorithm>
#include <chrono>
#include <ranges>

using namespace raft;

Expand Down Expand Up @@ -713,3 +717,114 @@ TEST_F_CORO(raft_fixture, test_delayed_snapshot_request) {
return nodes().begin()->second->raft()->term() > term_snapshot;
});
}

TEST_F_CORO(raft_fixture, leadership_transfer_delay) {
set_election_timeout(1500ms);
co_await create_simple_group(4);
auto replicate_some_data = [&] {
return retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replicate(
make_batches(10, 10, 128),
replicate_options(consistency_level::quorum_ack));
})
.then([&](result<replicate_result> result) {
if (result) {
vlog(
tstlog.info,
"replication result last offset: {}",
result.value().last_offset);
} else {
vlog(
tstlog.info,
"replication error: {}",
result.error().message());
}
});
};
using clock_t = std::chrono::high_resolution_clock;
co_await replicate_some_data();
struct leadership_changed_event {
model::node_id node;
leadership_status status;
clock_t::time_point timestamp;
};
ss::circular_buffer<leadership_changed_event> events;

register_leader_callback([&](model::node_id id, leadership_status status) {
events.push_back(leadership_changed_event{
.node = id,
.status = status,
.timestamp = clock_t::now(),
});
});
auto leader_id = get_leader().value();
auto& leader_node = node(leader_id);
auto current_term = leader_node.raft()->term();
auto r = co_await leader_node.raft()->transfer_leadership(
transfer_leadership_request{.group = leader_node.raft()->group()});
ASSERT_TRUE_CORO(r.success);
// here we wait for all the replicas to notify about the leadership changes,
// each replica will notify two times, one when there is no leader, second
// time when the leader is elected. We have 4 replicas so in total we expect
// 8 notifications to be fired.
co_await tests::cooperative_spin_wait_with_timeout(
10s, [&] { return events.size() >= 8; });

// calculate the time needed to transfer leadership, in our case it is the
// time between first notification reporting no leader and first reporting
// new leader.
auto new_leader_reported_ev = std::find_if(
events.begin(), events.end(), [&](leadership_changed_event& ev) {
return ev.status.current_leader.has_value()
&& ev.status.term > current_term;
});

auto transfer_time = new_leader_reported_ev->timestamp
- events.begin()->timestamp;
vlog(
tstlog.info,
"leadership_transfer - new leader reported after: {} ms",
(transfer_time) / 1ms);
events.clear();
// now remove the current leader from the raft group
leader_id = get_leader().value();
auto new_nodes = all_vnodes() | std::views::filter([&](vnode n) {
return n.id() != leader_id;
});
auto& new_leader_node = node(leader_id);
current_term = new_leader_node.raft()->term();
co_await new_leader_node.raft()->replace_configuration(
std::vector<vnode>{new_nodes.begin(), new_nodes.end()},
model::revision_id(2));
// analogically to the previous case we wait for 6 notifications as
// currently the group has only 3 replicas
co_await tests::cooperative_spin_wait_with_timeout(
10s, [&] { return events.size() >= 6; });

auto leader_reported_after_reconfiguration = std::find_if(
events.begin(), events.end(), [&](leadership_changed_event& ev) {
return ev.status.current_leader.has_value()
&& ev.status.term > current_term;
});

auto election_time = leader_reported_after_reconfiguration->timestamp
- events.begin()->timestamp;
vlog(
tstlog.info,
"reconfiguration - new leader reported after: {} ms",
(election_time) / 1ms);

for (auto& vn : all_vnodes()) {
co_await stop_node(vn.id());
}

auto tolerance = 0.15;
/**
* Validate that election time after reconfiguration is simillar to the
* time needed for leadership transfer
*/
ASSERT_LE_CORO(election_time, transfer_time * (1.0 + tolerance));
ASSERT_GE_CORO(election_time, transfer_time * (1.0 - tolerance));
}
62 changes: 34 additions & 28 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,27 +362,18 @@ raft_node_instance::raft_node_instance(
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
, _base_directory(fmt::format(
"test_raft_{}_{}", _id, random_generators::gen_alphanum_string(12)))
, _protocol(ss::make_shared<in_memory_test_protocol>(node_map, _logger))
, _features(feature_table)
, _recovery_mem_quota([] {
return raft::recovery_memory_quota::configuration{
.max_recovery_memory = config::mock_binding<std::optional<size_t>>(
200_MiB),
.default_read_buffer_size = config::mock_binding<size_t>(128_KiB),
};
})
, _recovery_scheduler(
config::mock_binding<size_t>(64), config::mock_binding(10ms))
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection) {
config::shard_local_cfg().disable_metrics.set_value(true);
}
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout)
: raft_node_instance(
id,
revision,
fmt::format(
"test_raft_{}_{}", _id, random_generators::gen_alphanum_string(12)),
node_map,
feature_table,
std::move(leader_update_clb),
enable_longest_log_detection,
election_timeout) {}

raft_node_instance::raft_node_instance(
model::node_id id,
Expand All @@ -391,7 +382,8 @@ raft_node_instance::raft_node_instance(
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection)
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
Expand All @@ -408,14 +400,16 @@ raft_node_instance::raft_node_instance(
, _recovery_scheduler(
config::mock_binding<size_t>(64), config::mock_binding(10ms))
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection) {
, _enable_longest_log_detection(enable_longest_log_detection)
, _election_timeout(
config::mock_binding<std::chrono::milliseconds>(election_timeout)) {
config::shard_local_cfg().disable_metrics.set_value(true);
}

ss::future<>
raft_node_instance::initialise(std::vector<raft::vnode> initial_nodes) {
_hb_manager = std::make_unique<heartbeat_manager>(
config::mock_binding<std::chrono::milliseconds>(50ms),
config::mock_binding<std::chrono::milliseconds>(_election_timeout() / 10),
consensus_client_protocol(_protocol),
_id,
config::mock_binding<std::chrono::milliseconds>(1000ms),
Expand Down Expand Up @@ -599,8 +593,14 @@ raft_fixture::add_node(model::node_id id, model::revision_id rev) {
rev,
*this,
_features,
[id, this](leadership_status lst) { _leaders_view[id] = lst; },
_enable_longest_log_detection);
[id, this](leadership_status lst) {
_leaders_view[id] = lst;
if (_leader_clb) {
_leader_clb.value()(id, lst);
}
},
_enable_longest_log_detection,
_election_timeout);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand All @@ -614,8 +614,14 @@ raft_node_instance& raft_fixture::add_node(
std::move(base_dir),
*this,
_features,
[id, this](leadership_status lst) { _leaders_view[id] = lst; },
_enable_longest_log_detection);
[id, this](leadership_status lst) {
_leaders_view[id] = lst;
if (_leader_clb) {
_leader_clb.value()(id, lst);
}
},
_enable_longest_log_detection,
_election_timeout);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand Down
21 changes: 17 additions & 4 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,17 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection);
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout);

raft_node_instance(
model::node_id id,
model::revision_id revision,
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection);
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout);

raft_node_instance(const raft_node_instance&) = delete;
raft_node_instance(raft_node_instance&&) noexcept = delete;
Expand Down Expand Up @@ -246,8 +248,6 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
ss::sstring _base_directory;
ss::shared_ptr<in_memory_test_protocol> _protocol;
ss::sharded<storage::api> _storage;
config::binding<std::chrono::milliseconds> _election_timeout
= config::mock_binding(500ms);
ss::sharded<features::feature_table>& _features;
ss::sharded<coordinated_recovery_throttle> _recovery_throttle;
recovery_memory_quota _recovery_mem_quota;
Expand All @@ -257,12 +257,15 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
ss::lw_shared_ptr<consensus> _raft;
bool started = false;
bool _enable_longest_log_detection;
config::binding<std::chrono::milliseconds> _election_timeout;
};

class raft_fixture
: public seastar_test
, public raft_node_map {
public:
using leader_update_clb_t
= ss::noncopyable_function<void(model::node_id, leadership_status)>;
raft_fixture()
: _logger("raft-fixture") {}
using raft_nodes_t = absl::
Expand Down Expand Up @@ -534,6 +537,14 @@ class raft_fixture
_enable_longest_log_detection = value;
}

void register_leader_callback(leader_update_clb_t clb) {
_leader_clb = std::move(clb);
}

void set_election_timeout(std::chrono::milliseconds timeout) {
_election_timeout = timeout;
}

private:
void validate_leaders();

Expand All @@ -544,6 +555,8 @@ class raft_fixture

ss::sharded<features::feature_table> _features;
bool _enable_longest_log_detection = true;
std::optional<leader_update_clb_t> _leader_clb;
std::chrono::milliseconds _election_timeout = 500ms;
};

std::ostream& operator<<(std::ostream& o, msg_type type);
Expand Down

0 comments on commit c861c8d

Please sign in to comment.