From 000dd9c4955f4b3c3e2f71b35d30138c7d1b9770 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Tue, 8 Jun 2021 16:57:41 -0400 Subject: [PATCH] Only run InitOffset once after the adapter has started (#695) --- pkg/source/adapter/adapter.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/source/adapter/adapter.go b/pkg/source/adapter/adapter.go index 1884a6e56a..634c2b1376 100644 --- a/pkg/source/adapter/adapter.go +++ b/pkg/source/adapter/adapter.go @@ -67,9 +67,10 @@ func NewEnvConfig() adapter.EnvConfigAccessor { } type Adapter struct { - config *AdapterConfig - controlServer *ctrlnetwork.ControlServer - saramaConfig *sarama.Config + config *AdapterConfig + controlServer *ctrlnetwork.ControlServer + saramaConfig *sarama.Config + offsetInitialized bool httpMessageSender *kncloudevents.HTTPMessageSender reporter pkgsource.StatsReporter @@ -242,7 +243,7 @@ func (a *Adapter) Cleanup(sess sarama.ConsumerGroupSession) { // InitOffsets makes sure all consumer group offsets are set. func (a *Adapter) InitOffsets(session sarama.ConsumerGroupSession) error { - if a.saramaConfig.Consumer.Offsets.Initial == sarama.OffsetNewest { + if !a.offsetInitialized && a.saramaConfig.Consumer.Offsets.Initial == sarama.OffsetNewest { // We want to make sure that ALL consumer group offsets are set to avoid // losing events in case the consumer group session is closed before at least one message is // consumed from ALL partitions. @@ -302,6 +303,8 @@ func (a *Adapter) InitOffsets(session sarama.ConsumerGroupSession) error { a.logger.Infow("consumer group offsets committed", zap.String("consumergroup", a.config.ConsumerGroup)) } + + a.offsetInitialized = true } // At this stage the KafkaSource instance is considered Ready (TODO: update KafkaSource status)