diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 5d3d3179a9821..1295b69818f1c 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -103,6 +103,9 @@ func (p *Reader) start(ctx context.Context) error { // We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from. lastCommittedOffset := p.fetchLastCommittedOffset(ctx) + if lastCommittedOffset > 0 { + lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset. + } p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().At(lastCommittedOffset)}, }) @@ -349,6 +352,8 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t continue } + consumerGroupLastCommittedOffset := p.fetchLastCommittedOffset(ctx) + // Send a direct request to the Kafka backend to fetch the last produced offset. // We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further // latency. @@ -371,6 +376,11 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t return 0, nil } + if consumerGroupLastCommittedOffset == lastProducedOffset { + level.Info(logger).Log("msg", "partition reader found no records to consume because it is already up-to-date", "last_committed_offset", consumerGroupLastCommittedOffset, "last_produced_offset", lastProducedOffset) + return 0, nil + } + // This message is NOT expected to be logged with a very high rate. In this log we display the last measured // lag. If we don't have it (lag is zero value), then it will not be logged. level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset) @@ -380,9 +390,13 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t if lastProducedOffset <= p.lastProcessedOffset { break } + if time.Since(lastProducedOffsetRequestedAt) > time.Minute { + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset) + } - records := p.poll(ctx) - recordsChan <- records + timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + recordsChan <- p.poll(timedCtx) + cancel() } if boff.Err() != nil { return 0, boff.ErrCause() diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go index dfd653de78e3d..191c0fc304eaa 100644 --- a/pkg/kafka/partition/reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/grafana/loki/v3/pkg/kafka" @@ -232,3 +233,111 @@ func TestPartitionReader_ProcessCommits(t *testing.T) { // We expect to have processed all the records, including initial + one per iteration. assert.Equal(t, iterations+1, recordsCount) } + +func TestPartitionReader_StartsAtNextOffset(t *testing.T) { + kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test") + consumer := newMockConsumer() + + kaf.CurrentNode() + consumerFactory := func(_ Committer) (Consumer, error) { + return consumer, nil + } + + // Produce some records + producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + stream := logproto.Stream{ + Labels: labels.FromStrings("foo", "bar").String(), + } + for i := 0; i < 5; i++ { + stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}} + records, err := kafka.Encode(0, "test-tenant", stream, 10<<20) + require.NoError(t, err) + require.Len(t, records, 1) + + producer.ProduceSync(context.Background(), records...) + } + + // Set our offset part way through the records we just produced + offset := int64(1) + kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger()) + require.NoError(t, err) + admClient := kadm.NewClient(kafkaClient) + toCommit := kadm.Offsets{} + toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1) + resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit) + require.NoError(t, err) + require.NoError(t, resp.Error()) + + // Start reading + partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + err = services.StartAndAwaitRunning(context.Background(), partitionReader) + require.NoError(t, err) + + // Wait for records to be processed + require.Eventually(t, func() bool { + return len(consumer.recordsChan) == 1 // All pending messages will be received in one batch + }, 10*time.Second, 10*time.Millisecond) + + // Check we only received records from the last commit onwards, and the last committed offset is not reprocessed. + receivedRecords := <-consumer.recordsChan + require.Len(t, receivedRecords, 3) // Offsets are 0 based, so we should read offsets 2,3,4 + for _, record := range receivedRecords { + assert.NotContainsf(t, record.Content, "test-0", "record %q should not contain test-0", record.Content) + assert.NotContainsf(t, record.Content, "test-1", "record %q should not contain test-1", record.Content) + } + + err = services.StopAndAwaitTerminated(context.Background(), partitionReader) + require.NoError(t, err) +} + +func TestPartitionReader_StartsUpIfNoNewRecordsAreAvailable(t *testing.T) { + kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test") + consumer := newMockConsumer() + + kaf.CurrentNode() + consumerFactory := func(_ Committer) (Consumer, error) { + return consumer, nil + } + + // Produce some records + producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + stream := logproto.Stream{ + Labels: labels.FromStrings("foo", "bar").String(), + } + for i := 0; i < 5; i++ { + stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}} + records, err := kafka.Encode(0, "test-tenant", stream, 10<<20) + require.NoError(t, err) + require.Len(t, records, 1) + + producer.ProduceSync(context.Background(), records...) + } + + // Set our offset to the last record produced + offset := int64(4) + kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger()) + require.NoError(t, err) + admClient := kadm.NewClient(kafkaClient) + toCommit := kadm.Offsets{} + toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1) + resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit) + require.NoError(t, err) + require.NoError(t, resp.Error()) + + // Start reading + partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = services.StartAndAwaitRunning(ctx, partitionReader) + require.NoError(t, err) + + // Check we didn't receive any records: This is a sanity check. We shouldn't get this far if we deadlock during startup. + require.Len(t, consumer.recordsChan, 0) + + err = services.StopAndAwaitTerminated(context.Background(), partitionReader) + require.NoError(t, err) +}