diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 9c8462ecf135f..01643d98d3a61 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -236,6 +236,17 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) { } } + // If every chunk overlaps and we exhaust fetching chunks before ever finding a non overlapping chunk + // in this case it will be possible to have a through value which is older or equal to our from value + // If that happens we reset the bounds according to the iteration direction + if through.Sub(from) <= 0 { + if it.direction == logproto.BACKWARD { + from = it.start + } else { + through = it.end + } + } + if it.chunks.Len() > 0 { it.lastOverlapping = it.lastOverlapping[:0] for _, c := range batch { diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index f7ae77417f72b..3acd430314629 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -183,6 +183,135 @@ func Test_newLogBatchChunkIterator(t *testing.T) { logproto.FORWARD, 2, }, + "forward all overlap and all chunks have a from time less than query from time": { + []*LazyChunk{ + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + }, + []logproto.Stream{ + { + Labels: fooLabels, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }, + }, + fooLabelsWithName, + from.Add(1 * time.Millisecond), from.Add(5 * time.Millisecond), + logproto.FORWARD, + 2, + }, "forward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ @@ -375,6 +504,135 @@ func Test_newLogBatchChunkIterator(t *testing.T) { logproto.BACKWARD, 2, }, + "backward all overlap and all chunks have a through time greater than query through time": { + []*LazyChunk{ + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }), + }, + []logproto.Stream{ + { + Labels: fooLabels, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from, + Line: "1", + }, + }, + }, + }, + fooLabelsWithName, + from, from.Add(4 * time.Millisecond), + logproto.BACKWARD, + 2, + }, "backward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 8c27f092decd0..110c1dfc11cef 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -38,7 +38,7 @@ func assertStream(t *testing.T, expected, actual []logproto.Stream) { for i := range expected { assert.Equal(t, expected[i].Labels, actual[i].Labels) if len(expected[i].Entries) != len(actual[i].Entries) { - t.Fatalf("error entries length are different expected %d actual%d\n%s", len(expected[i].Entries), len(actual[i].Entries), spew.Sdump(expected[i].Entries, actual[i].Entries)) + t.Fatalf("error entries length are different expected %d actual %d\n%s", len(expected[i].Entries), len(actual[i].Entries), spew.Sdump(expected[i].Entries, actual[i].Entries)) return }