Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): Add Ingestion from Kafka in Ingesters #14192

Merged
merged 13 commits into from
Sep 24, 2024
17 changes: 17 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,12 @@ kafka_config:
# CLI flag: -kafka.consumer-group
[consumer_group: <string> | default = ""]

# How frequently a consumer should commit the consumed offset to Kafka. The
# last committed offset is used at startup to continue the consumption from
# where it was left.
# CLI flag: -kafka.consumer-group-offset-commit-interval
[consumer_group_offset_commit_interval: <duration> | default = 1s]

# How long to retry a failed request to get the last produced offset.
# CLI flag: -kafka.last-produced-offset-retry-timeout
[last_produced_offset_retry_timeout: <duration> | default = 10s]
Expand All @@ -809,6 +815,17 @@ kafka_config:
# CLI flag: -kafka.auto-create-topic-enabled
[auto_create_topic_enabled: <boolean> | default = true]

# When auto-creation of Kafka topic is enabled and this value is positive,
# Kafka's num.partitions configuration option is set on Kafka brokers with
# this value when Loki component that uses Kafka starts. This configuration
# option specifies the default number of partitions that the Kafka broker uses
# for auto-created topics. Note that this is a Kafka-cluster wide setting, and
# applies to any auto-created topic. If the setting of num.partitions fails,
# Loki proceeds anyways, but auto-created topics could have an incorrect
# number of partitions.
# CLI flag: -kafka.auto-create-topic-default-partitions
[auto_create_topic_default_partitions: <int> | default = 1000]

# The maximum size of a Kafka record data that should be generated by the
# producer. An incoming write request larger than this size is split into
# multiple Kafka records. We strongly recommend to not change this setting
Expand Down
104 changes: 104 additions & 0 deletions pkg/ingester/downscale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package ingester

import (
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"

"github.com/grafana/loki/v3/pkg/util"
)

