Skip to content

Commit

Permalink
rm_stm/idempotency: handle epoch bumps
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Aug 5, 2024
1 parent 2f5b9cb commit e2c59b8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,9 @@ void producer_state::apply_data(
if (!bid.is_idempotent() || _evicted) {
return;
}
if (!bid.is_transactional && bid.pid.epoch > _id.epoch) {
reset_with_new_epoch(bid.pid.epoch);
}
_requests.stm_apply(bid, header.ctx.term, offset);
if (bid.is_transactional) {
if (!_transaction_state) {
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,10 @@ ss::future<result<kafka_result>> rm_stm::do_idempotent_replicate(
raft::replicate_options opts,
ss::lw_shared_ptr<available_promise<>> enqueued,
ssx::semaphore_units& units) {
// Check if the producer bumped the epoch and reset accordingly.
if (bid.pid.epoch > producer->id().epoch()) {
producer->reset_with_new_epoch(bid.pid.epoch);
}
auto request = producer->try_emplace_request(bid, synced_term);
if (!request) {
co_return request.error();
Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@ namespace cluster {
*
* 2. Idempotent requests - Provides single session idempotency guarantees. Each
* idempotent produce request is associated with a producer_identity
* (producer_id + epoch=0). Idempotency is implemented by tracking sequence
* (producer_id + epoch=N). Idempotency is implemented by tracking sequence
* numbers of last 5 inflight/processed requests and ensuring that the new
* requests maintain the sequence order. Client stamps the record batches with
* sequence numbers.
* sequence numbers. Idempotent producer may choose to increment epoch on the
* client side to reset the sequence tracking when it deems safe
* (check kip-360). The state machine detects such situations and resets the
* tracked sequence number state.
*
* 3. Transactional requests - Provides EOS semantics across multiple sessions
* by implementing fencing as defined in the Kafka protocol. Transactional
Expand Down

0 comments on commit e2c59b8

Please sign in to comment.