diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 2f29e1a3e04ed..56bd7ae553e3e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -801,6 +801,12 @@ kafka_config: # CLI flag: -kafka.consumer-group [consumer_group: | default = ""] + # How frequently a consumer should commit the consumed offset to Kafka. The + # last committed offset is used at startup to continue the consumption from + # where it was left. + # CLI flag: -kafka.consumer-group-offset-commit-interval + [consumer_group_offset_commit_interval: | default = 1s] + # How long to retry a failed request to get the last produced offset. # CLI flag: -kafka.last-produced-offset-retry-timeout [last_produced_offset_retry_timeout: | default = 10s] @@ -809,6 +815,17 @@ kafka_config: # CLI flag: -kafka.auto-create-topic-enabled [auto_create_topic_enabled: | default = true] + # When auto-creation of Kafka topic is enabled and this value is positive, + # Kafka's num.partitions configuration option is set on Kafka brokers with + # this value when Loki component that uses Kafka starts. This configuration + # option specifies the default number of partitions that the Kafka broker uses + # for auto-created topics. Note that this is a Kafka-cluster wide setting, and + # applies to any auto-created topic. If the setting of num.partitions fails, + # Loki proceeds anyways, but auto-created topics could have an incorrect + # number of partitions. + # CLI flag: -kafka.auto-create-topic-default-partitions + [auto_create_topic_default_partitions: | default = 1000] + # The maximum size of a Kafka record data that should be generated by the # producer. An incoming write request larger than this size is split into # multiple Kafka records. We strongly recommend to not change this setting diff --git a/pkg/ingester/downscale.go b/pkg/ingester/downscale.go new file mode 100644 index 0000000000000..55b3ee2d0ae9b --- /dev/null +++ b/pkg/ingester/downscale.go @@ -0,0 +1,104 @@ +package ingester + +import ( + "net/http" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + + "github.com/grafana/loki/v3/pkg/util" +) + +// PreparePartitionDownscaleHandler prepares the ingester's partition downscaling. The partition owned by the +// ingester will switch to INACTIVE state (read-only). +// +// Following methods are supported: +// +// - GET +// Returns timestamp when partition was switched to INACTIVE state, or 0, if partition is not in INACTIVE state. +// +// - POST +// Switches the partition to INACTIVE state (if not yet), and returns the timestamp when the switch to +// INACTIVE state happened. +// +// - DELETE +// Sets partition back from INACTIVE to ACTIVE state, and returns 0 signalling the partition is not in INACTIVE state +func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) { + logger := log.With(i.logger, "partition", i.ingestPartitionID) + + // Don't allow callers to change the shutdown configuration while we're in the middle + // of starting or shutting down. + if i.State() != services.Running { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + if !i.cfg.KafkaIngestion.Enabled { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + switch r.Method { + case http.MethodPost: + // It's not allowed to prepare the downscale while in PENDING state. Why? Because if the downscale + // will be later cancelled, we don't know if it was requested in PENDING or ACTIVE state, so we + // don't know to which state reverting back. Given a partition is expected to stay in PENDING state + // for a short period, we simply don't allow this case. + state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if state == ring.PartitionPending { + level.Warn(logger).Log("msg", "received a request to prepare partition for shutdown, but the request can't be satisfied because the partition is in PENDING state") + w.WriteHeader(http.StatusConflict) + return + } + + if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionInactive); err != nil { + level.Error(logger).Log("msg", "failed to change partition state to inactive", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + case http.MethodDelete: + state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + // If partition is inactive, make it active. We ignore other states Active and especially Pending. + if state == ring.PartitionInactive { + // We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency + // in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried. + // Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design. + // We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than + // "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer + // than "lookback period" ago, it looks to be an edge case not worth to address. + if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil { + level.Error(logger).Log("msg", "failed to change partition state to active", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + } + + state, stateTimestamp, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if state == ring.PartitionInactive { + util.WriteJSONResponse(w, map[string]any{"timestamp": stateTimestamp.Unix()}) + } else { + util.WriteJSONResponse(w, map[string]any{"timestamp": 0}) + } +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4e7ac11f515c0..3a18000af271d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -18,6 +18,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/ring" @@ -37,6 +38,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/kafka/partitionring" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" @@ -225,6 +227,7 @@ type Interface interface { GetOrCreateInstance(instanceID string) (*instance, error) ShutdownHandler(w http.ResponseWriter, r *http.Request) PrepareShutdown(w http.ResponseWriter, r *http.Request) + PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) } // Ingester builds chunks for incoming log streams. @@ -290,6 +293,10 @@ type Ingester struct { // recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance. readRing ring.ReadRing recalculateOwnedStreams *recalculateOwnedStreams + + ingestPartitionID int32 + partitionRingLifecycler *ring.PartitionInstanceLifecycler + partitionReader *partition.Reader } // New makes a new Ingester. @@ -353,6 +360,34 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con i.lifecyclerWatcher = services.NewFailureWatcher() i.lifecyclerWatcher.WatchService(i.lifecycler) + if i.cfg.KafkaIngestion.Enabled { + i.ingestPartitionID, err = partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID) + if err != nil { + return nil, fmt.Errorf("calculating ingester partition ID: %w", err) + } + partitionRingKV := cfg.KafkaIngestion.PartitionRingConfig.KVStore.Mock + if partitionRingKV == nil { + partitionRingKV, err = kv.NewClient(cfg.KafkaIngestion.PartitionRingConfig.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger) + if err != nil { + return nil, fmt.Errorf("creating KV store for ingester partition ring: %w", err) + } + } + i.partitionRingLifecycler = ring.NewPartitionInstanceLifecycler( + i.cfg.KafkaIngestion.PartitionRingConfig.ToLifecyclerConfig(i.ingestPartitionID, cfg.LifecyclerConfig.ID), + PartitionRingName, + PartitionRingKey, + partitionRingKV, + logger, + prometheus.WrapRegistererWithPrefix("loki_", registerer)) + + i.partitionReader, err = partition.NewReader(cfg.KafkaIngestion.KafkaConfig, i.ingestPartitionID, cfg.LifecyclerConfig.ID, NewKafkaConsumerFactory(i, logger, registerer), logger, registerer) + if err != nil { + return nil, err + } + i.lifecyclerWatcher.WatchService(i.partitionRingLifecycler) + i.lifecyclerWatcher.WatchService(i.partitionReader) + } + // Now that the lifecycler has been created, we can create the limiter // which depends on it. i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) @@ -461,7 +496,15 @@ func (i *Ingester) setupAutoForget() { }() } -func (i *Ingester) starting(ctx context.Context) error { +func (i *Ingester) starting(ctx context.Context) (err error) { + defer func() { + if err != nil { + // if starting() fails for any reason (e.g., context canceled), + // the lifecycler must be stopped. + _ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler) + } + }() + if i.cfg.WAL.Enabled { start := time.Now() @@ -546,17 +589,6 @@ func (i *Ingester) starting(ctx context.Context) error { i.InitFlushQueues() - // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done - err := i.lifecycler.StartAsync(context.Background()) - if err != nil { - return err - } - - err = i.lifecycler.AwaitRunning(ctx) - if err != nil { - return err - } - shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename) shutdownMarker, err := shutdownMarkerExists(shutdownMarkerPath) if err != nil { @@ -568,6 +600,26 @@ func (i *Ingester) starting(ctx context.Context) error { i.setPrepareShutdown() } + // When kafka ingestion is enabled, we have to make sure that reader catches up replaying the partition + // BEFORE the ingester ring lifecycler is started, because once the ingester ring lifecycler will start + // it will switch the ingester state in the ring to ACTIVE. + if i.partitionReader != nil { + if err := services.StartAndAwaitRunning(ctx, i.partitionReader); err != nil { + return fmt.Errorf("failed to start partition reader: %w", err) + } + } + + // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done + err = i.lifecycler.StartAsync(context.Background()) + if err != nil { + return err + } + + err = i.lifecycler.AwaitRunning(ctx) + if err != nil { + return err + } + err = i.recalculateOwnedStreams.StartAsync(ctx) if err != nil { return fmt.Errorf("can not start recalculate owned streams service: %w", err) @@ -578,6 +630,11 @@ func (i *Ingester) starting(ctx context.Context) error { return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err) } + if i.partitionRingLifecycler != nil { + if err := services.StartAndAwaitRunning(ctx, i.partitionRingLifecycler); err != nil { + return fmt.Errorf("failed to start partition ring lifecycler: %w", err) + } + } // start our loop i.loopDone.Add(1) go i.loop() @@ -610,6 +667,19 @@ func (i *Ingester) running(ctx context.Context) error { // At this point, loop no longer runs, but flushers are still running. func (i *Ingester) stopping(_ error) error { i.stopIncomingRequests() + + if i.partitionReader != nil { + if err := services.StopAndAwaitTerminated(context.Background(), i.partitionReader); err != nil { + level.Warn(i.logger).Log("msg", "failed to stop partition reader", "err", err) + } + } + + if i.partitionRingLifecycler != nil { + if err := services.StopAndAwaitTerminated(context.Background(), i.partitionRingLifecycler); err != nil { + level.Warn(i.logger).Log("msg", "failed to stop partition ring lifecycler", "err", err) + } + } + var errs util.MultiError errs.Add(i.wal.Stop()) @@ -766,6 +836,18 @@ func (i *Ingester) setPrepareShutdown() { i.lifecycler.SetUnregisterOnShutdown(true) i.terminateOnShutdown = true i.metrics.shutdownMarker.Set(1) + + if i.partitionRingLifecycler != nil { + // When the prepare shutdown endpoint is called there are two changes in the partitions ring behavior: + // + // 1. If setPrepareShutdown() is called at startup, because of the shutdown marker found on disk, + // the ingester shouldn't create the partition if doesn't exist, because we expect the ingester will + // be scaled down shortly after. + // 2. When the ingester will shutdown we'll have to remove the ingester from the partition owners, + // because we expect the ingester to be scaled down. + i.partitionRingLifecycler.SetCreatePartitionOnStartup(false) + i.partitionRingLifecycler.SetRemoveOwnerOnShutdown(true) + } } func (i *Ingester) unsetPrepareShutdown() { diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go new file mode 100644 index 0000000000000..52c5ba96a661e --- /dev/null +++ b/pkg/ingester/kafka_consumer.go @@ -0,0 +1,113 @@ +package ingester + +import ( + "context" + math "math" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" + "github.com/grafana/loki/v3/pkg/logproto" +) + +type consumerMetrics struct { + consumeLatency prometheus.Histogram + currentOffset prometheus.Gauge +} + +// newConsumerMetrics initializes and returns a new consumerMetrics instance +func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics { + return &consumerMetrics{ + consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingester_partition_records_batch_process_duration_seconds", + Help: "How long a kafka ingester consumer spent processing a batch of records from Kafka.", + NativeHistogramBucketFactor: 1.1, + }), + currentOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_partition_current_offset", + Help: "The current offset of the Kafka ingester consumer.", + }), + } +} + +func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger, reg prometheus.Registerer) partition.ConsumerFactory { + metrics := newConsumerMetrics(reg) + return func(_ partition.Committer) (partition.Consumer, error) { + decoder, err := kafka.NewDecoder() + if err != nil { + return nil, err + } + return &kafkaConsumer{ + pusher: pusher, + logger: logger, + decoder: decoder, + metrics: metrics, + }, nil + } +} + +type kafkaConsumer struct { + pusher logproto.PusherServer + logger log.Logger + decoder *kafka.Decoder + + metrics *consumerMetrics +} + +func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + level.Info(kc.logger).Log("msg", "shutting down kafka consumer") + return + case records := <-recordsChan: + kc.consume(records) + } + } + }() + return wg.Wait +} + +func (kc *kafkaConsumer) consume(records []partition.Record) { + if len(records) == 0 { + return + } + var ( + minOffset = int64(math.MaxInt64) + maxOffset = int64(0) + consumeStart = time.Now() + ) + + for _, record := range records { + minOffset = min(minOffset, record.Offset) + maxOffset = max(maxOffset, record.Offset) + } + + level.Debug(kc.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset) + for _, record := range records { + stream, err := kc.decoder.DecodeWithoutLabels(record.Content) + if err != nil { + level.Error(kc.logger).Log("msg", "failed to decode record", "error", err) + continue + } + ctx := user.InjectOrgID(record.Ctx, record.TenantID) + if _, err := kc.pusher.Push(ctx, &logproto.PushRequest{ + Streams: []logproto.Stream{stream}, + }); err != nil { + level.Error(kc.logger).Log("msg", "failed to push records", "error", err) + } + } + kc.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) + kc.metrics.currentOffset.Set(float64(maxOffset)) +} diff --git a/pkg/ingester/kafka_consumer_test.go b/pkg/ingester/kafka_consumer_test.go new file mode 100644 index 0000000000000..7a2ba5887d08e --- /dev/null +++ b/pkg/ingester/kafka_consumer_test.go @@ -0,0 +1,130 @@ +package ingester + +import ( + "context" + "os" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/tenant" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" + "github.com/grafana/loki/v3/pkg/logproto" + + "github.com/grafana/loki/pkg/push" +) + +var ( + tenantID = "foo" + streamBar = logproto.Stream{ + Labels: labels.Labels{labels.Label{Name: "stream", Value: "1"}}.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1).UTC(), + Line: "1", + }, + { + Timestamp: time.Unix(0, 2).UTC(), + Line: "2", + }, + }, + } + streamFoo = logproto.Stream{ + Labels: labels.Labels{labels.Label{Name: "stream", Value: "2"}}.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1).UTC(), + Line: "3", + }, + { + Timestamp: time.Unix(0, 2).UTC(), + Line: "4", + }, + }, + } +) + +type fakePusher struct { + pushes []*logproto.PushRequest + t *testing.T +} + +func (f *fakePusher) Push(ctx context.Context, in *logproto.PushRequest) (*logproto.PushResponse, error) { + tenant, err := tenant.TenantID(ctx) + require.NoError(f.t, err) + require.Equal(f.t, tenant, tenant) + // we need to copy in as it will be reused by the decoder. + req := &logproto.PushRequest{} + for _, s := range in.Streams { + newStream := push.Stream{ + Labels: s.Labels, + Entries: make([]push.Entry, len(s.Entries)), + } + copy(newStream.Entries, s.Entries) + req.Streams = append(req.Streams, newStream) + } + f.pushes = append(f.pushes, req) + return nil, nil +} + +type noopCommitter struct{} + +func (noopCommitter) Commit(_ context.Context, _ int64) error { return nil } + +func TestConsumer(t *testing.T) { + var ( + toPush []partition.Record + offset = int64(0) + pusher = &fakePusher{t: t} + ) + + consumer, err := NewKafkaConsumerFactory(pusher, log.NewLogfmtLogger(os.Stdout), prometheus.NewRegistry())(&noopCommitter{}) + require.NoError(t, err) + + records, err := kafka.Encode(0, tenantID, streamBar, 10000) + require.NoError(t, err) + + for _, record := range records { + toPush = append(toPush, partition.Record{ + Ctx: context.Background(), + TenantID: tenantID, + Content: record.Value, + Offset: offset, + }) + offset++ + } + records, err = kafka.Encode(0, "foo", streamFoo, 10000) + require.NoError(t, err) + for _, record := range records { + toPush = append(toPush, partition.Record{ + Ctx: context.Background(), + TenantID: tenantID, + Content: record.Value, + Offset: offset, + }) + offset++ + } + + ctx, cancel := context.WithCancel(context.Background()) + recordChan := make(chan []partition.Record) + wait := consumer.Start(ctx, recordChan) + + recordChan <- toPush + + cancel() + wait() + + require.Equal(t, []*logproto.PushRequest{ + { + Streams: []logproto.Stream{streamBar}, + }, + { + Streams: []logproto.Stream{streamFoo}, + }, + }, pusher.pushes) +} diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index f916b145f0084..7f981b7b5e739 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -1,12 +1,18 @@ package kafka import ( + "context" "errors" "flag" "fmt" "strconv" "strings" "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" ) const ( @@ -52,12 +58,13 @@ type Config struct { DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` - ConsumerGroup string `yaml:"consumer_group"` + ConsumerGroup string `yaml:"consumer_group"` + ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"` LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"` - AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` - // AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` + AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` + AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"` @@ -75,11 +82,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.") f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.") + f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.") f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+".last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.") f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic if it doesn't exist.") - // f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.") + f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 1000, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Loki component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Loki proceeds anyways, but auto-created topics could have an incorrect number of partitions.") f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.") f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") @@ -107,3 +115,35 @@ func (cfg *Config) GetConsumerGroup(instanceID string, partitionID int32) string return strings.ReplaceAll(cfg.ConsumerGroup, "", strconv.Itoa(int(partitionID))) } + +// SetDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers. +// This is best-effort, if setting the option fails, error is logged, but not returned. +func (cfg Config) SetDefaultNumberOfPartitionsForAutocreatedTopics(logger log.Logger) { + if cfg.AutoCreateTopicDefaultPartitions <= 0 { + return + } + + cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...) + if err != nil { + level.Error(logger).Log("msg", "failed to create kafka client", "err", err) + return + } + + adm := kadm.NewClient(cl) + defer adm.Close() + + defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions) + _, err = adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{ + { + Op: kadm.SetConfig, + Name: "num.partitions", + Value: &defaultNumberOfPartitions, + }, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err) + return + } + + level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions) +} diff --git a/pkg/kafka/config_test.go b/pkg/kafka/config_test.go new file mode 100644 index 0000000000000..7c21e38fd141e --- /dev/null +++ b/pkg/kafka/config_test.go @@ -0,0 +1,41 @@ +package kafka + +import ( + "testing" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) { + cluster, err := kfake.NewCluster(kfake.NumBrokers(1)) + require.NoError(t, err) + t.Cleanup(cluster.Close) + + addrs := cluster.ListenAddrs() + require.Len(t, addrs, 1) + + cfg := Config{ + Address: addrs[0], + AutoCreateTopicDefaultPartitions: 100, + } + + cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) { + r := request.(*kmsg.AlterConfigsRequest) + + require.Len(t, r.Resources, 1) + res := r.Resources[0] + require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType) + require.Len(t, res.Configs, 1) + cfg := res.Configs[0] + require.Equal(t, "num.partitions", cfg.Name) + require.NotNil(t, *cfg.Value) + require.Equal(t, "100", *cfg.Value) + + return &kmsg.AlterConfigsResponse{}, nil, true + }) + + cfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(log.NewNopLogger()) +} diff --git a/pkg/kafka/encoding.go b/pkg/kafka/encoding.go index c4977054f32f6..65daf59c25e77 100644 --- a/pkg/kafka/encoding.go +++ b/pkg/kafka/encoding.go @@ -167,6 +167,15 @@ func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) { return *d.stream, ls, nil } +// DecodeWithoutLabels converts a Kafka record's byte data back into a logproto.Stream without parsing labels. +func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) { + d.stream.Entries = d.stream.Entries[:0] + if err := d.stream.Unmarshal(data); err != nil { + return logproto.Stream{}, fmt.Errorf("failed to unmarshal stream: %w", err) + } + return *d.stream, nil +} + // sovPush calculates the size of varint-encoded uint64. // It is used to determine the number of bytes needed to encode a uint64 value // in Protocol Buffers' variable-length integer format. diff --git a/pkg/kafka/ingester/consumer.go b/pkg/kafka/ingester/consumer.go index d011ae16517d2..57abb2b00ff3f 100644 --- a/pkg/kafka/ingester/consumer.go +++ b/pkg/kafka/ingester/consumer.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/wal" ) @@ -36,17 +37,12 @@ type MetadataStore interface { AddBlock(ctx context.Context, in *metastorepb.AddBlockRequest, opts ...grpc.CallOption) (*metastorepb.AddBlockResponse, error) } -// Committer defines an interface for committing offsets -type Committer interface { - Commit(ctx context.Context, offset int64) error -} - // consumer represents a Kafka consumer that processes and stores log entries type consumer struct { metastoreClient MetadataStore storage ObjectStorage writer *wal.SegmentWriter - committer Committer + committer partition.Committer flushInterval time.Duration maxFlushSize int64 lastOffset int64 @@ -67,8 +63,8 @@ func NewConsumerFactory( maxFlushSize int64, logger log.Logger, reg prometheus.Registerer, -) ConsumerFactory { - return func(committer Committer) (Consumer, error) { +) partition.ConsumerFactory { + return func(committer partition.Committer) (partition.Consumer, error) { writer, err := wal.NewWalSegmentWriter() if err != nil { return nil, err @@ -95,7 +91,7 @@ func NewConsumerFactory( // Start starts the consumer and returns a function to wait for it to finish // It consumes records from the recordsChan, and flushes them to storage periodically. -func (c *consumer) Start(ctx context.Context, recordsChan <-chan []record) func() { +func (c *consumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() { var wg sync.WaitGroup wg.Add(1) go func() { @@ -127,7 +123,7 @@ func (c *consumer) Start(ctx context.Context, recordsChan <-chan []record) func( } // consume processes a batch of Kafka records, decoding and storing them -func (c *consumer) consume(records []record) error { +func (c *consumer) consume(records []partition.Record) error { if len(records) == 0 { return nil } @@ -136,8 +132,8 @@ func (c *consumer) consume(records []record) error { maxOffset = int64(0) ) for _, record := range records { - minOffset = min(minOffset, record.offset) - maxOffset = max(maxOffset, record.offset) + minOffset = min(minOffset, record.Offset) + maxOffset = max(maxOffset, record.Offset) } level.Debug(c.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset) return c.retryWithBackoff(context.Background(), backoff.Config{ @@ -163,9 +159,9 @@ func (c *consumer) consume(records []record) error { }) } -func (c *consumer) appendRecords(records []record) error { +func (c *consumer) appendRecords(records []partition.Record) error { for _, record := range records { - stream, labels, err := c.decoder.Decode(record.content) + stream, labels, err := c.decoder.Decode(record.Content) if err != nil { return fmt.Errorf("failed to decode record: %w", err) } @@ -184,7 +180,7 @@ func (c *consumer) appendRecords(records []record) error { Parsed: entry.Parsed, }) } - c.writer.Append(record.tenantID, stream.Labels, labels, c.toStore, time.Now()) + c.writer.Append(record.TenantID, stream.Labels, labels, c.toStore, time.Now()) } return nil } diff --git a/pkg/kafka/ingester/consumer_test.go b/pkg/kafka/ingester/consumer_test.go index 3f0adcce6247d..c6e14ddebeca8 100644 --- a/pkg/kafka/ingester/consumer_test.go +++ b/pkg/kafka/ingester/consumer_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -50,7 +51,7 @@ func TestConsumer_PeriodicFlush(t *testing.T) { consumer, err := consumerFactory(committer) require.NoError(t, err) - recordsChan := make(chan []record) + recordsChan := make(chan []partition.Record) _ = consumer.Start(ctx, recordsChan) stream := logproto.Stream{ @@ -63,10 +64,10 @@ func TestConsumer_PeriodicFlush(t *testing.T) { encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) require.NoError(t, err) - records := []record{{ - tenantID: "tenant1", - content: encodedRecords[0].Value, - offset: 0, + records := []partition.Record{{ + TenantID: "tenant1", + Content: encodedRecords[0].Value, + Offset: 0, }} recordsChan <- records @@ -103,7 +104,7 @@ func TestConsumer_ShutdownFlush(t *testing.T) { consumer, err := consumerFactory(committer) require.NoError(t, err) - recordsChan := make(chan []record) + recordsChan := make(chan []partition.Record) wait := consumer.Start(ctx, recordsChan) stream := logproto.Stream{ @@ -116,10 +117,10 @@ func TestConsumer_ShutdownFlush(t *testing.T) { encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) require.NoError(t, err) - records := []record{{ - tenantID: "tenant1", - content: encodedRecords[0].Value, - offset: 0, + records := []partition.Record{{ + TenantID: "tenant1", + Content: encodedRecords[0].Value, + Offset: 0, }} recordsChan <- records @@ -157,7 +158,7 @@ func TestConsumer_MaxFlushSize(t *testing.T) { consumer, err := consumerFactory(committer) require.NoError(t, err) - recordsChan := make(chan []record) + recordsChan := make(chan []partition.Record) _ = consumer.Start(ctx, recordsChan) stream := logproto.Stream{ @@ -170,10 +171,10 @@ func TestConsumer_MaxFlushSize(t *testing.T) { encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) require.NoError(t, err) - records := []record{{ - tenantID: "tenant1", - content: encodedRecords[0].Value, - offset: 0, + records := []partition.Record{{ + TenantID: "tenant1", + Content: encodedRecords[0].Value, + Offset: 0, }} recordsChan <- records diff --git a/pkg/kafka/ingester/ingester.go b/pkg/kafka/ingester/ingester.go index ef356778e4390..39595df142ba7 100644 --- a/pkg/kafka/ingester/ingester.go +++ b/pkg/kafka/ingester/ingester.go @@ -6,9 +6,6 @@ import ( "flag" "fmt" "net/http" - "regexp" - "strconv" - "strings" "time" "github.com/go-kit/log" @@ -21,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/kafka/ingester/shutdownmarker" + "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/kafka/partitionring" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -33,7 +31,6 @@ const ( ) var ( - ingesterIDRegexp = regexp.MustCompile("-([0-9]+)$") defaultFlushInterval = 15 * time.Second defaultFlushSize int64 = 300 << 20 // 300 MB ) @@ -98,19 +95,19 @@ type Ingester struct { lifecyclerWatcher *services.FailureWatcher ingesterPartitionID int32 partitionRingLifecycler *ring.PartitionInstanceLifecycler - partitionReader *PartitionReader + partitionReader *partition.Reader } // New makes a new Ingester. func New(cfg Config, - consumerFactory ConsumerFactory, + consumerFactory partition.ConsumerFactory, logger log.Logger, metricsNamespace string, registerer prometheus.Registerer, ) (*Ingester, error) { metrics := newIngesterMetrics(registerer) - ingesterPartitionID, err := extractIngesterPartitionID(cfg.LifecyclerConfig.ID) + ingesterPartitionID, err := partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID) if err != nil { return nil, fmt.Errorf("calculating ingester partition ID: %w", err) } @@ -142,7 +139,7 @@ func New(cfg Config, if err != nil { return nil, err } - i.partitionReader, err = NewPartitionReader(cfg.KafkaConfig, ingesterPartitionID, cfg.LifecyclerConfig.ID, consumerFactory, logger, registerer) + i.partitionReader, err = partition.NewReader(cfg.KafkaConfig, ingesterPartitionID, cfg.LifecyclerConfig.ID, consumerFactory, logger, registerer) if err != nil { return nil, err } @@ -157,25 +154,6 @@ func New(cfg Config, return i, nil } -// ingesterPartitionID returns the partition ID owner the the given ingester. -func extractIngesterPartitionID(ingesterID string) (int32, error) { - if strings.Contains(ingesterID, "local") { - return 0, nil - } - - match := ingesterIDRegexp.FindStringSubmatch(ingesterID) - if len(match) == 0 { - return 0, fmt.Errorf("ingester ID %s doesn't match regular expression %q", ingesterID, ingesterIDRegexp.String()) - } - // Parse the ingester sequence number. - ingesterSeq, err := strconv.Atoi(match[1]) - if err != nil { - return 0, fmt.Errorf("no ingester sequence number in ingester ID %s", ingesterID) - } - - return int32(ingesterSeq), nil -} - // ServeHTTP implements the pattern ring status page. func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) { i.lifecycler.ServeHTTP(w, r) diff --git a/pkg/kafka/ingester/ingester_test.go b/pkg/kafka/ingester/ingester_test.go index a3bcca72ca3d8..c7d62b9593a4c 100644 --- a/pkg/kafka/ingester/ingester_test.go +++ b/pkg/kafka/ingester/ingester_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/go-kit/log" - gokitlog "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/ring" @@ -28,8 +27,8 @@ func TestPreparePartitionDownscaleHandler(t *testing.T) { storage, err := objstore.NewTestStorage(t) require.NoError(t, err) ing, err := New(cfg, - NewConsumerFactory(NewTestMetastore(), storage, cfg.FlushInterval, cfg.FlushSize, gokitlog.NewNopLogger(), prometheus.NewRegistry()), - gokitlog.NewNopLogger(), "test", prometheus.NewRegistry()) + NewConsumerFactory(NewTestMetastore(), storage, cfg.FlushInterval, cfg.FlushSize, log.NewNopLogger(), prometheus.NewRegistry()), + log.NewNopLogger(), "test", prometheus.NewRegistry()) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), ing) require.NoError(t, err) @@ -99,53 +98,6 @@ func defaultIngesterTestConfig(t testing.TB) Config { return cfg } -func TestExtractIngesterPartitionID(t *testing.T) { - tests := []struct { - name string - ingesterID string - want int32 - wantErr bool - }{ - { - name: "Valid ingester ID", - ingesterID: "ingester-5", - want: 5, - wantErr: false, - }, - { - name: "Local ingester ID", - ingesterID: "ingester-local", - want: 0, - wantErr: false, - }, - { - name: "Invalid ingester ID format", - ingesterID: "invalid-format", - want: 0, - wantErr: true, - }, - { - name: "Invalid sequence number", - ingesterID: "ingester-abc", - want: 0, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := extractIngesterPartitionID(tt.ingesterID) - if (err != nil) != tt.wantErr { - t.Errorf("extractIngesterPartitionID() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("extractIngesterPartitionID() = %v, want %v", got, tt.want) - } - }) - } -} - // TestMetastore is a simple in-memory metastore for testing type TestMetastore struct { blocks map[string][]*metastorepb.BlockMeta diff --git a/pkg/kafka/ingester/partition_committer.go b/pkg/kafka/partition/committer.go similarity index 70% rename from pkg/kafka/ingester/partition_committer.go rename to pkg/kafka/partition/committer.go index a76e363a64e4b..c3a1f796e0e41 100644 --- a/pkg/kafka/ingester/partition_committer.go +++ b/pkg/kafka/partition/committer.go @@ -1,8 +1,9 @@ -package ingester +package partition import ( "context" "strconv" + "sync" "time" "github.com/go-kit/log" @@ -10,10 +11,16 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" + "go.uber.org/atomic" "github.com/grafana/loki/v3/pkg/kafka" ) +// Committer defines an interface for committing offsets +type Committer interface { + Commit(ctx context.Context, offset int64) error +} + // partitionCommitter is responsible for committing offsets for a specific Kafka partition // to the Kafka broker. It also tracks metrics related to the commit process. type partitionCommitter struct { @@ -28,11 +35,15 @@ type partitionCommitter struct { kafkaCfg kafka.Config partitionID int32 consumerGroup string + + toCommit *atomic.Int64 + wg sync.WaitGroup + cancel context.CancelFunc } -// newPartitionCommitter creates and initializes a new partitionCommitter. +// newCommitter creates and initializes a new Committer. // It sets up the necessary metrics and initializes the committer with the provided configuration. -func newPartitionCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter { +func newCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter { c := &partitionCommitter{ logger: logger, kafkaCfg: kafkaCfg, @@ -63,14 +74,51 @@ func newPartitionCommitter(kafkaCfg kafka.Config, admClient *kadm.Client, partit Help: "The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet.", ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))}, }), + toCommit: atomic.NewInt64(-1), } // Initialise the last committed offset metric to -1 to signal no offset has been committed yet (0 is a valid offset). c.lastCommittedOffset.Set(-1) + if kafkaCfg.ConsumerGroupOffsetCommitInterval > 0 { + c.wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + go c.autoCommitLoop(ctx) + } + return c } +func (r *partitionCommitter) autoCommitLoop(ctx context.Context) { + defer r.wg.Done() + commitTicker := time.NewTicker(r.kafkaCfg.ConsumerGroupOffsetCommitInterval) + defer commitTicker.Stop() + + previousOffset := r.toCommit.Load() + for { + select { + case <-ctx.Done(): + return + case <-commitTicker.C: + currOffset := r.toCommit.Load() + if currOffset == previousOffset { + continue + } + + if err := r.Commit(ctx, currOffset); err == nil { + previousOffset = currOffset + } + } + } +} + +func (r *partitionCommitter) enqueueOffset(o int64) { + if r.kafkaCfg.ConsumerGroupOffsetCommitInterval > 0 { + r.toCommit.Store(o) + } +} + // commit attempts to commit the given offset to Kafka for the partition this committer is responsible for. // It updates relevant metrics and logs the result of the commit operation. func (r *partitionCommitter) Commit(ctx context.Context, offset int64) (returnErr error) { @@ -101,3 +149,18 @@ func (r *partitionCommitter) Commit(ctx context.Context, offset int64) (returnEr r.lastCommittedOffset.Set(float64(committedOffset.At)) return nil } + +func (r *partitionCommitter) Stop() { + if r.kafkaCfg.ConsumerGroupOffsetCommitInterval <= 0 { + return + } + r.cancel() + r.wg.Wait() + + offset := r.toCommit.Load() + if offset < 0 { + return + } + // Commit has internal timeouts, so this call shouldn't block for too long. + _ = r.Commit(context.Background(), offset) +} diff --git a/pkg/kafka/ingester/partition_committer_test.go b/pkg/kafka/partition/committer_test.go similarity index 95% rename from pkg/kafka/ingester/partition_committer_test.go rename to pkg/kafka/partition/committer_test.go index 8fb823e3f2ed5..9ef02f910e5d0 100644 --- a/pkg/kafka/ingester/partition_committer_test.go +++ b/pkg/kafka/partition/committer_test.go @@ -1,4 +1,4 @@ -package ingester +package partition import ( "context" @@ -36,7 +36,7 @@ func TestPartitionCommitter(t *testing.T) { reg := prometheus.NewRegistry() partitionID := int32(1) consumerGroup := "test-consumer-group" - committer := newPartitionCommitter(kafkaCfg, admClient, partitionID, consumerGroup, logger, reg) + committer := newCommitter(kafkaCfg, admClient, partitionID, consumerGroup, logger, reg) // Test committing an offset ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/pkg/kafka/ingester/partition_reader.go b/pkg/kafka/partition/reader.go similarity index 84% rename from pkg/kafka/ingester/partition_reader.go rename to pkg/kafka/partition/reader.go index 5ed70412d9e0c..9720e059ae566 100644 --- a/pkg/kafka/ingester/partition_reader.go +++ b/pkg/kafka/partition/reader.go @@ -1,4 +1,4 @@ -package ingester +package partition import ( "context" @@ -20,10 +20,10 @@ import ( "github.com/grafana/loki/v3/pkg/kafka" ) -// PartitionReader is responsible for reading data from a specific Kafka partition +// Reader is responsible for reading data from a specific Kafka partition // and passing it to the consumer for processing. It is a core component of the // Loki ingester's Kafka-based ingestion pipeline. -type PartitionReader struct { +type Reader struct { services.Service kafkaCfg kafka.Config @@ -39,31 +39,31 @@ type PartitionReader struct { reg prometheus.Registerer } -type record struct { +type Record struct { // Context holds the tracing (and potentially other) info, that the record was enriched with on fetch from Kafka. - ctx context.Context - tenantID string - content []byte - offset int64 + Ctx context.Context + TenantID string + Content []byte + Offset int64 } type ConsumerFactory func(committer Committer) (Consumer, error) type Consumer interface { - Start(ctx context.Context, recordsChan <-chan []record) func() + Start(ctx context.Context, recordsChan <-chan []Record) func() } -// NewPartitionReader creates and initializes a new PartitionReader. +// NewReader creates and initializes a new PartitionReader. // It sets up the basic service and initializes the reader with the provided configuration. -func NewPartitionReader( +func NewReader( kafkaCfg kafka.Config, partitionID int32, instanceID string, consumerFactory ConsumerFactory, logger log.Logger, reg prometheus.Registerer, -) (*PartitionReader, error) { - r := &PartitionReader{ +) (*Reader, error) { + r := &Reader{ kafkaCfg: kafkaCfg, partitionID: partitionID, consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), @@ -79,7 +79,7 @@ func NewPartitionReader( // start initializes the Kafka client and committer for the PartitionReader. // This method is called when the PartitionReader service starts. -func (p *PartitionReader) start(_ context.Context) error { +func (p *Reader) start(_ context.Context) error { var err error p.client, err = kafka.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger, kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{ @@ -89,14 +89,14 @@ func (p *PartitionReader) start(_ context.Context) error { if err != nil { return errors.Wrap(err, "creating kafka reader client") } - p.committer = newPartitionCommitter(p.kafkaCfg, kadm.NewClient(p.client), p.partitionID, p.consumerGroup, p.logger, p.reg) - + p.committer = newCommitter(p.kafkaCfg, kadm.NewClient(p.client), p.partitionID, p.consumerGroup, p.logger, p.reg) + // todo: attempt to ensure max lag timestamp on startup. return nil } // run is the main loop of the PartitionReader. It continuously fetches and processes // data from Kafka, and send it to the consumer. -func (p *PartitionReader) run(ctx context.Context) error { +func (p *Reader) run(ctx context.Context) error { level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -110,11 +110,12 @@ func (p *PartitionReader) run(ctx context.Context) error { wait := consumer.Start(ctx, recordsChan) wait() + p.committer.Stop() return nil } -func (p *PartitionReader) startFetchLoop(ctx context.Context) <-chan []record { - records := make(chan []record) +func (p *Reader) startFetchLoop(ctx context.Context) <-chan []Record { + records := make(chan []Record) go func() { for { select { @@ -122,6 +123,7 @@ func (p *PartitionReader) startFetchLoop(ctx context.Context) <-chan []record { return default: records <- p.poll(ctx) + p.committer.enqueueOffset(p.lastProcessedOffset) } } }() @@ -129,7 +131,7 @@ func (p *PartitionReader) startFetchLoop(ctx context.Context) <-chan []record { } // logFetchErrors logs any errors encountered during the fetch operation. -func (p *PartitionReader) logFetchErrors(fetches kgo.Fetches) { +func (p *Reader) logFetchErrors(fetches kgo.Fetches) { mErr := multierror.New() fetches.EachError(func(topic string, partition int32, err error) { if errors.Is(err, context.Canceled) { @@ -148,7 +150,7 @@ func (p *PartitionReader) logFetchErrors(fetches kgo.Fetches) { } // pollFetches retrieves the next batch of records from Kafka and measures the fetch duration. -func (p *PartitionReader) poll(ctx context.Context) []record { +func (p *Reader) poll(ctx context.Context) []Record { defer func(start time.Time) { p.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) @@ -159,23 +161,27 @@ func (p *PartitionReader) poll(ctx context.Context) []record { if fetches.NumRecords() == 0 { return nil } - records := make([]record, 0, fetches.NumRecords()) + records := make([]Record, 0, fetches.NumRecords()) fetches.EachRecord(func(rec *kgo.Record) { - records = append(records, record{ + if rec.Partition != p.partitionID { + level.Error(p.logger).Log("msg", "wrong partition record received", "partition", rec.Partition, "expected_partition", p.partitionID) + return + } + records = append(records, Record{ // This context carries the tracing data for this individual record; // kotel populates this data when it fetches the messages. - ctx: rec.Context, - tenantID: string(rec.Key), - content: rec.Value, - offset: rec.Offset, + Ctx: rec.Context, + TenantID: string(rec.Key), + Content: rec.Value, + Offset: rec.Offset, }) }) - p.lastProcessedOffset = records[len(records)-1].offset + p.lastProcessedOffset = records[len(records)-1].Offset return records } // recordFetchesMetrics updates various metrics related to the fetch operation. -func (p *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) { +func (p *Reader) recordFetchesMetrics(fetches kgo.Fetches) { var ( now = time.Now() numRecords = 0 diff --git a/pkg/kafka/ingester/partition_reader_test.go b/pkg/kafka/partition/reader_test.go similarity index 86% rename from pkg/kafka/ingester/partition_reader_test.go rename to pkg/kafka/partition/reader_test.go index de71dc53b7691..addc5779bb6a8 100644 --- a/pkg/kafka/ingester/partition_reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -1,4 +1,4 @@ -package ingester +package partition import ( "context" @@ -21,17 +21,17 @@ import ( type mockConsumer struct { mock.Mock - recordsChan chan []record + recordsChan chan []Record wg sync.WaitGroup } func newMockConsumer() *mockConsumer { return &mockConsumer{ - recordsChan: make(chan []record, 100), + recordsChan: make(chan []Record, 100), } } -func (m *mockConsumer) Start(ctx context.Context, recordsChan <-chan []record) func() { +func (m *mockConsumer) Start(ctx context.Context, recordsChan <-chan []Record) func() { m.wg.Add(1) go func() { defer m.wg.Done() @@ -60,7 +60,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { return consumer, nil } - partitionReader, err := NewPartitionReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) + partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) @@ -90,8 +90,8 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { select { case receivedRecords := <-consumer.recordsChan: require.Len(t, receivedRecords, 1) - assert.Equal(t, "test-tenant", receivedRecords[0].tenantID) - assert.Equal(t, records[0].Value, receivedRecords[0].content) + assert.Equal(t, "test-tenant", receivedRecords[0].TenantID) + assert.Equal(t, records[0].Value, receivedRecords[0].Content) case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for records") } diff --git a/pkg/kafka/partitionring/parition_ring_test.go b/pkg/kafka/partitionring/parition_ring_test.go new file mode 100644 index 0000000000000..ad24e0c4ff1d7 --- /dev/null +++ b/pkg/kafka/partitionring/parition_ring_test.go @@ -0,0 +1,50 @@ +package partitionring + +import "testing" + +func TestExtractIngesterPartitionID(t *testing.T) { + tests := []struct { + name string + ingesterID string + want int32 + wantErr bool + }{ + { + name: "Valid ingester ID", + ingesterID: "ingester-5", + want: 5, + wantErr: false, + }, + { + name: "Local ingester ID", + ingesterID: "ingester-local", + want: 0, + wantErr: false, + }, + { + name: "Invalid ingester ID format", + ingesterID: "invalid-format", + want: 0, + wantErr: true, + }, + { + name: "Invalid sequence number", + ingesterID: "ingester-abc", + want: 0, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ExtractIngesterPartitionID(tt.ingesterID) + if (err != nil) != tt.wantErr { + t.Errorf("extractIngesterPartitionID() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("extractIngesterPartitionID() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/kafka/partitionring/partition_ring.go b/pkg/kafka/partitionring/partition_ring.go index cd2b027784bae..15dad003dd931 100644 --- a/pkg/kafka/partitionring/partition_ring.go +++ b/pkg/kafka/partitionring/partition_ring.go @@ -2,12 +2,18 @@ package partitionring import ( "flag" + "fmt" + "regexp" + "strconv" + "strings" "time" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" ) +var ingesterIDRegexp = regexp.MustCompile("-([0-9]+)$") + type Config struct { KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances. This option needs be set on ingesters, distributors, queriers, and rulers when running in microservices mode."` @@ -45,3 +51,22 @@ func (cfg *Config) ToLifecyclerConfig(partitionID int32, instanceID string) ring PollingInterval: cfg.lifecyclerPollingInterval, } } + +// ExtractIngesterPartitionID returns the partition ID owner the the given ingester. +func ExtractIngesterPartitionID(ingesterID string) (int32, error) { + if strings.Contains(ingesterID, "local") { + return 0, nil + } + + match := ingesterIDRegexp.FindStringSubmatch(ingesterID) + if len(match) == 0 { + return 0, fmt.Errorf("ingester ID %s doesn't match regular expression %q", ingesterID, ingesterIDRegexp.String()) + } + // Parse the ingester sequence number. + ingesterSeq, err := strconv.Atoi(match[1]) + if err != nil { + return 0, fmt.Errorf("no ingester sequence number in ingester ID %s", ingesterID) + } + + return int32(ingesterSeq), nil +} diff --git a/pkg/kafka/reader_client.go b/pkg/kafka/reader_client.go index 1b8c6b3bc1dc5..9237686fee609 100644 --- a/pkg/kafka/reader_client.go +++ b/pkg/kafka/reader_client.go @@ -13,10 +13,10 @@ import ( ) // NewReaderClient returns the kgo.Client that should be used by the Reader. -func NewReaderClient(cfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) { +func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) { const fetchMaxBytes = 100_000_000 - opts = append(opts, commonKafkaClientOptions(cfg, metrics, logger)...) + opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...) opts = append(opts, kgo.FetchMinBytes(1), kgo.FetchMaxBytes(fetchMaxBytes), @@ -32,7 +32,9 @@ func NewReaderClient(cfg Config, metrics *kprom.Metrics, logger log.Logger, opts if err != nil { return nil, errors.Wrap(err, "creating kafka client") } - + if kafkaCfg.AutoCreateTopicEnabled { + kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + } return client, nil } diff --git a/pkg/kafka/writer_client.go b/pkg/kafka/writer_client.go index 8f65679c01c17..ddd12a646d692 100644 --- a/pkg/kafka/writer_client.go +++ b/pkg/kafka/writer_client.go @@ -79,7 +79,9 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited". kgo.MaxBufferedBytes(0), ) - + if kafkaCfg.AutoCreateTopicEnabled { + kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + } return kgo.NewClient(opts...) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1d7a99ec066c8..76b03fce77371 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -616,6 +616,9 @@ func (t *Loki) initIngester() (_ services.Service, err error) { t.Server.HTTP.Methods("POST", "GET", "DELETE").Path("/ingester/prepare_shutdown").Handler( httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.PrepareShutdown)), ) + t.Server.HTTP.Methods("POST", "GET", "DELETE").Path("/ingester/prepare_partition_downscale").Handler( + httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.PreparePartitionDownscaleHandler)), + ) t.Server.HTTP.Methods("POST", "GET").Path("/ingester/shutdown").Handler( httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)), )