diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 8ab069f4e020d..51851a1705f1b 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -57,6 +57,7 @@ #include #include #include +#include #include template<> @@ -3025,7 +3026,11 @@ ss::future<> consensus::maybe_commit_configuration(ssx::semaphore_units u) { vlog( _ctxlog.trace, "current node is not longer group member, stepping down"); - do_step_down("not_longer_member"); + co_await transfer_and_stepdown("no_longer_member"); + if (_leader_id) { + _leader_id = std::nullopt; + trigger_leadership_notification(); + } } } @@ -3571,6 +3576,67 @@ consensus::do_transfer_leadership(transfer_leadership_request req) { return f.finally([this] { _transferring_leadership = false; }); } +ss::future<> consensus::transfer_and_stepdown(std::string_view ctx) { + // select a follower with longest log + auto voters = _fstats + | std::views::filter( + [](const follower_stats::container_t::value_type& p) { + return !p.second.is_learner; + }); + auto it = std::max_element( + voters.begin(), voters.end(), [](const auto& a, const auto& b) { + return a.second.last_dirty_log_index < b.second.last_dirty_log_index; + }); + + if (unlikely(it == voters.end())) { + vlog( + _ctxlog.warn, + "Unable to find a follower that would be an eligible candidate to " + "take over the leadership"); + do_step_down(ctx); + co_return; + } + + const auto target = it->first; + vlog( + _ctxlog.info, + "[{}] stepping down as leader in term {}, dirty offset {}, with " + "leadership transfer to {}", + ctx, + _term, + _log->offsets().dirty_offset, + target); + + timeout_now_request req{ + .target_node_id = target, + .node_id = _self, + .group = _group, + .term = _term, + }; + auto timeout = raft::clock_type::now() + + config::shard_local_cfg().raft_timeout_now_timeout_ms(); + auto r = co_await _client_protocol.timeout_now( + target.id(), std::move(req), rpc::client_opts(timeout)); + + if (r.has_error()) { + vlog( + _ctxlog.warn, + "[{}] stepping down - failed to request timeout_now from {} - " + "{}", + ctx, + target, + r.error().message()); + } else { + vlog( + _ctxlog.trace, + "[{}] stepping down - timeout now reply result: {} from node {}", + ctx, + r.value(), + target); + } + do_step_down(ctx); +} + ss::future<> consensus::remove_persistent_state() { // voted for co_await _storage.kvs().remove( diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 1e606c1a86148..e3dda31953c3c 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -548,6 +548,9 @@ class consensus { // all these private functions assume that we are under exclusive operations // via the _op_sem void do_step_down(std::string_view); + // steps down and requests the other replica to start leader election + // immediately + ss::future<> transfer_and_stepdown(std::string_view); ss::future do_vote(vote_request); ss::future do_append_entries(append_entries_request&&); diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index ff93a22b8d1c3..c1a216f880820 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -20,7 +20,11 @@ #include "test_utils/async.h" #include "test_utils/test.h" +#include + #include +#include +#include using namespace raft; @@ -713,3 +717,114 @@ TEST_F_CORO(raft_fixture, test_delayed_snapshot_request) { return nodes().begin()->second->raft()->term() > term_snapshot; }); } + +TEST_F_CORO(raft_fixture, leadership_transfer_delay) { + set_election_timeout(1500ms); + co_await create_simple_group(4); + auto replicate_some_data = [&] { + return retry_with_leader( + 10s + model::timeout_clock::now(), + [this](raft_node_instance& leader_node) { + return leader_node.raft()->replicate( + make_batches(10, 10, 128), + replicate_options(consistency_level::quorum_ack)); + }) + .then([&](result result) { + if (result) { + vlog( + tstlog.info, + "replication result last offset: {}", + result.value().last_offset); + } else { + vlog( + tstlog.info, + "replication error: {}", + result.error().message()); + } + }); + }; + using clock_t = std::chrono::high_resolution_clock; + co_await replicate_some_data(); + struct leadership_changed_event { + model::node_id node; + leadership_status status; + clock_t::time_point timestamp; + }; + ss::circular_buffer events; + + register_leader_callback([&](model::node_id id, leadership_status status) { + events.push_back(leadership_changed_event{ + .node = id, + .status = status, + .timestamp = clock_t::now(), + }); + }); + auto leader_id = get_leader().value(); + auto& leader_node = node(leader_id); + auto current_term = leader_node.raft()->term(); + auto r = co_await leader_node.raft()->transfer_leadership( + transfer_leadership_request{.group = leader_node.raft()->group()}); + ASSERT_TRUE_CORO(r.success); + // here we wait for all the replicas to notify about the leadership changes, + // each replica will notify two times, one when there is no leader, second + // time when the leader is elected. We have 4 replicas so in total we expect + // 8 notifications to be fired. + co_await tests::cooperative_spin_wait_with_timeout( + 10s, [&] { return events.size() >= 8; }); + + // calculate the time needed to transfer leadership, in our case it is the + // time between first notification reporting no leader and first reporting + // new leader. + auto new_leader_reported_ev = std::find_if( + events.begin(), events.end(), [&](leadership_changed_event& ev) { + return ev.status.current_leader.has_value() + && ev.status.term > current_term; + }); + + auto transfer_time = new_leader_reported_ev->timestamp + - events.begin()->timestamp; + vlog( + tstlog.info, + "leadership_transfer - new leader reported after: {} ms", + (transfer_time) / 1ms); + events.clear(); + // now remove the current leader from the raft group + leader_id = get_leader().value(); + auto new_nodes = all_vnodes() | std::views::filter([&](vnode n) { + return n.id() != leader_id; + }); + auto& new_leader_node = node(leader_id); + current_term = new_leader_node.raft()->term(); + co_await new_leader_node.raft()->replace_configuration( + std::vector{new_nodes.begin(), new_nodes.end()}, + model::revision_id(2)); + // analogically to the previous case we wait for 6 notifications as + // currently the group has only 3 replicas + co_await tests::cooperative_spin_wait_with_timeout( + 10s, [&] { return events.size() >= 6; }); + + auto leader_reported_after_reconfiguration = std::find_if( + events.begin(), events.end(), [&](leadership_changed_event& ev) { + return ev.status.current_leader.has_value() + && ev.status.term > current_term; + }); + + auto election_time = leader_reported_after_reconfiguration->timestamp + - events.begin()->timestamp; + vlog( + tstlog.info, + "reconfiguration - new leader reported after: {} ms", + (election_time) / 1ms); + + for (auto& vn : all_vnodes()) { + co_await stop_node(vn.id()); + } + + auto tolerance = 0.15; + /** + * Validate that election time after reconfiguration is simillar to the + * time needed for leadership transfer + */ + ASSERT_LE_CORO(election_time, transfer_time * (1.0 + tolerance)); + ASSERT_GE_CORO(election_time, transfer_time * (1.0 - tolerance)); +} diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index 121a28f519cc9..bf9cdf64b2a64 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -362,27 +362,18 @@ raft_node_instance::raft_node_instance( raft_node_map& node_map, ss::sharded& feature_table, leader_update_clb_t leader_update_clb, - bool enable_longest_log_detection) - : _id(id) - , _revision(revision) - , _logger(test_log, fmt::format("[node: {}]", _id)) - , _base_directory(fmt::format( - "test_raft_{}_{}", _id, random_generators::gen_alphanum_string(12))) - , _protocol(ss::make_shared(node_map, _logger)) - , _features(feature_table) - , _recovery_mem_quota([] { - return raft::recovery_memory_quota::configuration{ - .max_recovery_memory = config::mock_binding>( - 200_MiB), - .default_read_buffer_size = config::mock_binding(128_KiB), - }; - }) - , _recovery_scheduler( - config::mock_binding(64), config::mock_binding(10ms)) - , _leader_clb(std::move(leader_update_clb)) - , _enable_longest_log_detection(enable_longest_log_detection) { - config::shard_local_cfg().disable_metrics.set_value(true); -} + bool enable_longest_log_detection, + std::chrono::milliseconds election_timeout) + : raft_node_instance( + id, + revision, + fmt::format( + "test_raft_{}_{}", _id, random_generators::gen_alphanum_string(12)), + node_map, + feature_table, + std::move(leader_update_clb), + enable_longest_log_detection, + election_timeout) {} raft_node_instance::raft_node_instance( model::node_id id, @@ -391,7 +382,8 @@ raft_node_instance::raft_node_instance( raft_node_map& node_map, ss::sharded& feature_table, leader_update_clb_t leader_update_clb, - bool enable_longest_log_detection) + bool enable_longest_log_detection, + std::chrono::milliseconds election_timeout) : _id(id) , _revision(revision) , _logger(test_log, fmt::format("[node: {}]", _id)) @@ -408,14 +400,16 @@ raft_node_instance::raft_node_instance( , _recovery_scheduler( config::mock_binding(64), config::mock_binding(10ms)) , _leader_clb(std::move(leader_update_clb)) - , _enable_longest_log_detection(enable_longest_log_detection) { + , _enable_longest_log_detection(enable_longest_log_detection) + , _election_timeout( + config::mock_binding(election_timeout)) { config::shard_local_cfg().disable_metrics.set_value(true); } ss::future<> raft_node_instance::initialise(std::vector initial_nodes) { _hb_manager = std::make_unique( - config::mock_binding(50ms), + config::mock_binding(_election_timeout() / 10), consensus_client_protocol(_protocol), _id, config::mock_binding(1000ms), @@ -599,8 +593,14 @@ raft_fixture::add_node(model::node_id id, model::revision_id rev) { rev, *this, _features, - [id, this](leadership_status lst) { _leaders_view[id] = lst; }, - _enable_longest_log_detection); + [id, this](leadership_status lst) { + _leaders_view[id] = lst; + if (_leader_clb) { + _leader_clb.value()(id, lst); + } + }, + _enable_longest_log_detection, + _election_timeout); auto [it, success] = _nodes.emplace(id, std::move(instance)); return *it->second; @@ -614,8 +614,14 @@ raft_node_instance& raft_fixture::add_node( std::move(base_dir), *this, _features, - [id, this](leadership_status lst) { _leaders_view[id] = lst; }, - _enable_longest_log_detection); + [id, this](leadership_status lst) { + _leaders_view[id] = lst; + if (_leader_clb) { + _leader_clb.value()(id, lst); + } + }, + _enable_longest_log_detection, + _election_timeout); auto [it, success] = _nodes.emplace(id, std::move(instance)); return *it->second; diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index a134efb69b85c..a85af5f19c623 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -168,7 +168,8 @@ class raft_node_instance : public ss::weakly_referencable { raft_node_map& node_map, ss::sharded& feature_table, leader_update_clb_t leader_update_clb, - bool enable_longest_log_detection); + bool enable_longest_log_detection, + std::chrono::milliseconds election_timeout); raft_node_instance( model::node_id id, @@ -176,7 +177,8 @@ class raft_node_instance : public ss::weakly_referencable { raft_node_map& node_map, ss::sharded& feature_table, leader_update_clb_t leader_update_clb, - bool enable_longest_log_detection); + bool enable_longest_log_detection, + std::chrono::milliseconds election_timeout); raft_node_instance(const raft_node_instance&) = delete; raft_node_instance(raft_node_instance&&) noexcept = delete; @@ -246,8 +248,6 @@ class raft_node_instance : public ss::weakly_referencable { ss::sstring _base_directory; ss::shared_ptr _protocol; ss::sharded _storage; - config::binding _election_timeout - = config::mock_binding(500ms); ss::sharded& _features; ss::sharded _recovery_throttle; recovery_memory_quota _recovery_mem_quota; @@ -257,12 +257,15 @@ class raft_node_instance : public ss::weakly_referencable { ss::lw_shared_ptr _raft; bool started = false; bool _enable_longest_log_detection; + config::binding _election_timeout; }; class raft_fixture : public seastar_test , public raft_node_map { public: + using leader_update_clb_t + = ss::noncopyable_function; raft_fixture() : _logger("raft-fixture") {} using raft_nodes_t = absl:: @@ -534,6 +537,14 @@ class raft_fixture _enable_longest_log_detection = value; } + void register_leader_callback(leader_update_clb_t clb) { + _leader_clb = std::move(clb); + } + + void set_election_timeout(std::chrono::milliseconds timeout) { + _election_timeout = timeout; + } + private: void validate_leaders(); @@ -544,6 +555,8 @@ class raft_fixture ss::sharded _features; bool _enable_longest_log_detection = true; + std::optional _leader_clb; + std::chrono::milliseconds _election_timeout = 500ms; }; std::ostream& operator<<(std::ostream& o, msg_type type);