Skip to content

Commit

Permalink
Fixes iterators boundaries. (#2136)
Browse files Browse the repository at this point in the history
The start should be inclusive and the end is supposed to be exclusive.
If start and end are equal, then only one entry can be returned which need to also be equal to start.

Fixes #2124

Those issues were edge cases where the boundaries of a block or iterator would be equal to the start.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jun 8, 2020
1 parent 6e55277 commit d08ceef
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 21 deletions.
5 changes: 3 additions & 2 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,10 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
its = append(its, b.iterator(ctx, c.readers, filter))
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, b.iterator(ctx, c.readers, filter))
}

if !c.head.isEmpty() {
Expand Down
73 changes: 71 additions & 2 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/dustin/go-humanize"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/chunkenc/testdata"
Expand Down Expand Up @@ -644,3 +643,73 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
})
}
}

func TestMemChunk_IteratorBounds(t *testing.T) {

var createChunk = func() *MemChunk {
t.Helper()
c := NewMemChunk(EncNone, 1e6, 1e6)

if err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 1),
Line: "1",
}); err != nil {
t.Fatal(err)
}
if err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 2),
Line: "2",
}); err != nil {
t.Fatal(err)
}
return c
}

for _, tt := range []struct {
mint, maxt time.Time
direction logproto.Direction
expect []bool // array of expected values for next call in sequence
}{
{time.Unix(0, 0), time.Unix(0, 1), logproto.FORWARD, []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 2), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), logproto.FORWARD, []bool{true, true, false}},
{time.Unix(0, 2), time.Unix(0, 3), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 3), time.Unix(0, 3), logproto.FORWARD, []bool{false}},

{time.Unix(0, 0), time.Unix(0, 1), logproto.BACKWARD, []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 2), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), logproto.BACKWARD, []bool{true, true, false}},
{time.Unix(0, 2), time.Unix(0, 3), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 3), time.Unix(0, 3), logproto.BACKWARD, []bool{false}},
} {
t.Run(
fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction),
func(t *testing.T) {
tt := tt
c := createChunk()

// testing headchunk
it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil)
require.NoError(t, err)
for i := range tt.expect {
require.Equal(t, tt.expect[i], it.Next())
}
require.NoError(t, it.Close())

// testing chunk blocks
require.NoError(t, c.cut())
it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil)
require.NoError(t, err)
for i := range tt.expect {
require.Equal(t, tt.expect[i], it.Next())
}
require.NoError(t, it.Close())
})

}

}
13 changes: 10 additions & 3 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,18 @@ func (i *timeRangedIterator) Next() bool {
ts := i.EntryIterator.Entry().Timestamp
for ok && i.mint.After(ts) {
ok = i.EntryIterator.Next()
if !ok {
continue
}
ts = i.EntryIterator.Entry().Timestamp
}

if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive.
ok = false
if ok {
if ts.Equal(i.mint) { // The mint is inclusive
return true
}
if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive.
ok = false
}
}
if !ok {
i.EntryIterator.Close()
Expand Down
40 changes: 38 additions & 2 deletions pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
Expand Down Expand Up @@ -525,3 +524,40 @@ func Test_DuplicateCount(t *testing.T) {
})
}
}

func Test_timeRangedIterator_Next(t *testing.T) {

tests := []struct {
mint time.Time
maxt time.Time
expect []bool // array of expected values for next call in sequence
}{
{time.Unix(0, 0), time.Unix(0, 0), []bool{false}},
{time.Unix(0, 0), time.Unix(0, 1), []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), []bool{true, true, false}},
{time.Unix(0, 3), time.Unix(0, 3), []bool{true, false}},
{time.Unix(0, 4), time.Unix(0, 10), []bool{false}},
{time.Unix(0, 1), time.Unix(0, 10), []bool{true, true, true, false}},
{time.Unix(0, 0), time.Unix(0, 10), []bool{true, true, true, false}},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("mint:%d maxt:%d", tt.mint.UnixNano(), tt.maxt.UnixNano()), func(t *testing.T) {
i := NewTimeRangedIterator(
NewStreamIterator(
logproto.Stream{Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1)},
{Timestamp: time.Unix(0, 2)},
{Timestamp: time.Unix(0, 3)},
}}),
tt.mint,
tt.maxt,
)
for _, b := range tt.expect {
require.Equal(t, b, i.Next())
}
require.NoError(t, i.Close())
})
}
}
13 changes: 2 additions & 11 deletions pkg/promtail/promtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,17 +470,8 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if _, ok := h.receivedMap[file]; ok {
h.receivedMap[file] = append(h.receivedMap[file], s.Entries...)
} else {
h.receivedMap[file] = s.Entries
}

if _, ok := h.receivedLabels[file]; ok {
h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels)
} else {
h.receivedLabels[file] = []labels.Labels{parsedLabels}
}
h.receivedMap[file] = append(h.receivedMap[file], s.Entries...)
h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels)

}

Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
} else {
from = time.Unix(0, headChunk.Chunk.From.UnixNano())

if from.Before(it.req.Start) {
// when clipping the from it should never be before the start or equal to the end.
// Doing so would include entries not requested.
if from.Before(it.req.Start) || from.Equal(it.req.End) {
from = it.req.Start
}
}
Expand Down

0 comments on commit d08ceef

Please sign in to comment.