Skip to content

Commit

Permalink
BatchIter edge cases (#2466)
Browse files Browse the repository at this point in the history
* sampleIter uses correct cache

* minimal lazy chunk iterator test

* memchunk Blocks() inclusivity

* fixes a few edge cases in the batchiterator batching

* fixes bad metric name in test

* due to later chunks len check, resetting nextChunk is unnecessary

* lazychunks pop test

* safe starting of batchChunkIterator

* batchiter rudimentary safe start test
  • Loading branch information
owen-d authored Aug 5, 2020
1 parent 4a358bc commit cb20afa
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block {
blocks := make([]Block, 0, len(c.blocks))

for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
if maxt >= b.mint && b.maxt >= mint {
blocks = append(blocks, b)
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ var (
testTargetSize = 1500 * 1024
)

func TestBlocksInclusive(t *testing.T) {
chk := NewMemChunk(EncNone, testBlockSize, testTargetSize)
err := chk.Append(logprotoEntry(1, "1"))
require.Nil(t, err)
err = chk.cut()
require.Nil(t, err)

blocks := chk.Blocks(time.Unix(0, 1), time.Unix(0, 1))
require.Equal(t, 1, len(blocks))
require.Equal(t, 1, blocks[0].Entries())
}

func TestBlock(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
Expand Down
32 changes: 29 additions & 3 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type batchChunkIterator struct {
lastOverlapping []*LazyChunk
iterFactory chunksIteratorFactory

begun bool
ctx context.Context
cancel context.CancelFunc
start, end time.Time
direction logproto.Direction
Expand All @@ -69,6 +71,7 @@ func newBatchChunkIterator(
start: start,
end: end,
direction: direction,
ctx: ctx,
cancel: cancel,
iterFactory: iterFactory,
chunks: lazyChunks{direction: direction, chunks: chunks},
Expand All @@ -78,10 +81,17 @@ func newBatchChunkIterator(
}),
}
sort.Sort(res.chunks)
go res.loop(ctx)
return res
}

// Start is idempotent and will begin the processing thread which seeds the iterator data.
func (it *batchChunkIterator) Start() {
if !it.begun {
it.begun = true
go it.loop(it.ctx)
}
}

func (it *batchChunkIterator) loop(ctx context.Context) {
for {
if it.chunks.Len() == 0 {
Expand Down Expand Up @@ -111,6 +121,8 @@ func (it *batchChunkIterator) loop(ctx context.Context) {
}

func (it *batchChunkIterator) Next() bool {
it.Start() // Ensure the iterator has started.

var err error
// for loop to avoid recursion
for {
Expand Down Expand Up @@ -140,18 +152,22 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) {
batch := make([]*LazyChunk, 0, it.batchSize+len(it.lastOverlapping))
var nextChunk *LazyChunk

var includesOverlap bool

for it.chunks.Len() > 0 {

// pop the next batch of chunks and append/prepend previous overlapping chunks
// so we can merge/de-dupe overlapping entries.
if it.direction == logproto.FORWARD {
if !includesOverlap && it.direction == logproto.FORWARD {
batch = append(batch, it.lastOverlapping...)
}
batch = append(batch, it.chunks.pop(it.batchSize)...)
if it.direction == logproto.BACKWARD {
if !includesOverlap && it.direction == logproto.BACKWARD {
batch = append(batch, it.lastOverlapping...)
}

includesOverlap = true

if it.chunks.Len() > 0 {
nextChunk = it.chunks.Peek()
// we max out our iterator boundaries to the next chunks in the queue
Expand Down Expand Up @@ -294,8 +310,12 @@ func newLogBatchIterator(
filter: filter,
ctx: ctx,
}

batch := newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, logbatch.newChunksIterator)
// Important: since the batchChunkIterator is bound to the LogBatchIterator,
// ensure embedded fields are present before it's started.
logbatch.batchChunkIterator = batch
batch.Start()
return logbatch, nil
}

Expand All @@ -321,10 +341,12 @@ func (it *logBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through
func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
for _, chunks := range chks {

iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk)
if err != nil {
return nil, err
}

result = append(result, iterator)
}

Expand Down Expand Up @@ -391,7 +413,11 @@ func newSampleBatchIterator(
ctx: ctx,
}
batch := newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, samplebatch.newChunksIterator)

// Important: since the batchChunkIterator is bound to the SampleBatchIterator,
// ensure embedded fields are present before it's started.
samplebatch.batchChunkIterator = batch
batch.Start()
return samplebatch, nil
}

Expand Down
144 changes: 144 additions & 0 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,48 @@ import (
"github.com/grafana/loki/pkg/logql/stats"
)

func Test_batchIterSafeStart(t *testing.T) {
stream := logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
},
}
chks := []*LazyChunk{
newLazyChunk(stream),
}

var ok bool

batch := newBatchChunkIterator(context.Background(), chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), func(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) {
if !ok {
panic("unexpected")
}

// we don't care about the actual data for this test, just give it an iterator.
return iter.NewStreamIterator(stream), nil
})

// if it was started already, we should see a panic before this
time.Sleep(time.Millisecond)
ok = true

// ensure idempotency
batch.Start()
batch.Start()

ok = batch.Next()
require.Equal(t, true, ok)

}

func Test_newLogBatchChunkIterator(t *testing.T) {

tests := map[string]struct {
Expand Down Expand Up @@ -548,6 +590,108 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
logproto.BACKWARD,
2,
},
// This test is rather complex under the hood.
// It should cause three sub batches in the iterator.
// The first batch has no overlap -- it cannot as the first. It has bounds [1,2)
// The second batch has one chunk overlap, but it includes no entries in the overlap.
// It has bounds [2,4).
// The third batch finally consumes the overlap, with bounds [4,max).
// Notably it also ends up testing the code paths for increasing batch sizes past
// the default due to nextChunks with the same start timestamp.
"forward identicals": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
}),
},
[]logproto.Stream{
{
Labels: fooLabels,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
},
},
fooLabelsWithName,
from, from.Add(4 * time.Millisecond),
logproto.FORWARD,
1,
},
}

for name, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/lazy_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *LazyChunk) SampleIterator(
continue
}
if nextChunk != nil {
delete(c.overlappingBlocks, b.Offset())
delete(c.overlappingSampleBlocks, b.Offset())
}
// non-overlapping block with the next chunk are not cached.
its = append(its, b.SampleIterator(ctx, filter, extractor))
Expand Down
69 changes: 69 additions & 0 deletions pkg/storage/lazy_chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -15,6 +16,74 @@ import (
"github.com/grafana/loki/pkg/util"
)

