Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Iterator boundaries #2136

Merged
merged 1 commit into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,10 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
its = append(its, b.iterator(ctx, c.readers, filter))
if maxt < b.mint || b.maxt < mint {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud, clarifying how this addresses #2124.

With the patch: ignore the current block when none of its entries is within the interval [mint, maxt]. Otherwise append it for further processing. Or, in other words: if the current block has any entry within the interval [mint, maxt] then append it for further processing.

The special case discussed in #2124 corresponds to b.maxt == mint in this code section here, and with the old behavior maxt > b.mint && b.maxt > mint a block containing an entry within the interval [mint, maxt] was erroneously ignored for further processing.

}
its = append(its, b.iterator(ctx, c.readers, filter))
}

if !c.head.isEmpty() {
Expand Down
73 changes: 71 additions & 2 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/dustin/go-humanize"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/chunkenc/testdata"
Expand Down Expand Up @@ -644,3 +643,73 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
})
}
}

func TestMemChunk_IteratorBounds(t *testing.T) {

var createChunk = func() *MemChunk {
t.Helper()
c := NewMemChunk(EncNone, 1e6, 1e6)

if err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 1),
Line: "1",
}); err != nil {
t.Fatal(err)
}
if err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 2),
Line: "2",
}); err != nil {
t.Fatal(err)
}
return c
}

for _, tt := range []struct {
mint, maxt time.Time
direction logproto.Direction
expect []bool // array of expected values for next call in sequence
}{
{time.Unix(0, 0), time.Unix(0, 1), logproto.FORWARD, []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 2), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), logproto.FORWARD, []bool{true, true, false}},
{time.Unix(0, 2), time.Unix(0, 3), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 3), time.Unix(0, 3), logproto.FORWARD, []bool{false}},

{time.Unix(0, 0), time.Unix(0, 1), logproto.BACKWARD, []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 2), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), logproto.BACKWARD, []bool{true, true, false}},
{time.Unix(0, 2), time.Unix(0, 3), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 3), time.Unix(0, 3), logproto.BACKWARD, []bool{false}},
} {
t.Run(
fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction),
func(t *testing.T) {
tt := tt
c := createChunk()

// testing headchunk
it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil)
require.NoError(t, err)
for i := range tt.expect {
require.Equal(t, tt.expect[i], it.Next())
}
require.NoError(t, it.Close())

// testing chunk blocks
require.NoError(t, c.cut())
it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil)
require.NoError(t, err)
for i := range tt.expect {
require.Equal(t, tt.expect[i], it.Next())
}
require.NoError(t, it.Close())
})

}

}
13 changes: 10 additions & 3 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,18 @@ func (i *timeRangedIterator) Next() bool {
ts := i.EntryIterator.Entry().Timestamp
for ok && i.mint.After(ts) {
ok = i.EntryIterator.Next()
if !ok {
continue
}
ts = i.EntryIterator.Entry().Timestamp
}

if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive.
ok = false
if ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't this more simply be expressed as

	if i.maxt.Before(ts) || i.maxt.Equal(ts) {
		ok = false
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is a very specific path covering the case where both maxt == mint == ts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shortcut is required. I tried it but the test wasn't passing.

if ts.Equal(i.mint) { // The mint is inclusive
return true
}
if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive.
ok = false
}
}
if !ok {
i.EntryIterator.Close()
Expand Down
40 changes: 38 additions & 2 deletions pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
Expand Down Expand Up @@ -525,3 +524,40 @@ func Test_DuplicateCount(t *testing.T) {
})
}
}

func Test_timeRangedIterator_Next(t *testing.T) {

tests := []struct {
mint time.Time
maxt time.Time
expect []bool // array of expected values for next call in sequence
}{
{time.Unix(0, 0), time.Unix(0, 0), []bool{false}},
{time.Unix(0, 0), time.Unix(0, 1), []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), []bool{true, true, false}},
{time.Unix(0, 3), time.Unix(0, 3), []bool{true, false}},
{time.Unix(0, 4), time.Unix(0, 10), []bool{false}},
{time.Unix(0, 1), time.Unix(0, 10), []bool{true, true, true, false}},
{time.Unix(0, 0), time.Unix(0, 10), []bool{true, true, true, false}},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("mint:%d maxt:%d", tt.mint.UnixNano(), tt.maxt.UnixNano()), func(t *testing.T) {
i := NewTimeRangedIterator(
NewStreamIterator(
logproto.Stream{Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1)},
{Timestamp: time.Unix(0, 2)},
{Timestamp: time.Unix(0, 3)},
}}),
tt.mint,
tt.maxt,
)
for _, b := range tt.expect {
require.Equal(t, b, i.Next())
}
require.NoError(t, i.Close())
})
}
}
2 changes: 1 addition & 1 deletion pkg/logql/range_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func Test_RangeVectorIterator(t *testing.T) {
time.Unix(10, 0), time.Unix(100, 0),
},
{
(50 * time.Second).Nanoseconds(), // all step are overlaping
(50 * time.Second).Nanoseconds(), // all step are overlapping
(10 * time.Second).Nanoseconds(),
[]promql.Vector{
[]promql.Sample{
Expand Down
6 changes: 3 additions & 3 deletions pkg/promtail/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Test_Config(t *testing.T) {
clientDefaultConfig,
Config{
URL: flagext.URLValue{
u,
URL: u,
},
BackoffConfig: util.BackoffConfig{
MaxBackoff: 5 * time.Minute,
Expand All @@ -57,7 +57,7 @@ func Test_Config(t *testing.T) {
clientCustomConfig,
Config{
URL: flagext.URLValue{
u,
URL: u,
},
BackoffConfig: util.BackoffConfig{
MaxBackoff: 1 * time.Minute,
Expand All @@ -75,7 +75,7 @@ func Test_Config(t *testing.T) {
require.NoError(t, err)

if !reflect.DeepEqual(tc.expectedConfig, clientConfig) {
t.Errorf("Configs does not match, expected: %v, recieved: %v", tc.expectedConfig, clientConfig)
t.Errorf("Configs does not match, expected: %v, received: %v", tc.expectedConfig, clientConfig)
}
}
}
13 changes: 2 additions & 11 deletions pkg/promtail/promtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,17 +470,8 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if _, ok := h.receivedMap[file]; ok {
h.receivedMap[file] = append(h.receivedMap[file], s.Entries...)
} else {
h.receivedMap[file] = s.Entries
}

if _, ok := h.receivedLabels[file]; ok {
h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels)
} else {
h.receivedLabels[file] = []labels.Labels{parsedLabels}
}
h.receivedMap[file] = append(h.receivedMap[file], s.Entries...)
h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels)

}

Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
} else {
from = time.Unix(0, headChunk.Chunk.From.UnixNano())

if from.Before(it.req.Start) {
// when clipping the from it should never be before the start or equal to the end.
// Doing so would include entries not requested.
if from.Before(it.req.Start) || from.Equal(it.req.End) {
from = it.req.Start
}
}
Expand Down