Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always use the latest configuration for followers metadata #19964

Merged
merged 4 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,7 @@ ss::future<> consensus::hydrate_snapshot() {
co_await truncate_to_latest_snapshot(truncate_cfg.value());
}
_snapshot_size = co_await _snapshot_mgr.get_snapshot_size();
update_follower_stats(_configuration_manager.get_latest());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we should even hydrate a snapshot if the included_index < start offset, just return success and make it idempotent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this again, I think the fix makes sense but I think the issue though is L2241 already updates follower stats from snapshot config, so we are effectively doing it twice? Perhaps we should just swap L2242 & L2241 and make update_offset_from_snapshot() use latest config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've removed the stats update from update_offset_from_snapshot. I am not sure if the stats should be updated from within that method as its name suggest updating offsets.

}

std::optional<storage::truncate_prefix_config>
Expand Down Expand Up @@ -2331,7 +2332,6 @@ void consensus::update_offset_from_snapshot(
_last_snapshot_index = metadata.last_included_index;
_last_snapshot_term = metadata.last_included_term;

// TODO: add applying snapshot content to state machine
auto prev_commit_index = _commit_index;
_commit_index = std::max(_last_snapshot_index, _commit_index);
maybe_update_last_visible_index(_commit_index);
Expand All @@ -2340,8 +2340,6 @@ void consensus::update_offset_from_snapshot(
_replication_monitor.notify_committed();
_event_manager.notify_commit_index();
}

update_follower_stats(metadata.latest_configuration);
}

ss::future<install_snapshot_reply>
Expand All @@ -2366,10 +2364,10 @@ consensus::do_install_snapshot(install_snapshot_request r) {
_hbeat = clock_type::now();

// request received from new leader
do_step_down("install_snapshot_received");
if (r.term > _term) {
_term = r.term;
_voted_for = {};
do_step_down("install_snapshot_term_greater");
maybe_update_leader(r.source_node());
co_return co_await do_install_snapshot(std::move(r));
}
Expand Down
163 changes: 163 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,166 @@ TEST_F_CORO(raft_fixture, test_prioritizing_longest_log) {

co_await wait_for_visible_offset(visible_offset, 10s);
}

TEST_F_CORO(raft_fixture, test_delayed_snapshot_request) {
co_await create_simple_group(3);
auto replicate_some_data = [&] {
return retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replicate(
make_batches(10, 10, 128),
replicate_options(consistency_level::quorum_ack));
})
.then([&](result<replicate_result> result) {
if (result) {
vlog(
tstlog.info,
"replication result last offset: {}",
result.value().last_offset);
} else {
vlog(
tstlog.info,
"replication error: {}",
result.error().message());
}
});
};

co_await replicate_some_data();

co_await retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replace_configuration(
{all_vnodes()[0]}, model::revision_id{1});
});
// wait for reconfiguration
auto wait_for_reconfiguration = [&](int expected_nodes) {
return tests::cooperative_spin_wait_with_timeout(
10s, [&, expected_nodes] {
return std::all_of(
nodes().begin(),
nodes().end(),
[expected_nodes](const auto& p) {
return p.second->raft()->config().all_nodes().size()
== expected_nodes
&& p.second->raft()->config().get_state()
== configuration_state::simple;
});
});
};

co_await wait_for_reconfiguration(1);

auto leader_node_id = get_leader();
ASSERT_TRUE_CORO(leader_node_id.has_value());
auto& leader_node = node(leader_node_id.value());
ASSERT_EQ_CORO(leader_node.raft()->config().all_nodes().size(), 1);

co_await replicate_some_data();

co_await retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replace_configuration(
{all_vnodes()}, model::revision_id{2});
});

// wait for reconfiguration
co_await wait_for_reconfiguration(3);

co_await replicate_some_data();

