diff --git a/async_producer.go b/async_producer.go index ad0c789f9..4f7ece588 100644 --- a/async_producer.go +++ b/async_producer.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "math" "sync" "time" @@ -1135,13 +1136,30 @@ func (p *asyncProducer) shutdown() { close(p.successes) } +func (p *asyncProducer) bumpIdempotentProducerEpoch() { + _, epoch := p.txnmgr.getProducerID() + if epoch == math.MaxInt16 { + Logger.Println("producer/txnmanager epoch exhausted, requesting new producer ID") + txnmgr, err := newTransactionManager(p.conf, p.client) + if err != nil { + Logger.Println(err) + return + } + + p.txnmgr = txnmgr + } else { + p.txnmgr.bumpEpoch() + } +} + func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { // We need to reset the producer ID epoch if we set a sequence number on it, because the broker // will never see a message with this number, so we can never continue the sequence. if msg.hasSequence { Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition) - p.txnmgr.bumpEpoch() + p.bumpIdempotentProducerEpoch() } + msg.clear() pErr := &ProducerError{Msg: msg, Err: err} if p.conf.Producer.Return.Errors { diff --git a/async_producer_test.go b/async_producer_test.go index c2ae69ec0..b4eaf6755 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -3,6 +3,7 @@ package sarama import ( "errors" "log" + "math" "os" "os/signal" "strconv" @@ -1418,6 +1419,83 @@ func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { } } +// TestAsyncProducerIdempotentEpochExhaustion ensures that producer requests +// a new producerID when producerEpoch is exhausted +func TestAsyncProducerIdempotentEpochExhaustion(t *testing.T) { + broker := NewMockBroker(t, 1) + defer broker.Close() + + var ( + initialProducerID = int64(1000) + newProducerID = initialProducerID + 1 + ) + + metadataResponse := &MetadataResponse{ + Version: 1, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) + broker.Returns(metadataResponse) + + initProducerID := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: initialProducerID, + ProducerEpoch: math.MaxInt16, // Mock ProducerEpoch at the exhaustion point + } + broker.Returns(initProducerID) + + config := NewTestConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Flush.Frequency = 10 * time.Millisecond + config.Producer.Return.Successes = true + config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust + config.Producer.RequiredAcks = WaitForAll + config.Producer.Retry.Backoff = 0 + config.Producer.Idempotent = true + config.Net.MaxOpenRequests = 1 + config.Version = V0_11_0_0 + + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + defer closeProducer(t, producer) + + producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} + prodError := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable) + broker.Returns(prodError) + broker.Returns(&InitProducerIDResponse{ + ProducerID: newProducerID, + }) + + <-producer.Errors() + + lastProduceReqRes := broker.history[len(broker.history)-2] // last is InitProducerIDRequest + lastProduceBatch := lastProduceReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch + if lastProduceBatch.FirstSequence != 0 { + t.Error("first sequence not zero") + } + if lastProduceBatch.ProducerEpoch <= 1 { + t.Error("first epoch was not at exhaustion point") + } + + // Now we should produce with a new ProducerID + producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} + broker.Returns(prodError) + <-producer.Errors() + + lastProduceReqRes = broker.history[len(broker.history)-1] + lastProduceBatch = lastProduceReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch + if lastProduceBatch.ProducerID != newProducerID || lastProduceBatch.ProducerEpoch != 0 { + t.Error("producer did not requested a new producerID") + } +} + // TestBrokerProducerShutdown ensures that a call to shutdown stops the // brokerProducer run() loop and doesn't leak any goroutines //nolint:paralleltest