Skip to content

Commit

Permalink
check if received message is valid
Browse files Browse the repository at this point in the history
  • Loading branch information
Ioan Zicu authored and dnwe committed Jul 28, 2023
1 parent 849c8b1 commit 303de59
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,11 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
for {
select {
case message := <-claim.Messages():
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")

// message can be nill if offset is invalid, for example messages were removed by retention policy
if message != nil {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
Expand Down

0 comments on commit 303de59

Please sign in to comment.