From d08ceef16b8c52b4fdc4ff90fb4a7785eb4e1d02 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 8 Jun 2020 13:40:31 -0400 Subject: [PATCH] Fixes iterators boundaries. (#2136) The start should be inclusive and the end is supposed to be exclusive. If start and end are equal, then only one entry can be returned which need to also be equal to start. Fixes #2124 Those issues were edge cases where the boundaries of a block or iterator would be equal to the start. Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk.go | 5 ++- pkg/chunkenc/memchunk_test.go | 73 ++++++++++++++++++++++++++++++++++- pkg/iter/iterator.go | 13 +++++-- pkg/iter/iterator_test.go | 40 ++++++++++++++++++- pkg/promtail/promtail_test.go | 13 +------ pkg/storage/iterator.go | 4 +- 6 files changed, 127 insertions(+), 21 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 8cfd0b44e810..309dedb0b99b 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -474,9 +474,10 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi its := make([]iter.EntryIterator, 0, len(c.blocks)+1) for _, b := range c.blocks { - if maxt > b.mint && b.maxt > mint { - its = append(its, b.iterator(ctx, c.readers, filter)) + if maxt < b.mint || b.maxt < mint { + continue } + its = append(its, b.iterator(ctx, c.readers, filter)) } if !c.head.isEmpty() { diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 72b1961d41e6..9d6714507d14 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -10,9 +10,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/dustin/go-humanize" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc/testdata" @@ -644,3 +643,73 @@ func BenchmarkHeadBlockIterator(b *testing.B) { }) } } + +func TestMemChunk_IteratorBounds(t *testing.T) { + + var createChunk = func() *MemChunk { + t.Helper() + c := NewMemChunk(EncNone, 1e6, 1e6) + + if err := c.Append(&logproto.Entry{ + Timestamp: time.Unix(0, 1), + Line: "1", + }); err != nil { + t.Fatal(err) + } + if err := c.Append(&logproto.Entry{ + Timestamp: time.Unix(0, 2), + Line: "2", + }); err != nil { + t.Fatal(err) + } + return c + } + + for _, tt := range []struct { + mint, maxt time.Time + direction logproto.Direction + expect []bool // array of expected values for next call in sequence + }{ + {time.Unix(0, 0), time.Unix(0, 1), logproto.FORWARD, []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 2), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), logproto.FORWARD, []bool{true, true, false}}, + {time.Unix(0, 2), time.Unix(0, 3), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), logproto.FORWARD, []bool{false}}, + + {time.Unix(0, 0), time.Unix(0, 1), logproto.BACKWARD, []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 2), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), logproto.BACKWARD, []bool{true, true, false}}, + {time.Unix(0, 2), time.Unix(0, 3), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), logproto.BACKWARD, []bool{false}}, + } { + t.Run( + fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction), + func(t *testing.T) { + tt := tt + c := createChunk() + + // testing headchunk + it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil) + require.NoError(t, err) + for i := range tt.expect { + require.Equal(t, tt.expect[i], it.Next()) + } + require.NoError(t, it.Close()) + + // testing chunk blocks + require.NoError(t, c.cut()) + it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil) + require.NoError(t, err) + for i := range tt.expect { + require.Equal(t, tt.expect[i], it.Next()) + } + require.NoError(t, it.Close()) + }) + + } + +} diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 45ebff74d46e..4c5669b20f77 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -503,11 +503,18 @@ func (i *timeRangedIterator) Next() bool { ts := i.EntryIterator.Entry().Timestamp for ok && i.mint.After(ts) { ok = i.EntryIterator.Next() + if !ok { + continue + } ts = i.EntryIterator.Entry().Timestamp } - - if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive. - ok = false + if ok { + if ts.Equal(i.mint) { // The mint is inclusive + return true + } + if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive. + ok = false + } } if !ok { i.EntryIterator.Close() diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index c02246fc0e26..7a7f3cbaddfb 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -7,9 +7,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/stats" @@ -525,3 +524,40 @@ func Test_DuplicateCount(t *testing.T) { }) } } + +func Test_timeRangedIterator_Next(t *testing.T) { + + tests := []struct { + mint time.Time + maxt time.Time + expect []bool // array of expected values for next call in sequence + }{ + {time.Unix(0, 0), time.Unix(0, 0), []bool{false}}, + {time.Unix(0, 0), time.Unix(0, 1), []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), []bool{true, true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), []bool{true, false}}, + {time.Unix(0, 4), time.Unix(0, 10), []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 10), []bool{true, true, true, false}}, + {time.Unix(0, 0), time.Unix(0, 10), []bool{true, true, true, false}}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("mint:%d maxt:%d", tt.mint.UnixNano(), tt.maxt.UnixNano()), func(t *testing.T) { + i := NewTimeRangedIterator( + NewStreamIterator( + logproto.Stream{Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1)}, + {Timestamp: time.Unix(0, 2)}, + {Timestamp: time.Unix(0, 3)}, + }}), + tt.mint, + tt.maxt, + ) + for _, b := range tt.expect { + require.Equal(t, b, i.Next()) + } + require.NoError(t, i.Close()) + }) + } +} diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index ecee6c6b942a..d0b4a76b19d2 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -470,17 +470,8 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if _, ok := h.receivedMap[file]; ok { - h.receivedMap[file] = append(h.receivedMap[file], s.Entries...) - } else { - h.receivedMap[file] = s.Entries - } - - if _, ok := h.receivedLabels[file]; ok { - h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels) - } else { - h.receivedLabels[file] = []labels.Labels{parsedLabels} - } + h.receivedMap[file] = append(h.receivedMap[file], s.Entries...) + h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels) } diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 1381550f1c45..b3ce7e24592e 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -248,7 +248,9 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) { } else { from = time.Unix(0, headChunk.Chunk.From.UnixNano()) - if from.Before(it.req.Start) { + // when clipping the from it should never be before the start or equal to the end. + // Doing so would include entries not requested. + if from.Before(it.req.Start) || from.Equal(it.req.End) { from = it.req.Start } }