Skip to content

Commit

Permalink
Merge pull request #1203 from edoardocomar/idempotent_deadlock
Browse files Browse the repository at this point in the history
Revert to individual msg retries for non-idempotent
  • Loading branch information
bai authored Dec 11, 2018
2 parents 861a752 + 587c5f6 commit 97315fe
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
14 changes: 10 additions & 4 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,11 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
})

if len(retryTopics) > 0 {
err := bp.parent.client.RefreshMetadata(retryTopics...)
if err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
if bp.parent.conf.Producer.Idempotent {
err := bp.parent.client.RefreshMetadata(retryTopics...)
if err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
}
}

sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
Expand All @@ -858,9 +860,13 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = block.Err
if bp.parent.conf.Producer.Idempotent {
go bp.parent.retryBatch(topic, partition, pSet, block.Err)
} else {
bp.parent.retryMessages(pSet.msgs, block.Err)
}
// dropping the following messages has the side effect of incrementing their retry count
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
bp.parent.retryBatch(topic, partition, pSet, block.Err)
}
})
}
Expand Down
3 changes: 0 additions & 3 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
leader2.Returns(metadataLeader2)
leader2.Returns(prodSuccess)
expectResults(t, producer, 10, 0)

Expand Down Expand Up @@ -468,7 +467,6 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
seedBroker.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
seedBroker.Returns(metadataLeader2)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
Expand Down Expand Up @@ -654,7 +652,6 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {

// succeed this time
expectResults(t, producer, 5, 0)
seedBroker.Returns(metadataResponse)

// put five more through
for i := 0; i < 5; i++ {
Expand Down

0 comments on commit 97315fe

Please sign in to comment.