Skip to content

Commit

Permalink
Merge pull request #30 from prOOrc/fix/rebalancing
Browse files Browse the repository at this point in the history
fix/rebalancing
  • Loading branch information
m110 authored Jun 27, 2024
2 parents ba31b43 + 4138e3a commit 2664bae
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion pkg/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,17 @@ ResendLoop:
select {
case <-msg.Acked():
if sess != nil {
sess.MarkMessage(kafkaMsg, "")
if sess.Context().Err() == nil {
sess.MarkMessage(kafkaMsg, "")
} else {
logFields := receivedMsgLogFields.Add(
watermill.LogFields{
"err": sess.Context().Err().Error(),
},
)
h.logger.Trace("Closing, session ctx cancelled before ack", logFields)
return nil
}
}
h.logger.Trace("Message Acked", receivedMsgLogFields)
break ResendLoop
Expand Down

0 comments on commit 2664bae

Please sign in to comment.