Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove flush loop and queue from Ingester RF-1 #13538

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 24 additions & 44 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package ingesterrf1

import (
"crypto/rand"
"errors"
"fmt"
"net/http"
"strconv"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -17,7 +17,6 @@ import (
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand All @@ -38,19 +37,19 @@ const (

// Note: this is called both during the WAL replay (zero or more times)
// and then after replay as well.
func (i *Ingester) InitFlushQueues() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
func (i *Ingester) InitFlushWorkers() {
i.flushWorkersDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueues)
go i.flushLoop(j)
go i.flushWorker(j)
}
}

// Flush implements ring.FlushTransferer
// Flush triggers a flush of all the chunks and closes the flush queues.
// Called from the Lifecycler as part of the ingester shutdown.
func (i *Ingester) Flush() {
i.flush()
i.wal.Close()
i.flushWorkersDone.Wait()
}

// TransferOut implements ring.FlushTransferer
Expand All @@ -60,76 +59,57 @@ func (i *Ingester) TransferOut(_ context.Context) error {
return ring.ErrTransferDisabled
}

func (i *Ingester) flush() {
// TODO: Flush the last chunks
// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}

i.flushQueuesDone.Wait()
level.Debug(i.logger).Log("msg", "flush queues have drained")
}

// FlushHandler triggers a flush of all in memory chunks. Mainly used for
// local testing.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

type flushOp struct {
it *wal.PendingItem
num int64
}

func (o *flushOp) Key() string {
return strconv.Itoa(int(o.num))
}

func (o *flushOp) Priority() int64 {
return -o.num
}

func (i *Ingester) flushLoop(j int) {
l := log.With(i.logger, "loop", j)
func (i *Ingester) flushWorker(j int) {
l := log.With(i.logger, "worker", j)
defer func() {
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
level.Debug(l).Log("msg", "Ingester.flushWorker() exited")
i.flushWorkersDone.Done()
}()

for {
o := i.flushQueues[j].Dequeue()
if o == nil {
it, err := i.wal.NextPending()
if errors.Is(err, wal.ErrClosed) {
return
}
op := o.(*flushOp)

if it == nil {
// TODO: Do something more clever here instead.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was concerned this would increase CPU usage, but having tested it in dev it doesn't have much effect at all.

Copy link
Contributor

@cyriltovena cyriltovena Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think next pending should be blocking instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it too! I'll take a look at how to use sync.Cond to achieve this.

time.Sleep(100 * time.Millisecond)
continue
}

start := time.Now()

// We'll use this to log the size of the segment that was flushed.
n := op.it.Writer.InputSize()
n := it.Writer.InputSize()
humanized := humanize.Bytes(uint64(n))

err := i.flushOp(l, op)
err = i.flushItem(l, it)
d := time.Since(start)
if err != nil {
level.Error(l).Log("msg", "failed to flush", "size", humanized, "duration", d, "err", err)
} else {
level.Debug(l).Log("msg", "flushed", "size", humanized, "duration", d)
}

op.it.Result.SetDone(err)
i.wal.Put(op.it)
it.Result.SetDone(err)
i.wal.Put(it)
}
}

func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
func (i *Ingester) flushItem(l log.Logger, it *wal.PendingItem) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushSegment(ctx, op.it.Writer)
err := i.flushSegment(ctx, it.Writer)
if err == nil {
break
}
Expand Down
94 changes: 13 additions & 81 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -202,15 +201,11 @@ type Ingester struct {
store Storage
periodicConfigs []config.PeriodConfig

loopDone sync.WaitGroup
loopQuit chan struct{}
tailersQuit chan struct{}

// One queue per flush thread. Fingerprint is used to
// pick a queue.
numOps int64
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup
flushWorkersDone sync.WaitGroup

wal *wal.Manager

Expand Down Expand Up @@ -270,17 +265,16 @@ func New(cfg Config, clientConfig client.Config,
}

i := &Ingester{
cfg: cfg,
logger: logger,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: storage,
periodicConfigs: periodConfigs,
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
cfg: cfg,
logger: logger,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: storage,
periodicConfigs: periodConfigs,
flushWorkersDone: sync.WaitGroup{},
tailersQuit: make(chan struct{}),
metrics: metrics,
// flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
Expand Down Expand Up @@ -401,7 +395,7 @@ func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (i *Ingester) starting(ctx context.Context) error {
i.InitFlushQueues()
i.InitFlushWorkers()

// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err := i.lifecycler.StartAsync(context.Background())
Expand Down Expand Up @@ -435,9 +429,6 @@ func (i *Ingester) starting(ctx context.Context) error {
return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err)
}

// start our loop
i.loopDone.Add(1)
go i.loop()
return nil
}

Expand All @@ -457,8 +448,6 @@ func (i *Ingester) running(ctx context.Context) error {
// instance.closeTailers()
//}

close(i.loopQuit)
i.loopDone.Wait()
return serviceError
}

Expand All @@ -474,10 +463,7 @@ func (i *Ingester) stopping(_ error) error {
//}
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))

for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}
i.flushQueuesDone.Wait()
i.flushWorkersDone.Wait()

// i.streamRateCalculator.Stop()

Expand Down Expand Up @@ -518,60 +504,6 @@ func (i *Ingester) removeShutdownMarkerFile() {
}
}

func (i *Ingester) loop() {
defer i.loopDone.Done()

// Delay the first flush operation by up to 0.8x the flush time period.
// This will ensure that multiple ingesters started at the same time do not
// flush at the same time. Flushing at the same time can cause concurrently
// writing the same chunk to object storage, which in AWS S3 leads to being
// rate limited.
jitter := time.Duration(rand.Int63n(int64(float64(i.cfg.FlushCheckPeriod.Nanoseconds()) * 0.8)))
initialDelay := time.NewTimer(jitter)
defer initialDelay.Stop()

level.Info(i.logger).Log("msg", "sleeping for initial delay before starting periodic flushing", "delay", jitter)

select {
case <-initialDelay.C:
// do nothing and continue with flush loop
case <-i.loopQuit:
// ingester stopped while waiting for initial delay
return
}

// Add +/- 20% of flush interval as jitter.
// The default flush check period is 30s so max jitter will be 6s.
j := i.cfg.FlushCheckPeriod / 5
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

for {
select {
case <-flushTicker.C:
i.doFlushTick()
case <-i.loopQuit:
return
}
}
}

func (i *Ingester) doFlushTick() {
for {
// Keep adding ops to the queue until there are no more.
it := i.wal.NextPending()
if it == nil {
break
}
i.numOps++
flushQueueIndex := i.numOps % int64(i.cfg.ConcurrentFlushes)
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{
num: i.numOps,
it: it,
})
}
}

// PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.
//
// Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received.
Expand Down
41 changes: 33 additions & 8 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const (
)

var (
// ErrClosed is returned when the WAL is closed.
ErrClosed = errors.New("WAL is closed")

// ErrFull is returned when an append fails because the WAL is full. This
// happens when all segments are either in the pending list waiting to be
// flushed, or in the process of being flushed.
Expand Down Expand Up @@ -111,9 +114,10 @@ type Manager struct {
// pending is a list of segments that are waiting to be flushed. Once
// flushed, the segment is reset and moved to the back of the available
// list to accept writes again.
pending *list.List
shutdown chan struct{}
mu sync.Mutex
pending *list.List

closed bool
mu sync.Mutex
}

// item is similar to PendingItem, but it is an internal struct used in the
Expand Down Expand Up @@ -141,7 +145,6 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
metrics: metrics.ManagerMetrics,
available: list.New(),
pending: list.New(),
shutdown: make(chan struct{}),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed as it wasn't used.

}
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
Expand All @@ -162,6 +165,9 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.closed {
return nil, ErrClosed
}
if m.available.Len() == 0 {
return nil, ErrFull
}
Expand All @@ -185,9 +191,25 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
return it.r, nil
}

func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()
m.closed = true
if m.available.Len() > 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicate code. I will refactor it in a follow up PR. But I wanted to show how it works here.

el := m.available.Front()
it := el.Value.(*item)
if it.w.InputSize() > 0 {
m.pending.PushBack(it)
m.metrics.NumPending.Inc()
m.available.Remove(el)
m.metrics.NumAvailable.Dec()
}
}
}

// NextPending returns the next segment to be flushed. It returns nil if the
// pending list is empty.
func (m *Manager) NextPending() *PendingItem {
// pending list is empty, and ErrClosed if the WAL is closed.
func (m *Manager) NextPending() (*PendingItem, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.pending.Len() == 0 {
Expand All @@ -205,15 +227,18 @@ func (m *Manager) NextPending() *PendingItem {
}
// If the pending list is still empty return nil.
if m.pending.Len() == 0 {
return nil
if m.closed {
return nil, ErrClosed
}
return nil, nil
}
}
el := m.pending.Front()
it := el.Value.(*item)
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingItem{Result: it.r, Writer: it.w}
return &PendingItem{Result: it.r, Writer: it.w}, nil
}

// Put resets the segment and puts it back in the available list to accept
Expand Down
Loading
Loading