Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent idempotent producer epoch exhaustion #2178

Merged
merged 1 commit into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -1135,13 +1136,30 @@ func (p *asyncProducer) shutdown() {
close(p.successes)
}

func (p *asyncProducer) bumpIdempotentProducerEpoch() {
_, epoch := p.txnmgr.getProducerID()
if epoch == math.MaxInt16 {
Logger.Println("producer/txnmanager epoch exhausted, requesting new producer ID")
txnmgr, err := newTransactionManager(p.conf, p.client)
if err != nil {
Logger.Println(err)
return
Comment on lines +1144 to +1146
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, where we failed to send our InitProducerIDRequest to any broker and got ErrOutOfBrokers returned, do we need to do something more serious than just logging the error? Or are we happy just to leave p.txnmgr as the existing transaction manager with epoch left at MaxInt16 and we think the producer will eventually recover itself?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ladislavmacoun I'm assuming the latter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnwe Apologies for the late reply.

Looking at the Java AK client, the producer only transitions to an uninitialized state, I believe we will achieve the same thing with resetting the transitional manager. If it fails I don't think there is anything else we can do, and it should get initialised with another request eventually.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ladislavmacoun no worries, thanks for your great contribution!

}

p.txnmgr = txnmgr
} else {
p.txnmgr.bumpEpoch()
}
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
// will never see a message with this number, so we can never continue the sequence.
if msg.hasSequence {
Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
p.txnmgr.bumpEpoch()
p.bumpIdempotentProducerEpoch()
}

msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {
Expand Down
78 changes: 78 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"errors"
"log"
"math"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -1418,6 +1419,83 @@ func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
}
}

// TestAsyncProducerIdempotentEpochExhaustion ensures that producer requests
// a new producerID when producerEpoch is exhausted
func TestAsyncProducerIdempotentEpochExhaustion(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

var (
initialProducerID = int64(1000)
newProducerID = initialProducerID + 1
)

metadataResponse := &MetadataResponse{
Version: 1,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataResponse)

initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: initialProducerID,
ProducerEpoch: math.MaxInt16, // Mock ProducerEpoch at the exhaustion point
}
broker.Returns(initProducerID)

config := NewTestConfig()
config.Producer.Flush.Messages = 10
config.Producer.Flush.Frequency = 10 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Backoff = 0
config.Producer.Idempotent = true
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer closeProducer(t, producer)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
prodError := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
broker.Returns(prodError)
broker.Returns(&InitProducerIDResponse{
ProducerID: newProducerID,
})

<-producer.Errors()

lastProduceReqRes := broker.history[len(broker.history)-2] // last is InitProducerIDRequest
lastProduceBatch := lastProduceReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.FirstSequence != 0 {
t.Error("first sequence not zero")
}
if lastProduceBatch.ProducerEpoch <= 1 {
t.Error("first epoch was not at exhaustion point")
}

// Now we should produce with a new ProducerID
producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
broker.Returns(prodError)
<-producer.Errors()

lastProduceReqRes = broker.history[len(broker.history)-1]
lastProduceBatch = lastProduceReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.ProducerID != newProducerID || lastProduceBatch.ProducerEpoch != 0 {
t.Error("producer did not requested a new producerID")
}
}

// TestBrokerProducerShutdown ensures that a call to shutdown stops the
// brokerProducer run() loop and doesn't leak any goroutines
//nolint:paralleltest
Expand Down