Skip to content

Commit

Permalink
Merge pull request #2076 from Shopify/dnwe/fix-fetch-from-follower
Browse files Browse the repository at this point in the history
fix: only update preferredReadReplica if valid
  • Loading branch information
dnwe authored Dec 4, 2021
2 parents 556ddf7 + cbcd734 commit 635bcf3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
25 changes: 14 additions & 11 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,17 @@ func (c *consumer) Partitions(topic string) ([]int32, error) {

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
child := &partitionConsumer{
consumer: c,
conf: c.conf,
topic: topic,
partition: partition,
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
feeder: make(chan *FetchResponse, 1),
trigger: make(chan none, 1),
dying: make(chan none),
fetchSize: c.conf.Consumer.Fetch.Default,
consumer: c,
conf: c.conf,
topic: topic,
partition: partition,
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
feeder: make(chan *FetchResponse, 1),
preferredReadReplica: invalidPreferredReplicaID,
trigger: make(chan none, 1),
dying: make(chan none),
fetchSize: c.conf.Consumer.Fetch.Default,
}

if err := child.chooseStartingOffset(offset); err != nil {
Expand Down Expand Up @@ -605,7 +606,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

consumerBatchSizeMetric.Update(int64(nRecs))

child.preferredReadReplica = block.PreferredReadReplica
if block.PreferredReadReplica != invalidPreferredReplicaID {
child.preferredReadReplica = block.PreferredReadReplica
}

if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
Expand Down
2 changes: 2 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

const invalidPreferredReplicaID = -1

type AbortedTransaction struct {
ProducerID int64
FirstOffset int64
Expand Down

0 comments on commit 635bcf3

Please sign in to comment.