auto new_leader_node_id = get_leader();
ASSERT_TRUE_CORO(new_leader_node_id.has_value());
auto& new_leader_node = node(new_leader_node_id.value());
ASSERT_EQ_CORO(new_leader_node.raft()->config().all_nodes().size(), 3);

const auto& p = std::find_if(nodes().begin(), nodes().end(), [&](auto& p) {
return p.second->get_vnode() != new_leader_node.get_vnode();
});
auto& follower_node = p->second;
auto leader_proto = new_leader_node.get_protocol();
// simulate delayed install snapshot request send to follower
install_snapshot_request request;
request.target_node_id = follower_node->get_vnode();
request.node_id = leader_node.get_vnode();
request.group = follower_node->raft()->group();

/**
* A snapshot request represent a state from the point in time when group
* had only one member. Currently the follower is already using
* configuration with 3 members
*/
auto last_included = model::offset(random_generators::get_int(105, 199));
request.last_included_index = last_included;
request.dirty_offset = leader_node.raft()->dirty_offset();
request.term = leader_node.raft()->term();

snapshot_metadata metadata{
.last_included_index = request.last_included_index,
.last_included_term = leader_node.raft()->term(),
.latest_configuration = raft::group_configuration(
{all_vnodes()[0]}, model::revision_id(1)),
.log_start_delta = offset_translator_delta(2),
};

iobuf snapshot;
// using snapshot writer to populate all relevant snapshot metadata i.e.
// header and crc
storage::snapshot_writer writer(make_iobuf_ref_output_stream(snapshot));

co_await writer.write_metadata(reflection::to_iobuf(std::move(metadata)));
co_await write_iobuf_to_output_stream(iobuf{}, writer.output());
co_await writer.close();
request.chunk = snapshot.copy();
request.file_offset = 0;
request.done = true;

auto reply = co_await leader_proto->install_snapshot(
follower_node->get_vnode().id(),
std::move(request),
rpc::client_opts(10s));
ASSERT_TRUE_CORO(reply.has_value());
vlog(tstlog.info, "snapshot reply from follower: {}", reply.value());

// the snapshot contains a configuration with one node which is older than
// the current one the follower has. latest configuration MUST remain
// unchanged

ASSERT_EQ_CORO(follower_node->raft()->config().all_nodes().size(), 3);
EXPECT_EQ(follower_node->raft()->get_follower_stats().size(), 2);
// entries in follower log should be truncated.
ASSERT_EQ_CORO(
follower_node->raft()->start_offset(), model::next_offset(last_included));

/**
* Make sure the leader steps down when it receives an install snapshot
* request
*/

auto follower_proto = follower_node->get_protocol();
install_snapshot_request request_for_leader;

request_for_leader.group = follower_node->raft()->group();
request_for_leader.target_node_id = new_leader_node.get_vnode();
request_for_leader.node_id = follower_node->get_vnode();
request_for_leader.last_included_index = model::offset(
random_generators::get_int(105, 199));
request_for_leader.dirty_offset = leader_node.raft()->dirty_offset();
request_for_leader.term = leader_node.raft()->term();
request_for_leader.chunk = std::move(snapshot);
request_for_leader.done = true;
auto term_snapshot = leader_node.raft()->term();
auto leader_reply = co_await follower_proto->install_snapshot(
new_leader_node.get_vnode().id(),
std::move(request_for_leader),
rpc::client_opts(10s));

ASSERT_TRUE_CORO(leader_reply.has_value());
vlog(tstlog.info, "snapshot reply from leader: {}", leader_reply.value());
co_await tests::cooperative_spin_wait_with_timeout(10s, [&] {
return nodes().begin()->second->raft()->term() > term_snapshot;
});
}
2 changes: 2 additions & 0 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
/// dispatched.
void on_dispatch(dispatch_callback_t);

ss::shared_ptr<in_memory_test_protocol> get_protocol() { return _protocol; }

private:
model::node_id _id;
model::revision_id _revision;
Expand Down