Skip to content

Commit

Permalink
fix: prevent idempotent producer epoch exhaustion
Browse files Browse the repository at this point in the history
Add check for idempotent producer epoch overflow, as it is required by the
KIP-360[1], the producer should request a new producer ID when the epoch
is exhausted. Otherwise, the producer might get desynchronized with the broker.

[1]: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820

Signed-off-by: Ladislav Macoun <ladislavmacoun@gmail.com>
  • Loading branch information
ladislavmacoun committed Apr 13, 2022
1 parent 8f8d8da commit 5c6b581
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 1 deletion.
20 changes: 19 additions & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -1135,13 +1136,30 @@ func (p *asyncProducer) shutdown() {
close(p.successes)
}

func (p *asyncProducer) bumpIdempotentProducerEpoch() {
_, epoch := p.txnmgr.getProducerID()
if epoch == math.MaxInt16 {
Logger.Println("producer/txnmanager epoch exhausted, requesting new producer ID")
txnmgr, err := newTransactionManager(p.conf, p.client)
if err != nil {
Logger.Println(err)
return
}

p.txnmgr = txnmgr
} else {
p.txnmgr.bumpEpoch()
}
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
// will never see a message with this number, so we can never continue the sequence.
if msg.hasSequence {
Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
p.txnmgr.bumpEpoch()
p.bumpIdempotentProducerEpoch()
}

msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {
Expand Down
78 changes: 78 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"errors"
"log"
"math"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -1418,6 +1419,83 @@ func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
}
}

// TestAsyncProducerIdempotentEpochExhaustion ensures that producer requests
// a new producerID when producerEpoch is exhausted
func TestAsyncProducerIdempotentEpochExhaustion(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

var (
initialProducerID = int64(1000)
newProducerID = initialProducerID + 1
)

metadataResponse := &MetadataResponse{
Version: 1,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataResponse)

initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: initialProducerID,
ProducerEpoch: math.MaxInt16, // Mock ProducerEpoch at the exhaustion point
}
broker.Returns(initProducerID)

config := NewTestConfig()
config.Producer.Flush.Messages = 10
config.Producer.Flush.Frequency = 10 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Backoff = 0
config.Producer.Idempotent = true
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer closeProducer(t, producer)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
prodError := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
broker.Returns(prodError)
broker.Returns(&InitProducerIDResponse{
ProducerID: newProducerID,
})

<-producer.Errors()

lastProduceReqRes := broker.history[len(broker.history)-2] // last is InitProducerIDRequest
lastProduceBatch := lastProduceReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.FirstSequence != 0 {
t.Error("first sequence not zero")
}
if lastProduceBatch.ProducerEpoch <= 1 {
t.Error("first epoch was not at exhaustion point")
}

// Now we should produce with a new ProducerID
producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
broker.Returns(prodError)
<-producer.Errors()

lastProduceReqRes = broker.history[len(broker.history)-1]
lastProduceBatch = lastProduceReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.ProducerID != newProducerID || lastProduceBatch.ProducerEpoch != 0 {
t.Error("producer did not requested a new producerID")
}
}

// TestBrokerProducerShutdown ensures that a call to shutdown stops the
// brokerProducer run() loop and doesn't leak any goroutines
//nolint:paralleltest
Expand Down

0 comments on commit 5c6b581

Please sign in to comment.