diff --git a/src/v/raft/state_machine_manager.cc b/src/v/raft/state_machine_manager.cc index d0f91ef83f2db..0ea076b3d9bba 100644 --- a/src/v/raft/state_machine_manager.cc +++ b/src/v/raft/state_machine_manager.cc @@ -363,6 +363,18 @@ ss::future<> state_machine_manager::try_apply_in_foreground() { batch_applicator(default_ctx, machines, _as, _log), model::no_timeout); + if (max_last_applied == model::offset{}) { + vlog( + _log.warn, + "no progress has been made during state machine apply. Current " + "next offset: {}", + _next); + /** + * If no progress has been made, yield to prevent busy looping + */ + co_await ss::sleep_abortable(100ms, _as); + co_return; + } _next = std::max(model::next_offset(max_last_applied), _next); vlog(_log.trace, "updating _next offset with: {}", _next); } catch (const ss::timed_out_error&) { @@ -433,10 +445,13 @@ ss::future<> state_machine_manager::background_apply_fiber( try { model::record_batch_reader reader = co_await _raft->make_reader( config); - co_await std::move(reader).consume( + auto last_applied_before = entry->stm->last_applied_offset(); + auto last_applied_after = co_await std::move(reader).consume( batch_applicator(background_ctx, {entry}, _as, _log), model::no_timeout); - + if (last_applied_before >= last_applied_after) { + error = true; + } } catch (...) { error = true; vlog( @@ -446,7 +461,7 @@ ss::future<> state_machine_manager::background_apply_fiber( std::current_exception()); } if (error) { - co_await ss::sleep_abortable(1s, _as); + co_await ss::sleep_abortable(100ms, _as); } } units.return_all();