From 2f287e296ac64465d119e36df261eb1976c4a130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 24 Jun 2024 06:13:48 +0000 Subject: [PATCH 1/4] r/consensus: removed obsolete comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removed comment mentioning that we should add applying snapshot to stm as this is already being done Signed-off-by: Michał Maślanka --- src/v/raft/consensus.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index b9f5c9a94f13..97c1d5374ccc 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -2331,7 +2331,6 @@ void consensus::update_offset_from_snapshot( _last_snapshot_index = metadata.last_included_index; _last_snapshot_term = metadata.last_included_term; - // TODO: add applying snapshot content to state machine auto prev_commit_index = _commit_index; _commit_index = std::max(_last_snapshot_index, _commit_index); maybe_update_last_visible_index(_commit_index); From c69709f4ab6a9ce049946610b43b818b3bcbe3e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 24 Jun 2024 08:35:19 +0000 Subject: [PATCH 2/4] r/consensus: always use the latest configuration for followers metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When install snapshot request in processed by the follower it may not always replace the follower log content. If the snapshot last included offset is smaller than the follower dirty offset the follower should prefix truncate all the data up to the snapshot last included offset but keep all the entries which offset is greater than snapshot last included offset. Fixed the update of follower metadata state as it was always using the snapshot configuration instead the latest from the configuration manager. Fixes: #core-internal/issues/1310 Signed-off-by: Michał Maślanka --- src/v/raft/consensus.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 97c1d5374ccc..57c5abfe2493 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -2247,6 +2247,7 @@ ss::future<> consensus::hydrate_snapshot() { co_await truncate_to_latest_snapshot(truncate_cfg.value()); } _snapshot_size = co_await _snapshot_mgr.get_snapshot_size(); + update_follower_stats(_configuration_manager.get_latest()); } std::optional @@ -2339,8 +2340,6 @@ void consensus::update_offset_from_snapshot( _replication_monitor.notify_committed(); _event_manager.notify_commit_index(); } - - update_follower_stats(metadata.latest_configuration); } ss::future From 571aa19d3182c64275deb584d1d9be3c01d09210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 24 Jun 2024 08:40:24 +0000 Subject: [PATCH 3/4] r/consensus: always step down when install snapshot request is received MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When install snapshot request is received by the current leader it must unconditionally step down. This is the same behavior as in the case of receiving an append entries request. Signed-off-by: Michał Maślanka --- src/v/raft/consensus.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 57c5abfe2493..7cce37aca1a0 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -2364,10 +2364,10 @@ consensus::do_install_snapshot(install_snapshot_request r) { _hbeat = clock_type::now(); // request received from new leader + do_step_down("install_snapshot_received"); if (r.term > _term) { _term = r.term; _voted_for = {}; - do_step_down("install_snapshot_term_greater"); maybe_update_leader(r.source_node()); co_return co_await do_install_snapshot(std::move(r)); } From e73aa28ad7fe07293752a7cffaad5b0d67056857 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 24 Jun 2024 08:42:20 +0000 Subject: [PATCH 4/4] r/tests: added test validating delayed install snapshot request delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When install snapshot request is delayed and deliver to the node after it already made progress it can not lead to state inconsistencies. Added test validating handing of delayed `install_snapshot` requests. The test is validating behavior of both follower and the leader. Signed-off-by: Michał Maślanka --- src/v/raft/tests/basic_raft_fixture_test.cc | 163 ++++++++++++++++++++ src/v/raft/tests/raft_fixture.h | 2 + 2 files changed, 165 insertions(+) diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index d44ba1fa6af8..30787ec2e9e3 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -533,3 +533,166 @@ TEST_F_CORO(raft_fixture, test_prioritizing_longest_log) { co_await wait_for_visible_offset(visible_offset, 10s); } + +TEST_F_CORO(raft_fixture, test_delayed_snapshot_request) { + co_await create_simple_group(3); + auto replicate_some_data = [&] { + return retry_with_leader( + 10s + model::timeout_clock::now(), + [this](raft_node_instance& leader_node) { + return leader_node.raft()->replicate( + make_batches(10, 10, 128), + replicate_options(consistency_level::quorum_ack)); + }) + .then([&](result result) { + if (result) { + vlog( + tstlog.info, + "replication result last offset: {}", + result.value().last_offset); + } else { + vlog( + tstlog.info, + "replication error: {}", + result.error().message()); + } + }); + }; + + co_await replicate_some_data(); + + co_await retry_with_leader( + 10s + model::timeout_clock::now(), + [this](raft_node_instance& leader_node) { + return leader_node.raft()->replace_configuration( + {all_vnodes()[0]}, model::revision_id{1}); + }); + // wait for reconfiguration + auto wait_for_reconfiguration = [&](int expected_nodes) { + return tests::cooperative_spin_wait_with_timeout( + 10s, [&, expected_nodes] { + return std::all_of( + nodes().begin(), + nodes().end(), + [expected_nodes](const auto& p) { + return p.second->raft()->config().all_nodes().size() + == expected_nodes + && p.second->raft()->config().get_state() + == configuration_state::simple; + }); + }); + }; + + co_await wait_for_reconfiguration(1); + + auto leader_node_id = get_leader(); + ASSERT_TRUE_CORO(leader_node_id.has_value()); + auto& leader_node = node(leader_node_id.value()); + ASSERT_EQ_CORO(leader_node.raft()->config().all_nodes().size(), 1); + + co_await replicate_some_data(); + + co_await retry_with_leader( + 10s + model::timeout_clock::now(), + [this](raft_node_instance& leader_node) { + return leader_node.raft()->replace_configuration( + {all_vnodes()}, model::revision_id{2}); + }); + + // wait for reconfiguration + co_await wait_for_reconfiguration(3); + + co_await replicate_some_data(); + + auto new_leader_node_id = get_leader(); + ASSERT_TRUE_CORO(new_leader_node_id.has_value()); + auto& new_leader_node = node(new_leader_node_id.value()); + ASSERT_EQ_CORO(new_leader_node.raft()->config().all_nodes().size(), 3); + + const auto& p = std::find_if(nodes().begin(), nodes().end(), [&](auto& p) { + return p.second->get_vnode() != new_leader_node.get_vnode(); + }); + auto& follower_node = p->second; + auto leader_proto = new_leader_node.get_protocol(); + // simulate delayed install snapshot request send to follower + install_snapshot_request request; + request.target_node_id = follower_node->get_vnode(); + request.node_id = leader_node.get_vnode(); + request.group = follower_node->raft()->group(); + + /** + * A snapshot request represent a state from the point in time when group + * had only one member. Currently the follower is already using + * configuration with 3 members + */ + auto last_included = model::offset(random_generators::get_int(105, 199)); + request.last_included_index = last_included; + request.dirty_offset = leader_node.raft()->dirty_offset(); + request.term = leader_node.raft()->term(); + + snapshot_metadata metadata{ + .last_included_index = request.last_included_index, + .last_included_term = leader_node.raft()->term(), + .latest_configuration = raft::group_configuration( + {all_vnodes()[0]}, model::revision_id(1)), + .log_start_delta = offset_translator_delta(2), + }; + + iobuf snapshot; + // using snapshot writer to populate all relevant snapshot metadata i.e. + // header and crc + storage::snapshot_writer writer(make_iobuf_ref_output_stream(snapshot)); + + co_await writer.write_metadata(reflection::to_iobuf(std::move(metadata))); + co_await write_iobuf_to_output_stream(iobuf{}, writer.output()); + co_await writer.close(); + request.chunk = snapshot.copy(); + request.file_offset = 0; + request.done = true; + + auto reply = co_await leader_proto->install_snapshot( + follower_node->get_vnode().id(), + std::move(request), + rpc::client_opts(10s)); + ASSERT_TRUE_CORO(reply.has_value()); + vlog(tstlog.info, "snapshot reply from follower: {}", reply.value()); + + // the snapshot contains a configuration with one node which is older than + // the current one the follower has. latest configuration MUST remain + // unchanged + + ASSERT_EQ_CORO(follower_node->raft()->config().all_nodes().size(), 3); + EXPECT_EQ(follower_node->raft()->get_follower_stats().size(), 2); + // entries in follower log should be truncated. + ASSERT_EQ_CORO( + follower_node->raft()->start_offset(), model::next_offset(last_included)); + + /** + * Make sure the leader steps down when it receives an install snapshot + * request + */ + + auto follower_proto = follower_node->get_protocol(); + install_snapshot_request request_for_leader; + + request_for_leader.group = follower_node->raft()->group(); + request_for_leader.target_node_id = new_leader_node.get_vnode(); + request_for_leader.node_id = follower_node->get_vnode(); + request_for_leader.last_included_index = model::offset( + random_generators::get_int(105, 199)); + request_for_leader.dirty_offset = leader_node.raft()->dirty_offset(); + request_for_leader.term = leader_node.raft()->term(); + request_for_leader.chunk = std::move(snapshot); + request_for_leader.done = true; + auto term_snapshot = leader_node.raft()->term(); + auto leader_reply = co_await follower_proto->install_snapshot( + new_leader_node.get_vnode().id(), + std::move(request_for_leader), + rpc::client_opts(10s)); + + ASSERT_TRUE_CORO(leader_reply.has_value()); + vlog(tstlog.info, "snapshot reply from leader: {}", leader_reply.value()); + co_await tests::cooperative_spin_wait_with_timeout(10s, [&] { + return nodes().begin()->second->raft()->term() > term_snapshot; + }); +} diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index c6d5efeda980..cd33915ebbe1 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -240,6 +240,8 @@ class raft_node_instance : public ss::weakly_referencable { /// dispatched. void on_dispatch(dispatch_callback_t); + ss::shared_ptr get_protocol() { return _protocol; } + private: model::node_id _id; model::revision_id _revision;