Skip to content

Commit 0a12cab

Browse files
authoredNov 28, 2024··
feat: add phase to log lines in kafka_consumer.go (#15176)
1 parent abf681d commit 0a12cab

File tree

5 files changed

+24
-22
lines changed

5 files changed

+24
-22
lines changed
 

‎pkg/ingester/ingester.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
384384
cfg.KafkaIngestion.KafkaConfig,
385385
i.ingestPartitionID,
386386
cfg.LifecyclerConfig.ID,
387-
NewKafkaConsumerFactory(i, logger, registerer),
387+
NewKafkaConsumerFactory(i, registerer),
388388
logger,
389389
registerer,
390390
)

‎pkg/ingester/kafka_consumer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics {
3939
}
4040
}
4141

42-
func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger, reg prometheus.Registerer) partition.ConsumerFactory {
42+
func NewKafkaConsumerFactory(pusher logproto.PusherServer, reg prometheus.Registerer) partition.ConsumerFactory {
4343
metrics := newConsumerMetrics(reg)
44-
return func(committer partition.Committer) (partition.Consumer, error) {
44+
return func(committer partition.Committer, logger log.Logger) (partition.Consumer, error) {
4545
decoder, err := kafka.NewDecoder()
4646
if err != nil {
4747
return nil, err

‎pkg/ingester/kafka_consumer_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestConsumer(t *testing.T) {
8585
pusher = &fakePusher{t: t}
8686
)
8787

88-
consumer, err := NewKafkaConsumerFactory(pusher, log.NewLogfmtLogger(os.Stdout), prometheus.NewRegistry())(&noopCommitter{})
88+
consumer, err := NewKafkaConsumerFactory(pusher, prometheus.NewRegistry())(&noopCommitter{}, log.NewLogfmtLogger(os.Stdout))
8989
require.NoError(t, err)
9090

9191
records, err := kafka.Encode(0, tenantID, streamBar, 10000)

‎pkg/kafka/partition/reader_service.go

+15-13
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const (
2424
phaseRunning = "running"
2525
)
2626

27-
type ConsumerFactory func(committer Committer) (Consumer, error)
27+
type ConsumerFactory func(committer Committer, logger log.Logger) (Consumer, error)
2828

2929
type Consumer interface {
3030
Start(ctx context.Context, recordsChan <-chan []Record) func()
@@ -126,27 +126,29 @@ func (s *ReaderService) starting(ctx context.Context) error {
126126
s.metrics.reportOwnerOfPartition(s.reader.Partition())
127127
s.metrics.reportStarting()
128128

129+
logger := log.With(s.logger, "phase", phaseStarting)
130+
129131
// Fetch the last committed offset to determine where to start reading
130132
lastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx)
131133
if err != nil {
132134
return fmt.Errorf("fetching last committed offset: %w", err)
133135
}
134136

135137
if lastCommittedOffset == int64(KafkaEndOffset) {
136-
level.Warn(s.logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset))
138+
level.Warn(logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset))
137139
} else {
138-
level.Debug(s.logger).Log("msg", "last committed offset", "offset", lastCommittedOffset)
140+
level.Debug(logger).Log("msg", "last committed offset", "offset", lastCommittedOffset)
139141
}
140142

141143
consumeOffset := int64(kafkaStartOffset)
142144
if lastCommittedOffset >= 0 {
143145
// Read from the next offset.
144146
consumeOffset = lastCommittedOffset + 1
145147
}
146-
level.Debug(s.logger).Log("msg", "consuming from offset", "offset", consumeOffset)
148+
level.Debug(logger).Log("msg", "consuming from offset", "offset", consumeOffset)
147149
s.reader.SetOffsetForConsumption(consumeOffset)
148150

149-
if err = s.processConsumerLagAtStartup(ctx); err != nil {
151+
if err = s.processConsumerLagAtStartup(ctx, logger); err != nil {
150152
return fmt.Errorf("failed to process consumer lag at startup: %w", err)
151153
}
152154

@@ -157,7 +159,7 @@ func (s *ReaderService) running(ctx context.Context) error {
157159
level.Info(s.logger).Log("msg", "reader service running")
158160
s.metrics.reportRunning()
159161

160-
consumer, err := s.consumerFactory(s.committer)
162+
consumer, err := s.consumerFactory(s.committer, log.With(s.logger, "phase", phaseRunning))
161163
if err != nil {
162164
return fmt.Errorf("creating consumer: %w", err)
163165
}
@@ -172,13 +174,13 @@ func (s *ReaderService) running(ctx context.Context) error {
172174
return nil
173175
}
174176

175-
func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context) error {
177+
func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context, logger log.Logger) error {
176178
if s.cfg.MaxConsumerLagAtStartup <= 0 {
177-
level.Debug(s.logger).Log("msg", "processing consumer lag at startup is disabled")
179+
level.Debug(logger).Log("msg", "processing consumer lag at startup is disabled")
178180
return nil
179181
}
180182

181-
consumer, err := s.consumerFactory(s.committer)
183+
consumer, err := s.consumerFactory(s.committer, logger)
182184
if err != nil {
183185
return fmt.Errorf("failed to create consumer: %w", err)
184186
}
@@ -192,13 +194,13 @@ func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context) error {
192194
wait()
193195
}()
194196

195-
level.Debug(s.logger).Log("msg", "processing consumer lag at startup")
196-
_, err = s.fetchUntilLagSatisfied(ctx, s.cfg.MaxConsumerLagAtStartup, s.logger, recordsCh, time.Since)
197+
level.Debug(logger).Log("msg", "processing consumer lag at startup")
198+
_, err = s.fetchUntilLagSatisfied(ctx, s.cfg.MaxConsumerLagAtStartup, logger, recordsCh, time.Since)
197199
if err != nil {
198-
level.Error(s.logger).Log("msg", "failed to catch up", "err", err)
200+
level.Error(logger).Log("msg", "failed to catch up", "err", err)
199201
return err
200202
}
201-
level.Debug(s.logger).Log("msg", "processing consumer lag at startup finished")
203+
level.Debug(logger).Log("msg", "processing consumer lag at startup finished")
202204

203205
return nil
204206
}

‎pkg/kafka/partition/reader_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) {
8282
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
8383
consumer := newMockConsumer()
8484

85-
consumerFactory := func(_ Committer) (Consumer, error) {
85+
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
8686
return consumer, nil
8787
}
8888

@@ -136,7 +136,7 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) {
136136
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic")
137137
var consumerStarting *mockConsumer
138138

139-
consumerFactory := func(_ Committer) (Consumer, error) {
139+
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
140140
// Return two consumers to ensure we are processing requests during service `start()` and not during `run()`.
141141
if consumerStarting == nil {
142142
consumerStarting = newMockConsumer()
@@ -198,7 +198,7 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
198198
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic")
199199
consumer := newMockConsumer()
200200

201-
consumerFactory := func(_ Committer) (Consumer, error) {
201+
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
202202
return consumer, nil
203203
}
204204

@@ -267,7 +267,7 @@ func TestPartitionReader_StartsAtNextOffset(t *testing.T) {
267267
consumer := newMockConsumer()
268268

269269
kaf.CurrentNode()
270-
consumerFactory := func(_ Committer) (Consumer, error) {
270+
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
271271
return consumer, nil
272272
}
273273

@@ -329,7 +329,7 @@ func TestPartitionReader_StartsUpIfNoNewRecordsAreAvailable(t *testing.T) {
329329
consumer := newMockConsumer()
330330

331331
kaf.CurrentNode()
332-
consumerFactory := func(_ Committer) (Consumer, error) {
332+
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
333333
return consumer, nil
334334
}
335335

0 commit comments

Comments
 (0)
Please sign in to comment.