From 7719140e3ad316d3b8cde523b61f7601a7389bcf Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 11 Jun 2024 13:04:00 -0700 Subject: [PATCH] producer_state: add request::set_error and make it idempotent A request can be marked as errored multiple times, consider the example below. replicate_f : waiting for replication term change requests from old terms gc-ed: set_err(ec) replicate_f: set_err(ec) Current assert assumes that a request can be set only once, which is true for setting a successful result but not for errors. This commit splits set_value() into set_value() and set_error() and adjusts assert conditions accordingly. --- src/v/cluster/producer_state.cc | 27 +++++++++++++++++++++++++-- src/v/cluster/producer_state.h | 14 ++++---------- src/v/cluster/rm_stm.cc | 11 +++++------ 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index 964458ecd3a0a..b36c6600700eb 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -31,6 +31,29 @@ result_promise_t::future_type request::result() const { return _result.get_shared_future(); } +void request::set_value(request_result_t::value_type value) { + vassert( + _state <= request_state::in_progress && !_result.available(), + "unexpected request state during result set: {}", + *this); + _result.set_value(value); + _state = request_state::completed; +} + +void request::set_error(request_result_t::error_type error) { + // This is idempotent as different fibers can mark the result error + // at different times in some edge cases. + if (_state != request_state::completed) { + _result.set_value(error); + _state = request_state::completed; + return; + } + vassert( + _result.available() && result().get0().has_error(), + "Invalid result state, expected to be available and errored out: {}", + *this); +} + bool request::operator==(const request& other) const { bool compare = _first_sequence == other._first_sequence && _last_sequence == other._last_sequence @@ -119,7 +142,7 @@ result requests::try_emplace( // checks for sequence tracking. while (!_inflight_requests.empty()) { if (!_inflight_requests.front()->has_completed()) { - _inflight_requests.front()->set_value(errc::timeout); + _inflight_requests.front()->set_error(errc::timeout); } _inflight_requests.pop_front(); } @@ -194,7 +217,7 @@ void requests::gc_requests_from_older_terms(model::term_id current_term) { if (!_inflight_requests.front()->has_completed()) { // Here we know for sure the term change, these in flight // requests are going to fail anyway, mark them so. - _inflight_requests.front()->set_value(errc::timeout); + _inflight_requests.front()->set_error(errc::timeout); } _inflight_requests.pop_front(); } diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index aac1ba40bff18..d1c5609f78eac 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -40,7 +40,8 @@ concept AcceptsUnits = requires(Func f, ssx::semaphore_units units) { }; using producer_ptr = ss::lw_shared_ptr; -using result_promise_t = ss::shared_promise>; +using request_result_t = result; +using result_promise_t = ss::shared_promise; using request_ptr = ss::lw_shared_ptr; using seq_t = int32_t; @@ -69,15 +70,8 @@ class request { } } - template - void set_value(ValueType&& value) { - vassert( - _state <= request_state::in_progress && !_result.available(), - "unexpected request state during result set: {}", - *this); - _result.set_value(std::forward(value)); - _state = request_state::completed; - } + void set_value(request_result_t::value_type); + void set_error(request_result_t::error_type); void mark_request_in_progress() { _state = request_state::in_progress; } request_state state() const { return _state; } result_promise_t::future_type result() const; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 4c5148c69ef62..c5cd8fd07266a 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -800,7 +800,7 @@ ss::future> rm_stm::do_transactional_replicate( "got {} on replicating tx data batch for pid:{}", r.error(), bid.pid); - req_ptr->set_value(r.error()); + req_ptr->set_error(r.error()); co_return r.error(); } if (!co_await wait_no_throw( @@ -810,7 +810,7 @@ ss::future> rm_stm::do_transactional_replicate( _ctx_log.warn, "application of the replicated tx batch has timed out pid:{}", bid.pid); - req_ptr->set_value(errc::timeout); + req_ptr->set_error(errc::timeout); co_return tx_errc::timeout; } auto result = kafka_result{ @@ -900,7 +900,6 @@ ss::future> rm_stm::do_idempotent_replicate( raft::replicate_options opts, ss::lw_shared_ptr> enqueued, ssx::semaphore_units& units) { - using ret_t = result; auto request = producer->try_emplace_request(bid, synced_term); if (!request) { co_return request.error(); @@ -920,7 +919,7 @@ ss::future> rm_stm::do_idempotent_replicate( _ctx_log.warn, "replication failed, request enqueue returned error: {}", req_enqueued.get_exception()); - req_ptr->set_value(errc::replication_error); + req_ptr->set_error(errc::replication_error); co_return errc::replication_error; } units.return_all(); @@ -930,13 +929,13 @@ ss::future> rm_stm::do_idempotent_replicate( if (replicated.failed()) { vlog( _ctx_log.warn, "replication failed: {}", replicated.get_exception()); - req_ptr->set_value(errc::replication_error); + req_ptr->set_error(errc::replication_error); co_return errc::replication_error; } auto result = replicated.get0(); if (result.has_error()) { vlog(_ctx_log.warn, "replication failed: {}", result.error()); - req_ptr->set_value(result.error()); + req_ptr->set_error(result.error()); co_return result.error(); } // translate to kafka offset.