From e2c59b811810fa97b73234b1b8dd5040ee8b68f8 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 31 Jul 2024 14:00:21 -0700 Subject: [PATCH] rm_stm/idempotency: handle epoch bumps --- src/v/cluster/producer_state.cc | 3 +++ src/v/cluster/rm_stm.cc | 4 ++++ src/v/cluster/rm_stm.h | 7 +++++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index 77b4d176a0419..aaf06eadbb8ff 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -377,6 +377,9 @@ void producer_state::apply_data( if (!bid.is_idempotent() || _evicted) { return; } + if (!bid.is_transactional && bid.pid.epoch > _id.epoch) { + reset_with_new_epoch(bid.pid.epoch); + } _requests.stm_apply(bid, header.ctx.term, offset); if (bid.is_transactional) { if (!_transaction_state) { diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 82087b56ea214..fc4992b0de3b4 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -912,6 +912,10 @@ ss::future> rm_stm::do_idempotent_replicate( raft::replicate_options opts, ss::lw_shared_ptr> enqueued, ssx::semaphore_units& units) { + // Check if the producer bumped the epoch and reset accordingly. + if (bid.pid.epoch > producer->id().epoch()) { + producer->reset_with_new_epoch(bid.pid.epoch); + } auto request = producer->try_emplace_request(bid, synced_term); if (!request) { co_return request.error(); diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 2863bae789350..b33966e55f802 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -64,10 +64,13 @@ namespace cluster { * * 2. Idempotent requests - Provides single session idempotency guarantees. Each * idempotent produce request is associated with a producer_identity - * (producer_id + epoch=0). Idempotency is implemented by tracking sequence + * (producer_id + epoch=N). Idempotency is implemented by tracking sequence * numbers of last 5 inflight/processed requests and ensuring that the new * requests maintain the sequence order. Client stamps the record batches with - * sequence numbers. + * sequence numbers. Idempotent producer may choose to increment epoch on the + * client side to reset the sequence tracking when it deems safe + * (check kip-360). The state machine detects such situations and resets the + * tracked sequence number state. * * 3. Transactional requests - Provides EOS semantics across multiple sessions * by implementing fencing as defined in the Kafka protocol. Transactional