From ab57ba064c9944b291ae1e37ad39ddbc820e6c36 Mon Sep 17 00:00:00 2001 From: Ilya Obukhov Date: Thu, 5 Oct 2023 13:35:10 +0300 Subject: [PATCH 1/3] fix/rebalancing --- pkg/kafka/subscriber.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/kafka/subscriber.go b/pkg/kafka/subscriber.go index 9c346c7..b158af2 100644 --- a/pkg/kafka/subscriber.go +++ b/pkg/kafka/subscriber.go @@ -563,6 +563,9 @@ func (h messageHandler) processMessage( msg.SetContext(ctx) defer cancelCtx() + // check session has been canceled after rebalancing + sessionContext := sess.Context() + receivedMsgLogFields = receivedMsgLogFields.Add(watermill.LogFields{ "message_uuid": msg.UUID, }) @@ -578,6 +581,9 @@ ResendLoop: case <-ctx.Done(): h.logger.Trace("Closing, ctx cancelled before sent to consumer", receivedMsgLogFields) return nil + case <-sessionContext.Done(): + h.logger.Trace("Closing, session ctx cancelled before sent to consumer", receivedMsgLogFields) + return nil } select { @@ -603,6 +609,9 @@ ResendLoop: case <-ctx.Done(): h.logger.Trace("Closing, ctx cancelled before ack", receivedMsgLogFields) return nil + case <-sessionContext.Done(): + h.logger.Trace("Closing, session ctx cancelled before ack", receivedMsgLogFields) + return nil } } From 8ba4e35e3d84e13d2a50ca60a0432179240e7a8f Mon Sep 17 00:00:00 2001 From: Ilya Obukhov Date: Tue, 21 May 2024 23:58:24 +0300 Subject: [PATCH 2/3] fix/rebalancing check session is not nil --- pkg/kafka/subscriber.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/pkg/kafka/subscriber.go b/pkg/kafka/subscriber.go index b158af2..eebe39c 100644 --- a/pkg/kafka/subscriber.go +++ b/pkg/kafka/subscriber.go @@ -563,9 +563,6 @@ func (h messageHandler) processMessage( msg.SetContext(ctx) defer cancelCtx() - // check session has been canceled after rebalancing - sessionContext := sess.Context() - receivedMsgLogFields = receivedMsgLogFields.Add(watermill.LogFields{ "message_uuid": msg.UUID, }) @@ -581,15 +578,17 @@ ResendLoop: case <-ctx.Done(): h.logger.Trace("Closing, ctx cancelled before sent to consumer", receivedMsgLogFields) return nil - case <-sessionContext.Done(): - h.logger.Trace("Closing, session ctx cancelled before sent to consumer", receivedMsgLogFields) - return nil } select { case <-msg.Acked(): if sess != nil { - sess.MarkMessage(kafkaMsg, "") + if sess.Context().Err() == nil { + sess.MarkMessage(kafkaMsg, "") + } else { + h.logger.Trace("Closing, session ctx cancelled before ack", receivedMsgLogFields) + return nil + } } h.logger.Trace("Message Acked", receivedMsgLogFields) break ResendLoop @@ -609,9 +608,6 @@ ResendLoop: case <-ctx.Done(): h.logger.Trace("Closing, ctx cancelled before ack", receivedMsgLogFields) return nil - case <-sessionContext.Done(): - h.logger.Trace("Closing, session ctx cancelled before ack", receivedMsgLogFields) - return nil } } From 4138e3a989adad7490b658ba16587daad8d2dca0 Mon Sep 17 00:00:00 2001 From: Ilya Obukhov Date: Wed, 22 May 2024 00:06:12 +0300 Subject: [PATCH 3/3] fix/rebalancing add session context error info --- pkg/kafka/subscriber.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/kafka/subscriber.go b/pkg/kafka/subscriber.go index eebe39c..67aead0 100644 --- a/pkg/kafka/subscriber.go +++ b/pkg/kafka/subscriber.go @@ -586,7 +586,12 @@ ResendLoop: if sess.Context().Err() == nil { sess.MarkMessage(kafkaMsg, "") } else { - h.logger.Trace("Closing, session ctx cancelled before ack", receivedMsgLogFields) + logFields := receivedMsgLogFields.Add( + watermill.LogFields{ + "err": sess.Context().Err().Error(), + }, + ) + h.logger.Trace("Closing, session ctx cancelled before ack", logFields) return nil } }