Skip to content

Commit

Permalink
Merge pull request #1846 from wclaeys/master
Browse files Browse the repository at this point in the history
Do not ignore Consumer.Offsets.AutoCommit.Enable config on Close
  • Loading branch information
bai authored Jan 13, 2021
2 parents 80667b9 + 2b925af commit ceadf4f
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ func (om *offsetManager) Close() error {
om.asyncClosePOMs()

// flush one last time
for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
om.flushToBroker()
if om.releasePOMs(false) == 0 {
break
if om.conf.Consumer.Offsets.AutoCommit.Enable {
for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
om.flushToBroker()
if om.releasePOMs(false) == 0 {
break
}
}
}

Expand Down

0 comments on commit ceadf4f

Please sign in to comment.