From c1bf4d64f0843e3135136d3a0064ab5f8545ae2c Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 5 Jun 2024 12:01:22 +0100 Subject: [PATCH] Add backoff to flush op This commit adds a configurable backoff to flush ops in the ingester. This is to prevent situations where the store put operation fails fast (i.e. 401 Unauthorized) and can cause ingesters to be rate limited. --- docs/sources/shared/configuration.md | 18 ++++- pkg/ingester/flush.go | 34 +++++++-- pkg/ingester/flush_test.go | 66 +++++++++++++++++ pkg/ingester/ingester.go | 16 ++++- pkg/ingester/ingester_test.go | 101 ++++++++++++++++++++++----- pkg/ingester/instance_test.go | 13 +++- 6 files changed, 217 insertions(+), 31 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index cae0094873a84..b287bdea5f37f 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2752,7 +2752,23 @@ lifecycler: # CLI flag: -ingester.flush-check-period [flush_check_period: | default = 30s] -# The timeout before a flush is cancelled. +flush_op_backoff: + # Minimum backoff period when a flush fails. Each concurrent flush has its own + # backoff, see `ingester.concurrent-flushes`. + # CLI flag: -ingester.flush-op-backoff-min-period + [min_period: | default = 10s] + + # Maximum backoff period when a flush fails. Each concurrent flush has its own + # backoff, see `ingester.concurrent-flushes`. + # CLI flag: -ingester.flush-op-backoff-max-period + [max_period: | default = 1m] + + # Maximum retries for failed flushes. + # CLI flag: -ingester.flush-op-backoff-retries + [max_retries: | default = 10] + +# The timeout for an individual flush. Will be retried up to +# `flush-op-backoff-retries` times. # CLI flag: -ingester.flush-op-timeout [flush_op_timeout: | default = 10m] diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 00aad05475495..81407abcb2e25 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -7,7 +7,9 @@ import ( "sync" "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" @@ -135,8 +137,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo } func (i *Ingester) flushLoop(j int) { + l := log.With(i.logger, "loop", j) defer func() { - level.Debug(i.logger).Log("msg", "Ingester.flushLoop() exited") + level.Debug(l).Log("msg", "Ingester.flushLoop() exited") i.flushQueuesDone.Done() }() @@ -147,9 +150,10 @@ func (i *Ingester) flushLoop(j int) { } op := o.(*flushOp) - err := i.flushUserSeries(op.userID, op.fp, op.immediate) + m := util_log.WithUserID(op.userID, l) + err := i.flushOp(m, op) if err != nil { - level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err) + level.Error(m).Log("msg", "failed to flush", "err", err) } // If we're exiting & we failed to flush, put the failed operation @@ -161,7 +165,23 @@ func (i *Ingester) flushLoop(j int) { } } -func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { +func (i *Ingester) flushOp(l log.Logger, op *flushOp) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + b := backoff.New(ctx, i.cfg.FlushOpBackoff) + for b.Ongoing() { + err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate) + if err == nil { + break + } + level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err) + b.Wait() + } + return b.Err() +} + +func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error { instance, ok := i.getInstanceByID(userID) if !ok { return nil @@ -175,9 +195,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat lbs := labels.String() level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs) - ctx := user.InjectOrgID(context.Background(), userID) - ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) - defer cancel() + ctx = user.InjectOrgID(ctx, userID) + ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) + defer cancelFunc() err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx) if err != nil { return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 6fd52bafa066f..edd6084a2741b 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -1,6 +1,7 @@ package ingester import ( + "errors" "fmt" "os" "sort" @@ -102,6 +103,67 @@ func Benchmark_FlushLoop(b *testing.B) { } } +func Test_FlushOp(t *testing.T) { + t.Run("no error", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + _, ing := newTestStore(t, cfg, nil) + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + })) + }) + + t.Run("max retries exceeded", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + store, ing := newTestStore(t, cfg, nil) + store.onPut = func(_ context.Context, _ []chunk.Chunk) error { + return errors.New("failed to write chunks") + } + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + }), "terminated after 1 retries") + }) +} + func Test_Flush(t *testing.T) { var ( store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil) @@ -297,6 +359,10 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg := Config{} flagext.DefaultValues(&cfg) + cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushOpTimeout = 15 * time.Second cfg.FlushCheckPeriod = 99999 * time.Hour cfg.MaxChunkIdle = 99999 * time.Hour cfg.ConcurrentFlushes = 1 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 41b358906e0a1..87c71afaeef06 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" @@ -82,6 +83,7 @@ type Config struct { ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` @@ -127,7 +129,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") - f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.") + f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.") + f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.") f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.") f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") @@ -155,6 +160,15 @@ func (cfg *Config) Validate() error { return err } + if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff { + return errors.New("invalid flush op min backoff: cannot be larger than max backoff") + } + if cfg.FlushOpBackoff.MaxRetries <= 0 { + return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries) + } + if cfg.FlushOpTimeout <= 0 { + return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout) + } if cfg.IndexShards <= 0 { return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards) } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 1c438bd6bf2c0..6bb27ad645cc9 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/middleware" @@ -676,57 +677,119 @@ func TestIngester_asyncStoreMaxLookBack(t *testing.T) { func TestValidate(t *testing.T) { for i, tc := range []struct { - in Config - err bool - expected Config + in Config + expected Config + expectedErr string }{ { in: Config{ - MaxChunkAge: time.Minute, ChunkEncoding: chunkenc.EncGZIP.String(), - IndexShards: index.DefaultIndexShards, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, }, expected: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, MaxChunkAge: time.Minute, - ChunkEncoding: chunkenc.EncGZIP.String(), parsedEncoding: chunkenc.EncGZIP, - IndexShards: index.DefaultIndexShards, }, }, { in: Config{ ChunkEncoding: chunkenc.EncSnappy.String(), - IndexShards: index.DefaultIndexShards, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, }, expected: Config{ - ChunkEncoding: chunkenc.EncSnappy.String(), - parsedEncoding: chunkenc.EncSnappy, + ChunkEncoding: chunkenc.EncSnappy.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, IndexShards: index.DefaultIndexShards, + parsedEncoding: chunkenc.EncSnappy, }, }, { in: Config{ - IndexShards: index.DefaultIndexShards, ChunkEncoding: "bad-enc", + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + }, + expectedErr: "invalid encoding: bad-enc, supported: none, gzip, lz4-64k, snappy, lz4-256k, lz4-1M, lz4, flate, zstd", + }, + { + in: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, + }, + expectedErr: "invalid flush op max retries: 0", + }, + { + in: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, }, - err: true, + expectedErr: "invalid flush op timeout: 0s", }, { in: Config{ - MaxChunkAge: time.Minute, ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + MaxChunkAge: time.Minute, }, - err: true, + expectedErr: "invalid ingester index shard factor: 0", }, } { t.Run(fmt.Sprint(i), func(t *testing.T) { err := tc.in.Validate() - if tc.err { - require.NotNil(t, err) - return + if tc.expectedErr != "" { + require.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, tc.expected, tc.in) } - require.Nil(t, err) - require.Equal(t, tc.expected, tc.in) }) } } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 7f7dc30361d6a..3055a7fb0c5b7 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -40,9 +41,15 @@ import ( func defaultConfig() *Config { cfg := Config{ - BlockSize: 512, - ChunkEncoding: "gzip", - IndexShards: 32, + BlockSize: 512, + ChunkEncoding: "gzip", + IndexShards: 32, + FlushOpTimeout: 15 * time.Second, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, } if err := cfg.Validate(); err != nil { panic(errors.Wrap(err, "error building default test config"))