func TestLazyChunkIterator(t *testing.T) {
for i, tc := range []struct {
chunk *LazyChunk
expected []logproto.Stream
}{
{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
},
}),
[]logproto.Stream{
{
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
},
},
},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.TrueFilter, nil)
require.Nil(t, err)
streams, _, err := iter.ReadBatch(it, 1000)
require.Nil(t, err)
_ = it.Close()
require.Equal(t, tc.expected, streams.Streams)
})
}
}

func TestLazyChunksPop(t *testing.T) {
for i, tc := range []struct {
initial int
n int
expectedLn int
rem int
}{
{1, 1, 1, 0},
{2, 1, 1, 1},
{3, 4, 3, 0},
} {

t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
lc := &lazyChunks{}
for i := 0; i < tc.initial; i++ {
lc.chunks = append(lc.chunks, &LazyChunk{})
}
out := lc.pop(tc.n)

for i := 0; i < tc.expectedLn; i++ {
require.NotNil(t, out[i])
}

for i := 0; i < tc.rem; i++ {
require.NotNil(t, lc.chunks[i])
}
})
}
}

func TestIsOverlapping(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/grafana/loki/pkg/util"
)

var fooLabelsWithName = "{foo=\"bar\", __name__=\"log\"}"
var fooLabelsWithName = "{foo=\"bar\", __name__=\"logs\"}"
var fooLabels = "{foo=\"bar\"}"

var from = time.Unix(0, time.Millisecond.Nanoseconds())
Expand Down

0 comments on commit cb20afa

Please sign in to comment.