Skip to content

Commit

Permalink
fix graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Dec 10, 2023
1 parent f3b28f1 commit f2ad547
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 3 deletions.
2 changes: 2 additions & 0 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (b *batchConsumer) startBatch() {
b.consume(&messages, &commitMessages)
case msg, ok := <-b.incomingMessageStream:
if !ok {
close(b.batchConsumingStream)
close(b.messageProcessedStream)
return
}

Expand Down
2 changes: 2 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (c *consumer) startBatch() {
c.consume(&messages, &commitMessages)
case msg, ok := <-c.incomingMessageStream:
if !ok {
close(c.singleConsumingStream)
close(c.messageProcessedStream)
return
}

Expand Down
4 changes: 1 addition & 3 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,8 @@ func (c *base) Stop() error {
c.cancelFn()
c.quit <- struct{}{}
close(c.incomingMessageStream)
close(c.singleConsumingStream)
close(c.batchConsumingStream)
close(c.messageProcessedStream)
c.wg.Wait()

err = c.r.Close()
})

Expand Down

0 comments on commit f2ad547

Please sign in to comment.