diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index e2a175f7abb9b..8f430d4149815 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -100,6 +100,7 @@ consensus::consensus( ss::shared_ptr l, scheduling_config scheduling_config, config::binding disk_timeout, + config::binding enable_longest_log_detection, consensus_client_protocol client, consensus::leader_cb_t cb, storage::api& storage, @@ -116,6 +117,7 @@ consensus::consensus( , _log(l) , _scheduling(scheduling_config) , _disk_timeout(std::move(disk_timeout)) + , _enable_longest_log_detection(std::move(enable_longest_log_detection)) , _client_protocol(client) , _leader_notification(std::move(cb)) , _fstats( diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 752157b9828a5..70879d7383ef6 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -13,6 +13,7 @@ #include "base/likely.h" #include "base/seastarx.h" +#include "config/property.h" #include "features/feature_table.h" #include "hashing/crc32c.h" #include "metrics/metrics.h" @@ -99,6 +100,7 @@ class consensus { ss::shared_ptr, scheduling_config, config::binding disk_timeout, + config::binding enable_longest_log_detection, consensus_client_protocol, leader_cb_t, storage::api&, @@ -778,6 +780,7 @@ class consensus { ss::shared_ptr _log; scheduling_config _scheduling; config::binding _disk_timeout; + config::binding _enable_longest_log_detection; consensus_client_protocol _client_protocol; leader_cb_t _leader_notification; diff --git a/src/v/raft/group_configuration.h b/src/v/raft/group_configuration.h index 990b4a7ed9511..734d9a821dc4b 100644 --- a/src/v/raft/group_configuration.h +++ b/src/v/raft/group_configuration.h @@ -16,6 +16,7 @@ #include +#include #include namespace raft { @@ -329,6 +330,12 @@ class group_configuration template requires std::predicate bool majority(Predicate&& f) const; + /** + * Returns true if for all of group_nodes predicate returns true + */ + template + requires std::predicate + bool all_of(Predicate&& f) const; version_t version() const { return _version; } @@ -472,6 +479,15 @@ bool majority(Predicate&& f, Range&& range) { return cnt >= (range.size() / 2) + 1; } +template +bool all_of(Predicate&& f, Range&& range) { + if (range.empty()) { + return true; + } + + return std::all_of( + std::cbegin(range), std::cend(range), std::forward(f)); +} } // namespace details template @@ -533,6 +549,16 @@ bool group_configuration::majority(Predicate&& f) const { && details::majority(f, _old->voters); } +template +requires std::predicate +bool group_configuration::all_of(Predicate&& f) const { + if (!_old) { + return details::all_of(std::forward(f), _current.voters); + } + return details::all_of(f, _current.voters) + && details::all_of(f, _old->voters); +} + template void group_configuration::for_each_voter(Func&& f) const { auto ids = unique_voter_ids(); diff --git a/src/v/raft/group_manager.cc b/src/v/raft/group_manager.cc index 9ccc5817b9981..e8c6e83b670ff 100644 --- a/src/v/raft/group_manager.cc +++ b/src/v/raft/group_manager.cc @@ -111,6 +111,7 @@ ss::future> group_manager::create_group( log, scheduling_config(_raft_sg, raft_priority()), _configuration.raft_io_timeout_ms, + _configuration.enable_longest_log_detection, _client, [this](raft::leadership_status st) { trigger_leadership_notification(std::move(st)); diff --git a/src/v/raft/group_manager.h b/src/v/raft/group_manager.h index 859138c27a17b..7389ccad214dc 100644 --- a/src/v/raft/group_manager.h +++ b/src/v/raft/group_manager.h @@ -11,6 +11,7 @@ #pragma once #include "cluster/notification.h" +#include "config/property.h" #include "metrics/metrics.h" #include "model/metadata.h" #include "raft/heartbeat_manager.h" @@ -47,6 +48,7 @@ class group_manager { config::binding write_caching; config::binding write_caching_flush_ms; config::binding> write_caching_flush_bytes; + config::binding enable_longest_log_detection; }; using config_provider_fn = ss::noncopyable_function; diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index 76c0794cf33c5..d44ba1fa6af8a 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -413,6 +413,11 @@ TEST_P_CORO( * the offsets to appear. */ TEST_P_CORO(quorum_acks_fixture, test_progress_on_truncation) { + /** + * Truncation detection test is expected to experience a log truncation, + * hence we disable longest log detection + */ + set_enable_longest_log_detection(false); co_await create_simple_group(3); auto leader_id = co_await wait_for_leader(10s); auto params = GetParam(); @@ -484,3 +489,47 @@ INSTANTIATE_TEST_SUITE_P( .c_lvl = consistency_level::quorum_ack, .write_caching = false}, test_parameters{ .c_lvl = consistency_level::quorum_ack, .write_caching = true})); + +TEST_F_CORO(raft_fixture, test_prioritizing_longest_log) { + co_await create_simple_group(3); + + /** + * Enable write + */ + co_await set_write_caching(true); + auto r = co_await retry_with_leader( + 10s + model::timeout_clock::now(), + [this](raft_node_instance& leader_node) { + return leader_node.raft()->replicate( + make_batches(10, 10, 128), + replicate_options(consistency_level::quorum_ack)); + }); + ASSERT_TRUE_CORO(r.has_value()); + /** + * wait for all nodes + */ + auto visible_offset = r.value().last_offset; + co_await wait_for_visible_offset(visible_offset, 10s); + + /** + * Stop all nodes + */ + auto ids_set = all_ids(); + std::vector ids(ids_set.begin(), ids_set.end()); + auto survivor = random_generators::random_choice(ids); + + for (auto& id : ids) { + auto data_dir = node(id).raft()->log()->config().base_directory(); + co_await stop_node( + id, survivor == id ? remove_data_dir::no : remove_data_dir::yes); + add_node(id, model::revision_id(0), std::move(data_dir)); + } + + for (auto& [id, n] : nodes()) { + co_await n->init_and_start(all_vnodes()); + } + + auto leader_id = wait_for_leader(10s); + + co_await wait_for_visible_offset(visible_offset, 10s); +} diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index aa3e995d9b574..b643bffde157f 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -359,7 +359,8 @@ raft_node_instance::raft_node_instance( model::revision_id revision, raft_node_map& node_map, ss::sharded& feature_table, - leader_update_clb_t leader_update_clb) + leader_update_clb_t leader_update_clb, + bool enable_longest_log_detection) : _id(id) , _revision(revision) , _logger(test_log, fmt::format("[node: {}]", _id)) @@ -376,7 +377,8 @@ raft_node_instance::raft_node_instance( }) , _recovery_scheduler( config::mock_binding(64), config::mock_binding(10ms)) - , _leader_clb(std::move(leader_update_clb)) { + , _leader_clb(std::move(leader_update_clb)) + , _enable_longest_log_detection(enable_longest_log_detection) { config::shard_local_cfg().disable_metrics.set_value(true); } @@ -386,7 +388,8 @@ raft_node_instance::raft_node_instance( ss::sstring base_directory, raft_node_map& node_map, ss::sharded& feature_table, - leader_update_clb_t leader_update_clb) + leader_update_clb_t leader_update_clb, + bool enable_longest_log_detection) : _id(id) , _revision(revision) , _logger(test_log, fmt::format("[node: {}]", _id)) @@ -402,7 +405,8 @@ raft_node_instance::raft_node_instance( }) , _recovery_scheduler( config::mock_binding(64), config::mock_binding(10ms)) - , _leader_clb(std::move(leader_update_clb)) { + , _leader_clb(std::move(leader_update_clb)) + , _enable_longest_log_detection(enable_longest_log_detection) { config::shard_local_cfg().disable_metrics.set_value(true); } @@ -446,6 +450,7 @@ raft_node_instance::initialise(std::vector initial_nodes) { scheduling_config( ss::default_scheduling_group(), ss::default_priority_class()), config::mock_binding(1s), + config::mock_binding(_enable_longest_log_detection), consensus_client_protocol(_protocol), [this](leadership_status ls) { leadership_notification_callback(ls); }, _storage.local(), @@ -588,9 +593,12 @@ seastar::future<> raft_fixture::SetUpAsync() { raft_node_instance& raft_fixture::add_node(model::node_id id, model::revision_id rev) { auto instance = std::make_unique( - id, rev, *this, _features, [id, this](leadership_status lst) { - _leaders_view[id] = lst; - }); + id, + rev, + *this, + _features, + [id, this](leadership_status lst) { _leaders_view[id] = lst; }, + _enable_longest_log_detection); auto [it, success] = _nodes.emplace(id, std::move(instance)); return *it->second; @@ -604,7 +612,8 @@ raft_node_instance& raft_fixture::add_node( std::move(base_dir), *this, _features, - [id, this](leadership_status lst) { _leaders_view[id] = lst; }); + [id, this](leadership_status lst) { _leaders_view[id] = lst; }, + _enable_longest_log_detection); auto [it, success] = _nodes.emplace(id, std::move(instance)); return *it->second; diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index ce27989521031..515ab0e8db9f1 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -163,14 +163,16 @@ class raft_node_instance : public ss::weakly_referencable { ss::sstring base_directory, raft_node_map& node_map, ss::sharded& feature_table, - leader_update_clb_t leader_update_clb); + leader_update_clb_t leader_update_clb, + bool enable_longest_log_detection); raft_node_instance( model::node_id id, model::revision_id revision, raft_node_map& node_map, ss::sharded& feature_table, - leader_update_clb_t leader_update_clb); + leader_update_clb_t leader_update_clb, + bool enable_longest_log_detection); raft_node_instance(const raft_node_instance&) = delete; raft_node_instance(raft_node_instance&&) noexcept = delete; @@ -248,6 +250,7 @@ class raft_node_instance : public ss::weakly_referencable { leader_update_clb_t _leader_clb; ss::lw_shared_ptr _raft; bool started = false; + bool _enable_longest_log_detection; }; class raft_fixture @@ -462,6 +465,10 @@ class raft_fixture ss::future<> reset_background_flushing() const; ss::future<> set_write_caching(bool) const; + void set_enable_longest_log_detection(bool value) { + _enable_longest_log_detection = value; + } + private: void validate_leaders(); @@ -471,6 +478,7 @@ class raft_fixture absl::flat_hash_map _leaders_view; ss::sharded _features; + bool _enable_longest_log_detection = true; }; std::ostream& operator<<(std::ostream& o, msg_type type); diff --git a/src/v/raft/tests/raft_group_fixture.h b/src/v/raft/tests/raft_group_fixture.h index b0cbf1858f601..e8f59aff3c65e 100644 --- a/src/v/raft/tests/raft_group_fixture.h +++ b/src/v/raft/tests/raft_group_fixture.h @@ -177,6 +177,7 @@ struct raft_node { seastar::default_scheduling_group(), seastar::default_priority_class()), config::mock_binding(10s), + config::mock_binding(true), raft::make_rpc_client_protocol(self_id, cache), [this](raft::leadership_status st) { leader_callback(st); }, storage.local(), diff --git a/src/v/raft/tests/replication_monitor_tests.cc b/src/v/raft/tests/replication_monitor_tests.cc index e6914a4111707..abbf4e7dfd4b1 100644 --- a/src/v/raft/tests/replication_monitor_tests.cc +++ b/src/v/raft/tests/replication_monitor_tests.cc @@ -79,6 +79,7 @@ TEST_P_CORO(monitor_test_fixture, replication_monitor_wait) { } TEST_P_CORO(monitor_test_fixture, truncation_detection) { + set_enable_longest_log_detection(false); co_await create_simple_group(3); auto leader = co_await wait_for_leader(10s); co_await set_write_caching(write_caching()); diff --git a/src/v/raft/tests/simple_raft_fixture.h b/src/v/raft/tests/simple_raft_fixture.h index be0516a78cee1..a5abc3cc2e2c1 100644 --- a/src/v/raft/tests/simple_raft_fixture.h +++ b/src/v/raft/tests/simple_raft_fixture.h @@ -87,7 +87,9 @@ struct simple_raft_fixture { model::write_caching_mode::default_false), .write_caching_flush_ms = config::mock_binding(100ms), .write_caching_flush_bytes - = config::mock_binding>(std::nullopt)}; + = config::mock_binding>(std::nullopt), + .enable_longest_log_detection = config::mock_binding( + true)}; }, [] { return raft::recovery_memory_quota::configuration{ diff --git a/src/v/raft/vote_stm.cc b/src/v/raft/vote_stm.cc index 85dddd127a204..5fd8082a451c3 100644 --- a/src/v/raft/vote_stm.cc +++ b/src/v/raft/vote_stm.cc @@ -17,6 +17,7 @@ #include "rpc/types.h" #include "ssx/semaphore.h" +#include #include #include #include @@ -181,7 +182,7 @@ ss::future vote_stm::vote(bool leadership_transfer) { ss::future vote_stm::do_vote() { // dispatch requests to all voters _config->for_each_voter([this](vnode id) { (void)dispatch_one(id); }); - + _requests_dispatched_ts = clock_type::now(); co_await process_replies(); auto u = co_await _ptr->_op_lock.get_units(); @@ -190,8 +191,59 @@ ss::future vote_stm::do_vote() { co_return election_success(_success); } +bool vote_stm::has_request_in_progress() const { + return std::any_of( + _replies.begin(), + _replies.end(), + [](const absl::flat_hash_map::value_type& p) { + return p.second.get_state() == vmeta::state::in_progress; + }); +} + +bool vote_stm::can_wait_for_all() const { + return clock_type::now() + < _requests_dispatched_ts + _ptr->_jit.base_duration(); +} + ss::future<> vote_stm::process_replies() { return ss::repeat([this] { + const bool request_in_progress = has_request_in_progress(); + + /** + * Try to wait for vote replies from all of the nodes or use information + * from all replies if it is already available. Waiting for all the + * replies allow candidate to verify if any other protocol participant + * has log which is longer. + * + * If any of the replicas log is longer than current candidate the + * election is failed allowing the other node to become a leader. + */ + if ( + (!request_in_progress || can_wait_for_all()) + && _ptr->_enable_longest_log_detection()) { + if (request_in_progress) { + return wait_for_next_reply().then( + [] { return ss::stop_iteration::no; }); + } + auto longer_log_it = std::find_if( + _replies.begin(), + _replies.end(), + [](const absl::flat_hash_map::value_type& p) { + return p.second.has_longer_log(); + }); + + if (longer_log_it != _replies.end()) { + vlog( + _ctxlog.info, + "[pre-vote {}] vote failed - node {} has longer log than " + "current candidate", + _prevote, + longer_log_it->first); + _success = false; + return ss::make_ready_future( + ss::stop_iteration::yes); + } + } /** * The check if the election is successful is different for prevote, for * prevote phase to be successful it is enough that followers reported @@ -228,12 +280,7 @@ ss::future<> vote_stm::process_replies() { // neither majority votes granted nor failed, check if we have all // replies (is there any vote request in progress) - auto has_request_in_progress = std::any_of( - std::cbegin(_replies), std::cend(_replies), [](const auto& p) { - return p.second.get_state() == vmeta::state::in_progress; - }); - - if (!has_request_in_progress) { + if (!request_in_progress) { _success = false; return ss::make_ready_future( ss::stop_iteration::yes); @@ -245,6 +292,35 @@ ss::future<> vote_stm::process_replies() { }); } +ss::future<> vote_stm::wait_for_next_reply() { + vlog( + _ctxlog.debug, + "[pre-vote {}] trying to wait for vote replies from all of " + "the nodes", + _prevote); + auto h = _vote_bg.hold(); + const auto timeout = std::max( + clock_type::duration(0), + _ptr->_jit.base_duration() + - (clock_type::now() - _requests_dispatched_ts)); + + /** + * Return early if we are not allowed to wait + */ + if (timeout == clock_type::duration(0)) { + co_return; + } + + try { + co_await _sem.wait(timeout, 1); + } catch (const ss::semaphore_timed_out&) { + vlog( + _ctxlog.debug, + "[pre-vote {}] timed out waiting for all the replies", + _prevote); + } +} + ss::future<> vote_stm::wait() { return _vote_bg.close(); } ss::future<> vote_stm::update_vote_state(ssx::semaphore_units u) { diff --git a/src/v/raft/vote_stm.h b/src/v/raft/vote_stm.h index ee1644cfc02dc..5a8960d6b7e6c 100644 --- a/src/v/raft/vote_stm.h +++ b/src/v/raft/vote_stm.h @@ -95,9 +95,19 @@ class vote_stm { // it is an error return state::error; } + + bool has_longer_log() const { + return value && value->has_value() && !value->value().log_ok; + } std::unique_ptr> value; }; + bool has_request_in_progress() const; + + bool can_wait_for_all() const; + + ss::future<> wait_for_next_reply(); + friend std::ostream& operator<<(std::ostream&, const vmeta&); ss::future do_vote(); @@ -121,6 +131,7 @@ class vote_stm { ss::gate _vote_bg; absl::flat_hash_map _replies; ctx_log _ctxlog; + clock_type::time_point _requests_dispatched_ts; }; } // namespace raft diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 5447d5510f18d..acf7d1b732f43 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1340,7 +1340,11 @@ void application::wire_up_redpanda_services( .raft_replica_max_flush_delay_ms.bind(), .write_caching_flush_bytes = config::shard_local_cfg() - .raft_replica_max_pending_flush_bytes.bind()}; + .raft_replica_max_pending_flush_bytes.bind(), + .enable_longest_log_detection + = config::shard_local_cfg() + .raft_enable_longest_log_detection.bind(), + }; }, [] { return raft::recovery_memory_quota::configuration{