From c122356003fa44027694494ce6e88932a5c0b2d4 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 19 Sep 2024 17:37:56 +0200 Subject: [PATCH 01/10] feat(kafka): Add Ingestion from Kafka in Ingesters --- pkg/ingester/ingester.go | 34 ++++++++ pkg/ingester/kafka_consumer.go | 80 +++++++++++++++++++ pkg/kafka/config.go | 48 ++++++++++- pkg/kafka/config_test.go | 41 ++++++++++ pkg/kafka/encoding.go | 9 +++ pkg/kafka/ingester/consumer.go | 26 +++--- pkg/kafka/ingester/consumer_test.go | 31 +++---- pkg/kafka/ingester/ingester.go | 32 ++------ pkg/kafka/ingester/ingester_test.go | 47 ----------- .../committer.go} | 72 +++++++++++++++-- .../committer_test.go} | 4 +- .../reader.go} | 67 +++++++++------- .../reader_test.go} | 14 ++-- pkg/kafka/partitionring/parition_ring_test.go | 50 ++++++++++++ pkg/kafka/partitionring/partition_ring.go | 25 ++++++ pkg/kafka/reader_client.go | 4 +- pkg/kafka/writer_client.go | 4 +- 17 files changed, 433 insertions(+), 155 deletions(-) create mode 100644 pkg/ingester/kafka_consumer.go create mode 100644 pkg/kafka/config_test.go rename pkg/kafka/{ingester/partition_committer.go => partition/committer.go} (70%) rename pkg/kafka/{ingester/partition_committer_test.go => partition/committer_test.go} (95%) rename pkg/kafka/{ingester/partition_reader.go => partition/reader.go} (84%) rename pkg/kafka/{ingester/partition_reader_test.go => partition/reader_test.go} (86%) create mode 100644 pkg/kafka/partitionring/parition_ring_test.go diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9c913f9049f44..71db393e97d85 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -15,6 +15,7 @@ import ( "time" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/kafka/partitionring" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" @@ -26,6 +27,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/ring" @@ -293,6 +295,10 @@ type Ingester struct { // recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance. readRing ring.ReadRing recalculateOwnedStreams *recalculateOwnedStreams + + ingestPartitionID int32 + partitionRingLifecycler *ring.PartitionInstanceLifecycler + partitionReader *partition.Reader } // New makes a new Ingester. @@ -356,6 +362,34 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con i.lifecyclerWatcher = services.NewFailureWatcher() i.lifecyclerWatcher.WatchService(i.lifecycler) + if i.cfg.KafkaIngestion.Enabled { + i.ingestPartitionID, err = partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID) + if err != nil { + return nil, fmt.Errorf("calculating ingester partition ID: %w", err) + } + partitionRingKV := cfg.KafkaIngestion.PartitionRingConfig.KVStore.Mock + if partitionRingKV == nil { + partitionRingKV, err = kv.NewClient(cfg.KafkaIngestion.PartitionRingConfig.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger) + if err != nil { + return nil, fmt.Errorf("creating KV store for ingester partition ring: %w", err) + } + } + i.partitionRingLifecycler = ring.NewPartitionInstanceLifecycler( + i.cfg.KafkaIngestion.PartitionRingConfig.ToLifecyclerConfig(i.ingestPartitionID, cfg.LifecyclerConfig.ID), + PartitionRingName, + PartitionRingKey, + partitionRingKV, + logger, + prometheus.WrapRegistererWithPrefix("loki_", registerer)) + + i.partitionReader, err = partition.NewReader(cfg.KafkaIngestion.KafkaConfig, i.ingestPartitionID, cfg.LifecyclerConfig.ID, NewKafkaConsumerFactory(i, logger), logger, registerer) + if err != nil { + return nil, err + } + i.lifecyclerWatcher.WatchService(i.partitionRingLifecycler) + i.lifecyclerWatcher.WatchService(i.partitionReader) + } + // Now that the lifecycler has been created, we can create the limiter // which depends on it. i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go new file mode 100644 index 0000000000000..8ac5e1dc9c314 --- /dev/null +++ b/pkg/ingester/kafka_consumer.go @@ -0,0 +1,80 @@ +package ingester + +import ( + "context" + math "math" + "sync" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/user" + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" + "github.com/grafana/loki/v3/pkg/logproto" +) + +func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger) partition.ConsumerFactory { + return func(committer partition.Committer) (partition.Consumer, error) { + decoder, err := kafka.NewDecoder() + if err != nil { + return nil, err + } + return &kafkaConsumer{ + pusher: pusher, + logger: logger, + decoder: decoder, + }, nil + } +} + +type kafkaConsumer struct { + pusher logproto.PusherServer + logger log.Logger + decoder *kafka.Decoder +} + +func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + level.Info(kc.logger).Log("msg", "shutting down kafka consumer") + return + case records := <-recordsChan: + kc.consume(records) + } + } + }() + return wg.Wait +} + +func (kc *kafkaConsumer) consume(records []partition.Record) { + if len(records) == 0 { + return + } + var ( + minOffset = int64(math.MaxInt64) + maxOffset = int64(0) + ) + for _, record := range records { + minOffset = min(minOffset, record.Offset) + maxOffset = max(maxOffset, record.Offset) + } + level.Debug(kc.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset) + for _, record := range records { + stream, err := kc.decoder.DecodeWithoutLabels(record.Content) + if err != nil { + level.Error(kc.logger).Log("msg", "failed to decode record", "error", err) + continue + } + ctx := user.InjectOrgID(record.Ctx, record.TenantID) + if _, err := kc.pusher.Push(ctx, &logproto.PushRequest{ + Streams: []logproto.Stream{stream}, + }); err != nil { + level.Error(kc.logger).Log("msg", "failed to push records", "error", err) + } + } +} diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index f916b145f0084..1b431448ffacd 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -1,12 +1,18 @@ package kafka import ( + "context" "errors" "flag" "fmt" "strconv" "strings" "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" ) const ( @@ -52,12 +58,13 @@ type Config struct { DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` - ConsumerGroup string `yaml:"consumer_group"` + ConsumerGroup string `yaml:"consumer_group"` + ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"` LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"` - AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` - // AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` + AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` + AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"` @@ -75,11 +82,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.") f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.") + f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.") f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+".last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.") f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic if it doesn't exist.") - // f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.") + f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 1000, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.") f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.") f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") @@ -107,3 +115,35 @@ func (cfg *Config) GetConsumerGroup(instanceID string, partitionID int32) string return strings.ReplaceAll(cfg.ConsumerGroup, "", strconv.Itoa(int(partitionID))) } + +// SetDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers. +// This is best-effort, if setting the option fails, error is logged, but not returned. +func (cfg Config) SetDefaultNumberOfPartitionsForAutocreatedTopics(logger log.Logger) { + if cfg.AutoCreateTopicDefaultPartitions <= 0 { + return + } + + cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...) + if err != nil { + level.Error(logger).Log("msg", "failed to create kafka client", "err", err) + return + } + + adm := kadm.NewClient(cl) + defer adm.Close() + + defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions) + _, err = adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{ + { + Op: kadm.SetConfig, + Name: "num.partitions", + Value: &defaultNumberOfPartitions, + }, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err) + return + } + + level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions) +} diff --git a/pkg/kafka/config_test.go b/pkg/kafka/config_test.go new file mode 100644 index 0000000000000..7c21e38fd141e --- /dev/null +++ b/pkg/kafka/config_test.go @@ -0,0 +1,41 @@ +package kafka + +import ( + "testing" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) { + cluster, err := kfake.NewCluster(kfake.NumBrokers(1)) + require.NoError(t, err) + t.Cleanup(cluster.Close) + + addrs := cluster.ListenAddrs() + require.Len(t, addrs, 1) + + cfg := Config{ + Address: addrs[0], + AutoCreateTopicDefaultPartitions: 100, + } + + cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) { + r := request.(*kmsg.AlterConfigsRequest) + + require.Len(t, r.Resources, 1) + res := r.Resources[0] + require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType) + require.Len(t, res.Configs, 1) + cfg := res.Configs[0] + require.Equal(t, "num.partitions", cfg.Name) + require.NotNil(t, *cfg.Value) + require.Equal(t, "100", *cfg.Value) + + return &kmsg.AlterConfigsResponse{}, nil, true + }) + + cfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(log.NewNopLogger()) +} diff --git a/pkg/kafka/encoding.go b/pkg/kafka/encoding.go index c4977054f32f6..65daf59c25e77 100644 --- a/pkg/kafka/encoding.go +++ b/pkg/kafka/encoding.go @@ -167,6 +167,15 @@ func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) { return *d.stream, ls, nil } +// DecodeWithoutLabels converts a Kafka record's byte data back into a logproto.Stream without parsing labels. +func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) { + d.stream.Entries = d.stream.Entries[:0] + if err := d.stream.Unmarshal(data); err != nil { + return logproto.Stream{}, fmt.Errorf("failed to unmarshal stream: %w", err) + } + return *d.stream, nil +} + // sovPush calculates the size of varint-encoded uint64. // It is used to determine the number of bytes needed to encode a uint64 value // in Protocol Buffers' variable-length integer format. diff --git a/pkg/kafka/ingester/consumer.go b/pkg/kafka/ingester/consumer.go index d011ae16517d2..57abb2b00ff3f 100644 --- a/pkg/kafka/ingester/consumer.go +++ b/pkg/kafka/ingester/consumer.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/wal" ) @@ -36,17 +37,12 @@ type MetadataStore interface { AddBlock(ctx context.Context, in *metastorepb.AddBlockRequest, opts ...grpc.CallOption) (*metastorepb.AddBlockResponse, error) } -// Committer defines an interface for committing offsets -type Committer interface { - Commit(ctx context.Context, offset int64) error -} - // consumer represents a Kafka consumer that processes and stores log entries type consumer struct { metastoreClient MetadataStore storage ObjectStorage writer *wal.SegmentWriter - committer Committer + committer partition.Committer flushInterval time.Duration maxFlushSize int64 lastOffset int64 @@ -67,8 +63,8 @@ func NewConsumerFactory( maxFlushSize int64, logger log.Logger, reg prometheus.Registerer, -) ConsumerFactory { - return func(committer Committer) (Consumer, error) { +) partition.ConsumerFactory { + return func(committer partition.Committer) (partition.Consumer, error) { writer, err := wal.NewWalSegmentWriter() if err != nil { return nil, err @@ -95,7 +91,7 @@ func NewConsumerFactory( // Start starts the consumer and returns a function to wait for it to finish // It consumes records from the recordsChan, and flushes them to storage periodically. -func (c *consumer) Start(ctx context.Context, recordsChan <-chan []record) func() { +func (c *consumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() { var wg sync.WaitGroup wg.Add(1) go func() { @@ -127,7 +123,7 @@ func (c *consumer) Start(ctx context.Context, recordsChan <-chan []record) func( } // consume processes a batch of Kafka records, decoding and storing them -func (c *consumer) consume(records []record) error { +func (c *consumer) consume(records []partition.Record) error { if len(records) == 0 { return nil } @@ -136,8 +132,8 @@ func (c *consumer) consume(records []record) error { maxOffset = int64(0) ) for _, record := range records { - minOffset = min(minOffset, record.offset) - maxOffset = max(maxOffset, record.offset) + minOffset = min(minOffset, record.Offset) + maxOffset = max(maxOffset, record.Offset) } level.Debug(c.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset) return c.retryWithBackoff(context.Background(), backoff.Config{ @@ -163,9 +159,9 @@ func (c *consumer) consume(records []record) error { }) } -func (c *consumer) appendRecords(records []record) error { +func (c *consumer) appendRecords(records []partition.Record) error { for _, record := range records { - stream, labels, err := c.decoder.Decode(record.content) + stream, labels, err := c.decoder.Decode(record.Content) if err != nil { return fmt.Errorf("failed to decode record: %w", err) } @@ -184,7 +180,7 @@ func (c *consumer) appendRecords(records []record) error { Parsed: entry.Parsed, }) } - c.writer.Append(record.tenantID, stream.Labels, labels, c.toStore, time.Now()) + c.writer.Append(record.TenantID, stream.Labels, labels, c.toStore, time.Now()) } return nil } diff --git a/pkg/kafka/ingester/consumer_test.go b/pkg/kafka/ingester/consumer_test.go index 3f0adcce6247d..c6e14ddebeca8 100644 --- a/pkg/kafka/ingester/consumer_test.go +++ b/pkg/kafka/ingester/consumer_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -50,7 +51,7 @@ func TestConsumer_PeriodicFlush(t *testing.T) { consumer, err := consumerFactory(committer) require.NoError(t, err) - recordsChan := make(chan []record) + recordsChan := make(chan []partition.Record) _ = consumer.Start(ctx, recordsChan) stream := logproto.Stream{ @@ -63,10 +64,10 @@ func TestConsumer_PeriodicFlush(t *testing.T) { encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) require.NoError(t, err) - records := []record{{ - tenantID: "tenant1", - content: encodedRecords[0].Value, - offset: 0, + records := []partition.Record{{ + TenantID: "tenant1", + Content: encodedRecords[0].Value, + Offset: 0, }} recordsChan <- records @@ -103,7 +104,7 @@ func TestConsumer_ShutdownFlush(t *testing.T) { consumer, err := consumerFactory(committer) require.NoError(t, err) - recordsChan := make(chan []record) + recordsChan := make(chan []partition.Record) wait := consumer.Start(ctx, recordsChan) stream := logproto.Stream{ @@ -116,10 +117,10 @@ func TestConsumer_ShutdownFlush(t *testing.T) { encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) require.NoError(t, err) - records := []record{{ - tenantID: "tenant1", - content: encodedRecords[0].Value, - offset: 0, + records := []partition.Record{{ + TenantID: "tenant1", + Content: encodedRecords[0].Value, + Offset: 0, }} recordsChan <- records @@ -157,7 +158,7 @@ func TestConsumer_MaxFlushSize(t *testing.T) { consumer, err := consumerFactory(committer) require.NoError(t, err) - recordsChan := make(chan []record) + recordsChan := make(chan []partition.Record) _ = consumer.Start(ctx, recordsChan) stream := logproto.Stream{ @@ -170,10 +171,10 @@ func TestConsumer_MaxFlushSize(t *testing.T) { encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) require.NoError(t, err) - records := []record{{ - tenantID: "tenant1", - content: encodedRecords[0].Value, - offset: 0, + records := []partition.Record{{ + TenantID: "tenant1", + Content: encodedRecords[0].Value, + Offset: 0, }} recordsChan <- records diff --git a/pkg/kafka/ingester/ingester.go b/pkg/kafka/ingester/ingester.go index ef356778e4390..39595df142ba7 100644 --- a/pkg/kafka/ingester/ingester.go +++ b/pkg/kafka/ingester/ingester.go @@ -6,9 +6,6 @@ import ( "flag" "fmt" "net/http" - "regexp" - "strconv" - "strings" "time" "github.com/go-kit/log" @@ -21,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/kafka/ingester/shutdownmarker" + "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/kafka/partitionring" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -33,7 +31,6 @@ const ( ) var ( - ingesterIDRegexp = regexp.MustCompile("-([0-9]+)$") defaultFlushInterval = 15 * time.Second defaultFlushSize int64 = 300 << 20 // 300 MB ) @@ -98,19 +95,19 @@ type Ingester struct { lifecyclerWatcher *services.FailureWatcher ingesterPartitionID int32 partitionRingLifecycler *ring.PartitionInstanceLifecycler - partitionReader *PartitionReader + partitionReader *partition.Reader } // New makes a new Ingester. func New(cfg Config, - consumerFactory ConsumerFactory, + consumerFactory partition.ConsumerFactory, logger log.Logger, metricsNamespace string, registerer prometheus.Registerer, ) (*Ingester, error) { metrics := newIngesterMetrics(registerer) - ingesterPartitionID, err := extractIngesterPartitionID(cfg.LifecyclerConfig.ID) + ingesterPartitionID, err := partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID) if err != nil { return nil, fmt.Errorf("calculating ingester partition ID: %w", err) } @@ -142,7 +139,7 @@ func New(cfg Config, if err != nil { return nil, err } - i.partitionReader, err = NewPartitionReader(cfg.KafkaConfig, ingesterPartitionID, cfg.LifecyclerConfig.ID, consumerFactory, logger, registerer) + i.partitionReader, err = partition.NewReader(cfg.KafkaConfig, ingesterPartitionID, cfg.LifecyclerConfig.ID, consumerFactory, logger, registerer) if err != nil { return nil, err } @@ -157,25 +154,6 @@ func New(cfg Config, return i, nil } -// ingesterPartitionID returns the partition ID owner the the given ingester. -func extractIngesterPartitionID(ingesterID string) (int32, error) { - if strings.Contains(ingesterID, "local") { - return 0, nil - } - - match := ingesterIDRegexp.FindStringSubmatch(ingesterID) - if len(match) == 0 { - return 0, fmt.Errorf("ingester ID %s doesn't match regular expression %q", ingesterID, ingesterIDRegexp.String()) - } - // Parse the ingester sequence number. - ingesterSeq, err := strconv.Atoi(match[1]) - if err != nil { - return 0, fmt.Errorf("no ingester sequence number in ingester ID %s", ingesterID) - } - - return int32(ingesterSeq), nil -} - // ServeHTTP implements the pattern ring status page. func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) { i.lifecycler.ServeHTTP(w, r) diff --git a/pkg/kafka/ingester/ingester_test.go b/pkg/kafka/ingester/ingester_test.go index a3bcca72ca3d8..41a0c2dd126c7 100644 --- a/pkg/kafka/ingester/ingester_test.go +++ b/pkg/kafka/ingester/ingester_test.go @@ -99,53 +99,6 @@ func defaultIngesterTestConfig(t testing.TB) Config { return cfg } -func TestExtractIngesterPartitionID(t *testing.T) { - tests := []struct { - name string - ingesterID string - want int32 - wantErr bool - }{ - { - name: "Valid ingester ID", - ingesterID: "ingester-5", - want: 5, - wantErr: false, - }, - { - name: "Local ingester ID", - ingesterID: "ingester-local", - want: 0, - wantErr: false, - }, - { - name: "Invalid ingester ID format", - ingesterID: "invalid-format", - want: 0, - wantErr: true, - }, - { - name: "Invalid sequence number", - ingesterID: "ingester-abc", - want: 0, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := extractIngesterPartitionID(tt.ingesterID) - if (err != nil) != tt.wantErr { - t.Errorf("extractIngesterPartitionID() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("extractIngesterPartitionID() = %v, want %v", got, tt.want) - } - }) - } -} - // TestMetastore is a simple in-memory metastore for testing type TestMetastore struct { blocks map[string][]*metastorepb.BlockMeta diff --git a/pkg/kafka/ingester/partition_committer.go b/pkg/kafka/partition/committer.go similarity index 70% rename from pkg/kafka/ingester/partition_committer.go rename to pkg/kafka/partition/committer.go index a76e363a64e4b..444eb50de06b4 100644 --- a/pkg/kafka/ingester/partition_committer.go +++ b/pkg/kafka/partition/committer.go @@ -1,19 +1,25 @@ -package ingester +package partition import ( "context" "strconv" + "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/loki/v3/pkg/kafka" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" - - "github.com/grafana/loki/v3/pkg/kafka" + "go.uber.org/atomic" ) +// Committer defines an interface for committing offsets +type Committer interface { + Commit(ctx context.Context, offset int64) error +} + // partitionCommitter is responsible for committing offsets for a specific Kafka partition // to the Kafka broker. It also tracks metrics related to the commit process. type partitionCommitter struct { @@ -28,11 +34,15 @@ type partitionCommitter struct { kafkaCfg kafka.Config partitionID int32 consumerGroup string + + toCommit *atomic.Int64 + wg sync.WaitGroup + cancel context.CancelFunc } -// newPartitionCommitter creates and initializes a new partitionCommitter. +// newCommitter creates and initializes a new partitionCommitter. // It sets up the necessary metrics and initializes the committer with the provided configuration. -func newPartitionCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter { +func newCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter { c := &partitionCommitter{ logger: logger, kafkaCfg: kafkaCfg, @@ -63,14 +73,51 @@ func newPartitionCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partit Help: "The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet.", ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))}, }), + toCommit: atomic.NewInt64(-1), } // Initialise the last committed offset metric to -1 to signal no offset has been committed yet (0 is a valid offset). c.lastCommittedOffset.Set(-1) + if kafkaCfg.ConsumerGroupOffsetCommitInterval > 0 { + c.wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + go c.autoCommitLoop(ctx) + } + return c } +func (r *partitionCommitter) autoCommitLoop(ctx context.Context) { + defer r.wg.Done() + commitTicker := time.NewTicker(r.kafkaCfg.ConsumerGroupOffsetCommitInterval) + defer commitTicker.Stop() + + previousOffset := r.toCommit.Load() + for { + select { + case <-ctx.Done(): + return + case <-commitTicker.C: + currOffset := r.toCommit.Load() + if currOffset == previousOffset { + continue + } + + if err := r.Commit(ctx, currOffset); err == nil { + previousOffset = currOffset + } + } + } +} + +func (r *partitionCommitter) enqueueOffset(o int64) { + if r.kafkaCfg.ConsumerGroupOffsetCommitInterval > 0 { + r.toCommit.Store(o) + } +} + // commit attempts to commit the given offset to Kafka for the partition this committer is responsible for. // It updates relevant metrics and logs the result of the commit operation. func (r *partitionCommitter) Commit(ctx context.Context, offset int64) (returnErr error) { @@ -101,3 +148,18 @@ func (r *partitionCommitter) Commit(ctx context.Context, offset int64) (returnEr r.lastCommittedOffset.Set(float64(committedOffset.At)) return nil } + +func (r *partitionCommitter) Stop() { + if r.kafkaCfg.ConsumerGroupOffsetCommitInterval <= 0 { + return + } + r.cancel() + r.wg.Wait() + + offset := r.toCommit.Load() + if offset < 0 { + return + } + // Commit has internal timeouts, so this call shouldn't block for too long. + _ = r.Commit(context.Background(), offset) +} diff --git a/pkg/kafka/ingester/partition_committer_test.go b/pkg/kafka/partition/committer_test.go similarity index 95% rename from pkg/kafka/ingester/partition_committer_test.go rename to pkg/kafka/partition/committer_test.go index 8fb823e3f2ed5..9ef02f910e5d0 100644 --- a/pkg/kafka/ingester/partition_committer_test.go +++ b/pkg/kafka/partition/committer_test.go @@ -1,4 +1,4 @@ -package ingester +package partition import ( "context" @@ -36,7 +36,7 @@ func TestPartitionCommitter(t *testing.T) { reg := prometheus.NewRegistry() partitionID := int32(1) consumerGroup := "test-consumer-group" - committer := newPartitionCommitter(kafkaCfg, admClient, partitionID, consumerGroup, logger, reg) + committer := newCommitter(kafkaCfg, admClient, partitionID, consumerGroup, logger, reg) // Test committing an offset ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/pkg/kafka/ingester/partition_reader.go b/pkg/kafka/partition/reader.go similarity index 84% rename from pkg/kafka/ingester/partition_reader.go rename to pkg/kafka/partition/reader.go index 5ed70412d9e0c..d5979fae5a6e9 100644 --- a/pkg/kafka/ingester/partition_reader.go +++ b/pkg/kafka/partition/reader.go @@ -1,4 +1,4 @@ -package ingester +package partition import ( "context" @@ -10,20 +10,19 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" + "github.com/grafana/loki/v3/pkg/kafka" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/plugin/kprom" - - "github.com/grafana/loki/v3/pkg/kafka" ) -// PartitionReader is responsible for reading data from a specific Kafka partition +// Reader is responsible for reading data from a specific Kafka partition // and passing it to the consumer for processing. It is a core component of the // Loki ingester's Kafka-based ingestion pipeline. -type PartitionReader struct { +type Reader struct { services.Service kafkaCfg kafka.Config @@ -39,31 +38,31 @@ type PartitionReader struct { reg prometheus.Registerer } -type record struct { +type Record struct { // Context holds the tracing (and potentially other) info, that the record was enriched with on fetch from Kafka. - ctx context.Context - tenantID string - content []byte - offset int64 + Ctx context.Context + TenantID string + Content []byte + Offset int64 } type ConsumerFactory func(committer Committer) (Consumer, error) type Consumer interface { - Start(ctx context.Context, recordsChan <-chan []record) func() + Start(ctx context.Context, recordsChan <-chan []Record) func() } -// NewPartitionReader creates and initializes a new PartitionReader. +// NewReader creates and initializes a new PartitionReader. // It sets up the basic service and initializes the reader with the provided configuration. -func NewPartitionReader( +func NewReader( kafkaCfg kafka.Config, partitionID int32, instanceID string, consumerFactory ConsumerFactory, logger log.Logger, reg prometheus.Registerer, -) (*PartitionReader, error) { - r := &PartitionReader{ +) (*Reader, error) { + r := &Reader{ kafkaCfg: kafkaCfg, partitionID: partitionID, consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), @@ -79,7 +78,7 @@ func NewPartitionReader( // start initializes the Kafka client and committer for the PartitionReader. // This method is called when the PartitionReader service starts. -func (p *PartitionReader) start(_ context.Context) error { +func (p *Reader) start(_ context.Context) error { var err error p.client, err = kafka.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger, kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{ @@ -89,14 +88,14 @@ func (p *PartitionReader) start(_ context.Context) error { if err != nil { return errors.Wrap(err, "creating kafka reader client") } - p.committer = newPartitionCommitter(p.kafkaCfg, kadm.NewClient(p.client), p.partitionID, p.consumerGroup, p.logger, p.reg) - + p.committer = newCommitter(p.kafkaCfg, kadm.NewClient(p.client), p.partitionID, p.consumerGroup, p.logger, p.reg) + // todo: attempt to ensure max lag timestamp on startup. return nil } // run is the main loop of the PartitionReader. It continuously fetches and processes // data from Kafka, and send it to the consumer. -func (p *PartitionReader) run(ctx context.Context) error { +func (p *Reader) run(ctx context.Context) error { level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -110,11 +109,12 @@ func (p *PartitionReader) run(ctx context.Context) error { wait := consumer.Start(ctx, recordsChan) wait() + p.committer.Stop() return nil } -func (p *PartitionReader) startFetchLoop(ctx context.Context) <-chan []record { - records := make(chan []record) +func (p *Reader) startFetchLoop(ctx context.Context) <-chan []Record { + records := make(chan []Record) go func() { for { select { @@ -122,6 +122,7 @@ func (p *PartitionReader) startFetchLoop(ctx context.Context) <-chan []record { return default: records <- p.poll(ctx) + p.committer.enqueueOffset(p.lastProcessedOffset) } } }() @@ -129,7 +130,7 @@ func (p *PartitionReader) startFetchLoop(ctx context.Context) <-chan []record { } // logFetchErrors logs any errors encountered during the fetch operation. -func (p *PartitionReader) logFetchErrors(fetches kgo.Fetches) { +func (p *Reader) logFetchErrors(fetches kgo.Fetches) { mErr := multierror.New() fetches.EachError(func(topic string, partition int32, err error) { if errors.Is(err, context.Canceled) { @@ -148,7 +149,7 @@ func (p *PartitionReader) logFetchErrors(fetches kgo.Fetches) { } // pollFetches retrieves the next batch of records from Kafka and measures the fetch duration. -func (p *PartitionReader) poll(ctx context.Context) []record { +func (p *Reader) poll(ctx context.Context) []Record { defer func(start time.Time) { p.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) @@ -159,23 +160,27 @@ func (p *PartitionReader) poll(ctx context.Context) []record { if fetches.NumRecords() == 0 { return nil } - records := make([]record, 0, fetches.NumRecords()) + records := make([]Record, 0, fetches.NumRecords()) fetches.EachRecord(func(rec *kgo.Record) { - records = append(records, record{ + if rec.Partition != p.partitionID { + level.Error(p.logger).Log("msg", "wrong partition record received", "partition", rec.Partition, "expected_partition", p.partitionID) + return + } + records = append(records, Record{ // This context carries the tracing data for this individual record; // kotel populates this data when it fetches the messages. - ctx: rec.Context, - tenantID: string(rec.Key), - content: rec.Value, - offset: rec.Offset, + Ctx: rec.Context, + TenantID: string(rec.Key), + Content: rec.Value, + Offset: rec.Offset, }) }) - p.lastProcessedOffset = records[len(records)-1].offset + p.lastProcessedOffset = records[len(records)-1].Offset return records } // recordFetchesMetrics updates various metrics related to the fetch operation. -func (p *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) { +func (p *Reader) recordFetchesMetrics(fetches kgo.Fetches) { var ( now = time.Now() numRecords = 0 diff --git a/pkg/kafka/ingester/partition_reader_test.go b/pkg/kafka/partition/reader_test.go similarity index 86% rename from pkg/kafka/ingester/partition_reader_test.go rename to pkg/kafka/partition/reader_test.go index de71dc53b7691..addc5779bb6a8 100644 --- a/pkg/kafka/ingester/partition_reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -1,4 +1,4 @@ -package ingester +package partition import ( "context" @@ -21,17 +21,17 @@ import ( type mockConsumer struct { mock.Mock - recordsChan chan []record + recordsChan chan []Record wg sync.WaitGroup } func newMockConsumer() *mockConsumer { return &mockConsumer{ - recordsChan: make(chan []record, 100), + recordsChan: make(chan []Record, 100), } } -func (m *mockConsumer) Start(ctx context.Context, recordsChan <-chan []record) func() { +func (m *mockConsumer) Start(ctx context.Context, recordsChan <-chan []Record) func() { m.wg.Add(1) go func() { defer m.wg.Done() @@ -60,7 +60,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { return consumer, nil } - partitionReader, err := NewPartitionReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) + partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) @@ -90,8 +90,8 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { select { case receivedRecords := <-consumer.recordsChan: require.Len(t, receivedRecords, 1) - assert.Equal(t, "test-tenant", receivedRecords[0].tenantID) - assert.Equal(t, records[0].Value, receivedRecords[0].content) + assert.Equal(t, "test-tenant", receivedRecords[0].TenantID) + assert.Equal(t, records[0].Value, receivedRecords[0].Content) case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for records") } diff --git a/pkg/kafka/partitionring/parition_ring_test.go b/pkg/kafka/partitionring/parition_ring_test.go new file mode 100644 index 0000000000000..ad24e0c4ff1d7 --- /dev/null +++ b/pkg/kafka/partitionring/parition_ring_test.go @@ -0,0 +1,50 @@ +package partitionring + +import "testing" + +func TestExtractIngesterPartitionID(t *testing.T) { + tests := []struct { + name string + ingesterID string + want int32 + wantErr bool + }{ + { + name: "Valid ingester ID", + ingesterID: "ingester-5", + want: 5, + wantErr: false, + }, + { + name: "Local ingester ID", + ingesterID: "ingester-local", + want: 0, + wantErr: false, + }, + { + name: "Invalid ingester ID format", + ingesterID: "invalid-format", + want: 0, + wantErr: true, + }, + { + name: "Invalid sequence number", + ingesterID: "ingester-abc", + want: 0, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ExtractIngesterPartitionID(tt.ingesterID) + if (err != nil) != tt.wantErr { + t.Errorf("extractIngesterPartitionID() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("extractIngesterPartitionID() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/kafka/partitionring/partition_ring.go b/pkg/kafka/partitionring/partition_ring.go index cd2b027784bae..15dad003dd931 100644 --- a/pkg/kafka/partitionring/partition_ring.go +++ b/pkg/kafka/partitionring/partition_ring.go @@ -2,12 +2,18 @@ package partitionring import ( "flag" + "fmt" + "regexp" + "strconv" + "strings" "time" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" ) +var ingesterIDRegexp = regexp.MustCompile("-([0-9]+)$") + type Config struct { KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances. This option needs be set on ingesters, distributors, queriers, and rulers when running in microservices mode."` @@ -45,3 +51,22 @@ func (cfg *Config) ToLifecyclerConfig(partitionID int32, instanceID string) ring PollingInterval: cfg.lifecyclerPollingInterval, } } + +// ExtractIngesterPartitionID returns the partition ID owner the the given ingester. +func ExtractIngesterPartitionID(ingesterID string) (int32, error) { + if strings.Contains(ingesterID, "local") { + return 0, nil + } + + match := ingesterIDRegexp.FindStringSubmatch(ingesterID) + if len(match) == 0 { + return 0, fmt.Errorf("ingester ID %s doesn't match regular expression %q", ingesterID, ingesterIDRegexp.String()) + } + // Parse the ingester sequence number. + ingesterSeq, err := strconv.Atoi(match[1]) + if err != nil { + return 0, fmt.Errorf("no ingester sequence number in ingester ID %s", ingesterID) + } + + return int32(ingesterSeq), nil +} diff --git a/pkg/kafka/reader_client.go b/pkg/kafka/reader_client.go index 1b8c6b3bc1dc5..2383326979a03 100644 --- a/pkg/kafka/reader_client.go +++ b/pkg/kafka/reader_client.go @@ -32,7 +32,9 @@ func NewReaderClient(cfg Config, metrics *kprom.Metrics, logger log.Logger, opts if err != nil { return nil, errors.Wrap(err, "creating kafka client") } - + if cfg.AutoCreateTopicEnabled { + cfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + } return client, nil } diff --git a/pkg/kafka/writer_client.go b/pkg/kafka/writer_client.go index 8f65679c01c17..ddd12a646d692 100644 --- a/pkg/kafka/writer_client.go +++ b/pkg/kafka/writer_client.go @@ -79,7 +79,9 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited". kgo.MaxBufferedBytes(0), ) - + if kafkaCfg.AutoCreateTopicEnabled { + kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + } return kgo.NewClient(opts...) } From f508978ff046c9051f09bfa225f8bc8c7882f608 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 20 Sep 2024 11:23:27 +0200 Subject: [PATCH 02/10] Add consumer metrics --- pkg/ingester/ingester.go | 2 +- pkg/ingester/kafka_consumer.go | 38 ++++++++++++++++++++++++++--- pkg/kafka/ingester/ingester_test.go | 5 ++-- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 71db393e97d85..3ac18f2e5b792 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -382,7 +382,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con logger, prometheus.WrapRegistererWithPrefix("loki_", registerer)) - i.partitionReader, err = partition.NewReader(cfg.KafkaIngestion.KafkaConfig, i.ingestPartitionID, cfg.LifecyclerConfig.ID, NewKafkaConsumerFactory(i, logger), logger, registerer) + i.partitionReader, err = partition.NewReader(cfg.KafkaIngestion.KafkaConfig, i.ingestPartitionID, cfg.LifecyclerConfig.ID, NewKafkaConsumerFactory(i, logger, registerer), logger, registerer) if err != nil { return nil, err } diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go index 8ac5e1dc9c314..210c631596558 100644 --- a/pkg/ingester/kafka_consumer.go +++ b/pkg/ingester/kafka_consumer.go @@ -4,6 +4,7 @@ import ( "context" math "math" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -11,9 +12,32 @@ import ( "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) -func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger) partition.ConsumerFactory { +type consumerMetrics struct { + consumeLatency prometheus.Histogram + currentOffset prometheus.Gauge +} + +// newConsumerMetrics initializes and returns a new consumerMetrics instance +func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics { + return &consumerMetrics{ + consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingester_partition_records_batch_process_duration_seconds", + Help: "How long a kafka ingester consumer spent processing a batch of records from Kafka.", + NativeHistogramBucketFactor: 1.1, + }), + currentOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_partition_current_offset", + Help: "The current offset of the Kafka ingester consumer.", + }), + } +} + +func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger, reg prometheus.Registerer) partition.ConsumerFactory { + metrics := newConsumerMetrics(reg) return func(committer partition.Committer) (partition.Consumer, error) { decoder, err := kafka.NewDecoder() if err != nil { @@ -23,6 +47,7 @@ func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger) pa pusher: pusher, logger: logger, decoder: decoder, + metrics: metrics, }, nil } } @@ -31,6 +56,8 @@ type kafkaConsumer struct { pusher logproto.PusherServer logger log.Logger decoder *kafka.Decoder + + metrics *consumerMetrics } func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() { @@ -56,13 +83,16 @@ func (kc *kafkaConsumer) consume(records []partition.Record) { return } var ( - minOffset = int64(math.MaxInt64) - maxOffset = int64(0) + minOffset = int64(math.MaxInt64) + maxOffset = int64(0) + consumeStart = time.Now() ) + for _, record := range records { minOffset = min(minOffset, record.Offset) maxOffset = max(maxOffset, record.Offset) } + level.Debug(kc.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset) for _, record := range records { stream, err := kc.decoder.DecodeWithoutLabels(record.Content) @@ -77,4 +107,6 @@ func (kc *kafkaConsumer) consume(records []partition.Record) { level.Error(kc.logger).Log("msg", "failed to push records", "error", err) } } + kc.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) + kc.metrics.currentOffset.Set(float64(maxOffset)) } diff --git a/pkg/kafka/ingester/ingester_test.go b/pkg/kafka/ingester/ingester_test.go index 41a0c2dd126c7..c7d62b9593a4c 100644 --- a/pkg/kafka/ingester/ingester_test.go +++ b/pkg/kafka/ingester/ingester_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/go-kit/log" - gokitlog "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/ring" @@ -28,8 +27,8 @@ func TestPreparePartitionDownscaleHandler(t *testing.T) { storage, err := objstore.NewTestStorage(t) require.NoError(t, err) ing, err := New(cfg, - NewConsumerFactory(NewTestMetastore(), storage, cfg.FlushInterval, cfg.FlushSize, gokitlog.NewNopLogger(), prometheus.NewRegistry()), - gokitlog.NewNopLogger(), "test", prometheus.NewRegistry()) + NewConsumerFactory(NewTestMetastore(), storage, cfg.FlushInterval, cfg.FlushSize, log.NewNopLogger(), prometheus.NewRegistry()), + log.NewNopLogger(), "test", prometheus.NewRegistry()) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), ing) require.NoError(t, err) From e312ffe138cb597db9e1da81e104993823a4231a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 20 Sep 2024 12:24:43 +0200 Subject: [PATCH 03/10] Add consumer tests --- pkg/ingester/kafka_consumer.go | 2 +- pkg/ingester/kafka_consumer_test.go | 123 ++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 pkg/ingester/kafka_consumer_test.go diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go index 210c631596558..c106819ef2bcf 100644 --- a/pkg/ingester/kafka_consumer.go +++ b/pkg/ingester/kafka_consumer.go @@ -38,7 +38,7 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics { func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger, reg prometheus.Registerer) partition.ConsumerFactory { metrics := newConsumerMetrics(reg) - return func(committer partition.Committer) (partition.Consumer, error) { + return func(_ partition.Committer) (partition.Consumer, error) { decoder, err := kafka.NewDecoder() if err != nil { return nil, err diff --git a/pkg/ingester/kafka_consumer_test.go b/pkg/ingester/kafka_consumer_test.go new file mode 100644 index 0000000000000..b9d9c53fab56d --- /dev/null +++ b/pkg/ingester/kafka_consumer_test.go @@ -0,0 +1,123 @@ +package ingester + +import ( + "context" + "os" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +var ( + streamBar = logproto.Stream{ + Labels: labels.Labels{labels.Label{Name: "stream", Value: "1"}}.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1).UTC(), + Line: "1", + }, + { + Timestamp: time.Unix(0, 2).UTC(), + Line: "2", + }, + }, + } + streamFoo = logproto.Stream{ + Labels: labels.Labels{labels.Label{Name: "stream", Value: "2"}}.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1).UTC(), + Line: "3", + }, + { + Timestamp: time.Unix(0, 2).UTC(), + Line: "4", + }, + }, + } +) + +type fakePusher struct { + pushes []*logproto.PushRequest +} + +func (f *fakePusher) Push(ctx context.Context, in *logproto.PushRequest) (*logproto.PushResponse, error) { + // we need to copy in as it will be reused by the decoder. + req := &logproto.PushRequest{} + for _, s := range in.Streams { + newStream := push.Stream{ + Labels: s.Labels, + Entries: make([]push.Entry, len(s.Entries)), + } + copy(newStream.Entries, s.Entries) + req.Streams = append(req.Streams, newStream) + } + f.pushes = append(f.pushes, req) + return nil, nil +} + +type noopCommitter struct{} + +func (noopCommitter) Commit(ctx context.Context, offset int64) error { return nil } + +func TestConsumer(t *testing.T) { + var ( + toPush []partition.Record + offset = int64(0) + pusher = &fakePusher{} + tenantID = "foo" + ) + + consumer, err := NewKafkaConsumerFactory(pusher, log.NewLogfmtLogger(os.Stdout), prometheus.NewRegistry())(&noopCommitter{}) + require.NoError(t, err) + + records, err := kafka.Encode(0, tenantID, streamBar, 10000) + require.NoError(t, err) + + for _, record := range records { + toPush = append(toPush, partition.Record{ + Ctx: context.Background(), + TenantID: tenantID, + Content: record.Value, + Offset: offset, + }) + offset++ + } + records, err = kafka.Encode(0, "foo", streamFoo, 10000) + require.NoError(t, err) + for _, record := range records { + toPush = append(toPush, partition.Record{ + Ctx: context.Background(), + TenantID: tenantID, + Content: record.Value, + Offset: offset, + }) + offset++ + } + + ctx, cancel := context.WithCancel(context.Background()) + recordChan := make(chan []partition.Record) + wait := consumer.Start(ctx, recordChan) + + recordChan <- toPush + + cancel() + wait() + + require.Equal(t, []*logproto.PushRequest{ + { + Streams: []logproto.Stream{streamBar}, + }, + { + Streams: []logproto.Stream{streamFoo}, + }, + }, pusher.pushes) +} From 054c8ef6b21de3bdd556ef2e26743b6763051c6b Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 20 Sep 2024 15:01:14 +0200 Subject: [PATCH 04/10] Adds partition downscale and startup --- pkg/ingester/downscale.go | 104 ++++++++++++++++++++++++++++++++++++++ pkg/ingester/ingester.go | 74 ++++++++++++++++++++++----- pkg/loki/modules.go | 3 ++ 3 files changed, 168 insertions(+), 13 deletions(-) create mode 100644 pkg/ingester/downscale.go diff --git a/pkg/ingester/downscale.go b/pkg/ingester/downscale.go new file mode 100644 index 0000000000000..560493765ce37 --- /dev/null +++ b/pkg/ingester/downscale.go @@ -0,0 +1,104 @@ +package ingester + +import ( + "net/http" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + + "github.com/grafana/loki/v3/pkg/util" +) + +// PreparePartitionDownscaleHandler prepares the ingester's partition downscaling. The partition owned by the +// ingester will switch to INACTIVE state (read-only). +// +// Following methods are supported: +// +// - GET +// Returns timestamp when partition was switched to INACTIVE state, or 0, if partition is not in INACTIVE state. +// +// - POST +// Switches the partition to INACTIVE state (if not yet), and returns the timestamp when the switch to +// INACTIVE state happened. +// +// - DELETE +// Sets partition back from INACTIVE to ACTIVE state. +func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) { + logger := log.With(i.logger, "partition", i.ingestPartitionID) + + // Don't allow callers to change the shutdown configuration while we're in the middle + // of starting or shutting down. + if i.State() != services.Running { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + if !i.cfg.KafkaIngestion.Enabled { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + switch r.Method { + case http.MethodPost: + // It's not allowed to prepare the downscale while in PENDING state. Why? Because if the downscale + // will be later cancelled, we don't know if it was requested in PENDING or ACTIVE state, so we + // don't know to which state reverting back. Given a partition is expected to stay in PENDING state + // for a short period, we simply don't allow this case. + state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if state == ring.PartitionPending { + level.Warn(logger).Log("msg", "received a request to prepare partition for shutdown, but the request can't be satisfied because the partition is in PENDING state") + w.WriteHeader(http.StatusConflict) + return + } + + if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionInactive); err != nil { + level.Error(logger).Log("msg", "failed to change partition state to inactive", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + case http.MethodDelete: + state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + // If partition is inactive, make it active. We ignore other states Active and especially Pending. + if state == ring.PartitionInactive { + // We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency + // in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried. + // Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design. + // We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than + // "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer + // than "lookback period" ago, it looks to be an edge case not worth to address. + if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil { + level.Error(logger).Log("msg", "failed to change partition state to active", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + } + + state, stateTimestamp, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if state == ring.PartitionInactive { + util.WriteJSONResponse(w, map[string]any{"timestamp": stateTimestamp.Unix()}) + } else { + util.WriteJSONResponse(w, map[string]any{"timestamp": 0}) + } +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 3ac18f2e5b792..72e185e24e5d6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -230,6 +230,7 @@ type Interface interface { GetOrCreateInstance(instanceID string) (*instance, error) ShutdownHandler(w http.ResponseWriter, r *http.Request) PrepareShutdown(w http.ResponseWriter, r *http.Request) + PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) } // Ingester builds chunks for incoming log streams. @@ -498,7 +499,15 @@ func (i *Ingester) setupAutoForget() { }() } -func (i *Ingester) starting(ctx context.Context) error { +func (i *Ingester) starting(ctx context.Context) (err error) { + defer func() { + if err != nil { + // if starting() fails for any reason (e.g., context canceled), + // the lifecycler must be stopped. + _ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler) + } + }() + if i.cfg.WAL.Enabled { start := time.Now() @@ -583,17 +592,6 @@ func (i *Ingester) starting(ctx context.Context) error { i.InitFlushQueues() - // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done - err := i.lifecycler.StartAsync(context.Background()) - if err != nil { - return err - } - - err = i.lifecycler.AwaitRunning(ctx) - if err != nil { - return err - } - shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename) shutdownMarker, err := shutdownMarkerExists(shutdownMarkerPath) if err != nil { @@ -605,16 +603,41 @@ func (i *Ingester) starting(ctx context.Context) error { i.setPrepareShutdown() } + // When kafka ingestion is enabled, we have to make sure that reader catches up replaying the partition + // BEFORE the ingester ring lifecycler is started, because once the ingester ring lifecycler will start + // it will switch the ingester state in the ring to ACTIVE. + if i.partitionReader != nil { + if err := services.StartAndAwaitRunning(ctx, i.partitionReader); err != nil { + return fmt.Errorf("failed to start partition reader: %w", err) + } + } + + // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done + err = i.lifecycler.StartAsync(context.Background()) + if err != nil { + return err + } + + err = i.lifecycler.AwaitRunning(ctx) + if err != nil { + return err + } + err = i.recalculateOwnedStreams.StartAsync(ctx) if err != nil { return fmt.Errorf("can not start recalculate owned streams service: %w", err) } - err = i.lifecycler.AwaitRunning(ctx) + err = i.recalculateOwnedStreams.AwaitRunning(ctx) if err != nil { return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err) } + if i.partitionRingLifecycler != nil { + if err := services.StartAndAwaitRunning(ctx, i.partitionRingLifecycler); err != nil { + return fmt.Errorf("failed to start partition ring lifecycler: %w", err) + } + } // start our loop i.loopDone.Add(1) go i.loop() @@ -647,6 +670,19 @@ func (i *Ingester) running(ctx context.Context) error { // At this point, loop no longer runs, but flushers are still running. func (i *Ingester) stopping(_ error) error { i.stopIncomingRequests() + + if i.partitionReader != nil { + if err := services.StopAndAwaitTerminated(context.Background(), i.partitionReader); err != nil { + level.Warn(i.logger).Log("msg", "failed to stop partition reader", "err", err) + } + } + + if i.partitionRingLifecycler != nil { + if err := services.StopAndAwaitTerminated(context.Background(), i.partitionRingLifecycler); err != nil { + level.Warn(i.logger).Log("msg", "failed to stop partition ring lifecycler", "err", err) + } + } + var errs util.MultiError errs.Add(i.wal.Stop()) @@ -803,6 +839,18 @@ func (i *Ingester) setPrepareShutdown() { i.lifecycler.SetUnregisterOnShutdown(true) i.terminateOnShutdown = true i.metrics.shutdownMarker.Set(1) + + if i.partitionRingLifecycler != nil { + // When the prepare shutdown endpoint is called there are two changes in the partitions ring behavior: + // + // 1. If setPrepareShutdown() is called at startup, because of the shutdown marker found on disk, + // the ingester shouldn't create the partition if doesn't exist, because we expect the ingester will + // be scaled down shortly after. + // 2. When the ingester will shutdown we'll have to remove the ingester from the partition owners, + // because we expect the ingester to be scaled down. + i.partitionRingLifecycler.SetCreatePartitionOnStartup(false) + i.partitionRingLifecycler.SetRemoveOwnerOnShutdown(true) + } } func (i *Ingester) unsetPrepareShutdown() { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1d7a99ec066c8..ed20ab6e6338f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -616,6 +616,9 @@ func (t *Loki) initIngester() (_ services.Service, err error) { t.Server.HTTP.Methods("POST", "GET", "DELETE").Path("/ingester/prepare_shutdown").Handler( httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.PrepareShutdown)), ) + t.Server.HTTP.Methods("POST", "GET", "DELETE").Path("/ingester/prepare-partition-downscale").Handler( + httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.PreparePartitionDownscaleHandler)), + ) t.Server.HTTP.Methods("POST", "GET").Path("/ingester/shutdown").Handler( httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)), ) From cc77f724d3ef4935bdbfd2d6be16eb46181a3bd4 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 20 Sep 2024 15:27:57 +0200 Subject: [PATCH 05/10] make format --- docs/sources/shared/configuration.md | 17 +++++++++++++++++ pkg/ingester/kafka_consumer.go | 5 +++-- pkg/ingester/kafka_consumer_test.go | 23 +++++++++++++++-------- pkg/kafka/partition/committer.go | 3 ++- pkg/kafka/partition/reader.go | 3 ++- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d50669016356c..e20eca9393a19 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -801,6 +801,12 @@ kafka_config: # CLI flag: -kafka.consumer-group [consumer_group: | default = ""] + # How frequently a consumer should commit the consumed offset to Kafka. The + # last committed offset is used at startup to continue the consumption from + # where it was left. + # CLI flag: -kafka.consumer-group-offset-commit-interval + [consumer_group_offset_commit_interval: | default = 1s] + # How long to retry a failed request to get the last produced offset. # CLI flag: -kafka.last-produced-offset-retry-timeout [last_produced_offset_retry_timeout: | default = 10s] @@ -809,6 +815,17 @@ kafka_config: # CLI flag: -kafka.auto-create-topic-enabled [auto_create_topic_enabled: | default = true] + # When auto-creation of Kafka topic is enabled and this value is positive, + # Kafka's num.partitions configuration option is set on Kafka brokers with + # this value when Mimir component that uses Kafka starts. This configuration + # option specifies the default number of partitions that the Kafka broker uses + # for auto-created topics. Note that this is a Kafka-cluster wide setting, and + # applies to any auto-created topic. If the setting of num.partitions fails, + # Mimir proceeds anyways, but auto-created topics could have an incorrect + # number of partitions. + # CLI flag: -kafka.auto-create-topic-default-partitions + [auto_create_topic_default_partitions: | default = 1000] + # The maximum size of a Kafka record data that should be generated by the # producer. An incoming write request larger than this size is split into # multiple Kafka records. We strongly recommend to not change this setting diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go index c106819ef2bcf..52c5ba96a661e 100644 --- a/pkg/ingester/kafka_consumer.go +++ b/pkg/ingester/kafka_consumer.go @@ -9,11 +9,12 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" ) type consumerMetrics struct { diff --git a/pkg/ingester/kafka_consumer_test.go b/pkg/ingester/kafka_consumer_test.go index b9d9c53fab56d..4df6705c7f72e 100644 --- a/pkg/ingester/kafka_consumer_test.go +++ b/pkg/ingester/kafka_consumer_test.go @@ -7,16 +7,20 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/kafka" - "github.com/grafana/loki/v3/pkg/kafka/partition" - "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/dskit/tenant" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" + "github.com/grafana/loki/v3/pkg/logproto" + + "github.com/grafana/loki/pkg/push" ) var ( + tenantID = "foo" streamBar = logproto.Stream{ Labels: labels.Labels{labels.Label{Name: "stream", Value: "1"}}.String(), Entries: []logproto.Entry{ @@ -47,9 +51,13 @@ var ( type fakePusher struct { pushes []*logproto.PushRequest + t *testing.T } func (f *fakePusher) Push(ctx context.Context, in *logproto.PushRequest) (*logproto.PushResponse, error) { + tenant, err := tenant.TenantID(ctx) + require.NoError(f.t, err) + require.Equal(f.t, tenant, tenant) // we need to copy in as it will be reused by the decoder. req := &logproto.PushRequest{} for _, s := range in.Streams { @@ -70,10 +78,9 @@ func (noopCommitter) Commit(ctx context.Context, offset int64) error { return ni func TestConsumer(t *testing.T) { var ( - toPush []partition.Record - offset = int64(0) - pusher = &fakePusher{} - tenantID = "foo" + toPush []partition.Record + offset = int64(0) + pusher = &fakePusher{t: t} ) consumer, err := NewKafkaConsumerFactory(pusher, log.NewLogfmtLogger(os.Stdout), prometheus.NewRegistry())(&noopCommitter{}) diff --git a/pkg/kafka/partition/committer.go b/pkg/kafka/partition/committer.go index 444eb50de06b4..3a4394bfc1afe 100644 --- a/pkg/kafka/partition/committer.go +++ b/pkg/kafka/partition/committer.go @@ -8,11 +8,12 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/loki/v3/pkg/kafka" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" "go.uber.org/atomic" + + "github.com/grafana/loki/v3/pkg/kafka" ) // Committer defines an interface for committing offsets diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index d5979fae5a6e9..9720e059ae566 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -10,13 +10,14 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" - "github.com/grafana/loki/v3/pkg/kafka" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/plugin/kprom" + + "github.com/grafana/loki/v3/pkg/kafka" ) // Reader is responsible for reading data from a specific Kafka partition From 240f90c1157e4b5fb3684b6f9c47fae6b0f143f3 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 23 Sep 2024 09:24:18 +0200 Subject: [PATCH 06/10] make lint --- pkg/ingester/kafka_consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/kafka_consumer_test.go b/pkg/ingester/kafka_consumer_test.go index 4df6705c7f72e..7a2ba5887d08e 100644 --- a/pkg/ingester/kafka_consumer_test.go +++ b/pkg/ingester/kafka_consumer_test.go @@ -74,7 +74,7 @@ func (f *fakePusher) Push(ctx context.Context, in *logproto.PushRequest) (*logpr type noopCommitter struct{} -func (noopCommitter) Commit(ctx context.Context, offset int64) error { return nil } +func (noopCommitter) Commit(_ context.Context, _ int64) error { return nil } func TestConsumer(t *testing.T) { var ( From 3c42077d033c6f8e594f770c39e75ccd0afa2935 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 23 Sep 2024 12:20:09 +0200 Subject: [PATCH 07/10] Update pkg/kafka/config.go Co-authored-by: Joao Marcal --- pkg/kafka/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 1b431448ffacd..7f981b7b5e739 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -87,7 +87,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+".last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.") f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic if it doesn't exist.") - f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 1000, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.") + f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 1000, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Loki component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Loki proceeds anyways, but auto-created topics could have an incorrect number of partitions.") f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.") f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") From 7aba37a09b9125dfad9e0d0706d9d825a0752849 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 23 Sep 2024 13:52:36 +0200 Subject: [PATCH 08/10] review feedback --- docs/sources/shared/configuration.md | 4 ++-- pkg/ingester/downscale.go | 2 +- pkg/kafka/reader_client.go | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d72999eb0bf86..56bd7ae553e3e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -817,11 +817,11 @@ kafka_config: # When auto-creation of Kafka topic is enabled and this value is positive, # Kafka's num.partitions configuration option is set on Kafka brokers with - # this value when Mimir component that uses Kafka starts. This configuration + # this value when Loki component that uses Kafka starts. This configuration # option specifies the default number of partitions that the Kafka broker uses # for auto-created topics. Note that this is a Kafka-cluster wide setting, and # applies to any auto-created topic. If the setting of num.partitions fails, - # Mimir proceeds anyways, but auto-created topics could have an incorrect + # Loki proceeds anyways, but auto-created topics could have an incorrect # number of partitions. # CLI flag: -kafka.auto-create-topic-default-partitions [auto_create_topic_default_partitions: | default = 1000] diff --git a/pkg/ingester/downscale.go b/pkg/ingester/downscale.go index 560493765ce37..55b3ee2d0ae9b 100644 --- a/pkg/ingester/downscale.go +++ b/pkg/ingester/downscale.go @@ -24,7 +24,7 @@ import ( // INACTIVE state happened. // // - DELETE -// Sets partition back from INACTIVE to ACTIVE state. +// Sets partition back from INACTIVE to ACTIVE state, and returns 0 signalling the partition is not in INACTIVE state func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) { logger := log.With(i.logger, "partition", i.ingestPartitionID) diff --git a/pkg/kafka/reader_client.go b/pkg/kafka/reader_client.go index 2383326979a03..9237686fee609 100644 --- a/pkg/kafka/reader_client.go +++ b/pkg/kafka/reader_client.go @@ -13,10 +13,10 @@ import ( ) // NewReaderClient returns the kgo.Client that should be used by the Reader. -func NewReaderClient(cfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) { +func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) { const fetchMaxBytes = 100_000_000 - opts = append(opts, commonKafkaClientOptions(cfg, metrics, logger)...) + opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...) opts = append(opts, kgo.FetchMinBytes(1), kgo.FetchMaxBytes(fetchMaxBytes), @@ -32,8 +32,8 @@ func NewReaderClient(cfg Config, metrics *kprom.Metrics, logger log.Logger, opts if err != nil { return nil, errors.Wrap(err, "creating kafka client") } - if cfg.AutoCreateTopicEnabled { - cfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + if kafkaCfg.AutoCreateTopicEnabled { + kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) } return client, nil } From 077b80b7f1cbc01f64e1d952f3ba5af73e4494f5 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 23 Sep 2024 15:18:11 +0200 Subject: [PATCH 09/10] Update pkg/kafka/partition/committer.go Co-authored-by: benclive --- pkg/kafka/partition/committer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kafka/partition/committer.go b/pkg/kafka/partition/committer.go index 3a4394bfc1afe..c3a1f796e0e41 100644 --- a/pkg/kafka/partition/committer.go +++ b/pkg/kafka/partition/committer.go @@ -41,7 +41,7 @@ type partitionCommitter struct { cancel context.CancelFunc } -// newCommitter creates and initializes a new partitionCommitter. +// newCommitter creates and initializes a new Committer. // It sets up the necessary metrics and initializes the committer with the provided configuration. func newCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter { c := &partitionCommitter{ From bea9573d61a1121c96ee23ed68a03f1948149472 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 23 Sep 2024 15:30:25 +0200 Subject: [PATCH 10/10] Rename downscale endpoint to use underscore --- pkg/loki/modules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index ed20ab6e6338f..76b03fce77371 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -616,7 +616,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) { t.Server.HTTP.Methods("POST", "GET", "DELETE").Path("/ingester/prepare_shutdown").Handler( httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.PrepareShutdown)), ) - t.Server.HTTP.Methods("POST", "GET", "DELETE").Path("/ingester/prepare-partition-downscale").Handler( + t.Server.HTTP.Methods("POST", "GET", "DELETE").Path("/ingester/prepare_partition_downscale").Handler( httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.PreparePartitionDownscaleHandler)), ) t.Server.HTTP.Methods("POST", "GET").Path("/ingester/shutdown").Handler(