diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index da11bb2760680..02f955006a312 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -897,7 +897,7 @@ lifecycler: [query_store_max_look_back_period: | default = 0] -# The ingester WAL records incoming logs and stores them on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash. +# The ingester WAL (Write Ahead Log) records incoming logs and stores them on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash. wal: # Enables writing to WAL. # CLI flag: -ingester.wal-enabled @@ -911,13 +911,17 @@ wal: # CLI flag: -ingester.recover-from-wal [recover: | default = false] -# When WAL is enabled, should chunks be flushed to long-term storage on shutdown. -# CLI flag: -ingester.flush-on-shutdown -[flush_on_shutdown: | default = false] + # When WAL is enabled, should chunks be flushed to long-term storage on shutdown. + # CLI flag: -ingester.flush-on-shutdown + [flush_on_shutdown: | default = false] -# Interval at which checkpoints should be created. -# CLI flag: ingester.checkpoint-duration -[checkpoint_duration: | default = 5m] + # Interval at which checkpoints should be created. + # CLI flag: ingester.checkpoint-duration + [checkpoint_duration: | default = 5m] + + # Maximum memory size the WAL may use during replay. After hitting this it will flush data to storage before continuing. + # A unit suffix (KB, MB, GB) may be applied. + [replay_memory_ceiling: | default = 4GB] ``` ## consul_config diff --git a/docs/sources/operations/storage/wal.md b/docs/sources/operations/storage/wal.md index a5c1e99db99d6..93f5df18d14d4 100644 --- a/docs/sources/operations/storage/wal.md +++ b/docs/sources/operations/storage/wal.md @@ -26,6 +26,11 @@ In the event the underlying WAL disk is full, Loki will not fail incoming writes Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be used to track and alert when this happens. + +### Backpressure + +The WAL also includes a backpressure mechanism to allow a large WAL to be replayed within a smaller memory bound. This is helpful after bad scenarios (i.e. an outage) when a WAL has grown past the point it may be recovered in memory. In this case, the ingester will track the amount of data being replayed and once it's passed the `ingester.wal-replay-memory-ceiling` threshold, will flush to storage. When this happens, it's likely that Loki's attempt to deduplicate chunks via content addressable storage will suffer. We deemed this efficiency loss an acceptable tradeoff considering how it simplifies operation and that it should not occur during regular operation (rollouts, rescheduling) where the WAL can be replayed without triggering this threshold. + ### Metrics ## Changes to deployment @@ -38,6 +43,7 @@ Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be * `--ingester.checkpoint-duration` to the interval at which checkpoints should be created. * `--ingester.recover-from-wal` to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this. * If you are going to enable WAL, it is advisable to always set this to `true`. + * `--ingester.wal-replay-memory-ceiling` (default 4GB) may be set higher/lower depending on your resource settings. It handles memory pressure during WAL replays, allowing a WAL many times larger than available memory to be replayed. This is provided to minimize reconciliation time after very bad situations, i.e. an outage, and will likely not impact regular operations/rollouts _at all_. We suggest setting this to a high percentage (~75%) of available memory. ## Changes in lifecycle when WAL is enabled @@ -78,7 +84,7 @@ When scaling down, we must ensure existing data on the leaving ingesters are flu Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shutdown according to statefulset rules are `ingester-3` and then `ingester-2`. -Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/ingester/flush_shutdown`](../../api#post-ingesterflush_shutdown) endpoint. This will flush the chunks and shut down the ingesters (while also removing itself from the ring). +Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/ingester/flush_shutdown`](../../api#post-ingesterflush_shutdown) endpoint. This will flush the chunks and remove itself from the ring, after which it will register as unready and may be deleted. After hitting the endpoint for `ingester-2 ingester-3`, scale down the ingesters to 2. diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index ec81e8aa99d40..98d1007d1e6ae 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -269,7 +269,7 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk { blocks: []block{}, head: &headBlock{}, - format: chunkFormatV2, + format: chunkFormatV3, encoding: enc, } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index ce63a3cbea05d..eb23085408bb5 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -920,13 +920,6 @@ func TestCheckpointEncoding(t *testing.T) { cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), blockSize, targetSize) require.Nil(t, err) - // TODO(owen-d): remove once v3+ is the default chunk version - // because that is when we started serializing uncompressed size. - // Until then, nil them out in order to ease equality testing. - for i := range c.blocks { - c.blocks[i].uncompressedSize = 0 - } - require.Equal(t, c, cpy) } diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index a0ccc84b40cc9..5a6f504c411b7 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -47,12 +47,10 @@ func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time, func defaultIngesterTestConfigWithWAL(t *testing.T, walDir string) Config { ingesterConfig := defaultIngesterTestConfig(t) ingesterConfig.MaxTransferRetries = 0 - ingesterConfig.WAL = WALConfig{ - Enabled: true, - Dir: walDir, - Recover: true, - CheckpointDuration: time.Second, - } + ingesterConfig.WAL.Enabled = true + ingesterConfig.WAL.Dir = walDir + ingesterConfig.WAL.Recover = true + ingesterConfig.WAL.CheckpointDuration = time.Second return ingesterConfig } @@ -113,7 +111,7 @@ func TestIngesterWAL(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // ensure we haven't checkpointed yet - expectCheckpoint(t, walDir, false) + expectCheckpoint(t, walDir, false, time.Second) // restart the ingester i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) @@ -124,9 +122,8 @@ func TestIngesterWAL(t *testing.T) { // ensure we've recovered data from wal segments ensureIngesterData(ctx, t, start, end, i) - time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer // ensure we have checkpointed now - expectCheckpoint(t, walDir, true) + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*5) // give a bit of buffer require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) @@ -244,17 +241,151 @@ func TestUnflushedChunks(t *testing.T) { require.Equal(t, 1, len(unflushedChunks(chks))) } -func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { - fs, err := ioutil.ReadDir(walDir) +func TestIngesterWALBackpressureSegments(t *testing.T) { + + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") require.Nil(t, err) - var found bool - for _, f := range fs { - if _, err := checkpointIndex(f.Name(), false); err == nil { - found = true + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + ingesterConfig.WAL.ReplayMemoryCeiling = 1000 + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, } } - require.True(t, found == shouldExist) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + start := time.Now() + // Replay data 5x larger than the ceiling. + totalSize := int(5 * i.cfg.WAL.ReplayMemoryCeiling) + req, written := mkPush(start, totalSize) + require.Equal(t, totalSize, written) + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, req) + require.NoError(t, err) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // ensure we haven't checkpointed yet + expectCheckpoint(t, walDir, false, time.Second) + + // restart the ingester, ensuring we replayed from WAL. + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) +} + +func TestIngesterWALBackpressureCheckpoint(t *testing.T) { + + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + ingesterConfig.WAL.ReplayMemoryCeiling = 1000 + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + start := time.Now() + // Replay data 5x larger than the ceiling. + totalSize := int(5 * i.cfg.WAL.ReplayMemoryCeiling) + req, written := mkPush(start, totalSize) + require.Equal(t, totalSize, written) + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, req) + require.NoError(t, err) + + // ensure we have checkpointed now + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*5) // give a bit of buffer + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // restart the ingester, ensuring we can replay from the checkpoint as well. + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) +} + +func expectCheckpoint(t *testing.T, walDir string, shouldExist bool, max time.Duration) { + deadline := time.After(max) + for { + select { + case <-deadline: + require.Fail(t, "timeout while waiting for checkpoint existence:", shouldExist) + default: + <-time.After(max / 10) // check 10x over the duration + } + + fs, err := ioutil.ReadDir(walDir) + require.Nil(t, err) + var found bool + for _, f := range fs { + if _, err := checkpointIndex(f.Name(), false); err == nil { + found = true + } + } + if found == shouldExist { + return + } + } + +} + +// mkPush makes approximately totalSize bytes of log lines across min(500, totalSize) streams +func mkPush(start time.Time, totalSize int) (*logproto.PushRequest, int) { + var written int + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + }, + } + totalStreams := 500 + if totalStreams > totalSize { + totalStreams = totalSize + } + + for i := 0; i < totalStreams; i++ { + req.Streams = append(req.Streams, logproto.Stream{ + Labels: fmt.Sprintf(`{foo="bar",i="%d"}`, i), + }) + + for j := 0; j < totalSize/totalStreams; j++ { + req.Streams[i].Entries = append(req.Streams[i].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(j) * time.Nanosecond), + Line: string([]byte{1}), + }) + written++ + } + + } + return req, written } type ingesterInstancesFunc func() []*instance diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 8ff11a59560c8..4775f04d835a5 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -42,6 +42,11 @@ func Test_Encoding_Series(t *testing.T) { err := decodeWALRecord(buf, decoded) require.Nil(t, err) + + // Since we use a pool, there can be subtle differentiations between nil slices and len(0) slices. + // Both are valid, so check length. + require.Equal(t, 0, len(decoded.RefEntries)) + decoded.RefEntries = nil require.Equal(t, record, decoded) } diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 8322f8745c606..d29a0a50ea3f1 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -107,10 +107,24 @@ const ( flushReasonSynced = "synced" ) +// Note: this is called both during the WAL replay (zero or more times) +// and then after replay as well. +func (i *Ingester) InitFlushQueues() { + i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) + for j := 0; j < i.cfg.ConcurrentFlushes; j++ { + i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) + go i.flushLoop(j) + } +} + // Flush triggers a flush of all the chunks and closes the flush queues. // Called from the Lifecycler as part of the ingester shutdown. func (i *Ingester) Flush() { - i.sweepUsers(true) + i.flush(true) +} + +func (i *Ingester) flush(mayRemoveStreams bool) { + i.sweepUsers(true, mayRemoveStreams) // Close the flush queues, to unblock waiting workers. for _, flushQueue := range i.flushQueues { @@ -118,12 +132,13 @@ func (i *Ingester) Flush() { } i.flushQueuesDone.Wait() + } // FlushHandler triggers a flush of all in memory chunks. Mainly used for // local testing. func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) { - i.sweepUsers(true) + i.sweepUsers(true, true) w.WriteHeader(http.StatusNoContent) } @@ -143,21 +158,21 @@ func (o *flushOp) Priority() int64 { } // sweepUsers periodically schedules series for flushing and garbage collects users with no series -func (i *Ingester) sweepUsers(immediate bool) { +func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) { instances := i.getInstances() for _, instance := range instances { - i.sweepInstance(instance, immediate) + i.sweepInstance(instance, immediate, mayRemoveStreams) } } -func (i *Ingester) sweepInstance(instance *instance, immediate bool) { +func (i *Ingester) sweepInstance(instance *instance, immediate, mayRemoveStreams bool) { instance.streamsMtx.Lock() defer instance.streamsMtx.Unlock() for _, stream := range instance.streams { i.sweepStream(instance, stream, immediate) - i.removeFlushedChunks(instance, stream) + i.removeFlushedChunks(instance, stream, mayRemoveStreams) } } @@ -287,23 +302,28 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) { } // must hold streamsMtx -func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { +func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRemoveStream bool) { now := time.Now() stream.chunkMtx.Lock() defer stream.chunkMtx.Unlock() prevNumChunks := len(stream.chunks) + var subtracted int for len(stream.chunks) > 0 { if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod { break } + subtracted += stream.chunks[0].chunk.UncompressedSize() stream.chunks[0].chunk = nil // erase reference so the chunk can be garbage-collected stream.chunks = stream.chunks[1:] } memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks))) - if len(stream.chunks) == 0 { + // Signal how much data has been flushed to lessen any WAL replay pressure. + i.replayController.Sub(int64(subtracted)) + + if mayRemoveStream && len(stream.chunks) == 0 { delete(instance.streamsByFP, stream.fp) delete(instance.streams, stream.labelsString) instance.index.Delete(stream.labels, stream.fp) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4d2fe2a6f9325..4c0d9f56c02de 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -150,6 +150,9 @@ type Ingester struct { // Currently only used by the WAL to signal when the disk is full. flushOnShutdownSwitch *OnceSwitch + // Only used by WAL & flusher to coordinate backpressure during replay. + replayController *replayController + metrics *ingesterMetrics wal WAL @@ -184,6 +187,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid metrics: metrics, flushOnShutdownSwitch: &OnceSwitch{}, } + i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) if cfg.WAL.Enabled { if err := os.MkdirAll(cfg.WAL.Dir, os.ModePerm); err != nil { @@ -214,7 +218,15 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid } func (i *Ingester) starting(ctx context.Context) error { + if i.cfg.WAL.Recover { + // Ignore retain period during wal replay. + old := i.cfg.RetainPeriod + i.cfg.RetainPeriod = 0 + defer func() { + i.cfg.RetainPeriod = old + }() + // Disable the in process stream limit checks while replaying the WAL i.limiter.Disable() defer i.limiter.Enable() @@ -274,11 +286,7 @@ func (i *Ingester) starting(ctx context.Context) error { } - i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) - for j := 0; j < i.cfg.ConcurrentFlushes; j++ { - i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) - go i.flushLoop(j) - } + i.InitFlushQueues() // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done err := i.lifecycler.StartAsync(context.Background()) @@ -349,7 +357,7 @@ func (i *Ingester) loop() { for { select { case <-flushTicker.C: - i.sweepUsers(false) + i.sweepUsers(false, true) case <-i.loopQuit: return diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 61d9a62fd45c5..a2f83e4f5b993 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -164,7 +164,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { continue } - if err := stream.Push(ctx, s.Entries, record); err != nil { + if _, err := stream.Push(ctx, s.Entries, record); err != nil { appendErr = err continue } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index b907612c6f0cb..2942eed517f67 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -22,6 +22,17 @@ type ingesterMetrics struct { recoveredStreamsTotal prometheus.Counter recoveredChunksTotal prometheus.Counter recoveredEntriesTotal prometheus.Counter + recoveredBytesTotal prometheus.Counter + recoveryBytesInUse prometheus.Gauge +} + +// setRecoveryBytesInUse bounds the bytes reports to >= 0. +// TODO(owen-d): we can gain some efficiency by having the flusher never update this after recovery ends. +func (m *ingesterMetrics) setRecoveryBytesInUse(v int64) { + if v < 0 { + v = 0 + } + m.recoveryBytesInUse.Set(float64(v)) } const ( @@ -88,5 +99,13 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { Name: "loki_ingester_wal_recovered_entries_total", Help: "Total number of entries recovered from the WAL.", }), + recoveredBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_recovered_bytes_total", + Help: "Total number of bytes recovered from the WAL.", + }), + recoveryBytesInUse: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_wal_bytes_in_use", + Help: "Total number of bytes in use by the WAL recovery process.", + }), } } diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index fb2b355a687c7..ef5ef062c96d4 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -1,7 +1,6 @@ package ingester import ( - "context" io "io" "runtime" "sync" @@ -12,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/wal" + "golang.org/x/net/context" "github.com/grafana/loki/pkg/logproto" ) @@ -87,7 +87,6 @@ type Recoverer interface { Series(series *Series) error SetStream(userID string, series record.RefSeries) error Push(userID string, entries RefEntries) error - Close() Done() <-chan struct{} } @@ -95,7 +94,8 @@ type ingesterRecoverer struct { // basically map[userID]map[fingerprint]*stream users sync.Map ing *Ingester - done chan struct{} + + done chan struct{} } func newIngesterRecoverer(i *Ingester) *ingesterRecoverer { @@ -109,31 +109,37 @@ func newIngesterRecoverer(i *Ingester) *ingesterRecoverer { func (r *ingesterRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) } func (r *ingesterRecoverer) Series(series *Series) error { - inst := r.ing.getOrCreateInstance(series.UserID) + return r.ing.replayController.WithBackPressure(func() error { - // TODO(owen-d): create another fn to avoid unnecessary label type conversions. - stream, err := inst.getOrCreateStream(logproto.Stream{ - Labels: client.FromLabelAdaptersToLabels(series.Labels).String(), - }, true, nil) + inst := r.ing.getOrCreateInstance(series.UserID) - if err != nil { - return err - } + // TODO(owen-d): create another fn to avoid unnecessary label type conversions. + stream, err := inst.getOrCreateStream(logproto.Stream{ + Labels: client.FromLabelAdaptersToLabels(series.Labels).String(), + }, true, nil) - added, err := stream.setChunks(series.Chunks) - if err != nil { - return err - } - r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks))) - r.ing.metrics.recoveredEntriesTotal.Add(float64(added)) + if err != nil { + return err + } - // now store the stream in the recovery map under the fingerprint originally recorded - // as it's possible the newly mapped fingerprint is different. This is because the WAL records - // will use this original reference. - got, _ := r.users.LoadOrStore(series.UserID, &sync.Map{}) - streamsMap := got.(*sync.Map) - streamsMap.Store(series.Fingerprint, stream) - return nil + bytesAdded, entriesAdded, err := stream.setChunks(series.Chunks) + + if err != nil { + return err + } + r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks))) + r.ing.metrics.recoveredEntriesTotal.Add(float64(entriesAdded)) + r.ing.replayController.Add(int64(bytesAdded)) + + // now store the stream in the recovery map under the fingerprint originally recorded + // as it's possible the newly mapped fingerprint is different. This is because the WAL records + // will use this original reference. + got, _ := r.users.LoadOrStore(series.UserID, &sync.Map{}) + streamsMap := got.(*sync.Map) + streamsMap.Store(series.Fingerprint, stream) + + return nil + }) } // SetStream is responsible for setting the key path for userIDs -> fingerprints -> streams. @@ -170,19 +176,22 @@ func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) er } func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { - out, ok := r.users.Load(userID) - if !ok { - return errors.Errorf("user (%s) not set during WAL replay", userID) - } + return r.ing.replayController.WithBackPressure(func() error { + out, ok := r.users.Load(userID) + if !ok { + return errors.Errorf("user (%s) not set during WAL replay", userID) + } - s, ok := out.(*sync.Map).Load(entries.Ref) - if !ok { - return errors.Errorf("stream (%d) not set during WAL replay for user (%s)", entries.Ref, userID) - } + s, ok := out.(*sync.Map).Load(entries.Ref) + if !ok { + return errors.Errorf("stream (%d) not set during WAL replay for user (%s)", entries.Ref, userID) + } - // ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments) - _ = s.(*stream).Push(context.Background(), entries.Entries, nil) - return nil + // ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments) + bytesAdded, _ := s.(*stream).Push(context.Background(), entries.Entries, nil) + r.ing.replayController.Add(int64(bytesAdded)) + return nil + }) } func (r *ingesterRecoverer) Close() { diff --git a/pkg/ingester/replay_controller.go b/pkg/ingester/replay_controller.go new file mode 100644 index 0000000000000..0b72a618ffcc5 --- /dev/null +++ b/pkg/ingester/replay_controller.go @@ -0,0 +1,104 @@ +package ingester + +import ( + "sync" + + "go.uber.org/atomic" +) + +type replayFlusher struct { + i *Ingester +} + +func (f *replayFlusher) Flush() { + f.i.InitFlushQueues() + f.i.flush(false) // flush data but don't remove streams from the ingesters + + // Similar to sweepUsers with the exception that it will not remove streams + // afterwards to prevent unlinking a stream which may receive later writes from the WAL. + // We have to do this here after the flushQueues have been drained. + instances := f.i.getInstances() + + for _, instance := range instances { + instance.streamsMtx.Lock() + + for _, stream := range instance.streams { + f.i.removeFlushedChunks(instance, stream, false) + } + + instance.streamsMtx.Unlock() + } +} + +type Flusher interface { + Flush() +} + +// replayController handles coordinating backpressure between WAL replays and chunk flushing. +type replayController struct { + cfg WALConfig + metrics *ingesterMetrics + currentBytes atomic.Int64 + cond *sync.Cond + isFlushing atomic.Bool + flusher Flusher +} + +// flusher is expected to reduce pressure via calling Sub +func newReplayController(metrics *ingesterMetrics, cfg WALConfig, flusher Flusher) *replayController { + return &replayController{ + cfg: cfg, + metrics: metrics, + cond: sync.NewCond(&sync.Mutex{}), + flusher: flusher, + } +} + +func (c *replayController) Add(x int64) { + c.metrics.recoveredBytesTotal.Add(float64(x)) + c.metrics.setRecoveryBytesInUse(c.currentBytes.Add(x)) +} + +func (c *replayController) Sub(x int64) { + c.metrics.setRecoveryBytesInUse(c.currentBytes.Sub(x)) + +} + +func (c *replayController) Cur() int { + return int(c.currentBytes.Load()) +} + +func (c *replayController) Flush() { + if c.isFlushing.CAS(false, true) { + c.flusher.Flush() + c.isFlushing.Store(false) + + // Broadcast after lock is acquired to prevent race conditions with cpu scheduling + // where the flush code could finish before the goroutine which initiated it gets to call + // c.cond.Wait() + c.cond.L.Lock() + c.cond.Broadcast() + c.cond.L.Unlock() + } +} + +// WithBackPressure is expected to call replayController.Add in the passed function to increase the managed byte count. +// It will call the function as long as there is expected room before the memory cap and will then flush data intermittently +// when needed. +func (c *replayController) WithBackPressure(fn func() error) error { + // Account for backpressure and wait until there's enough memory to continue replaying the WAL + c.cond.L.Lock() + + // use 90% as a threshold since we'll be adding to it. + for c.Cur() > int(c.cfg.ReplayMemoryCeiling)*9/10 { + // too much backpressure, flush + go c.Flush() + c.cond.Wait() + } + + // Don't hold the lock while executing the provided function. + // This ensures we can run functions concurrently. + c.cond.L.Unlock() + + return fn() +} diff --git a/pkg/ingester/replay_controller_test.go b/pkg/ingester/replay_controller_test.go new file mode 100644 index 0000000000000..b4e1b81af9e1e --- /dev/null +++ b/pkg/ingester/replay_controller_test.go @@ -0,0 +1,77 @@ +package ingester + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type dumbFlusher struct { + onFlush func() +} + +func newDumbFlusher(onFlush func()) *dumbFlusher { + return &dumbFlusher{ + onFlush: onFlush, + } +} + +func (f *dumbFlusher) Flush() { + if f.onFlush != nil { + f.onFlush() + } +} + +func nilMetrics() *ingesterMetrics { return newIngesterMetrics(nil) } + +func TestReplayController(t *testing.T) { + var ops []string + var opLock sync.Mutex + + var rc *replayController + flusher := newDumbFlusher( + func() { + rc.Sub(100) // simulate flushing 100 bytes + opLock.Lock() + defer opLock.Unlock() + ops = append(ops, "Flush") + }, + ) + rc = newReplayController(nilMetrics(), WALConfig{ReplayMemoryCeiling: 100}, flusher) + + var wg sync.WaitGroup + n := 5 + wg.Add(n) + + for i := 0; i < n; i++ { + // In order to prevent all the goroutines from running before they've added bytes + // to the internal count, introduce a brief sleep. + time.Sleep(time.Millisecond) + + // nolint:errcheck,unparam + go rc.WithBackPressure(func() error { + rc.Add(50) + opLock.Lock() + defer opLock.Unlock() + ops = append(ops, "WithBackPressure") + wg.Done() + return nil + }) + } + + wg.Wait() + + expected := []string{ + "WithBackPressure", // add 50, total 50 + "WithBackPressure", // add 50, total 100 + "Flush", // subtract 100, total 0 + "WithBackPressure", // add 50, total 50 + "WithBackPressure", // add 50, total 100 + "Flush", // subtract 100, total 0 + "WithBackPressure", // add 50, total 50 + } + require.Equal(t, expected, ops) + +} diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 2c093ed8f9ad9..dde1c8b8e0c33 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -117,18 +117,19 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { } // setChunks is used during checkpoint recovery -func (s *stream) setChunks(chunks []Chunk) (entriesAdded int, err error) { +func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err error) { s.chunkMtx.Lock() defer s.chunkMtx.Unlock() chks, err := fromWireChunks(s.cfg, chunks) if err != nil { - return 0, err + return 0, 0, err } s.chunks = chks for _, c := range s.chunks { entriesAdded += c.chunk.Size() + bytesAdded += c.chunk.UncompressedSize() } - return entriesAdded, nil + return bytesAdded, entriesAdded, nil } func (s *stream) NewChunk() *chunkenc.MemChunk { @@ -139,9 +140,10 @@ func (s *stream) Push( ctx context.Context, entries []logproto.Entry, record *WALRecord, -) error { +) (int, error) { s.chunkMtx.Lock() defer s.chunkMtx.Unlock() + var bytesAdded int prevNumChunks := len(s.chunks) var lastChunkTimestamp time.Time if prevNumChunks == 0 { @@ -199,6 +201,9 @@ func (s *stream) Push( lastChunkTimestamp = entries[i].Timestamp s.lastLine.ts = lastChunkTimestamp s.lastLine.content = entries[i].Line + + // length of string plus + bytesAdded += len(entries[i].Line) } chunk.lastUpdated = time.Now() } @@ -264,15 +269,15 @@ func (s *stream) Push( fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries)) - return httpgrpc.Errorf(http.StatusBadRequest, buf.String()) + return bytesAdded, httpgrpc.Errorf(http.StatusBadRequest, buf.String()) } - return lastEntryWithErr.e + return bytesAdded, lastEntryWithErr.e } if len(s.chunks) != prevNumChunks { memoryChunks.Add(float64(len(s.chunks) - prevNumChunks)) } - return nil + return bytesAdded, nil } // Returns true, if chunk should be cut before adding new entry. This is done to make ingesters diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index a7a405e5764c5..793f6b4ec8645 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -44,7 +44,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { NilMetrics, ) - err := s.Push(context.Background(), []logproto.Entry{ + _, err := s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(int64(numLogs), 0), Line: "log"}, }, recordPool.GetRecord()) require.NoError(t, err) @@ -65,7 +65,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { fmt.Fprintf(&expected, "total ignored: %d out of %d", numLogs, numLogs) expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String()) - err = s.Push(context.Background(), newLines, recordPool.GetRecord()) + _, err = s.Push(context.Background(), newLines, recordPool.GetRecord()) require.Error(t, err) require.Equal(t, expectErr.Error(), err.Error()) }) @@ -82,7 +82,7 @@ func TestPushDeduplication(t *testing.T) { NilMetrics, ) - err := s.Push(context.Background(), []logproto.Entry{ + written, err := s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, @@ -91,6 +91,7 @@ func TestPushDeduplication(t *testing.T) { require.Len(t, s.chunks, 1) require.Equal(t, s.chunks[0].chunk.Size(), 2, "expected exact duplicate to be dropped and newer content with same timestamp to be appended") + require.Equal(t, len("test"+"newer, better test"), written) } func TestStreamIterator(t *testing.T) { @@ -164,7 +165,8 @@ func Benchmark_PushStream(b *testing.B) { for n := 0; n < b.N; n++ { rec := recordPool.GetRecord() - require.NoError(b, s.Push(ctx, e, rec)) + _, err := s.Push(ctx, e, rec) + require.NoError(b, err) recordPool.PutRecord(rec) } } diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 0cd7fd328d9b5..59bd6037a156b 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wal" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util/flagext" ) var ( @@ -20,13 +21,15 @@ var ( ) const walSegmentSize = wal.DefaultSegmentSize * 4 +const defaultCeiling = 4 << 30 // 4GB type WALConfig struct { - Enabled bool `yaml:"enabled"` - Dir string `yaml:"dir"` - Recover bool `yaml:"recover"` - CheckpointDuration time.Duration `yaml:"checkpoint_duration"` - FlushOnShutdown bool `yaml:"flush_on_shutdown"` + Enabled bool `yaml:"enabled"` + Dir string `yaml:"dir"` + Recover bool `yaml:"recover"` + CheckpointDuration time.Duration `yaml:"checkpoint_duration"` + FlushOnShutdown bool `yaml:"flush_on_shutdown"` + ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"` } func (cfg *WALConfig) Validate() error { @@ -43,6 +46,10 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.") f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 5*time.Minute, "Interval at which checkpoints should be created.") f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown.") + + // Need to set default here + cfg.ReplayMemoryCeiling = flagext.ByteSize(defaultCeiling) + f.Var(&cfg.ReplayMemoryCeiling, "ingester.wal-replay-memory-ceiling", "How much memory the WAL may use during replay before it needs to flush chunks to storage, i.e. 10GB. We suggest setting this to a high percentage (~75%) of available memory.") } // WAL interface allows us to have a no-op WAL when the WAL is disabled. diff --git a/production/ksonnet/loki/wal.libsonnet b/production/ksonnet/loki/wal.libsonnet index 695cbcf9384a9..d3188c68175a0 100644 --- a/production/ksonnet/loki/wal.libsonnet +++ b/production/ksonnet/loki/wal.libsonnet @@ -13,6 +13,7 @@ enabled: true, dir: '/loki/wal', recover: true, + replay_memory_ceiling: '9GB', // between the requests & limits }, }, }),