From e141de7a179b9c302b546b42e175d2f2409dab63 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 18 Aug 2021 15:01:31 -0400 Subject: [PATCH 1/2] handle replaying unordered WAL into ordered configs. --- pkg/chunkenc/memchunk.go | 14 ++++++++++++ pkg/ingester/checkpoint.go | 8 +++---- pkg/ingester/flush_test.go | 1 - pkg/ingester/ingester.go | 11 +++++----- pkg/ingester/instance.go | 2 +- pkg/ingester/limiter.go | 11 +++++++++- pkg/ingester/recovery.go | 40 +++++++++++++++++++++++++++++++++-- pkg/ingester/stream.go | 13 +++++++----- pkg/ingester/transfer_test.go | 1 - 9 files changed, 80 insertions(+), 21 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 1f2f3044d5556..e9353afd92b7e 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -724,6 +724,20 @@ func (c *MemChunk) reorder() error { return nil } +func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error { + + if c.head != nil && c.head.Format() != desired { + newH, err := c.head.Convert(desired) + if err != nil { + return err + } + + c.head = newH + } + c.headFmt = desired + return nil +} + // cut a new block and add it to finished blocks. func (c *MemChunk) cut() error { if c.head.IsEmpty() { diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 87c0b1e09eda6..d2f404a6d0296 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -100,10 +100,10 @@ func fromWireChunks(conf *Config, wireChunks []Chunk) ([]chunkDesc, error) { lastUpdated: c.LastUpdated, } - hbType := chunkenc.OrderedHeadBlockFmt - if conf.UnorderedWrites { - hbType = chunkenc.UnorderedHeadBlockFmt - } + // Always use Unordered headblocks during replay + // to ensure Loki can effectively replay an unordered-friendly + // WAL into a new configuration that disables unordered writes. + hbType := chunkenc.UnorderedHeadBlockFmt mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, hbType, conf.BlockSize, conf.TargetChunkSize) if err != nil { return nil, err diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index b91cca1076345..52ed4bfc66f4d 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -291,7 +291,6 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg.LifecyclerConfig.MinReadyDuration = 0 cfg.BlockSize = 256 * 1024 cfg.TargetChunkSize = 1500 * 1024 - cfg.UnorderedWrites = true return cfg } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 5de0021743505..9f6e1b782a994 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -82,8 +82,6 @@ type Config struct { ChunkFilterer storage.RequestChunkFilterer `yaml:"-"` - UnorderedWrites bool `yaml:"unordered_writes_enabled"` - IndexShards int `yaml:"index_shards"` } @@ -107,7 +105,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.") f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.") f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`") - f.BoolVar(&cfg.UnorderedWrites, "ingester.unordered-writes-enabled", false, "(Experimental) Allow out of order writes.") f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.") } @@ -328,9 +325,9 @@ func (i *Ingester) starting(ctx context.Context) error { i.cfg.RetainPeriod = old }() - // Disable the in process stream limit checks while replaying the WAL - i.limiter.Disable() - defer i.limiter.Enable() + // Disable the in process stream limit checks while replaying the WAL. + // It is re-enabled in the recover's Close() method. + i.limiter.DisableForWALReplay() recoverer := newIngesterRecoverer(i) defer recoverer.Close() @@ -381,6 +378,8 @@ func (i *Ingester) starting(ctx context.Context) error { "errors", segmentRecoveryErr != nil, ) + level.Info(util_log.Logger).Log("msg", "closing recoverer") + recoverer.Close() elapsed := time.Since(start) i.metrics.walReplayDuration.Set(elapsed.Seconds()) level.Info(util_log.Logger).Log("msg", "recovery finished", "time", elapsed.String()) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 9e83fcc068901..351277b43efc9 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo if !ok { sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp) - stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.limits.UnorderedWrites(i.instanceID), i.metrics) + stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) i.streamsByFP[fp] = stream i.streams[stream.labelsString] = stream i.streamsCreatedTotal.Inc() diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index c4841ea27d646..32867647e077a 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -29,7 +29,7 @@ type Limiter struct { disabled bool } -func (l *Limiter) Disable() { +func (l *Limiter) DisableForWALReplay() { l.mtx.Lock() defer l.mtx.Unlock() l.disabled = true @@ -50,6 +50,15 @@ func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor } } +func (l *Limiter) UnorderedWrites(userID string) bool { + // WAL replay should not discard previously ack'd writes, + // so allow out of order writes while the limiter is disabled. + if l.disabled { + return true + } + return l.limits.UnorderedWrites(userID) +} + // AssertMaxStreamsPerUser ensures limit has not been reached compared to the current // number of streams in input and returns an error if so. func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 3d0c5c4ed29b7..17c61f18cb5f5 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -99,6 +99,7 @@ type ingesterRecoverer struct { } func newIngesterRecoverer(i *Ingester) *ingesterRecoverer { + return &ingesterRecoverer{ ing: i, done: make(chan struct{}), @@ -127,6 +128,9 @@ func (r *ingesterRecoverer) Series(series *Series) error { stream.lastLine.content = series.LastLine stream.entryCt = series.EntryCt stream.highestTs = series.HighestTs + // Always set during replay, then reset to desired value afterward. + // This allows replaying unordered WALs into ordered configurations. + stream.unorderedWrites = true if err != nil { return err @@ -202,14 +206,46 @@ func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { } func (r *ingesterRecoverer) Close() { - // reset all the incrementing stream counters after a successful WAL replay. + // Ensure this is only run once. + select { + case <-r.done: + return + default: + } + + close(r.done) + + // Enable the limiter here to accurately reflect tenant limits after recovery. + r.ing.limiter.Enable() + for _, inst := range r.ing.getInstances() { inst.forAllStreams(context.Background(), func(s *stream) error { + // reset all the incrementing stream counters after a successful WAL replay. s.resetCounter() + + // If we've replayed a WAL with unordered writes, but the new + // configuration disables them, convert all streams/head blocks + // to ensure unordered writes are disabled after the replay, + // but without dropping any previously accepted data. + isAllowed := r.ing.limiter.limits.UnorderedWrites(s.tenant) + old := s.unorderedWrites + s.unorderedWrites = isAllowed + + if !isAllowed && old { + + s.chunkMtx.Lock() + defer s.chunkMtx.Unlock() + if len(s.chunks) > 0 { + err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed)) + if err != nil { + return err + } + } + } + return nil }) } - close(r.done) } func (r *ingesterRecoverer) Done() <-chan struct{} { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 5323eb6f9b3cf..6411d50a660d5 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -161,11 +161,7 @@ func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err er } func (s *stream) NewChunk() *chunkenc.MemChunk { - hbType := chunkenc.OrderedHeadBlockFmt - if s.unorderedWrites { - hbType = chunkenc.UnorderedHeadBlockFmt - } - return chunkenc.NewMemChunk(s.cfg.parsedEncoding, hbType, s.cfg.BlockSize, s.cfg.TargetChunkSize) + return chunkenc.NewMemChunk(s.cfg.parsedEncoding, headBlockType(s.unorderedWrites), s.cfg.BlockSize, s.cfg.TargetChunkSize) } func (s *stream) Push( @@ -480,3 +476,10 @@ func (s *stream) addTailer(t *tailer) { func (s *stream) resetCounter() { s.entryCt = 0 } + +func headBlockType(unorderedWrites bool) chunkenc.HeadBlockFmt { + if unorderedWrites { + return chunkenc.UnorderedHeadBlockFmt + } + return chunkenc.OrderedHeadBlockFmt +} diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index abf1657db6acd..28ec897282b0e 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -30,7 +30,6 @@ func TestTransferOut(t *testing.T) { f := newTestIngesterFactory(t) ing := f.getIngester(time.Duration(0), t) - ing.cfg.UnorderedWrites = false // enforce ordered writes on old testware (transfers are deprecated). // Push some data into our original ingester ctx := user.InjectOrgID(context.Background(), "test") From 5f69b0addc320c661b6a7dd088d49f10cbb80fb5 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 18 Aug 2021 18:06:09 -0400 Subject: [PATCH 2/2] ingester limiter metric, testware, correctly replayed unordered WAL into ordered config --- pkg/ingester/checkpoint_test.go | 108 +++++++++++++++++++++++++++++++- pkg/ingester/ingester.go | 2 +- pkg/ingester/instance.go | 2 +- pkg/ingester/instance_test.go | 16 ++--- pkg/ingester/limiter.go | 6 +- pkg/ingester/limiter_test.go | 4 +- pkg/ingester/metrics.go | 6 ++ pkg/ingester/recovery.go | 2 +- 8 files changed, 130 insertions(+), 16 deletions(-) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 5ee0000a24b05..0626e083ebbd7 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -450,7 +450,7 @@ func Test_SeriesIterator(t *testing.T) { IngestionBurstSizeMB: 1e4, }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) for i := 0; i < 3; i++ { inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil) @@ -500,7 +500,7 @@ func Benchmark_SeriesIterator(b *testing.B) { IngestionBurstSizeMB: 1e4, }, nil) require.NoError(b, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) for i := range instances { inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil, nil) @@ -575,3 +575,107 @@ func buildChunks(t testing.TB, size int) []Chunk { } return chks } + +func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) { + for _, waitForCheckpoint := range []bool{false, true} { + t.Run(fmt.Sprintf("checkpoint-%v", waitForCheckpoint), func(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + + // First launch the ingester with unordered writes enabled + dft := defaultLimitsTestConfig() + dft.UnorderedWrites = true + limits, err := validation.NewOverrides(dft, nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + }, + } + + start := time.Now() + steps := 10 + end := start.Add(time.Second * time.Duration(steps)) + + // Write data out of order + for i := steps - 1; i >= 0; i-- { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + } + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + if waitForCheckpoint { + // Ensure we have checkpointed now + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*2) // give a bit of buffer + + // Add some more data after the checkpoint + tmp := end + end = end.Add(time.Second * time.Duration(steps)) + req.Streams[0].Entries = nil + req.Streams[1].Entries = nil + // Write data out of order again + for i := steps - 1; i >= 0; i-- { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: tmp.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", steps+i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: tmp.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", steps+i), + }) + } + + _, err = i.Push(ctx, &req) + require.NoError(t, err) + } + + ensureIngesterData(ctx, t, start, end, i) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Now disable unordered writes + limitCfg := defaultLimitsTestConfig() + limitCfg.UnorderedWrites = false + limits, err = validation.NewOverrides(limitCfg, nil) + require.NoError(t, err) + + // restart the ingester + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // ensure we've recovered data from wal segments + ensureIngesterData(ctx, t, start, end, i) + }) + } +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9f6e1b782a994..8e2dc7dc9ca68 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -228,7 +228,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid // Now that the lifecycler has been created, we can create the limiter // which depends on it. - i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) + i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) i.Service = services.NewBasicService(i.starting, i.running, i.stopping) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 351277b43efc9..35e2f7746e15b 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp) - stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.limits.UnorderedWrites(i.instanceID), i.metrics) + stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) i.streams[pushReqStream.Labels] = stream i.streamsByFP[fp] = stream diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 0f261d72f4dd3..e9591fd83280a 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -39,7 +39,7 @@ var NilMetrics = newIngesterMetrics(nil) func TestLabelsCollisions(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) require.NoError(t, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}, nil) @@ -66,7 +66,7 @@ func TestLabelsCollisions(t *testing.T) { func TestConcurrentPushes(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) require.NoError(t, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) @@ -117,7 +117,7 @@ func TestConcurrentPushes(t *testing.T) { func TestSyncPeriod(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) require.NoError(t, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) const ( syncPeriod = 1 * time.Minute @@ -159,7 +159,7 @@ func TestSyncPeriod(t *testing.T) { func Test_SeriesQuery(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) require.NoError(t, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) // just some random values cfg := defaultConfig() @@ -274,7 +274,7 @@ func makeRandomLabels() labels.Labels { func Benchmark_PushInstance(b *testing.B) { limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) require.NoError(b, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) i := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil) ctx := context.Background() @@ -314,7 +314,7 @@ func Benchmark_PushInstance(b *testing.B) { func Benchmark_instance_addNewTailer(b *testing.B) { limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 100000}, nil) require.NoError(b, err) - limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) ctx := context.Background() @@ -368,7 +368,7 @@ func Test_Iterator(t *testing.T) { defaultLimits := defaultLimitsTestConfig() overrides, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) - instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil) + instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil) ctx := context.TODO() direction := logproto.BACKWARD limit := uint32(2) @@ -450,7 +450,7 @@ func Test_ChunkFilter(t *testing.T) { overrides, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) instance := newInstance( - &ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{}) + &ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{}) ctx := context.TODO() direction := logproto.BACKWARD limit := uint32(2) diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 32867647e077a..0f222d667a290 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -24,6 +24,7 @@ type Limiter struct { limits *validation.Overrides ring RingCount replicationFactor int + metrics *ingesterMetrics mtx sync.RWMutex disabled bool @@ -33,20 +34,23 @@ func (l *Limiter) DisableForWALReplay() { l.mtx.Lock() defer l.mtx.Unlock() l.disabled = true + l.metrics.limiterEnabled.Set(0) } func (l *Limiter) Enable() { l.mtx.Lock() defer l.mtx.Unlock() l.disabled = false + l.metrics.limiterEnabled.Set(1) } // NewLimiter makes a new limiter -func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int) *Limiter { +func NewLimiter(limits *validation.Overrides, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter { return &Limiter{ limits: limits, ring: ring, replicationFactor: replicationFactor, + metrics: metrics, } } diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index da5e3a999af53..3be6c20cce3b1 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -108,7 +108,7 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, ring, testData.ringReplicationFactor) + limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor) actual := limiter.AssertMaxStreamsPerUser("test", testData.streams) assert.Equal(t, testData.expected, actual) @@ -155,7 +155,7 @@ func TestLimiter_minNonZero(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - limiter := NewLimiter(nil, nil, 0) + limiter := NewLimiter(nil, NilMetrics, nil, 0) assert.Equal(t, testData.expected, limiter.minNonZero(testData.first, testData.second)) }) } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 9e57a6c513fd7..bf31ecd63302b 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -27,6 +27,8 @@ type ingesterMetrics struct { recoveryBytesInUse prometheus.Gauge recoveryIsFlushing prometheus.Gauge + limiterEnabled prometheus.Gauge + autoForgetUnhealthyIngestersTotal prometheus.Counter } @@ -119,6 +121,10 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { Name: "loki_ingester_wal_replay_flushing", Help: "Whether the wal replay is in a flushing phase due to backpressure", }), + limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_limiter_enabled", + Help: "Whether the ingester's limiter is enabled", + }), autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "loki_ingester_autoforget_unhealthy_ingesters_total", Help: "Total number of ingesters automatically forgotten", diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 17c61f18cb5f5..96c63c6881f29 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -227,7 +227,7 @@ func (r *ingesterRecoverer) Close() { // configuration disables them, convert all streams/head blocks // to ensure unordered writes are disabled after the replay, // but without dropping any previously accepted data. - isAllowed := r.ing.limiter.limits.UnorderedWrites(s.tenant) + isAllowed := r.ing.limiter.UnorderedWrites(s.tenant) old := s.unorderedWrites s.unorderedWrites = isAllowed