Skip to content

Commit

Permalink
r/tests: added test validating visible offset invariant
Browse files Browse the repository at this point in the history
Added test checking if an offset that is visible on the leader is not
truncated on the followers.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Feb 5, 2024
1 parent f4ecf1a commit 71b489c
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "raft/tests/raft_fixture.h"
#include "raft/tests/raft_group_fixture.h"
#include "raft/types.h"
#include "random/generators.h"
#include "serde/serde.h"
#include "storage/record_batch_builder.h"
#include "test_utils/async.h"
Expand Down Expand Up @@ -172,3 +173,132 @@ TEST_F_CORO(
[](auto& n) { return n.raft()->maybe_flush_log(0); });
co_await wait_for_committed_offset(result.value().last_offset, 10s);
}

/**
* This tests validates if visible offset moves backward. The invariant that the
* last visible offset does not move backward should be guaranteed by Raft even
* if using relaxed consistency level.
*
* This is possible as the protocol waits for the majority of nodes to
* acknowledge receiving the message before making it visible.
*/
TEST_F_CORO(
raft_fixture, validate_relaxed_consistency_visible_offset_advancement) {
co_await create_simple_group(3);
// wait for leader
co_await wait_for_leader(10s);
for (auto& [_, node] : nodes()) {
node->on_dispatch([](raft::msg_type t) {
if (
t == raft::msg_type::append_entries
&& random_generators::get_int(1000) > 800) {
return ss::sleep(1s);
}

return ss::now();
});
}
bool stop = false;

auto produce_fiber = ss::do_until(
[&stop] { return stop; },
[this] {
ss::lw_shared_ptr<consensus> raft;
for (auto& n : nodes()) {
if (n.second->raft()->is_leader()) {
raft = n.second->raft();
break;
}
}

if (!raft) {
return ss::sleep(100ms);
}
return raft
->replicate(
make_batches(10, 10, 128),
replicate_options(consistency_level::leader_ack))
.then([this](result<replicate_result> result) {
if (result.has_error()) {
vlog(
logger().info,
"error(replicating): {}",
result.error().message());
// "error(replicating): {}\n", result.error().message());
}
});
});
int transfers = 200;
auto l_transfer_fiber = ss::do_until(
[&transfers, &stop] { return transfers-- <= 0 || stop; },
[this] {
std::vector<raft::vnode> not_leaders;
ss::lw_shared_ptr<consensus> raft;
for (auto& n : nodes()) {
if (n.second->raft()->is_leader()) {
raft = n.second->raft();
} else {
not_leaders.push_back(n.second->get_vnode());
}
}

if (!raft) {
return ss::sleep(100ms);
}
auto target = random_generators::random_choice(not_leaders);
return raft
->transfer_leadership(transfer_leadership_request{
.group = raft->group(),
.target = target.id(),
.timeout = 25ms,
})
.then([this](transfer_leadership_reply r) {
if (r.result != raft::errc::success) {
vlog(logger().info, "error(transferring): {}", r);
}
})
.then([] { return ss::sleep(200ms); });
});

absl::node_hash_map<model::node_id, model::offset> last_visible;
auto validator_fiber = ss::do_until(
[&stop] { return stop; },
[this, &last_visible] {
for (auto& [id, node] : nodes()) {
auto o = node->raft()->last_visible_index();
vassert(
last_visible[id] <= o,
"Visible offset moved back on node {}, current visible offset: "
"{}, last visible offset: {}",
id,
o,
last_visible[id]);
vassert(
o <= node->raft()->dirty_offset(),
"last visible offset can not be larger than log end offset");
last_visible[id] = o;
}
return ss::sleep(10ms);
});

co_await ss::sleep(30s);
stop = true;
co_await std::move(produce_fiber);
co_await std::move(l_transfer_fiber);
co_await std::move(validator_fiber);

for (auto& n : nodes()) {
auto r = n.second->raft();
vlog(
logger().info,
"leader: {} log_end: {}, visible: {} \n",
r->is_leader(),
r->dirty_offset(),
r->last_visible_index());
if (r->is_leader()) {
for (auto& fs : r->get_follower_stats()) {
vlog(logger().info, "follower: {}", fs.second);
}
}
}
}

0 comments on commit 71b489c

Please sign in to comment.