Skip to content

Commit

Permalink
Introduces per stream chunks mutex (#3000)
Browse files Browse the repository at this point in the history
* marshalable chunks

* wal record types custom serialization

* proto types for wal checkpoints

* byteswith output unaffected by buffer

* wal & record pool ifcs

* wal record can hold entries from multiple series

* entry pool

* ingester uses noopWal

* removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding.

* segment writing

* [WIP] wal recovery from segments

* replay uses sync.Maps & preserves WAL fingerprints

* in memory wal recovery

* wal segment recovery

* ingester metrics struct

* wal replay locks streamsMtx in instances, adds checkpoint codec

* ingester metrics

* checkpointer

* WAL checkpoint writer

* checkpointwriter can write multiple checkpoints

* reorgs checkpointing

* wires up checkpointwriter to wal

* ingester SeriesIter impl

* wires up ingesterRecoverer to consume checkpoints

* generic recovery fn

* generic recovery fn

* recover from both wal types

* cleans up old tmp checkpoints & allows aborting in flight checkpoints

* wires up wal checkpointing

* more granular wal logging

* fixes off by 1 wal truncation & removes double logging

* adds userID to wal records correctly

* wire chunk encoding tests

* more granular wal metrics

* checkpoint encoding test

* ignores debug bins

* segment replay ignores out of orders

* fixes bug between WAL reading []byte validity and proto unmarshalling refs

* conf validations, removes comments

* flush on shutdown config

* POST /ingester/shutdown

* renames flush on shutdown

* wal & checkpoint use same segment size

* writes entries to wal regardless of tailers

* makes wal checkpoing duration default to 5m

* recovery metrics

* encodes headchunks separately for wal purposes

* merge upstream

* linting

* adds chunkMtx to ingester streams

* all flush locking locks streams -> chunks in order to prevent deadlocks

* wal integration tests

* addresses pr feedback

uses entry pool in stream push/tailer

removes unnecessary pool interaction

checkpointbytes comment

fillchunk helper, record resetting in tests via pool

redundant comment

defers wg done in recovery

s/num/count/

checkpoint wal uses a logger

encodeWithTypeHeader now creates its own []byte

removes pool from decodeEntries

wal stop can error

* cleans up merge

* lint

* inline functions to simplify unlocking
  • Loading branch information
owen-d authored Nov 27, 2020
1 parent 4d9865a commit 8c1fe88
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 86 deletions.
11 changes: 11 additions & 0 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,19 @@ func (i *ingesterSeriesIter) Iter() <-chan *SeriesWithErr {
})

for _, stream := range streams {
stream.chunkMtx.RLock()
if len(stream.chunks) < 1 {
stream.chunkMtx.RUnlock()
// it's possible the stream has been flushed to storage
// in between starting the checkpointing process and
// checkpointing this stream.
continue
}

// TODO(owen-d): use a pool
chunks, err := toWireChunks(stream.chunks, nil)
stream.chunkMtx.RUnlock()

var s *Series
if err == nil {
s = &Series{
Expand Down
143 changes: 143 additions & 0 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package ingester

import (
"context"
fmt "fmt"
"io/ioutil"
"os"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)

// small util for ensuring data exists as we expect
func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time, i *Ingester) {
result := mockQuerierServer{
ctx: ctx,
}
err := i.Query(&logproto.QueryRequest{
Selector: `{foo="bar"}`,
Limit: 100,
Start: start,
End: end,
}, &result)

ln := int(end.Sub(start) / time.Second)
require.NoError(t, err)
require.Len(t, result.resps, 1)
require.Len(t, result.resps[0].Streams, 2)
require.Len(t, result.resps[0].Streams[0].Entries, ln)
require.Len(t, result.resps[0].Streams[1].Entries, ln)
}

func TestIngesterWAL(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Enabled: true,
Dir: walDir,
Recover: true,
CheckpointDuration: time.Second,
}
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}

start := time.Now()
steps := 10
end := start.Add(time.Second * time.Duration(steps))

for i := 0; i < steps; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
}

ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)

ensureIngesterData(ctx, t, start, end, i)

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// ensure we haven't checkpointed yet
expectCheckpoint(t, walDir, false)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))

// ensure we've recovered data from wal segments
ensureIngesterData(ctx, t, start, end, i)

time.Sleep(ingesterConfig.WAL.CheckpointDuration + time.Second) // give a bit of buffer
// ensure we have checkpointed now
expectCheckpoint(t, walDir, true)

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))

// ensure we've recovered data from checkpoint+wal segments
ensureIngesterData(ctx, t, start, end, i)

}

func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) {
fs, err := ioutil.ReadDir(walDir)
require.Nil(t, err)
var found bool
for _, f := range fs {
if _, err := checkpointIndex(f.Name(), false); err == nil {
found = true
}
}

require.True(t, found == shouldExist)
}
88 changes: 54 additions & 34 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
}
}

// must hold streamsMtx
func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) {
stream.chunkMtx.RLock()
defer stream.chunkMtx.RUnlock()
if len(stream.chunks) == 0 {
return
}
Expand Down Expand Up @@ -214,37 +217,34 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
return nil
}

