Skip to content

Commit

Permalink
feat: Remove flush loop and queue from Ingester RF-1
Browse files Browse the repository at this point in the history
This commit removes the flush loop and flush queue from Ingester RF-1.
This code is from the original ingester code, and is no longer needed
since we have the WAL Manager.
  • Loading branch information
grobinson-grafana committed Jul 16, 2024
1 parent 3ac130b commit 414c32e
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 140 deletions.
68 changes: 24 additions & 44 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package ingesterrf1

import (
"crypto/rand"
"errors"
"fmt"
"net/http"
"strconv"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -17,7 +17,6 @@ import (
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand All @@ -38,19 +37,19 @@ const (

// Note: this is called both during the WAL replay (zero or more times)
// and then after replay as well.
func (i *Ingester) InitFlushQueues() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
func (i *Ingester) InitFlushWorkers() {
i.flushWorkersDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueues)
go i.flushLoop(j)
go i.flushWorker(j)
}
}

// Flush implements ring.FlushTransferer
// Flush triggers a flush of all the chunks and closes the flush queues.
// Called from the Lifecycler as part of the ingester shutdown.
func (i *Ingester) Flush() {
i.flush()
i.wal.Close()
i.flushWorkersDone.Wait()
}

// TransferOut implements ring.FlushTransferer
Expand All @@ -60,76 +59,57 @@ func (i *Ingester) TransferOut(_ context.Context) error {
return ring.ErrTransferDisabled
}

func (i *Ingester) flush() {
// TODO: Flush the last chunks
// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}

i.flushQueuesDone.Wait()
level.Debug(i.logger).Log("msg", "flush queues have drained")
}

// FlushHandler triggers a flush of all in memory chunks. Mainly used for
// local testing.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

type flushOp struct {
it *wal.PendingItem
num int64
}

func (o *flushOp) Key() string {
return strconv.Itoa(int(o.num))
}

func (o *flushOp) Priority() int64 {
return -o.num
}

func (i *Ingester) flushLoop(j int) {
l := log.With(i.logger, "loop", j)
func (i *Ingester) flushWorker(j int) {
l := log.With(i.logger, "worker", j)
defer func() {
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
level.Debug(l).Log("msg", "Ingester.flushWorker() exited")
i.flushWorkersDone.Done()
}()

for {
o := i.flushQueues[j].Dequeue()
if o == nil {
it, err := i.wal.NextPending()
if errors.Is(err, wal.ErrClosed) {
return
}
op := o.(*flushOp)

if it == nil {
// TODO: Do something more clever here instead.
time.Sleep(100 * time.Millisecond)
continue
}

start := time.Now()

// We'll use this to log the size of the segment that was flushed.
n := op.it.Writer.InputSize()
n := it.Writer.InputSize()
humanized := humanize.Bytes(uint64(n))

err := i.flushOp(l, op)
err = i.flushItem(l, it)
d := time.Since(start)
if err != nil {
level.Error(l).Log("msg", "failed to flush", "size", humanized, "duration", d, "err", err)
} else {
level.Debug(l).Log("msg", "flushed", "size", humanized, "duration", d)
}

op.it.Result.SetDone(err)
i.wal.Put(op.it)
it.Result.SetDone(err)
i.wal.Put(it)
}
}

func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
func (i *Ingester) flushItem(l log.Logger, it *wal.PendingItem) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushSegment(ctx, op.it.Writer)
err := i.flushSegment(ctx, it.Writer)
if err == nil {
break
}
Expand Down
94 changes: 13 additions & 81 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -202,15 +201,11 @@ type Ingester struct {
store Storage
periodicConfigs []config.PeriodConfig

loopDone sync.WaitGroup
loopQuit chan struct{}
tailersQuit chan struct{}

// One queue per flush thread. Fingerprint is used to
// pick a queue.
numOps int64
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup
flushWorkersDone sync.WaitGroup

wal *wal.Manager

Expand Down Expand Up @@ -270,17 +265,16 @@ func New(cfg Config, clientConfig client.Config,
}

i := &Ingester{
cfg: cfg,
logger: logger,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: storage,
periodicConfigs: periodConfigs,
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
cfg: cfg,
logger: logger,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: storage,
periodicConfigs: periodConfigs,
flushWorkersDone: sync.WaitGroup{},
tailersQuit: make(chan struct{}),
metrics: metrics,
// flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
Expand Down Expand Up @@ -401,7 +395,7 @@ func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (i *Ingester) starting(ctx context.Context) error {
i.InitFlushQueues()
i.InitFlushWorkers()

// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err := i.lifecycler.StartAsync(context.Background())
Expand Down Expand Up @@ -435,9 +429,6 @@ func (i *Ingester) starting(ctx context.Context) error {
return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err)
}

// start our loop
i.loopDone.Add(1)
go i.loop()
return nil
}

Expand All @@ -457,8 +448,6 @@ func (i *Ingester) running(ctx context.Context) error {
// instance.closeTailers()
//}

close(i.loopQuit)
i.loopDone.Wait()
return serviceError
}

Expand All @@ -474,10 +463,7 @@ func (i *Ingester) stopping(_ error) error {
//}
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))

for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}
i.flushQueuesDone.Wait()
i.flushWorkersDone.Wait()

// i.streamRateCalculator.Stop()

Expand Down Expand Up @@ -518,60 +504,6 @@ func (i *Ingester) removeShutdownMarkerFile() {
}
}

func (i *Ingester) loop() {
defer i.loopDone.Done()

// Delay the first flush operation by up to 0.8x the flush time period.
// This will ensure that multiple ingesters started at the same time do not
// flush at the same time. Flushing at the same time can cause concurrently
// writing the same chunk to object storage, which in AWS S3 leads to being
// rate limited.
jitter := time.Duration(rand.Int63n(int64(float64(i.cfg.FlushCheckPeriod.Nanoseconds()) * 0.8)))
initialDelay := time.NewTimer(jitter)
defer initialDelay.Stop()

level.Info(i.logger).Log("msg", "sleeping for initial delay before starting periodic flushing", "delay", jitter)

select {
case <-initialDelay.C:
// do nothing and continue with flush loop
case <-i.loopQuit:
// ingester stopped while waiting for initial delay
return
}

// Add +/- 20% of flush interval as jitter.
// The default flush check period is 30s so max jitter will be 6s.
j := i.cfg.FlushCheckPeriod / 5
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

for {
select {
case <-flushTicker.C:
i.doFlushTick()
case <-i.loopQuit:
return
}
}
}

func (i *Ingester) doFlushTick() {
for {
// Keep adding ops to the queue until there are no more.
it := i.wal.NextPending()
if it == nil {
break
}
i.numOps++
flushQueueIndex := i.numOps % int64(i.cfg.ConcurrentFlushes)
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{
num: i.numOps,
it: it,
})
}
}

// PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.
//
// Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received.
Expand Down
41 changes: 33 additions & 8 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const (
)

var (
// ErrClosed is returned when the WAL is closed.
ErrClosed = errors.New("WAL is closed")

// ErrFull is returned when an append fails because the WAL is full. This
// happens when all segments are either in the pending list waiting to be
// flushed, or in the process of being flushed.
Expand Down Expand Up @@ -111,9 +114,10 @@ type Manager struct {
// pending is a list of segments that are waiting to be flushed. Once
// flushed, the segment is reset and moved to the back of the available
// list to accept writes again.
pending *list.List
shutdown chan struct{}
mu sync.Mutex
pending *list.List

closed bool
mu sync.Mutex
}

// item is similar to PendingItem, but it is an internal struct used in the
Expand Down Expand Up @@ -141,7 +145,6 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
metrics: metrics.ManagerMetrics,
available: list.New(),
pending: list.New(),
shutdown: make(chan struct{}),
}
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
Expand All @@ -162,6 +165,9 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.closed {
return nil, ErrClosed
}
if m.available.Len() == 0 {
return nil, ErrFull
}
Expand All @@ -185,9 +191,25 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
return it.r, nil
}

func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()
m.closed = true
if m.available.Len() > 0 {
el := m.available.Front()
it := el.Value.(*item)
if it.w.InputSize() >= 0 {
m.pending.PushBack(it)
m.metrics.NumPending.Inc()
m.available.Remove(el)
m.metrics.NumAvailable.Dec()
}
}
}

// NextPending returns the next segment to be flushed. It returns nil if the
// pending list is empty.
func (m *Manager) NextPending() *PendingItem {
// pending list is empty, and ErrClosed if the WAL is closed.
func (m *Manager) NextPending() (*PendingItem, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.pending.Len() == 0 {
Expand All @@ -205,15 +227,18 @@ func (m *Manager) NextPending() *PendingItem {
}
// If the pending list is still empty return nil.
if m.pending.Len() == 0 {
return nil
if m.closed {
return nil, ErrClosed
}
return nil, nil
}
}
el := m.pending.Front()
it := el.Value.(*item)
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingItem{Result: it.r, Writer: it.w}
return &PendingItem{Result: it.r, Writer: it.w}, nil
}

// Put resets the segment and puts it back in the available list to accept
Expand Down
Loading

0 comments on commit 414c32e

Please sign in to comment.