Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: WAL Manager #13428

Merged
merged 3 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package wal

import (
"container/list"
"errors"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
)

const (
// DefaultMaxAge is the default value for the maximum amount of time a
// segment can can be buffered in memory before it should be flushed.
DefaultMaxAge = 500 * time.Millisecond
// DefaultMaxSegments is the default value for the maximum number of
// segments that can be buffered in memory, including segments waiting to
// be flushed.
DefaultMaxSegments = 10
// DefaultMaxSegmentSize is the default value for the maximum segment size
// (uncompressed).
DefaultMaxSegmentSize = 8 * 1024 * 1024 // 8MB.
)

var (
// ErrFull is returned when an append fails because the WAL is full. This
// happens when all segments are either in the pending list waiting to be
// flushed, or in the process of being flushed.
ErrFull = errors.New("The WAL is full")
)

type AppendRequest struct {
TenantID string
Labels labels.Labels
LabelsStr string
Entries []*logproto.Entry
}

type AppendResult struct {
done chan struct{}
err error
}

// Done returns a channel that is closed when the result of an append is
// available. Err() should be called to check if the operation was successful.
func (p *AppendResult) Done() <-chan struct{} {
return p.done
}

// Err returns a non-nil error if the operation failed, and nil if it was
// successful. It should not be called until Done() is closed to avoid data
// races.
func (p *AppendResult) Err() error {
return p.err
}

// SetDone closes the channel and sets the (optional) error.
func (p *AppendResult) SetDone(err error) {
p.err = err
close(p.done)
}

type Config struct {
// MaxAge is the maximum amount of time a segment can be buffered in memory
// before it is moved to the pending list to be flushed. Increasing MaxAge
// allows more time for a segment to grow to MaxSegmentSize, but may increase
// latency if the write volume is too small.
MaxAge time.Duration

// MaxSegments is the maximum number of segments that can be buffered in
// memory. Increasing MaxSegments allows for large bursts of writes to be
// buffered in memory, but may increase latency if the write volume exceeds
// the rate at which segments can be flushed.
MaxSegments int64

// MaxSegmentSize is the maximum size (uncompressed) of a segment. It is
// not a strict limit, and segments can exceed the maximum size when
// individual appends are larger than the remaining capacity.
MaxSegmentSize int64
}

// Manager buffers segments in memory, and keeps track of which segments are
// available and which are waiting to be flushed. The maximum number of
// segments that can be buffered in memory, and their maximum age and maximum
// size before being flushed are configured when creating the Manager.
//
// By buffering segments in memory, the WAL can tolerate bursts of append
// requests that arrive faster than can be flushed. The amount of data that can
// be buffered is configured using MaxSegments and MaxSegmentSize. You must use
// caution when configuring these to avoid excessive latency.
//
// The WAL is full when all segments are waiting to be flushed or in the process
// of being flushed. When the WAL is full, subsequent appends fail with ErrFull.
// It is not permitted to append more data until another segment has been flushed
// and returned to the available list. This allows the manager to apply back-pressure
// and avoid congestion collapse due to excessive timeouts and retries.
type Manager struct {
cfg Config

// available is a list of segments that are available and accepting data.
// All segments other than the segment at the front of the list are empty,
// and only the segment at the front of the list is written to. When this
// segment has exceeded its maximum age or maximum size it is moved to the
// pending list to be flushed, and the next segment in the available list
// takes its place.
available *list.List

// pending is a list of segments that are waiting to be flushed. Once
// flushed, the segment is reset and moved to the back of the available
// list to accept writes again.
pending *list.List
shutdown chan struct{}
mu sync.Mutex
}

// item is similar to PendingItem, but it is an internal struct used in the
// available and pending lists. It contains a single-use result that is returned
// to callers of Manager.Append() and a re-usable segment that is reset after
// each flush.
type item struct {
r *AppendResult
w *SegmentWriter
// firstAppendedAt is the time of the first append to the segment, and is
// used to know when the segment has exceeded the maximum age and should
// be moved to the pending list. It is reset after each flush.
firstAppendedAt time.Time
}

// PendingItem contains a result and the segment to be flushed. ClosedWriters
// are to be returned following a flush so the segment can be re-used.
type PendingItem struct {
Result *AppendResult
Writer *SegmentWriter
}

func NewManager(cfg Config) (*Manager, error) {
m := Manager{
cfg: cfg,
available: list.New(),
pending: list.New(),
shutdown: make(chan struct{}),
}
for i := int64(0); i < cfg.MaxSegments; i++ {
w, err := NewWalSegmentWriter()
if err != nil {
return nil, err
}
m.available.PushBack(&item{
r: &AppendResult{done: make(chan struct{})},
w: w,
})
}
return &m, nil
}

func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
m.mu.Lock()
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
defer m.mu.Unlock()
if m.available.Len() == 0 {
return nil, ErrFull
}
el := m.available.Front()
it := el.Value.(*item)
if it.firstAppendedAt.IsZero() {
it.firstAppendedAt = time.Now()
}
it.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries)
// If the segment exceeded the maximum age or the maximum size, move it to
// the closed list to be flushed.
if time.Since(it.firstAppendedAt) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize {
m.pending.PushBack(it)
m.available.Remove(el)
}
return it.r, nil
}

func (m *Manager) Get() (*PendingItem, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mega nit: Maybe this should be called GetPending or something else?
API Clients only call Append but my mental model is that Get should return a valid WAL, not a closed one.
I don't feel strongly about it though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't have strong opinion on this either. I took the name Get and Put from sync.Pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think on this, because PutPending sounds weird and incorrect.

m.mu.Lock()
defer m.mu.Unlock()
if m.pending.Len() == 0 {
if m.available.Len() > 0 {
// Check if the current segment has exceeded its maximum age and
// should be moved to the pending list.
el := m.available.Front()
it := el.Value.(*item)
if !it.firstAppendedAt.IsZero() && time.Since(it.firstAppendedAt) >= m.cfg.MaxAge {
m.pending.PushBack(it)
m.available.Remove(el)
}
}
// If there are still no pending items, return nil.
if m.pending.Len() == 0 {
return nil, nil
}
}
el := m.pending.Front()
it := el.Value.(*item)
m.pending.Remove(el)
return &PendingItem{Result: it.r, Writer: it.w}, nil
}

func (m *Manager) Put(it *PendingItem) error {
m.mu.Lock()
defer m.mu.Unlock()
it.Writer.Reset()
m.available.PushBack(&item{
r: &AppendResult{done: make(chan struct{})},
w: it.Writer,
})
return nil
}
Loading
Loading