Skip to content

Commit

Permalink
Merge pull request #22769 from redpanda-data/backport-19966-v24.1.x-462
Browse files Browse the repository at this point in the history
[v24.1.x] Transfer leadership before stepping down after reconfiguration
  • Loading branch information
mmaslankaprv authored Aug 7, 2024
2 parents dde19be + c861c8d commit 9144399
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 33 deletions.
68 changes: 67 additions & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <chrono>
#include <iterator>
#include <optional>
#include <ranges>
#include <system_error>

template<>
Expand Down Expand Up @@ -3025,7 +3026,11 @@ ss::future<> consensus::maybe_commit_configuration(ssx::semaphore_units u) {
vlog(
_ctxlog.trace,
"current node is not longer group member, stepping down");
do_step_down("not_longer_member");
co_await transfer_and_stepdown("no_longer_member");
if (_leader_id) {
_leader_id = std::nullopt;
trigger_leadership_notification();
}
}
}

Expand Down Expand Up @@ -3571,6 +3576,67 @@ consensus::do_transfer_leadership(transfer_leadership_request req) {
return f.finally([this] { _transferring_leadership = false; });
}

ss::future<> consensus::transfer_and_stepdown(std::string_view ctx) {
// select a follower with longest log
auto voters = _fstats
| std::views::filter(
[](const follower_stats::container_t::value_type& p) {
return !p.second.is_learner;
});
auto it = std::max_element(
voters.begin(), voters.end(), [](const auto& a, const auto& b) {
return a.second.last_dirty_log_index < b.second.last_dirty_log_index;
});

if (unlikely(it == voters.end())) {
vlog(
_ctxlog.warn,
"Unable to find a follower that would be an eligible candidate to "
"take over the leadership");
do_step_down(ctx);
co_return;
}

const auto target = it->first;
vlog(
_ctxlog.info,
"[{}] stepping down as leader in term {}, dirty offset {}, with "
"leadership transfer to {}",
ctx,
_term,
_log->offsets().dirty_offset,
target);

timeout_now_request req{
.target_node_id = target,
.node_id = _self,
.group = _group,
.term = _term,
};
auto timeout = raft::clock_type::now()
+ config::shard_local_cfg().raft_timeout_now_timeout_ms();
auto r = co_await _client_protocol.timeout_now(
target.id(), std::move(req), rpc::client_opts(timeout));

if (r.has_error()) {
vlog(
_ctxlog.warn,
"[{}] stepping down - failed to request timeout_now from {} - "
"{}",
ctx,
target,
r.error().message());
} else {
vlog(
_ctxlog.trace,
"[{}] stepping down - timeout now reply result: {} from node {}",
ctx,
r.value(),
target);
}
do_step_down(ctx);
}

ss::future<> consensus::remove_persistent_state() {
// voted for
co_await _storage.kvs().remove(
Expand Down
3 changes: 3 additions & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ class consensus {
// all these private functions assume that we are under exclusive operations
// via the _op_sem
void do_step_down(std::string_view);
// steps down and requests the other replica to start leader election
// immediately
ss::future<> transfer_and_stepdown(std::string_view);
ss::future<vote_reply> do_vote(vote_request);
ss::future<append_entries_reply>
do_append_entries(append_entries_request&&);
Expand Down
115 changes: 115 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
#include "test_utils/async.h"
#include "test_utils/test.h"

#include <seastar/core/circular_buffer.hh>

#include <algorithm>
#include <chrono>
#include <ranges>

using namespace raft;

Expand Down Expand Up @@ -713,3 +717,114 @@ TEST_F_CORO(raft_fixture, test_delayed_snapshot_request) {
return nodes().begin()->second->raft()->term() > term_snapshot;
});
}

TEST_F_CORO(raft_fixture, leadership_transfer_delay) {
set_election_timeout(1500ms);
co_await create_simple_group(4);
auto replicate_some_data = [&] {
return retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replicate(
make_batches(10, 10, 128),
replicate_options(consistency_level::quorum_ack));
})
.then([&](result<replicate_result> result) {
if (result) {
vlog(
tstlog.info,
"replication result last offset: {}",
result.value().last_offset);
} else {
vlog(
tstlog.info,
"replication error: {}",
result.error().message());
}
});
};
using clock_t = std::chrono::high_resolution_clock;
co_await replicate_some_data();
struct leadership_changed_event {
model::node_id node;
leadership_status status;
clock_t::time_point timestamp;
};
ss::circular_buffer<leadership_changed_event> events;

