Skip to content

Commit

Permalink
producer_state: add reset_with_new_epoch() utility
Browse files Browse the repository at this point in the history
To be used to reset the producer state with new epoch for idempotent
producers that decide to bump the epoch on the client side (which is
totally fine as the idempotency is per session and client can
independently decide to bump the epoch on it's side).
  • Loading branch information
bharathv committed Aug 5, 2024
1 parent 7ac5acf commit 2f5b9cb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,22 @@ bool producer_state::can_evict() {
return true;
}

void producer_state::reset_with_new_epoch(model::producer_epoch new_epoch) {
vassert(
new_epoch > _id.get_epoch(),
"Invalid epoch bump to {} for producer {}",
new_epoch,
*this);
vassert(
!_transaction_state,
"Invalid epoch bump to {} for a non idempotent producer: {}",
new_epoch,
*this);
vlog(_logger.info, "[{}] Reseting epoch to {}", *this, new_epoch);
_requests.reset(errc::timeout);
_id = model::producer_identity(_id.id, new_epoch);
}

result<request_ptr> producer_state::try_emplace_request(
const model::batch_identity& bid, model::term_id current_term, bool reset) {
if (bid.first_seq > bid.last_seq) {
Expand Down
8 changes: 8 additions & 0 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ class producer_state {
ss::lowres_system_clock::now() - _last_updated_ts);
}

// Resets the producer to use a new epoch. The new epoch should be strictly
// larger than the current epoch. This is only used by the idempotent
// producers trying to bump epoch of the existing producer based on the
// incoming request with a higher epoch. Transactions follow a separate
// fencing based approach to bump epochs as it requires aborting any in
// progress transactions with older epoch.
void reset_with_new_epoch(model::producer_epoch new_epoch);

private:
prefix_logger& _logger;

Expand Down

0 comments on commit 2f5b9cb

Please sign in to comment.