From 974030260d5e4580260175453350b11ba2638cb9 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 10 Aug 2020 10:56:36 -0400 Subject: [PATCH 1/2] Fixes a bug where if all chunks are fetched before finding a non-overlapping chunk, and the chunk from/through times are outside of the query request times, we would incorrectly set the iterator from/to bounds leading to missing logs in the query. --- pkg/storage/batch.go | 13 +- pkg/storage/batch_test.go | 258 ++++++++++++++++++++++++++++++++++++++ pkg/storage/util_test.go | 2 +- 3 files changed, 271 insertions(+), 2 deletions(-) diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 9c8462ecf135f..e1ef6d8f7ae69 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -166,7 +166,7 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) { batch = append(batch, it.lastOverlapping...) } - includesOverlap = true + //includesOverlap = true if it.chunks.Len() > 0 { nextChunk = it.chunks.Peek() @@ -236,6 +236,17 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) { } } + // If every chunk overlaps and we exhaust fetching chunks before ever finding a non overlapping chunk + // in this case it will be possible to have a through value which is older or equal to our from value + // If that happens we reset the bounds according to the iteration direction + if through.Sub(from) <= 0 { + if it.direction == logproto.BACKWARD { + from = it.start + } else { + through = it.end + } + } + if it.chunks.Len() > 0 { it.lastOverlapping = it.lastOverlapping[:0] for _, c := range batch { diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index f7ae77417f72b..3acd430314629 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -183,6 +183,135 @@ func Test_newLogBatchChunkIterator(t *testing.T) { logproto.FORWARD, 2, }, + "forward all overlap and all chunks have a from time less than query from time": { + []*LazyChunk{ + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + }, + []logproto.Stream{ + { + Labels: fooLabels, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }, + }, + fooLabelsWithName, + from.Add(1 * time.Millisecond), from.Add(5 * time.Millisecond), + logproto.FORWARD, + 2, + }, "forward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ @@ -375,6 +504,135 @@ func Test_newLogBatchChunkIterator(t *testing.T) { logproto.BACKWARD, 2, }, + "backward all overlap and all chunks have a through time greater than query through time": { + []*LazyChunk{ + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + }, + []logproto.Stream{ + { + Labels: fooLabels, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from, + Line: "1", + }, + }, + }, + }, + fooLabelsWithName, + from, from.Add(4 * time.Millisecond), + logproto.BACKWARD, + 2, + }, "backward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 8c27f092decd0..110c1dfc11cef 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -38,7 +38,7 @@ func assertStream(t *testing.T, expected, actual []logproto.Stream) { for i := range expected { assert.Equal(t, expected[i].Labels, actual[i].Labels) if len(expected[i].Entries) != len(actual[i].Entries) { - t.Fatalf("error entries length are different expected %d actual%d\n%s", len(expected[i].Entries), len(actual[i].Entries), spew.Sdump(expected[i].Entries, actual[i].Entries)) + t.Fatalf("error entries length are different expected %d actual %d\n%s", len(expected[i].Entries), len(actual[i].Entries), spew.Sdump(expected[i].Entries, actual[i].Entries)) return } From cc813802aa30de1ab13633fe33f658a251dc5f4d Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 10 Aug 2020 11:32:17 -0400 Subject: [PATCH 2/2] uncomment `includesOverlap` which was commented during testing and not fixed by mistake. --- pkg/storage/batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index e1ef6d8f7ae69..01643d98d3a61 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -166,7 +166,7 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) { batch = append(batch, it.lastOverlapping...) } - //includesOverlap = true + includesOverlap = true if it.chunks.Len() > 0 { nextChunk = it.chunks.Peek()