Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] Transfer leadership before stepping down after reconfiguration #22769

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <chrono>
#include <iterator>
#include <optional>
#include <ranges>
#include <system_error>

template<>
Expand Down Expand Up @@ -3025,7 +3026,11 @@ ss::future<> consensus::maybe_commit_configuration(ssx::semaphore_units u) {
vlog(
_ctxlog.trace,
"current node is not longer group member, stepping down");
do_step_down("not_longer_member");
co_await transfer_and_stepdown("no_longer_member");
if (_leader_id) {
_leader_id = std::nullopt;
trigger_leadership_notification();
}
}
}

Expand Down Expand Up @@ -3571,6 +3576,67 @@ consensus::do_transfer_leadership(transfer_leadership_request req) {
return f.finally([this] { _transferring_leadership = false; });
}

ss::future<> consensus::transfer_and_stepdown(std::string_view ctx) {
// select a follower with longest log
auto voters = _fstats
| std::views::filter(
[](const follower_stats::container_t::value_type& p) {
return !p.second.is_learner;
});
auto it = std::max_element(
voters.begin(), voters.end(), [](const auto& a, const auto& b) {
return a.second.last_dirty_log_index < b.second.last_dirty_log_index;
});

if (unlikely(it == voters.end())) {
vlog(
_ctxlog.warn,
"Unable to find a follower that would be an eligible candidate to "
"take over the leadership");
do_step_down(ctx);
co_return;
}

const auto target = it->first;
vlog(
_ctxlog.info,
"[{}] stepping down as leader in term {}, dirty offset {}, with "
"leadership transfer to {}",
ctx,
_term,
_log->offsets().dirty_offset,
target);

timeout_now_request req{
.target_node_id = target,
.node_id = _self,
.group = _group,
.term = _term,
};
auto timeout = raft::clock_type::now()
+ config::shard_local_cfg().raft_timeout_now_timeout_ms();
auto r = co_await _client_protocol.timeout_now(
target.id(), std::move(req), rpc::client_opts(timeout));

if (r.has_error()) {
vlog(
_ctxlog.warn,
"[{}] stepping down - failed to request timeout_now from {} - "
"{}",
ctx,
target,
r.error().message());
} else {
vlog(
_ctxlog.trace,
"[{}] stepping down - timeout now reply result: {} from node {}",
ctx,
r.value(),
target);
}
do_step_down(ctx);
}

ss::future<> consensus::remove_persistent_state() {
// voted for
co_await _storage.kvs().remove(
Expand Down
3 changes: 3 additions & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ class consensus {
// all these private functions assume that we are under exclusive operations
// via the _op_sem
void do_step_down(std::string_view);
// steps down and requests the other replica to start leader election
// immediately
ss::future<> transfer_and_stepdown(std::string_view);
ss::future<vote_reply> do_vote(vote_request);
ss::future<append_entries_reply>
do_append_entries(append_entries_request&&);
Expand Down
115 changes: 115 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
#include "test_utils/async.h"
#include "test_utils/test.h"

#include <seastar/core/circular_buffer.hh>

#include <algorithm>
#include <chrono>
#include <ranges>

using namespace raft;

Expand Down Expand Up @@ -713,3 +717,114 @@ TEST_F_CORO(raft_fixture, test_delayed_snapshot_request) {
return nodes().begin()->second->raft()->term() > term_snapshot;
});
}

TEST_F_CORO(raft_fixture, leadership_transfer_delay) {
set_election_timeout(1500ms);
co_await create_simple_group(4);
auto replicate_some_data = [&] {
return retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replicate(
make_batches(10, 10, 128),
replicate_options(consistency_level::quorum_ack));
})
.then([&](result<replicate_result> result) {
if (result) {
vlog(
tstlog.info,
"replication result last offset: {}",
result.value().last_offset);
} else {
vlog(
tstlog.info,
"replication error: {}",
result.error().message());
}
});
};
using clock_t = std::chrono::high_resolution_clock;
co_await replicate_some_data();
struct leadership_changed_event {
model::node_id node;
leadership_status status;
clock_t::time_point timestamp;
};
ss::circular_buffer<leadership_changed_event> events;

