Skip to content

Commit

Permalink
check is channel is closed
Browse files Browse the repository at this point in the history
Signed-off-by: Ioan Zicu <ioan.zicu@nokia.com>
  • Loading branch information
Ioan Zicu committed Jul 31, 2023
1 parent c1f290a commit bb8803d
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,13 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message := <-claim.Messages():
// message can be nill if offset is invalid, for example messages were removed by retention policy
if message != nil {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
case message, ok := <-claim.Messages():
if !ok {
log.Printf("message channel was closed")
break
}
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
Expand Down

0 comments on commit bb8803d

Please sign in to comment.