Skip to content

Commit

Permalink
producer_state: add request::set_error and make it idempotent
Browse files Browse the repository at this point in the history
A request can be marked as errored multiple times, consider the example
below.

replicate_f : waiting for replication
term change
requests from old terms gc-ed: set_err(ec)
replicate_f: set_err(ec)

Current assert assumes that a request can be set only once, which is
true for setting a successful result but not for errors. This commit
splits set_value() into set_value() and set_error() and adjusts
assert conditions accordingly.
  • Loading branch information
bharathv committed Jun 13, 2024
1 parent 7579cf1 commit 7719140
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
27 changes: 25 additions & 2 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@ result_promise_t::future_type request::result() const {
return _result.get_shared_future();
}

void request::set_value(request_result_t::value_type value) {
vassert(
_state <= request_state::in_progress && !_result.available(),
"unexpected request state during result set: {}",
*this);
_result.set_value(value);
_state = request_state::completed;
}

void request::set_error(request_result_t::error_type error) {
// This is idempotent as different fibers can mark the result error
// at different times in some edge cases.
if (_state != request_state::completed) {
_result.set_value(error);
_state = request_state::completed;
return;
}
vassert(
_result.available() && result().get0().has_error(),
"Invalid result state, expected to be available and errored out: {}",
*this);
}

bool request::operator==(const request& other) const {
bool compare = _first_sequence == other._first_sequence
&& _last_sequence == other._last_sequence
Expand Down Expand Up @@ -119,7 +142,7 @@ result<request_ptr> requests::try_emplace(
// checks for sequence tracking.
while (!_inflight_requests.empty()) {
if (!_inflight_requests.front()->has_completed()) {
_inflight_requests.front()->set_value(errc::timeout);
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
Expand Down Expand Up @@ -194,7 +217,7 @@ void requests::gc_requests_from_older_terms(model::term_id current_term) {
if (!_inflight_requests.front()->has_completed()) {
// Here we know for sure the term change, these in flight
// requests are going to fail anyway, mark them so.
_inflight_requests.front()->set_value(errc::timeout);
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
Expand Down
14 changes: 4 additions & 10 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ concept AcceptsUnits = requires(Func f, ssx::semaphore_units units) {
};

using producer_ptr = ss::lw_shared_ptr<producer_state>;
using result_promise_t = ss::shared_promise<result<kafka_result>>;
using request_result_t = result<kafka_result>;
using result_promise_t = ss::shared_promise<request_result_t>;
using request_ptr = ss::lw_shared_ptr<request>;
using seq_t = int32_t;

Expand Down Expand Up @@ -69,15 +70,8 @@ class request {
}
}

template<class ValueType>
void set_value(ValueType&& value) {
vassert(
_state <= request_state::in_progress && !_result.available(),
"unexpected request state during result set: {}",
*this);
_result.set_value(std::forward<ValueType>(value));
_state = request_state::completed;
}
void set_value(request_result_t::value_type);
void set_error(request_result_t::error_type);
void mark_request_in_progress() { _state = request_state::in_progress; }
request_state state() const { return _state; }
result_promise_t::future_type result() const;
Expand Down
11 changes: 5 additions & 6 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
"got {} on replicating tx data batch for pid:{}",
r.error(),
bid.pid);
req_ptr->set_value(r.error());
req_ptr->set_error(r.error());
co_return r.error();
}
if (!co_await wait_no_throw(
Expand All @@ -810,7 +810,7 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
_ctx_log.warn,
"application of the replicated tx batch has timed out pid:{}",
bid.pid);
req_ptr->set_value(errc::timeout);
req_ptr->set_error(errc::timeout);
co_return tx_errc::timeout;
}
auto result = kafka_result{
Expand Down Expand Up @@ -900,7 +900,6 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued,
ssx::semaphore_units& units) {
using ret_t = result<kafka_result>;
auto request = producer->try_emplace_request(bid, synced_term);
if (!request) {
co_return request.error();
Expand All @@ -920,7 +919,7 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
_ctx_log.warn,
"replication failed, request enqueue returned error: {}",
req_enqueued.get_exception());
req_ptr->set_value<ret_t>(errc::replication_error);
req_ptr->set_error(errc::replication_error);
co_return errc::replication_error;
}
units.return_all();
Expand All @@ -930,13 +929,13 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
if (replicated.failed()) {
vlog(
_ctx_log.warn, "replication failed: {}", replicated.get_exception());
req_ptr->set_value<ret_t>(errc::replication_error);
req_ptr->set_error(errc::replication_error);
co_return errc::replication_error;
}
auto result = replicated.get0();
if (result.has_error()) {
vlog(_ctx_log.warn, "replication failed: {}", result.error());
req_ptr->set_value<ret_t>(result.error());
req_ptr->set_error(result.error());
co_return result.error();
}
// translate to kafka offset.
Expand Down

0 comments on commit 7719140

Please sign in to comment.