register_leader_callback([&](model::node_id id, leadership_status status) {
events.push_back(leadership_changed_event{
.node = id,
.status = status,
.timestamp = clock_t::now(),
});
});
auto leader_id = get_leader().value();
auto& leader_node = node(leader_id);
auto current_term = leader_node.raft()->term();
auto r = co_await leader_node.raft()->transfer_leadership(
transfer_leadership_request{.group = leader_node.raft()->group()});
ASSERT_TRUE_CORO(r.success);
// here we wait for all the replicas to notify about the leadership changes,
// each replica will notify two times, one when there is no leader, second
// time when the leader is elected. We have 4 replicas so in total we expect
// 8 notifications to be fired.
co_await tests::cooperative_spin_wait_with_timeout(
10s, [&] { return events.size() >= 8; });

// calculate the time needed to transfer leadership, in our case it is the
// time between first notification reporting no leader and first reporting
// new leader.
auto new_leader_reported_ev = std::find_if(
events.begin(), events.end(), [&](leadership_changed_event& ev) {
return ev.status.current_leader.has_value()
&& ev.status.term > current_term;
});

auto transfer_time = new_leader_reported_ev->timestamp
- events.begin()->timestamp;
vlog(
tstlog.info,
"leadership_transfer - new leader reported after: {} ms",
(transfer_time) / 1ms);
events.clear();
// now remove the current leader from the raft group
leader_id = get_leader().value();
auto new_nodes = all_vnodes() | std::views::filter([&](vnode n) {
return n.id() != leader_id;
});
auto& new_leader_node = node(leader_id);
current_term = new_leader_node.raft()->term();
co_await new_leader_node.raft()->replace_configuration(
std::vector<vnode>{new_nodes.begin(), new_nodes.end()},
model::revision_id(2));
// analogically to the previous case we wait for 6 notifications as
// currently the group has only 3 replicas
co_await tests::cooperative_spin_wait_with_timeout(
10s, [&] { return events.size() >= 6; });

auto leader_reported_after_reconfiguration = std::find_if(
events.begin(), events.end(), [&](leadership_changed_event& ev) {
return ev.status.current_leader.has_value()
&& ev.status.term > current_term;
});

auto election_time = leader_reported_after_reconfiguration->timestamp
- events.begin()->timestamp;
vlog(
tstlog.info,
"reconfiguration - new leader reported after: {} ms",
(election_time) / 1ms);

for (auto& vn : all_vnodes()) {
co_await stop_node(vn.id());
}

auto tolerance = 0.15;
/**
* Validate that election time after reconfiguration is simillar to the
* time needed for leadership transfer
*/
ASSERT_LE_CORO(election_time, transfer_time * (1.0 + tolerance));
ASSERT_GE_CORO(election_time, transfer_time * (1.0 - tolerance));
}
62 changes: 34 additions & 28 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,27 +362,18 @@ raft_node_instance::raft_node_instance(
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
, _base_directory(fmt::format(
"test_raft_{}_{}", _id, random_generators::gen_alphanum_string(12)))
, _protocol(ss::make_shared<in_memory_test_protocol>(node_map, _logger))
, _features(feature_table)
, _recovery_mem_quota([] {
return raft::recovery_memory_quota::configuration{
.max_recovery_memory = config::mock_binding<std::optional<size_t>>(
200_MiB),
.default_read_buffer_size = config::mock_binding<size_t>(128_KiB),
};
})
, _recovery_scheduler(
config::mock_binding<size_t>(64), config::mock_binding(10ms))
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection) {
config::shard_local_cfg().disable_metrics.set_value(true);
}
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout)
: raft_node_instance(
id,
revision,
fmt::format(
"test_raft_{}_{}", _id, random_generators::gen_alphanum_string(12)),
node_map,
feature_table,
std::move(leader_update_clb),
enable_longest_log_detection,
election_timeout) {}

