From 9df3038ad4f9579970464498bc8b349f4169e5a5 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Thu, 9 Apr 2020 16:48:09 +1000 Subject: [PATCH 1/2] Fix "broker received out of order sequence" when brokers die When the following three conditions are satisfied, the producer code can skip message sequence numbers and cause the broker to complain that the sequences are out of order: * config.Producer.Idempotent is set * The producer loses, and then regains, its connection to a broker * The client code continues to attempt to produce messages whilst the broker is unavailable. For every message the client attempted to send while the broker is unavailable, the transaction manager sequence number will be incremented, however these messages will eventually fail and return an error to the caller. When the broker re-appears, and another message is published, it's sequence number is higher than the last one the broker remembered - the values that were attempted while it was down were never seen. Thus, from the broker's perspective, it's seeing out-of-order sequence numbers. The fix to this has a few parts: * Don't obtain a sequence number from the transaction manager until we're sure we want to try publishing the message * Affix the producer ID and epoch to the message once the sequence is generated * Increment the transaction manager epoch (and reset all sequence numbers to zero) when we permenantly fail to publish a message. That represents a sequence that the broker will never see, so the only safe thing to do is to roll over the epoch number. * Ensure we don't publish message sets that contain messages from multiple transaction manager epochs. --- async_producer.go | 60 +++++++++++++++++++++++++++++------- async_producer_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++ produce_set.go | 24 ++++++++++----- 3 files changed, 135 insertions(+), 18 deletions(-) diff --git a/async_producer.go b/async_producer.go index 5c57a8137..d0ce01b66 100644 --- a/async_producer.go +++ b/async_producer.go @@ -60,13 +60,28 @@ const ( noProducerEpoch = -1 ) -func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 { +func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) { key := fmt.Sprintf("%s-%d", topic, partition) t.mutex.Lock() defer t.mutex.Unlock() sequence := t.sequenceNumbers[key] t.sequenceNumbers[key] = sequence + 1 - return sequence + return sequence, t.producerEpoch +} + +func (t *transactionManager) bumpEpoch() { + t.mutex.Lock() + defer t.mutex.Unlock() + t.producerEpoch++ + for k := range t.sequenceNumbers { + t.sequenceNumbers[k] = 0 + } +} + +func (t *transactionManager) getProducerID() (int64, int16) { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.producerID, t.producerEpoch } func newTransactionManager(conf *Config, client Client) (*transactionManager, error) { @@ -208,6 +223,8 @@ type ProducerMessage struct { flags flagSet expectation chan *ProducerError sequenceNumber int32 + producerEpoch int16 + hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -234,6 +251,9 @@ func (m *ProducerMessage) byteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.sequenceNumber = 0 + m.producerEpoch = 0 + m.hasSequence = false } // ProducerError is the type of error generated when the producer fails to deliver a message. @@ -388,10 +408,6 @@ func (tp *topicProducer) dispatch() { continue } } - // All messages being retried (sent or not) have already had their retry count updated - if tp.parent.conf.Producer.Idempotent && msg.retries == 0 { - msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) - } handler := tp.handlers[msg.Partition] if handler == nil { @@ -570,6 +586,15 @@ func (pp *partitionProducer) dispatch() { Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) } + // Now that we know we have a broker to actually try and send this message to, generate the sequence + // number for it. + // All messages being retried (sent or not) have already had their retry count updated + // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer. + if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 { + msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) + msg.hasSequence = true + } + pp.brokerProducer.input <- msg } } @@ -748,12 +773,21 @@ func (bp *brokerProducer) run() { } if bp.buffer.wouldOverflow(msg) { - if err := bp.waitForSpace(msg); err != nil { + Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) + if err := bp.waitForSpace(msg, false); err != nil { bp.parent.retryMessage(msg, err) continue } } + if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch { + // The epoch was reset, need to roll the buffer over + Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID()) + if err := bp.waitForSpace(msg, true); err != nil { + bp.parent.retryMessage(msg, err) + continue + } + } if err := bp.buffer.add(msg); err != nil { bp.parent.returnError(msg, err) continue @@ -809,9 +843,7 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error { return bp.currentRetries[msg.Topic][msg.Partition] } -func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { - Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) - +func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error { for { select { case response := <-bp.responses: @@ -819,7 +851,7 @@ func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { // handling a response can change our state, so re-check some things if reason := bp.needsRetry(msg); reason != nil { return reason - } else if !bp.buffer.wouldOverflow(msg) { + } else if !bp.buffer.wouldOverflow(msg) && !forceRollover { return nil } case bp.output <- bp.buffer: @@ -1030,6 +1062,12 @@ func (p *asyncProducer) shutdown() { } func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { + // We need to reset the producer ID epoch if we set a sequence number on it, because the broker + // will never see a message with this number, so we can never continue the sequence. + if msg.hasSequence { + Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition) + p.txnmgr.bumpEpoch() + } msg.clear() pErr := &ProducerError{Msg: msg, Err: err} if p.conf.Producer.Return.Errors { diff --git a/async_producer_test.go b/async_producer_test.go index d0f012d24..46b97790a 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1130,6 +1130,75 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { closeProducer(t, producer) } +func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { + broker := NewMockBroker(t, 1) + defer broker.Close() + + metadataResponse := &MetadataResponse{ + Version: 1, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) + broker.Returns(metadataResponse) + + initProducerID := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: 1000, + ProducerEpoch: 1, + } + broker.Returns(initProducerID) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Flush.Frequency = 10 * time.Millisecond + config.Producer.Return.Successes = true + config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust + config.Producer.RequiredAcks = WaitForAll + config.Producer.Retry.Backoff = 0 + config.Producer.Idempotent = true + config.Net.MaxOpenRequests = 1 + config.Version = V0_11_0_0 + + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + defer closeProducer(t, producer) + + producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} + prodError := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable) + broker.Returns(prodError) + <-producer.Errors() + + lastReqRes := broker.history[len(broker.history)-1] + lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch + if lastProduceBatch.FirstSequence != 0 { + t.Error("first sequence not zero") + } + if lastProduceBatch.ProducerEpoch != 1 { + t.Error("first epoch was not one") + } + + // Now if we produce again, the epoch should have rolled over. + producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} + broker.Returns(prodError) + <-producer.Errors() + + lastReqRes = broker.history[len(broker.history)-1] + lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch + if lastProduceBatch.FirstSequence != 0 { + t.Error("second sequence not zero") + } + if lastProduceBatch.ProducerEpoch <= 1 { + t.Error("second epoch was not > 1") + } +} + // TestBrokerProducerShutdown ensures that a call to shutdown stops the // brokerProducer run() loop and doesn't leak any goroutines func TestBrokerProducerShutdown(t *testing.T) { diff --git a/produce_set.go b/produce_set.go index 36c43c6a6..9c70f8180 100644 --- a/produce_set.go +++ b/produce_set.go @@ -13,17 +13,22 @@ type partitionSet struct { } type produceSet struct { - parent *asyncProducer - msgs map[string]map[int32]*partitionSet + parent *asyncProducer + msgs map[string]map[int32]*partitionSet + producerID int64 + producerEpoch int16 bufferBytes int bufferCount int } func newProduceSet(parent *asyncProducer) *produceSet { + pid, epoch := parent.txnmgr.getProducerID() return &produceSet{ - msgs: make(map[string]map[int32]*partitionSet), - parent: parent, + msgs: make(map[string]map[int32]*partitionSet), + parent: parent, + producerID: pid, + producerEpoch: epoch, } } @@ -65,8 +70,8 @@ func (ps *produceSet) add(msg *ProducerMessage) error { Version: 2, Codec: ps.parent.conf.Producer.Compression, CompressionLevel: ps.parent.conf.Producer.CompressionLevel, - ProducerID: ps.parent.txnmgr.producerID, - ProducerEpoch: ps.parent.txnmgr.producerEpoch, + ProducerID: ps.producerID, + ProducerEpoch: ps.producerEpoch, } if ps.parent.conf.Producer.Idempotent { batch.FirstSequence = msg.sequenceNumber @@ -78,12 +83,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } partitions[msg.Partition] = set } - set.msgs = append(set.msgs, msg) if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence { return errors.New("assertion failed: message out of sequence added to a batch") } + } + + // Past this point we can't return an error, because we've already added the message to the set. + set.msgs = append(set.msgs, msg) + + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { // We are being conservative here to avoid having to prep encode the record size += maximumRecordOverhead rec := &Record{ From ca141913f2a03afc0ca77916c2e7932f284ae950 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Thu, 16 Apr 2020 17:13:44 +1000 Subject: [PATCH 2/2] Add a functional test to cover idempotent production --- functional_producer_test.go | 78 +++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/functional_producer_test.go b/functional_producer_test.go index 1fa0ba1c9..e589a8eb8 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -3,6 +3,7 @@ package sarama import ( "fmt" "os" + "strings" "sync" "testing" "time" @@ -96,6 +97,83 @@ func TestFuncProducingToInvalidTopic(t *testing.T) { safeClose(t, producer) } +func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + config := NewConfig() + config.Producer.Flush.Frequency = 250 * time.Millisecond + config.Producer.Idempotent = true + config.Producer.Timeout = 500 * time.Millisecond + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 500 * time.Millisecond + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Producer.RequiredAcks = WaitForAll + config.Net.MaxOpenRequests = 1 + config.Version = V0_11_0_0 + + producer, err := NewSyncProducer(kafkaBrokers, config) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, producer) + + // Successfully publish a few messages + for i := 0; i < 10; i++ { + _, _, err = producer.SendMessage(&ProducerMessage{ + Topic: "test.1", + Value: StringEncoder(fmt.Sprintf("%d message", i)), + }) + if err != nil { + t.Fatal(err) + } + } + + // break the brokers. + for proxyName, proxy := range Proxies { + if !strings.Contains(proxyName, "kafka") { + continue + } + if err := proxy.Disable(); err != nil { + t.Fatal(err) + } + } + + // This should fail hard now + for i := 10; i < 20; i++ { + _, _, err = producer.SendMessage(&ProducerMessage{ + Topic: "test.1", + Value: StringEncoder(fmt.Sprintf("%d message", i)), + }) + if err == nil { + t.Fatal(err) + } + } + + // Now bring the proxy back up + for proxyName, proxy := range Proxies { + if !strings.Contains(proxyName, "kafka") { + continue + } + if err := proxy.Enable(); err != nil { + t.Fatal(err) + } + } + + // We should be able to publish again (once everything calms down) + // (otherwise it times out) + for { + _, _, err = producer.SendMessage(&ProducerMessage{ + Topic: "test.1", + Value: StringEncoder("comeback message"), + }) + if err == nil { + break + } + } +} + func testProducingMessages(t *testing.T, config *Config) { setupFunctionalTest(t) defer teardownFunctionalTest(t)