diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 5f3b1de117868..43a0fa5ab5f17 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -974,19 +974,12 @@ rm_stm::do_mark_expired(model::producer_identity pid) { co_return std::error_code(co_await do_try_abort_old_tx(pid)); } -ss::future> rm_stm::do_sync_and_transactional_replicate( +ss::future> rm_stm::transactional_replicate( + model::term_id synced_term, producer_ptr producer, model::batch_identity bid, model::record_batch_reader rdr, ssx::semaphore_units units) { - if (!co_await sync(_sync_timeout)) { - vlog( - _ctx_log.trace, - "processing name:replicate_tx pid:{} => stale leader", - bid.pid); - co_return errc::not_leader; - } - auto synced_term = _insync_term; auto result = co_await do_transactional_replicate( synced_term, producer, bid, std::move(rdr)); if (!result) { @@ -1105,18 +1098,27 @@ ss::future> rm_stm::transactional_replicate( if (!check_tx_permitted()) { co_return errc::generic_tx_error; } + if (!co_await sync(_sync_timeout)) { + vlog( + _ctx_log.trace, + "processing name:replicate_tx pid:{} => stale leader", + bid.pid); + co_return errc::not_leader; + } + auto synced_term = _insync_term; auto [producer, _] = maybe_create_producer(bid.pid); co_return co_await producer ->run_with_lock([&](ssx::semaphore_units units) { - return do_sync_and_transactional_replicate( - producer, bid, std::move(rdr), std::move(units)); + return transactional_replicate( + synced_term, producer, bid, std::move(rdr), std::move(units)); }) .finally([this, producer] { _producer_state_manager.local().touch(*producer, _vcluster_id); }); } -ss::future> rm_stm::do_sync_and_idempotent_replicate( +ss::future> rm_stm::idempotent_replicate( + model::term_id synced_term, producer_ptr producer, model::batch_identity bid, model::record_batch_reader br, @@ -1124,12 +1126,6 @@ ss::future> rm_stm::do_sync_and_idempotent_replicate( ss::lw_shared_ptr> enqueued, ssx::semaphore_units units, producer_previously_known known_producer) { - if (!co_await sync(_sync_timeout)) { - // it's ok not to set enqueued on early return because - // the safety check in replicate_in_stages sets it automatically - co_return errc::not_leader; - } - auto synced_term = _insync_term; auto result = co_await do_idempotent_replicate( synced_term, producer, @@ -1256,10 +1252,17 @@ ss::future> rm_stm::idempotent_replicate( raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { try { + if (!co_await sync(_sync_timeout)) { + // it's ok not to set enqueued on early return because + // the safety check in replicate_in_stages sets it automatically + co_return errc::not_leader; + } + auto synced_term = _insync_term; auto [producer, known_producer] = maybe_create_producer(bid.pid); co_return co_await producer ->run_with_lock([&, known_producer](ssx::semaphore_units units) { - return do_sync_and_idempotent_replicate( + return idempotent_replicate( + synced_term, producer, bid, std::move(br), diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 64ab3f0e21b73..6e879c848ca91 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -313,7 +313,8 @@ class rm_stm final : public raft::persisted_stm<> { ss::future> transactional_replicate( model::batch_identity, model::record_batch_reader); - ss::future> do_sync_and_transactional_replicate( + ss::future> transactional_replicate( + model::term_id, producer_ptr, model::batch_identity, model::record_batch_reader, @@ -331,23 +332,24 @@ class rm_stm final : public raft::persisted_stm<> { raft::replicate_options, ss::lw_shared_ptr>); - ss::future> do_idempotent_replicate( + ss::future> idempotent_replicate( model::term_id, producer_ptr, model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>, - ssx::semaphore_units&, + ssx::semaphore_units, producer_previously_known); - ss::future> do_sync_and_idempotent_replicate( + ss::future> do_idempotent_replicate( + model::term_id, producer_ptr, model::batch_identity, model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>, - ssx::semaphore_units, + ssx::semaphore_units&, producer_previously_known); ss::future> replicate_msg(