diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index f577d0b8851f6..4ab36713d3920 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -2,9 +2,9 @@ package ingesterrf1 import ( "crypto/rand" + "errors" "fmt" "net/http" - "strconv" "time" "github.com/dustin/go-humanize" @@ -17,7 +17,6 @@ import ( "golang.org/x/net/context" "github.com/grafana/loki/v3/pkg/storage/wal" - "github.com/grafana/loki/v3/pkg/util" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -38,11 +37,10 @@ const ( // Note: this is called both during the WAL replay (zero or more times) // and then after replay as well. -func (i *Ingester) InitFlushQueues() { - i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) +func (i *Ingester) InitFlushWorkers() { + i.flushWorkersDone.Add(i.cfg.ConcurrentFlushes) for j := 0; j < i.cfg.ConcurrentFlushes; j++ { - i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueues) - go i.flushLoop(j) + go i.flushWorker(j) } } @@ -50,7 +48,8 @@ func (i *Ingester) InitFlushQueues() { // Flush triggers a flush of all the chunks and closes the flush queues. // Called from the Lifecycler as part of the ingester shutdown. func (i *Ingester) Flush() { - i.flush() + i.wal.Close() + i.flushWorkersDone.Wait() } // TransferOut implements ring.FlushTransferer @@ -60,57 +59,38 @@ func (i *Ingester) TransferOut(_ context.Context) error { return ring.ErrTransferDisabled } -func (i *Ingester) flush() { - // TODO: Flush the last chunks - // Close the flush queues, to unblock waiting workers. - for _, flushQueue := range i.flushQueues { - flushQueue.Close() - } - - i.flushQueuesDone.Wait() - level.Debug(i.logger).Log("msg", "flush queues have drained") -} - // FlushHandler triggers a flush of all in memory chunks. Mainly used for // local testing. func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNoContent) } -type flushOp struct { - it *wal.PendingItem - num int64 -} - -func (o *flushOp) Key() string { - return strconv.Itoa(int(o.num)) -} - -func (o *flushOp) Priority() int64 { - return -o.num -} - -func (i *Ingester) flushLoop(j int) { - l := log.With(i.logger, "loop", j) +func (i *Ingester) flushWorker(j int) { + l := log.With(i.logger, "worker", j) defer func() { - level.Debug(l).Log("msg", "Ingester.flushLoop() exited") - i.flushQueuesDone.Done() + level.Debug(l).Log("msg", "Ingester.flushWorker() exited") + i.flushWorkersDone.Done() }() for { - o := i.flushQueues[j].Dequeue() - if o == nil { + it, err := i.wal.NextPending() + if errors.Is(err, wal.ErrClosed) { return } - op := o.(*flushOp) + + if it == nil { + // TODO: Do something more clever here instead. + time.Sleep(100 * time.Millisecond) + continue + } start := time.Now() // We'll use this to log the size of the segment that was flushed. - n := op.it.Writer.InputSize() + n := it.Writer.InputSize() humanized := humanize.Bytes(uint64(n)) - err := i.flushOp(l, op) + err = i.flushItem(l, it) d := time.Since(start) if err != nil { level.Error(l).Log("msg", "failed to flush", "size", humanized, "duration", d, "err", err) @@ -118,18 +98,18 @@ func (i *Ingester) flushLoop(j int) { level.Debug(l).Log("msg", "flushed", "size", humanized, "duration", d) } - op.it.Result.SetDone(err) - i.wal.Put(op.it) + it.Result.SetDone(err) + i.wal.Put(it) } } -func (i *Ingester) flushOp(l log.Logger, op *flushOp) error { +func (i *Ingester) flushItem(l log.Logger, it *wal.PendingItem) error { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() b := backoff.New(ctx, i.cfg.FlushOpBackoff) for b.Ongoing() { - err := i.flushSegment(ctx, op.it.Writer) + err := i.flushSegment(ctx, it.Writer) if err == nil { break } diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index 3df47c598c133..c3e6afd052d5b 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -5,7 +5,6 @@ import ( "flag" "fmt" "io" - "math/rand" "net/http" "os" "path" @@ -202,15 +201,11 @@ type Ingester struct { store Storage periodicConfigs []config.PeriodConfig - loopDone sync.WaitGroup - loopQuit chan struct{} tailersQuit chan struct{} // One queue per flush thread. Fingerprint is used to // pick a queue. - numOps int64 - flushQueues []*util.PriorityQueue - flushQueuesDone sync.WaitGroup + flushWorkersDone sync.WaitGroup wal *wal.Manager @@ -270,17 +265,16 @@ func New(cfg Config, clientConfig client.Config, } i := &Ingester{ - cfg: cfg, - logger: logger, - clientConfig: clientConfig, - tenantConfigs: configs, - instances: map[string]*instance{}, - store: storage, - periodicConfigs: periodConfigs, - loopQuit: make(chan struct{}), - flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), - tailersQuit: make(chan struct{}), - metrics: metrics, + cfg: cfg, + logger: logger, + clientConfig: clientConfig, + tenantConfigs: configs, + instances: map[string]*instance{}, + store: storage, + periodicConfigs: periodConfigs, + flushWorkersDone: sync.WaitGroup{}, + tailersQuit: make(chan struct{}), + metrics: metrics, // flushOnShutdownSwitch: &OnceSwitch{}, terminateOnShutdown: false, streamRateCalculator: NewStreamRateCalculator(), @@ -401,7 +395,7 @@ func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (i *Ingester) starting(ctx context.Context) error { - i.InitFlushQueues() + i.InitFlushWorkers() // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done err := i.lifecycler.StartAsync(context.Background()) @@ -435,9 +429,6 @@ func (i *Ingester) starting(ctx context.Context) error { return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err) } - // start our loop - i.loopDone.Add(1) - go i.loop() return nil } @@ -457,8 +448,6 @@ func (i *Ingester) running(ctx context.Context) error { // instance.closeTailers() //} - close(i.loopQuit) - i.loopDone.Wait() return serviceError } @@ -474,10 +463,7 @@ func (i *Ingester) stopping(_ error) error { //} errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler)) - for _, flushQueue := range i.flushQueues { - flushQueue.Close() - } - i.flushQueuesDone.Wait() + i.flushWorkersDone.Wait() // i.streamRateCalculator.Stop() @@ -518,60 +504,6 @@ func (i *Ingester) removeShutdownMarkerFile() { } } -func (i *Ingester) loop() { - defer i.loopDone.Done() - - // Delay the first flush operation by up to 0.8x the flush time period. - // This will ensure that multiple ingesters started at the same time do not - // flush at the same time. Flushing at the same time can cause concurrently - // writing the same chunk to object storage, which in AWS S3 leads to being - // rate limited. - jitter := time.Duration(rand.Int63n(int64(float64(i.cfg.FlushCheckPeriod.Nanoseconds()) * 0.8))) - initialDelay := time.NewTimer(jitter) - defer initialDelay.Stop() - - level.Info(i.logger).Log("msg", "sleeping for initial delay before starting periodic flushing", "delay", jitter) - - select { - case <-initialDelay.C: - // do nothing and continue with flush loop - case <-i.loopQuit: - // ingester stopped while waiting for initial delay - return - } - - // Add +/- 20% of flush interval as jitter. - // The default flush check period is 30s so max jitter will be 6s. - j := i.cfg.FlushCheckPeriod / 5 - flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j) - defer flushTicker.Stop() - - for { - select { - case <-flushTicker.C: - i.doFlushTick() - case <-i.loopQuit: - return - } - } -} - -func (i *Ingester) doFlushTick() { - for { - // Keep adding ops to the queue until there are no more. - it := i.wal.NextPending() - if it == nil { - break - } - i.numOps++ - flushQueueIndex := i.numOps % int64(i.cfg.ConcurrentFlushes) - i.flushQueues[flushQueueIndex].Enqueue(&flushOp{ - num: i.numOps, - it: it, - }) - } -} - // PrepareShutdown will handle the /ingester/prepare_shutdown endpoint. // // Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received. diff --git a/pkg/storage/wal/manager.go b/pkg/storage/wal/manager.go index d1359f80601eb..cfc89605cb7c3 100644 --- a/pkg/storage/wal/manager.go +++ b/pkg/storage/wal/manager.go @@ -25,6 +25,9 @@ const ( ) var ( + // ErrClosed is returned when the WAL is closed. + ErrClosed = errors.New("WAL is closed") + // ErrFull is returned when an append fails because the WAL is full. This // happens when all segments are either in the pending list waiting to be // flushed, or in the process of being flushed. @@ -111,9 +114,10 @@ type Manager struct { // pending is a list of segments that are waiting to be flushed. Once // flushed, the segment is reset and moved to the back of the available // list to accept writes again. - pending *list.List - shutdown chan struct{} - mu sync.Mutex + pending *list.List + + closed bool + mu sync.Mutex } // item is similar to PendingItem, but it is an internal struct used in the @@ -141,7 +145,6 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) { metrics: metrics.ManagerMetrics, available: list.New(), pending: list.New(), - shutdown: make(chan struct{}), } m.metrics.NumPending.Set(0) m.metrics.NumFlushing.Set(0) @@ -162,6 +165,9 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) { func (m *Manager) Append(r AppendRequest) (*AppendResult, error) { m.mu.Lock() defer m.mu.Unlock() + if m.closed { + return nil, ErrClosed + } if m.available.Len() == 0 { return nil, ErrFull } @@ -185,9 +191,25 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) { return it.r, nil } +func (m *Manager) Close() { + m.mu.Lock() + defer m.mu.Unlock() + m.closed = true + if m.available.Len() > 0 { + el := m.available.Front() + it := el.Value.(*item) + if it.w.InputSize() > 0 { + m.pending.PushBack(it) + m.metrics.NumPending.Inc() + m.available.Remove(el) + m.metrics.NumAvailable.Dec() + } + } +} + // NextPending returns the next segment to be flushed. It returns nil if the -// pending list is empty. -func (m *Manager) NextPending() *PendingItem { +// pending list is empty, and ErrClosed if the WAL is closed. +func (m *Manager) NextPending() (*PendingItem, error) { m.mu.Lock() defer m.mu.Unlock() if m.pending.Len() == 0 { @@ -205,7 +227,10 @@ func (m *Manager) NextPending() *PendingItem { } // If the pending list is still empty return nil. if m.pending.Len() == 0 { - return nil + if m.closed { + return nil, ErrClosed + } + return nil, nil } } el := m.pending.Front() @@ -213,7 +238,7 @@ func (m *Manager) NextPending() *PendingItem { m.pending.Remove(el) m.metrics.NumPending.Dec() m.metrics.NumFlushing.Inc() - return &PendingItem{Result: it.r, Writer: it.w} + return &PendingItem{Result: it.r, Writer: it.w}, nil } // Put resets the segment and puts it back in the available list to accept diff --git a/pkg/storage/wal/manager_test.go b/pkg/storage/wal/manager_test.go index 64c1056b395ec..f5fee51ff1666 100644 --- a/pkg/storage/wal/manager_test.go +++ b/pkg/storage/wal/manager_test.go @@ -171,6 +171,40 @@ func TestManager_AppendMaxSize(t *testing.T) { require.Equal(t, 1, m.pending.Len()) } +func TestManager_AppendWALClosed(t *testing.T) { + m, err := NewManager(Config{ + MaxAge: 30 * time.Second, + MaxSegments: 10, + MaxSegmentSize: 1024, // 1KB + }, NewMetrics(nil)) + require.NoError(t, err) + + // Append some data. + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} + res, err := m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // Close the WAL. + m.Close() + + // Should not be able to append more data as the WAL is closed. + res, err = m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.Nil(t, res) + require.ErrorIs(t, err, ErrClosed) +} + func TestManager_AppendWALFull(t *testing.T) { m, err := NewManager(Config{ MaxAge: 30 * time.Second, @@ -216,7 +250,8 @@ func TestManager_NextPending(t *testing.T) { // There should be no segments waiting to be flushed as no data has been // written. - it := m.NextPending() + it, err := m.NextPending() + require.NoError(t, err) require.Nil(t, it) // Append 1KB of data. @@ -231,11 +266,54 @@ func TestManager_NextPending(t *testing.T) { require.NoError(t, err) // There should be a segment waiting to be flushed. - it = m.NextPending() + it, err = m.NextPending() + require.NoError(t, err) require.NotNil(t, it) // There should be no more segments waiting to be flushed. - it = m.NextPending() + it, err = m.NextPending() + require.NoError(t, err) + require.Nil(t, it) +} + +func TestManager_NextPendingClosed(t *testing.T) { + m, err := NewManager(Config{ + MaxAge: 30 * time.Second, + MaxSegments: 10, + MaxSegmentSize: 1024, // 1KB + }, NewMetrics(nil)) + require.NoError(t, err) + + // Append some data. + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} + res, err := m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // There should be no segments waiting to be flushed as neither the maximum + // age nor maximum size has been exceeded. + it, err := m.NextPending() + require.NoError(t, err) + require.Nil(t, it) + + // Close the WAL. + m.Close() + + // There should be one segment waiting to be flushed. + it, err = m.NextPending() + require.NoError(t, err) + require.NotNil(t, it) + + // There are no more segments waiting to be flushed, and since the WAL is + // closed, successive calls should return ErrClosed. + it, err = m.NextPending() + require.ErrorIs(t, err, ErrClosed) require.Nil(t, it) } @@ -261,7 +339,8 @@ func TestManager_NexPendingMaxAge(t *testing.T) { // The segment that was just appended to has neither reached the maximum // age nor maximum size to be flushed. - it := m.NextPending() + it, err := m.NextPending() + require.NoError(t, err) require.Nil(t, it) require.Equal(t, 1, m.available.Len()) require.Equal(t, 0, m.pending.Len()) @@ -269,7 +348,8 @@ func TestManager_NexPendingMaxAge(t *testing.T) { // Wait 100ms. The segment that was just appended to should have reached // the maximum age. time.Sleep(100 * time.Millisecond) - it = m.NextPending() + it, err = m.NextPending() + require.NoError(t, err) require.NotNil(t, it) require.Equal(t, 0, m.available.Len()) require.Equal(t, 0, m.pending.Len()) @@ -304,7 +384,8 @@ func TestManager_Put(t *testing.T) { require.Equal(t, 1, m.pending.Len()) // Getting the pending segment should remove it from the list. - it := m.NextPending() + it, err := m.NextPending() + require.NoError(t, err) require.NotNil(t, it) require.Equal(t, 0, m.available.Len()) require.Equal(t, 0, m.pending.Len()) @@ -373,7 +454,8 @@ wal_segments_pending 1 require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) // Get the segment from the pending list. - it := m.NextPending() + it, err := m.NextPending() + require.NoError(t, err) require.NotNil(t, it) expected = ` # HELP wal_segments_available The number of WAL segments accepting writes.