Skip to content

Commit

Permalink
updated fetch size overflow check
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xinran committed May 21, 2019
1 parent fd1d413 commit 25f71e5
Showing 1 changed file with 5 additions and 0 deletions.
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -591,6 +592,10 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
// int32 overflow
if child.fetchSize < 0 {
child.fetchSize = math.MaxInt32
}
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
Expand Down

0 comments on commit 25f71e5

Please sign in to comment.