chunks, labels := i.collectChunksToFlush(instance, fp, immediate)
chunks, labels, chunkMtx := i.collectChunksToFlush(instance, fp, immediate)
if len(chunks) < 1 {
return nil
}

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
err := i.flushChunks(ctx, fp, labels, chunks, &instance.streamsMtx)
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
if err != nil {
return err
}

instance.streamsMtx.Lock()
for _, chunk := range chunks {
chunk.flushed = time.Now()
}
instance.streamsMtx.Unlock()
return nil
}

func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels) {
func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels, *sync.RWMutex) {
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

stream, ok := instance.streamsByFP[fp]
instance.streamsMtx.Unlock()

if !ok {
return nil, nil
return nil, nil, nil
}

var result []*chunkDesc
stream.chunkMtx.Lock()
defer stream.chunkMtx.Unlock()
for j := range stream.chunks {
shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j])
if immediate || shouldFlush {
Expand All @@ -262,7 +262,7 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint
}
}
}
return result, stream.labels
return result, stream.labels, &stream.chunkMtx
}

func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) {
Expand All @@ -285,9 +285,12 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) {
return false, ""
}

// must hold streamsMtx
func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
now := time.Now()

stream.chunkMtx.Lock()
defer stream.chunkMtx.Unlock()
prevNumChunks := len(stream.chunks)
for len(stream.chunks) > 0 {
if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod {
Expand All @@ -308,7 +311,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
}
}

func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, streamsMtx sync.Locker) error {
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
Expand All @@ -319,28 +322,37 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
metric := labelsBuilder.Labels()

wireChunks := make([]chunk.Chunk, 0, len(cs))
for _, c := range cs {
// Ensure that new blocks are cut before flushing as data in the head block is not included otherwise.
if err = c.chunk.Close(); err != nil {
return err
}
firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
c := chunk.NewChunk(
userID, fp, metric,
chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),
firstTime,
lastTime,
)

start := time.Now()
streamsMtx.Lock()
err := c.Encode()
streamsMtx.Unlock()
if err != nil {
return err

// use anonymous function to make lock releasing simpler.
err = func() error {
chunkMtx.Lock()
defer chunkMtx.Unlock()

for _, c := range cs {
// Ensure that new blocks are cut before flushing as data in the head block is not included otherwise.
if err := c.chunk.Close(); err != nil {
return err
}
firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
c := chunk.NewChunk(
userID, fp, metric,
chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),
firstTime,
lastTime,
)

start := time.Now()
if err := c.Encode(); err != nil {
return err
}
chunkEncodeTime.Observe(time.Since(start).Seconds())
wireChunks = append(wireChunks, c)
}
chunkEncodeTime.Observe(time.Since(start).Seconds())
wireChunks = append(wireChunks, c)
return nil
}()

if err != nil {
return err
}

if err := i.store.Put(ctx, wireChunks); err != nil {
Expand All @@ -350,7 +362,15 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
// Record statistics only when actual put request did not return error.
sizePerTenant := chunkSizePerTenant.WithLabelValues(userID)
countPerTenant := chunksPerTenant.WithLabelValues(userID)

chunkMtx.Lock()
defer chunkMtx.Unlock()

for i, wc := range wireChunks {

// flush successful, write while we have lock
cs[i].flushed = time.Now()

numEntries := cs[i].chunk.Size()
byt, err := wc.Encoded()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func defaultIngesterTestConfig(t *testing.T) Config {
cfg.LifecyclerConfig.Addr = "localhost"
cfg.LifecyclerConfig.ID = "localhost"
cfg.LifecyclerConfig.FinalSleep = 0
cfg.LifecyclerConfig.MinReadyDuration = 0
return cfg
}

Expand Down
14 changes: 4 additions & 10 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,10 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
continue
}

prevNumChunks := len(stream.chunks)
if err := stream.Push(ctx, s.Entries, record); err != nil {
appendErr = err
continue
}

memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
}

if !record.IsEmpty() {
Expand Down Expand Up @@ -259,8 +256,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter
err = i.forMatchingStreams(
expr.Matchers(),
func(stream *stream) error {
ingStats.TotalChunksMatched += int64(len(stream.chunks))
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels))
iter, err := stream.Iterator(ctx, ingStats, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels))
if err != nil {
return err
}
Expand Down Expand Up @@ -290,8 +286,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
err = i.forMatchingStreams(
expr.Selector().Matchers(),
func(stream *stream) error {
ingStats.TotalChunksMatched += int64(len(stream.chunks))
iter, err := stream.SampleIterator(ctx, req.Start, req.End, extractor.ForStream(stream.labels))
iter, err := stream.SampleIterator(ctx, ingStats, req.Start, req.End, extractor.ForStream(stream.labels))
if err != nil {
return err
}
Expand Down Expand Up @@ -577,10 +572,9 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer
}

func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool {
firstchunkFrom, _ := stream.chunks[0].chunk.Bounds()
_, lastChunkTo := stream.chunks[len(stream.chunks)-1].chunk.Bounds()
from, to := stream.Bounds()

if req.End.UnixNano() > firstchunkFrom.UnixNano() && req.Start.UnixNano() <= lastChunkTo.UnixNano() {
if req.End.UnixNano() > from.UnixNano() && req.Start.UnixNano() <= to.UnixNano() {
return true
}
return false
Expand Down
Loading

0 comments on commit 8c1fe88

Please sign in to comment.