Skip to content

Commit

Permalink
Correctly sets hash value for headblock iterator (#5423)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Feb 17, 2022
1 parent 137a28d commit 5d8bc61
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
6 changes: 4 additions & 2 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
if stream, ok = streams[lhash]; !ok {
stream = &logproto.Stream{
Labels: parsedLbs.String(),
Hash: lhash,
}
streams[lhash] = stream
}
Expand Down Expand Up @@ -1062,8 +1063,9 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
StreamHash: lhash,
}
series[lhash] = s
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ func (hb *unorderedHeadBlock) Iterator(
maxt int64,
pipeline log.StreamPipeline,
) iter.EntryIterator {

// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
Expand All @@ -238,6 +237,7 @@ func (hb *unorderedHeadBlock) Iterator(
if stream, ok = streams[lhash]; !ok {
stream = &logproto.Stream{
Labels: parsedLbs.String(),
Hash: lhash,
}
streams[lhash] = stream
}
Expand Down Expand Up @@ -267,7 +267,6 @@ func (hb *unorderedHeadBlock) SampleIterator(
maxt int64,
extractor log.StreamSampleExtractor,
) iter.SampleIterator {

series := map[uint64]*logproto.Series{}

_ = hb.forEntries(
Expand All @@ -285,8 +284,9 @@ func (hb *unorderedHeadBlock) SampleIterator(
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
StreamHash: parsedLabels.Hash(),
}
series[lhash] = s
}
Expand Down
32 changes: 29 additions & 3 deletions pkg/chunkenc/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -268,7 +269,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
// unordered, unordered

// current default block size of 256kb with 75b avg log lines =~ 5.2k lines/block
var nWrites = (256 << 10) / 50
nWrites := (256 << 10) / 50

headBlockFn := func() func(int64, string) {
hb := &headBlock{}
Expand Down Expand Up @@ -449,7 +450,6 @@ func BenchmarkUnorderedRead(b *testing.B) {
})
}
})

}

func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
Expand Down Expand Up @@ -563,7 +563,6 @@ func TestReorder(t *testing.T) {

require.Equal(t, exp, b)
})

}
}

Expand Down Expand Up @@ -610,3 +609,30 @@ func TestReorderAcrossBlocks(t *testing.T) {
}
iterEq(t, exp, itr)
}

func Test_HeadIteratorHash(t *testing.T) {
lbs := labels.Labels{labels.Label{Name: "foo", Value: "bar"}}
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
if err != nil {
panic(err)
}

for name, b := range map[string]HeadBlock{
"unordered": newUnorderedHeadBlock(),
"ordered": &headBlock{},
} {
t.Run(name, func(t *testing.T) {
require.NoError(t, b.Append(1, "foo"))
eit := b.Iterator(context.Background(), logproto.BACKWARD, 0, 2, log.NewNoopPipeline().ForStream(lbs))

for eit.Next() {
require.Equal(t, lbs.Hash(), eit.StreamHash())
}

sit := b.SampleIterator(context.TODO(), 0, 2, ex.ForStream(lbs))
for sit.Next() {
require.Equal(t, lbs.Hash(), sit.StreamHash())
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/ingester/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,17 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) {
}, &result)
require.NoError(t, err)
require.Len(t, result.resps, 1)
lbls := labels.Labels{{Name: "bar", Value: "baz1"}, {Name: "foo", Value: "bar"}}
expected := []logproto.Stream{
{
Labels: `{bar="baz1", foo="bar"}`,
Labels: lbls.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "line 1",
},
},
Hash: lbls.Hash(),
},
}
require.Equal(t, expected, result.resps[0].Streams)
Expand Down

0 comments on commit 5d8bc61

Please sign in to comment.