Skip to content

Commit

Permalink
fix: Ensure partition-reader starts up correctly (#14845)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Nov 8, 2024
1 parent 4bfa380 commit b2f3d2e
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 2 deletions.
18 changes: 16 additions & 2 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (p *Reader) start(ctx context.Context) error {

// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
lastCommittedOffset := p.fetchLastCommittedOffset(ctx)
if lastCommittedOffset > 0 {
lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset.
}
p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().At(lastCommittedOffset)},
})
Expand Down Expand Up @@ -349,6 +352,8 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
continue
}

consumerGroupLastCommittedOffset := p.fetchLastCommittedOffset(ctx)

// Send a direct request to the Kafka backend to fetch the last produced offset.
// We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further
// latency.
Expand All @@ -371,6 +376,11 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
return 0, nil
}

if consumerGroupLastCommittedOffset == lastProducedOffset {
level.Info(logger).Log("msg", "partition reader found no records to consume because it is already up-to-date", "last_committed_offset", consumerGroupLastCommittedOffset, "last_produced_offset", lastProducedOffset)
return 0, nil
}

// This message is NOT expected to be logged with a very high rate. In this log we display the last measured
// lag. If we don't have it (lag is zero value), then it will not be logged.
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset)
Expand All @@ -380,9 +390,13 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
if lastProducedOffset <= p.lastProcessedOffset {
break
}
if time.Since(lastProducedOffsetRequestedAt) > time.Minute {
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset)
}

records := p.poll(ctx)
recordsChan <- records
timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
recordsChan <- p.poll(timedCtx)
cancel()
}
if boff.Err() != nil {
return 0, boff.ErrCause()
Expand Down
109 changes: 109 additions & 0 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/kafka"
Expand Down Expand Up @@ -232,3 +233,111 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
// We expect to have processed all the records, including initial + one per iteration.
assert.Equal(t, iterations+1, recordsCount)
}

func TestPartitionReader_StartsAtNextOffset(t *testing.T) {
kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
consumer := newMockConsumer()

kaf.CurrentNode()
consumerFactory := func(_ Committer) (Consumer, error) {
return consumer, nil
}

// Produce some records
producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
stream := logproto.Stream{
Labels: labels.FromStrings("foo", "bar").String(),
}
for i := 0; i < 5; i++ {
stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}}
records, err := kafka.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

producer.ProduceSync(context.Background(), records...)
}

// Set our offset part way through the records we just produced
offset := int64(1)
kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
require.NoError(t, err)
admClient := kadm.NewClient(kafkaClient)
toCommit := kadm.Offsets{}
toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1)
resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit)
require.NoError(t, err)
require.NoError(t, resp.Error())

// Start reading
partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), partitionReader)
require.NoError(t, err)

// Wait for records to be processed
require.Eventually(t, func() bool {
return len(consumer.recordsChan) == 1 // All pending messages will be received in one batch
}, 10*time.Second, 10*time.Millisecond)

// Check we only received records from the last commit onwards, and the last committed offset is not reprocessed.
receivedRecords := <-consumer.recordsChan
require.Len(t, receivedRecords, 3) // Offsets are 0 based, so we should read offsets 2,3,4
for _, record := range receivedRecords {
assert.NotContainsf(t, record.Content, "test-0", "record %q should not contain test-0", record.Content)
assert.NotContainsf(t, record.Content, "test-1", "record %q should not contain test-1", record.Content)
}

err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
require.NoError(t, err)
}

func TestPartitionReader_StartsUpIfNoNewRecordsAreAvailable(t *testing.T) {
kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
consumer := newMockConsumer()

kaf.CurrentNode()
consumerFactory := func(_ Committer) (Consumer, error) {
return consumer, nil
}

// Produce some records
producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
stream := logproto.Stream{
Labels: labels.FromStrings("foo", "bar").String(),
}
for i := 0; i < 5; i++ {
stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}}
records, err := kafka.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

producer.ProduceSync(context.Background(), records...)
}

// Set our offset to the last record produced
offset := int64(4)
kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
require.NoError(t, err)
admClient := kadm.NewClient(kafkaClient)
toCommit := kadm.Offsets{}
toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1)
resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit)
require.NoError(t, err)
require.NoError(t, resp.Error())

// Start reading
partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = services.StartAndAwaitRunning(ctx, partitionReader)
require.NoError(t, err)

// Check we didn't receive any records: This is a sanity check. We shouldn't get this far if we deadlock during startup.
require.Len(t, consumer.recordsChan, 0)

err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
require.NoError(t, err)
}

0 comments on commit b2f3d2e

Please sign in to comment.