Skip to content

Commit

Permalink
r/stm_manager: do not read batches if no stm accepts foreground apply
Browse files Browse the repository at this point in the history
If neither of state machines managed by the state machine manager can
accept the foreground apply there is no need to read the range of
batches from the log. In this case we can skip the foreground apply read
and move on directly to background apply dispatch.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv authored and bharathv committed May 22, 2024
1 parent 5ef2018 commit 08af2b2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
51 changes: 32 additions & 19 deletions src/v/raft/state_machine_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ ss::future<> state_machine_manager::apply_snapshot_to_stm(
std::max(model::next_offset(last_offset), stm_entry->stm->next()));
}

ss::future<> state_machine_manager::apply() {
ss::future<> state_machine_manager::try_apply_in_foreground() {
try {
ss::coroutine::switch_to sg_sw(_apply_sg);
// wait until consensus commit index is >= _next
Expand All @@ -315,6 +315,30 @@ ss::future<> state_machine_manager::apply() {
co_return co_await apply_raft_snapshot();
}

// collect STMs which has the same _next offset as the offset in
// manager and there is no background apply taking place
std::vector<entry_ptr> machines;
for (auto& [_, entry] : _machines) {
/**
* We can simply check if a mutex is ready here as calling
* maybe_start_background_apply() will make the mutex underlying
* semaphore immediately not ready as there are no scheduling points
* before calling `get_units`
*/
if (
entry->stm->next() == _next
&& entry->background_apply_mutex.ready()) {
machines.push_back(entry);
}
}
if (machines.empty()) {
vlog(
_log.debug,
"no machines were selected to apply in foreground, current next "
"offset: {}",
_next);
co_return;
}
/**
* Raft make_reader method allows callers reading up to
* last_visible index. In order to make the STMs safe and working
Expand All @@ -334,27 +358,12 @@ ss::future<> state_machine_manager::apply() {
_next, _raft->committed_offset(), ss::default_priority_class());

model::record_batch_reader reader = co_await _raft->make_reader(config);
// collect STMs which has the same _next offset as the offset in
// manager and there is no background apply taking place
std::vector<entry_ptr> machines;
for (auto& [_, entry] : _machines) {
/**
* We can simply check if a mutex is ready here as calling
* maybe_start_background_apply() will make the mutex underlying
* semaphore immediately not ready as there are no scheduling points
* before calling `get_units`
*/
if (
entry->stm->next() == _next
&& entry->background_apply_mutex.ready()) {
machines.push_back(entry);
}
}
auto last_applied = co_await std::move(reader).consume(

auto max_last_applied = co_await std::move(reader).consume(
batch_applicator(default_ctx, machines, _as, _log),
model::no_timeout);

_next = std::max(model::next_offset(last_applied), _next);
_next = std::max(model::next_offset(max_last_applied), _next);
vlog(_log.trace, "updating _next offset with: {}", _next);
} catch (const ss::timed_out_error&) {
vlog(_log.debug, "state machine apply timeout");
Expand All @@ -365,6 +374,10 @@ ss::future<> state_machine_manager::apply() {
vlog(
_log.warn, "manager apply exception: {}", std::current_exception());
}
}

ss::future<> state_machine_manager::apply() {
co_await try_apply_in_foreground();
/**
* If any of the state machine is behind, dispatch background apply fibers
*/
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/state_machine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class state_machine_manager final {
ss::future<> do_apply_raft_snapshot(
raft::snapshot_metadata metadata, storage::snapshot_reader& reader);
ss::future<> apply();
ss::future<> try_apply_in_foreground();

ss::future<std::vector<ssx::semaphore_units>>
acquire_background_apply_mutexes();
Expand Down

0 comments on commit 08af2b2

Please sign in to comment.