Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Only run InitOffset once after the adapter has started (#695)
Browse files Browse the repository at this point in the history
  • Loading branch information
lionelvillard authored Jun 8, 2021
1 parent 2f12463 commit 000dd9c
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions pkg/source/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ func NewEnvConfig() adapter.EnvConfigAccessor {
}

type Adapter struct {
config *AdapterConfig
controlServer *ctrlnetwork.ControlServer
saramaConfig *sarama.Config
config *AdapterConfig
controlServer *ctrlnetwork.ControlServer
saramaConfig *sarama.Config
offsetInitialized bool

httpMessageSender *kncloudevents.HTTPMessageSender
reporter pkgsource.StatsReporter
Expand Down Expand Up @@ -242,7 +243,7 @@ func (a *Adapter) Cleanup(sess sarama.ConsumerGroupSession) {

// InitOffsets makes sure all consumer group offsets are set.
func (a *Adapter) InitOffsets(session sarama.ConsumerGroupSession) error {
if a.saramaConfig.Consumer.Offsets.Initial == sarama.OffsetNewest {
if !a.offsetInitialized && a.saramaConfig.Consumer.Offsets.Initial == sarama.OffsetNewest {
// We want to make sure that ALL consumer group offsets are set to avoid
// losing events in case the consumer group session is closed before at least one message is
// consumed from ALL partitions.
Expand Down Expand Up @@ -302,6 +303,8 @@ func (a *Adapter) InitOffsets(session sarama.ConsumerGroupSession) error {

a.logger.Infow("consumer group offsets committed", zap.String("consumergroup", a.config.ConsumerGroup))
}

a.offsetInitialized = true
}

// At this stage the KafkaSource instance is considered Ready (TODO: update KafkaSource status)
Expand Down

0 comments on commit 000dd9c

Please sign in to comment.