register_leader_callback([&](model::node_id id, leadership_status status) {
events.push_back(leadership_changed_event{
.node = id,
.status = status,
.timestamp = clock_t::now(),
});
});
auto leader_id = get_leader().value();
auto& leader_node = node(leader_id);
auto current_term = leader_node.raft()->term();
auto r = co_await leader_node.raft()->transfer_leadership(
transfer_leadership_request{.group = leader_node.raft()->group()});
ASSERT_TRUE_CORO(r.success);
// here we wait for all the replicas to notify about the leadership changes,
// each replica will notify two times, one when there is no leader, second
// time when the leader is elected. We have 4 replicas so in total we expect
// 8 notifications to be fired.
co_await tests::cooperative_spin_wait_with_timeout(
10s, [&] { return events.size() >= 8; });

// calculate the time needed to transfer leadership, in our case it is the
// time between first notification reporting no leader and first reporting
// new leader.
auto new_leader_reported_ev = std::find_if(
events.begin(), events.end(), [&](leadership_changed_event& ev) {
return ev.status.current_leader.has_value()
&& ev.status.term > current_term;
});

auto transfer_time = new_leader_reported_ev->timestamp
- events.begin()->timestamp;
vlog(
tstlog.info,
"leadership_transfer - new leader reported after: {} ms",
(transfer_time) / 1ms);
events.clear();
// now remove the current leader from the raft group
leader_id = get_leader().value();
auto new_nodes = all_vnodes() | std::views::filter([&](vnode n) {
return n.id() != leader_id;
});
auto& new_leader_node = node(leader_id);
current_term = new_leader_node.raft()->term();
co_await new_leader_node.raft()->replace_configuration(
std::vector<vnode>{new_nodes.begin(), new_nodes.end()},
model::revision_id(2));
// analogically to the previous case we wait for 6 notifications as
// currently the group has only 3 replicas
co_await tests::cooperative_spin_wait_with_timeout(
10s, [&] { return events.size() >= 6; });

auto leader_reported_after_reconfiguration = std::find_if(
events.begin(), events.end(), [&](leadership_changed_event& ev) {
return ev.status.current_leader.has_value()
&& ev.status.term > current_term;
});

auto election_time = leader_reported_after_reconfiguration->timestamp
- events.begin()->timestamp;
vlog(
tstlog.info,
"reconfiguration - new leader reported after: {} ms",
(election_time) / 1ms);

for (auto& vn : all_vnodes()) {
co_await stop_node(vn.id());
}

auto tolerance = 0.15;
/**
* Validate that election time after reconfiguration is simillar to the
* time needed for leadership transfer
*/
ASSERT_LE_CORO(election_time, transfer_time * (1.0 + tolerance));
ASSERT_GE_CORO(election_time, transfer_time * (1.0 - tolerance));
}
62 changes: 34 additions & 28 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,27 +362,18 @@ raft_node_instance::raft_node_instance(
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
, _base_directory(fmt::format(
"test_raft_{}_{}", _id, random_generators::gen_alphanum_string(12)))
, _protocol(ss::make_shared<in_memory_test_protocol>(node_map, _logger))
, _features(feature_table)
, _recovery_mem_quota([] {
return raft::recovery_memory_quota::configuration{
.max_recovery_memory = config::mock_binding<std::optional<size_t>>(
200_MiB),
.default_read_buffer_size = config::mock_binding<size_t>(128_KiB),
};
})
, _recovery_scheduler(
config::mock_binding<size_t>(64), config::mock_binding(10ms))
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection) {
config::shard_local_cfg().disable_metrics.set_value(true);
}
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout)
: raft_node_instance(
id,
revision,
fmt::format(
"test_raft_{}_{}", _id, random_generators::gen_alphanum_string(12)),
node_map,
feature_table,
std::move(leader_update_clb),
enable_longest_log_detection,
election_timeout) {}

