Skip to content

Commit

Permalink
Merge pull request #18638 from vbotbuildovich/backport-pr-18626-v24.1…
Browse files Browse the repository at this point in the history
….x-939

[v24.1.x] Fixed `state_machine_manager` stall
  • Loading branch information
piyushredpanda authored May 23, 2024
2 parents 271c533 + 5a66694 commit 5ff21da
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 25 deletions.
76 changes: 53 additions & 23 deletions src/v/raft/state_machine_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ ss::future<> state_machine_manager::apply_snapshot_to_stm(
std::max(model::next_offset(last_offset), stm_entry->stm->next()));
}

ss::future<> state_machine_manager::apply() {
ss::future<> state_machine_manager::try_apply_in_foreground() {
try {
ss::coroutine::switch_to sg_sw(_apply_sg);
// wait until consensus commit index is >= _next
Expand All @@ -315,6 +315,30 @@ ss::future<> state_machine_manager::apply() {
co_return co_await apply_raft_snapshot();
}

// collect STMs which has the same _next offset as the offset in
// manager and there is no background apply taking place
std::vector<entry_ptr> machines;
for (auto& [_, entry] : _machines) {
/**
* We can simply check if a mutex is ready here as calling
* maybe_start_background_apply() will make the mutex underlying
* semaphore immediately not ready as there are no scheduling points
* before calling `get_units`
*/
if (
entry->stm->next() == _next
&& entry->background_apply_mutex.ready()) {
machines.push_back(entry);
}
}
if (machines.empty()) {
vlog(
_log.debug,
"no machines were selected to apply in foreground, current next "
"offset: {}",
_next);
co_return;
}
/**
* Raft make_reader method allows callers reading up to
* last_visible index. In order to make the STMs safe and working
Expand All @@ -334,27 +358,24 @@ ss::future<> state_machine_manager::apply() {
_next, _raft->committed_offset(), ss::default_priority_class());

model::record_batch_reader reader = co_await _raft->make_reader(config);
// collect STMs which has the same _next offset as the offset in
// manager and there is no background apply taking place
std::vector<entry_ptr> machines;
for (auto& [_, entry] : _machines) {
/**
* We can simply check if a mutex is ready here as calling
* maybe_start_background_apply() will make the mutex underlying
* semaphore immediately not ready as there are no scheduling points
* before calling `get_units`
*/
if (
entry->stm->next() == _next
&& entry->background_apply_mutex.ready()) {
machines.push_back(entry);
}
}
auto last_applied = co_await std::move(reader).consume(

auto max_last_applied = co_await std::move(reader).consume(
batch_applicator(default_ctx, machines, _as, _log),
model::no_timeout);

_next = std::max(model::next_offset(last_applied), _next);
if (max_last_applied == model::offset{}) {
vlog(
_log.warn,
"no progress has been made during state machine apply. Current "
"next offset: {}",
_next);
/**
* If no progress has been made, yield to prevent busy looping
*/
co_await ss::sleep_abortable(100ms, _as);
co_return;
}
_next = std::max(model::next_offset(max_last_applied), _next);
vlog(_log.trace, "updating _next offset with: {}", _next);
} catch (const ss::timed_out_error&) {
vlog(_log.debug, "state machine apply timeout");
Expand All @@ -365,6 +386,10 @@ ss::future<> state_machine_manager::apply() {
vlog(
_log.warn, "manager apply exception: {}", std::current_exception());
}
}

ss::future<> state_machine_manager::apply() {
co_await try_apply_in_foreground();
/**
* If any of the state machine is behind, dispatch background apply fibers
*/
Expand Down Expand Up @@ -408,7 +433,9 @@ ss::future<> state_machine_manager::background_apply_fiber(
entry_ptr entry, ssx::semaphore_units units) {
while (!_as.abort_requested() && entry->stm->next() < _next) {
storage::log_reader_config config(
entry->stm->next(), _next, ss::default_priority_class());
entry->stm->next(),
model::prev_offset(_next),
ss::default_priority_class());

vlog(
_log.debug,
Expand All @@ -420,10 +447,13 @@ ss::future<> state_machine_manager::background_apply_fiber(
try {
model::record_batch_reader reader = co_await _raft->make_reader(
config);
co_await std::move(reader).consume(
auto last_applied_before = entry->stm->last_applied_offset();
auto last_applied_after = co_await std::move(reader).consume(
batch_applicator(background_ctx, {entry}, _as, _log),
model::no_timeout);

if (last_applied_before >= last_applied_after) {
error = true;
}
} catch (...) {
error = true;
vlog(
Expand All @@ -433,7 +463,7 @@ ss::future<> state_machine_manager::background_apply_fiber(
std::current_exception());
}
if (error) {
co_await ss::sleep_abortable(1s, _as);
co_await ss::sleep_abortable(100ms, _as);
}
}
units.return_all();
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/state_machine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class state_machine_manager final {
ss::future<> do_apply_raft_snapshot(
raft::snapshot_metadata metadata, storage::snapshot_reader& reader);
ss::future<> apply();
ss::future<> try_apply_in_foreground();

ss::future<std::vector<ssx::semaphore_units>>
acquire_background_apply_mutexes();
Expand Down
129 changes: 129 additions & 0 deletions src/v/raft/tests/stm_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,132 @@ TEST_F_CORO(

ASSERT_EQ_CORO(new_stm->state, partial_expected_state);
}

struct controllable_throwing_kv : public simple_kv {
static constexpr std::string_view name = "controllable_throwing_kv_1";
explicit controllable_throwing_kv(raft_node_instance& rn)
: simple_kv(rn) {}

ss::future<> apply(const model::record_batch& batch) override {
if (batch.last_offset() > _allow_apply) {
throw std::runtime_error(fmt::format(
"not allowed to apply batches with last offset greater than {}. "
"Current batch last offset: {}",
_allow_apply,
batch.last_offset()));
}
vassert(
batch.base_offset() == next(),
"batch {} base offset is not the next to apply, expected base "
"offset: {}",
batch.header(),
next());
co_await simple_kv::apply(batch);
co_return;
}

void allow_apply_to(model::offset o) { _allow_apply = o; }

model::offset _allow_apply;
};

struct controllable_throwing_kv_2 : public controllable_throwing_kv {
using controllable_throwing_kv::controllable_throwing_kv;

static constexpr std::string_view name = "controllable_throwing_kv_2";
};

struct controllable_throwing_kv_3 : public controllable_throwing_kv {
using controllable_throwing_kv::controllable_throwing_kv;

static constexpr std::string_view name = "controllable_throwing_kv_3";
};
TEST_F_CORO(state_machine_fixture, test_all_machines_throw) {
/**
* This test covers the scenario in which all state machines thrown an
* exception during apply, and then one of the state machines makes some
* progress.
*/
create_nodes();
std::vector<ss::shared_ptr<simple_kv>> stms;
for (auto& [id, node] : nodes()) {
raft::state_machine_manager_builder builder;
auto kv_1 = builder.create_stm<controllable_throwing_kv>(*node);
auto kv_2 = builder.create_stm<controllable_throwing_kv_2>(*node);
auto kv_3 = builder.create_stm<controllable_throwing_kv_3>(*node);

stms.push_back(ss::dynamic_pointer_cast<simple_kv>(kv_1));
stms.push_back(ss::dynamic_pointer_cast<simple_kv>(kv_2));
stms.push_back(ss::dynamic_pointer_cast<simple_kv>(kv_3));

co_await node->init_and_start(all_vnodes(), std::move(builder));
}
for (auto& [id, node] : nodes()) {
node->raft()
->stm_manager()
->get<controllable_throwing_kv>()
->allow_apply_to(model::offset(100));
node->raft()
->stm_manager()
->get<controllable_throwing_kv_2>()
->allow_apply_to(model::offset(100));
node->raft()
->stm_manager()
->get<controllable_throwing_kv_3>()
->allow_apply_to(model::offset(150));
}
vlog(logger().info, "Generating state for test");
auto expected = co_await build_random_state(
500, wait_for_each_batch::no, 1);
vlog(logger().info, "Waiting for state machines");
RPTEST_REQUIRE_EVENTUALLY_CORO(15s, [&] {
return std::ranges::all_of(
nodes() | std::views::values,
[&](std::unique_ptr<raft_node_instance>& node) {
auto la = node->raft()
->stm_manager()
->get<controllable_throwing_kv_3>()
->last_applied_offset();
return la >= model::offset(150);
});
});

for (auto& [id, node] : nodes()) {
node->raft()
->stm_manager()
->get<controllable_throwing_kv_2>()
->allow_apply_to(model::offset(160));
}
RPTEST_REQUIRE_EVENTUALLY_CORO(15s, [&] {
return std::ranges::all_of(
nodes() | std::views::values,
[&](std::unique_ptr<raft_node_instance>& node) {
auto la = node->raft()
->stm_manager()
->get<controllable_throwing_kv_2>()
->last_applied_offset();
return la >= model::offset(160);
});
});

for (auto& [id, node] : nodes()) {
node->raft()
->stm_manager()
->get<controllable_throwing_kv>()
->allow_apply_to(model::offset(1000));
node->raft()
->stm_manager()
->get<controllable_throwing_kv_2>()
->allow_apply_to(model::offset(1000));
node->raft()
->stm_manager()
->get<controllable_throwing_kv_3>()
->allow_apply_to(model::offset(1000));
}

co_await wait_for_apply();

for (auto& stm : stms) {
ASSERT_EQ_CORO(stm->state, expected);
}
}
7 changes: 5 additions & 2 deletions src/v/raft/tests/stm_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,14 @@ struct state_machine_fixture : raft_fixture {

ss::future<absl::flat_hash_map<ss::sstring, value_entry>>
build_random_state(
int op_cnt, wait_for_each_batch wait_for_each = wait_for_each_batch::no) {
int op_cnt,
wait_for_each_batch wait_for_each = wait_for_each_batch::no,
size_t max_batch_size = 50) {
absl::flat_hash_map<ss::sstring, value_entry> state;

for (int i = 0; i < op_cnt;) {
const auto batch_sz = random_generators::get_int(1, 50);
const auto batch_sz = random_generators::get_int<size_t>(
1, max_batch_size);
std::vector<std::pair<ss::sstring, std::optional<ss::sstring>>> ops;
for (auto n = 0; n < batch_sz; ++n) {
auto k = random_generators::gen_alphanum_string(10);
Expand Down

0 comments on commit 5ff21da

Please sign in to comment.