Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport] [v24.1.x] miscellaneous idempotency fixes #22687 #22781

Merged
merged 9 commits into from
Aug 15, 2024
11 changes: 11 additions & 0 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,17 @@ bool producer_state::can_evict() {
return true;
}

void producer_state::reset_with_new_epoch(model::producer_epoch new_epoch) {
vassert(
new_epoch > _id.get_epoch(),
"Invalid epoch bump to {} for producer {}",
new_epoch,
*this);
vlog(clusterlog.info, "[{}] Reseting epoch to {}", *this, new_epoch);
_requests.reset(errc::timeout);
_id = model::producer_identity(_id.id, new_epoch);
}

result<request_ptr> producer_state::try_emplace_request(
const model::batch_identity& bid, model::term_id current_term, bool reset) {
if (bid.first_seq > bid.last_seq) {
Expand Down
11 changes: 11 additions & 0 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,17 @@ class producer_state {
void update_current_txn_start_offset(std::optional<kafka::offset> offset) {
_current_txn_start_offset = offset;
}

model::producer_identity id() const { return _id; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I guess this better fits in a5bb2d2?


// Resets the producer to use a new epoch. The new epoch should be strictly
// larger than the current epoch. This is only used by the idempotent
// producers trying to bump epoch of the existing producer based on the
// incoming request with a higher epoch. Transactions follow a separate
// fencing based approach to bump epochs as it requires aborting any in
// progress transactions with older epoch.
void reset_with_new_epoch(model::producer_epoch new_epoch);

safe_intrusive_list_hook _hook;

private:
Expand Down