// PreparePartitionDownscaleHandler prepares the ingester's partition downscaling. The partition owned by the
// ingester will switch to INACTIVE state (read-only).
//
// Following methods are supported:
//
// - GET
// Returns timestamp when partition was switched to INACTIVE state, or 0, if partition is not in INACTIVE state.
//
// - POST
// Switches the partition to INACTIVE state (if not yet), and returns the timestamp when the switch to
// INACTIVE state happened.
//
// - DELETE
// Sets partition back from INACTIVE to ACTIVE state, and returns 0 signalling the partition is not in INACTIVE state
func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) {
logger := log.With(i.logger, "partition", i.ingestPartitionID)

// Don't allow callers to change the shutdown configuration while we're in the middle
// of starting or shutting down.
if i.State() != services.Running {
w.WriteHeader(http.StatusServiceUnavailable)
return
}

if !i.cfg.KafkaIngestion.Enabled {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

switch r.Method {
case http.MethodPost:
// It's not allowed to prepare the downscale while in PENDING state. Why? Because if the downscale
// will be later cancelled, we don't know if it was requested in PENDING or ACTIVE state, so we
// don't know to which state reverting back. Given a partition is expected to stay in PENDING state
// for a short period, we simply don't allow this case.
state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if state == ring.PartitionPending {
level.Warn(logger).Log("msg", "received a request to prepare partition for shutdown, but the request can't be satisfied because the partition is in PENDING state")
w.WriteHeader(http.StatusConflict)
return
}

if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionInactive); err != nil {
level.Error(logger).Log("msg", "failed to change partition state to inactive", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

case http.MethodDelete:
state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

// If partition is inactive, make it active. We ignore other states Active and especially Pending.
if state == ring.PartitionInactive {
// We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency
// in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried.
// Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design.
// We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than
// "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer
// than "lookback period" ago, it looks to be an edge case not worth to address.
if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil {
level.Error(logger).Log("msg", "failed to change partition state to active", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}

state, stateTimestamp, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if state == ring.PartitionInactive {
util.WriteJSONResponse(w, map[string]any{"timestamp": stateTimestamp.Unix()})
} else {
util.WriteJSONResponse(w, map[string]any{"timestamp": 0})
}
}
106 changes: 94 additions & 12 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -225,6 +227,7 @@ type Interface interface {
GetOrCreateInstance(instanceID string) (*instance, error)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
PrepareShutdown(w http.ResponseWriter, r *http.Request)
PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request)
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -290,6 +293,10 @@ type Ingester struct {
// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
readRing ring.ReadRing
recalculateOwnedStreams *recalculateOwnedStreams

ingestPartitionID int32
partitionRingLifecycler *ring.PartitionInstanceLifecycler
partitionReader *partition.Reader
}

// New makes a new Ingester.
Expand Down Expand Up @@ -353,6 +360,34 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.lifecyclerWatcher = services.NewFailureWatcher()
i.lifecyclerWatcher.WatchService(i.lifecycler)

if i.cfg.KafkaIngestion.Enabled {
i.ingestPartitionID, err = partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
}
partitionRingKV := cfg.KafkaIngestion.PartitionRingConfig.KVStore.Mock
if partitionRingKV == nil {
partitionRingKV, err = kv.NewClient(cfg.KafkaIngestion.PartitionRingConfig.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger)
if err != nil {
return nil, fmt.Errorf("creating KV store for ingester partition ring: %w", err)
}
}
i.partitionRingLifecycler = ring.NewPartitionInstanceLifecycler(
i.cfg.KafkaIngestion.PartitionRingConfig.ToLifecyclerConfig(i.ingestPartitionID, cfg.LifecyclerConfig.ID),
PartitionRingName,
PartitionRingKey,
partitionRingKV,
logger,
prometheus.WrapRegistererWithPrefix("loki_", registerer))

i.partitionReader, err = partition.NewReader(cfg.KafkaIngestion.KafkaConfig, i.ingestPartitionID, cfg.LifecyclerConfig.ID, NewKafkaConsumerFactory(i, logger, registerer), logger, registerer)
if err != nil {
return nil, err
}
i.lifecyclerWatcher.WatchService(i.partitionRingLifecycler)
i.lifecyclerWatcher.WatchService(i.partitionReader)
}

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
Expand Down Expand Up @@ -461,7 +496,15 @@ func (i *Ingester) setupAutoForget() {
}()
}

func (i *Ingester) starting(ctx context.Context) error {
func (i *Ingester) starting(ctx context.Context) (err error) {
defer func() {
if err != nil {
// if starting() fails for any reason (e.g., context canceled),
// the lifecycler must be stopped.
_ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}
}()

if i.cfg.WAL.Enabled {
start := time.Now()

Expand Down Expand Up @@ -546,17 +589,6 @@ func (i *Ingester) starting(ctx context.Context) error {

i.InitFlushQueues()

// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err := i.lifecycler.StartAsync(context.Background())
if err != nil {
return err
}

err = i.lifecycler.AwaitRunning(ctx)
if err != nil {
return err
}

shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename)
shutdownMarker, err := shutdownMarkerExists(shutdownMarkerPath)
if err != nil {
Expand All @@ -568,6 +600,26 @@ func (i *Ingester) starting(ctx context.Context) error {
i.setPrepareShutdown()
}

// When kafka ingestion is enabled, we have to make sure that reader catches up replaying the partition
// BEFORE the ingester ring lifecycler is started, because once the ingester ring lifecycler will start
// it will switch the ingester state in the ring to ACTIVE.
if i.partitionReader != nil {
if err := services.StartAndAwaitRunning(ctx, i.partitionReader); err != nil {
return fmt.Errorf("failed to start partition reader: %w", err)
}
}

// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err = i.lifecycler.StartAsync(context.Background())
if err != nil {
return err
}

err = i.lifecycler.AwaitRunning(ctx)
if err != nil {
return err
}

err = i.recalculateOwnedStreams.StartAsync(ctx)
if err != nil {
return fmt.Errorf("can not start recalculate owned streams service: %w", err)
Expand All @@ -578,6 +630,11 @@ func (i *Ingester) starting(ctx context.Context) error {
return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err)
}

if i.partitionRingLifecycler != nil {
if err := services.StartAndAwaitRunning(ctx, i.partitionRingLifecycler); err != nil {
return fmt.Errorf("failed to start partition ring lifecycler: %w", err)
}
}
// start our loop
i.loopDone.Add(1)
go i.loop()
Expand Down Expand Up @@ -610,6 +667,19 @@ func (i *Ingester) running(ctx context.Context) error {
// At this point, loop no longer runs, but flushers are still running.
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()

if i.partitionReader != nil {
if err := services.StopAndAwaitTerminated(context.Background(), i.partitionReader); err != nil {
level.Warn(i.logger).Log("msg", "failed to stop partition reader", "err", err)
}
}

if i.partitionRingLifecycler != nil {
if err := services.StopAndAwaitTerminated(context.Background(), i.partitionRingLifecycler); err != nil {
level.Warn(i.logger).Log("msg", "failed to stop partition ring lifecycler", "err", err)
}
}

var errs util.MultiError
errs.Add(i.wal.Stop())

Expand Down Expand Up @@ -766,6 +836,18 @@ func (i *Ingester) setPrepareShutdown() {
i.lifecycler.SetUnregisterOnShutdown(true)
i.terminateOnShutdown = true
i.metrics.shutdownMarker.Set(1)

if i.partitionRingLifecycler != nil {
// When the prepare shutdown endpoint is called there are two changes in the partitions ring behavior:
//
// 1. If setPrepareShutdown() is called at startup, because of the shutdown marker found on disk,
// the ingester shouldn't create the partition if doesn't exist, because we expect the ingester will
// be scaled down shortly after.
// 2. When the ingester will shutdown we'll have to remove the ingester from the partition owners,
// because we expect the ingester to be scaled down.
i.partitionRingLifecycler.SetCreatePartitionOnStartup(false)
i.partitionRingLifecycler.SetRemoveOwnerOnShutdown(true)
}
}

func (i *Ingester) unsetPrepareShutdown() {
Expand Down
Loading
Loading