diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index 9f5e6c17dd1b6..77b4d176a0419 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -324,6 +324,22 @@ bool producer_state::can_evict() { return true; } +void producer_state::reset_with_new_epoch(model::producer_epoch new_epoch) { + vassert( + new_epoch > _id.get_epoch(), + "Invalid epoch bump to {} for producer {}", + new_epoch, + *this); + vassert( + !_transaction_state, + "Invalid epoch bump to {} for a non idempotent producer: {}", + new_epoch, + *this); + vlog(_logger.info, "[{}] Reseting epoch to {}", *this, new_epoch); + _requests.reset(errc::timeout); + _id = model::producer_identity(_id.id, new_epoch); +} + result producer_state::try_emplace_request( const model::batch_identity& bid, model::term_id current_term, bool reset) { if (bid.first_seq > bid.last_seq) { diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index 3ead6114ba90b..14938b02eaa16 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -263,6 +263,14 @@ class producer_state { ss::lowres_system_clock::now() - _last_updated_ts); } + // Resets the producer to use a new epoch. The new epoch should be strictly + // larger than the current epoch. This is only used by the idempotent + // producers trying to bump epoch of the existing producer based on the + // incoming request with a higher epoch. Transactions follow a separate + // fencing based approach to bump epochs as it requires aborting any in + // progress transactions with older epoch. + void reset_with_new_epoch(model::producer_epoch new_epoch); + private: prefix_logger& _logger;