raft_node_instance::raft_node_instance(
model::node_id id,
Expand All @@ -391,7 +382,8 @@ raft_node_instance::raft_node_instance(
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection)
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
Expand All @@ -408,14 +400,16 @@ raft_node_instance::raft_node_instance(
, _recovery_scheduler(
config::mock_binding<size_t>(64), config::mock_binding(10ms))
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection) {
, _enable_longest_log_detection(enable_longest_log_detection)
, _election_timeout(
config::mock_binding<std::chrono::milliseconds>(election_timeout)) {
config::shard_local_cfg().disable_metrics.set_value(true);
}

ss::future<>
raft_node_instance::initialise(std::vector<raft::vnode> initial_nodes) {
_hb_manager = std::make_unique<heartbeat_manager>(
config::mock_binding<std::chrono::milliseconds>(50ms),
config::mock_binding<std::chrono::milliseconds>(_election_timeout() / 10),
consensus_client_protocol(_protocol),
_id,
config::mock_binding<std::chrono::milliseconds>(1000ms),
Expand Down Expand Up @@ -599,8 +593,14 @@ raft_fixture::add_node(model::node_id id, model::revision_id rev) {
rev,
*this,
_features,
[id, this](leadership_status lst) { _leaders_view[id] = lst; },
_enable_longest_log_detection);
[id, this](leadership_status lst) {
_leaders_view[id] = lst;
if (_leader_clb) {
_leader_clb.value()(id, lst);
}
},
_enable_longest_log_detection,
_election_timeout);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand All @@ -614,8 +614,14 @@ raft_node_instance& raft_fixture::add_node(
std::move(base_dir),
*this,
_features,
[id, this](leadership_status lst) { _leaders_view[id] = lst; },
_enable_longest_log_detection);
[id, this](leadership_status lst) {
_leaders_view[id] = lst;
if (_leader_clb) {
_leader_clb.value()(id, lst);
}
},
_enable_longest_log_detection,
_election_timeout);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand Down
21 changes: 17 additions & 4 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,17 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection);
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout);

raft_node_instance(
model::node_id id,
model::revision_id revision,
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection);
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout);

raft_node_instance(const raft_node_instance&) = delete;
raft_node_instance(raft_node_instance&&) noexcept = delete;
Expand Down Expand Up @@ -246,8 +248,6 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
ss::sstring _base_directory;
ss::shared_ptr<in_memory_test_protocol> _protocol;
ss::sharded<storage::api> _storage;
config::binding<std::chrono::milliseconds> _election_timeout
= config::mock_binding(500ms);
ss::sharded<features::feature_table>& _features;
ss::sharded<coordinated_recovery_throttle> _recovery_throttle;
recovery_memory_quota _recovery_mem_quota;
Expand All @@ -257,12 +257,15 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
ss::lw_shared_ptr<consensus> _raft;
bool started = false;
bool _enable_longest_log_detection;
config::binding<std::chrono::milliseconds> _election_timeout;
};

class raft_fixture
: public seastar_test
, public raft_node_map {
public:
using leader_update_clb_t
= ss::noncopyable_function<void(model::node_id, leadership_status)>;
raft_fixture()
: _logger("raft-fixture") {}
using raft_nodes_t = absl::
Expand Down Expand Up @@ -534,6 +537,14 @@ class raft_fixture
_enable_longest_log_detection = value;
}

void register_leader_callback(leader_update_clb_t clb) {
_leader_clb = std::move(clb);
}

void set_election_timeout(std::chrono::milliseconds timeout) {
_election_timeout = timeout;
}

private:
void validate_leaders();

Expand All @@ -544,6 +555,8 @@ class raft_fixture

ss::sharded<features::feature_table> _features;
bool _enable_longest_log_detection = true;
std::optional<leader_update_clb_t> _leader_clb;
std::chrono::milliseconds _election_timeout = 500ms;
};

std::ostream& operator<<(std::ostream& o, msg_type type);
Expand Down
Loading