raft_node_instance::raft_node_instance(
model::node_id id,
Expand All @@ -391,7 +382,8 @@ raft_node_instance::raft_node_instance(
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection)
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
Expand All @@ -408,14 +400,16 @@ raft_node_instance::raft_node_instance(
, _recovery_scheduler(
config::mock_binding<size_t>(64), config::mock_binding(10ms))
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection) {
, _enable_longest_log_detection(enable_longest_log_detection)
, _election_timeout(
config::mock_binding<std::chrono::milliseconds>(election_timeout)) {
config::shard_local_cfg().disable_metrics.set_value(true);
}

ss::future<>
raft_node_instance::initialise(std::vector<raft::vnode> initial_nodes) {
_hb_manager = std::make_unique<heartbeat_manager>(
config::mock_binding<std::chrono::milliseconds>(50ms),
config::mock_binding<std::chrono::milliseconds>(_election_timeout() / 10),
consensus_client_protocol(_protocol),
_id,
config::mock_binding<std::chrono::milliseconds>(1000ms),
Expand Down Expand Up @@ -599,8 +593,14 @@ raft_fixture::add_node(model::node_id id, model::revision_id rev) {
rev,
*this,
_features,
[id, this](leadership_status lst) { _leaders_view[id] = lst; },
_enable_longest_log_detection);
[id, this](leadership_status lst) {
_leaders_view[id] = lst;
if (_leader_clb) {
_leader_clb.value()(id, lst);
}
},
_enable_longest_log_detection,
_election_timeout);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand All @@ -614,8 +614,14 @@ raft_node_instance& raft_fixture::add_node(
std::move(base_dir),
*this,
_features,
[id, this](leadership_status lst) { _leaders_view[id] = lst; },
_enable_longest_log_detection);
[id, this](leadership_status lst) {
_leaders_view[id] = lst;
if (_leader_clb) {
_leader_clb.value()(id, lst);
}
},
_enable_longest_log_detection,
_election_timeout);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand Down
21 changes: 17 additions & 4 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,17 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection);
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout);

raft_node_instance(
model::node_id id,
model::revision_id revision,
raft_node_map& node_map,
ss::sharded<features::feature_table>& feature_table,
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection);
bool enable_longest_log_detection,
std::chrono::milliseconds election_timeout);

raft_node_instance(const raft_node_instance&) = delete;
raft_node_instance(raft_node_instance&&) noexcept = delete;
Expand Down Expand Up @@ -246,8 +248,6 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
ss::sstring _base_directory;
ss::shared_ptr<in_memory_test_protocol> _protocol;
ss::sharded<storage::api> _storage;
config::binding<std::chrono::milliseconds> _election_timeout
= config::mock_binding(500ms);
ss::sharded<features::feature_table>& _features;
ss::sharded<coordinated_recovery_throttle> _recovery_throttle;
recovery_memory_quota _recovery_mem_quota;
Expand All @@ -257,12 +257,15 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
ss::lw_shared_ptr<consensus> _raft;
bool started = false;
bool _enable_longest_log_detection;
config::binding<std::chrono::milliseconds> _election_timeout;
};

class raft_fixture
: public seastar_test
, public raft_node_map {
public:
using leader_update_clb_t
= ss::noncopyable_function<void(model::node_id, leadership_status)>;
raft_fixture()
: _logger("raft-fixture") {}
using raft_nodes_t = absl::
Expand Down Expand Up @@ -534,6 +537,14 @@ class raft_fixture
_enable_longest_log_detection = value;
}

void register_leader_callback(leader_update_clb_t clb) {
_leader_clb = std::move(clb);
}

void set_election_timeout(std::chrono::milliseconds timeout) {
_election_timeout = timeout;
}

private:
void validate_leaders();

Expand All @@ -544,6 +555,8 @@ class raft_fixture

ss::sharded<features::feature_table> _features;
bool _enable_longest_log_detection = true;
std::optional<leader_update_clb_t> _leader_clb;
std::chrono::milliseconds _election_timeout = 500ms;
};

std::ostream& operator<<(std::ostream& o, msg_type type);
Expand Down

0 comments on commit 9144399

Please sign in to comment.