From 9c6bc8b458b44b3ca5f00c3382aade9f5d74a776 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 17 Feb 2022 20:52:43 +0100 Subject: [PATCH 1/2] Correctly sets hash value for headblock iterator (#5423) --- pkg/chunkenc/memchunk.go | 6 ++++-- pkg/chunkenc/unordered.go | 8 ++++---- pkg/chunkenc/unordered_test.go | 32 +++++++++++++++++++++++++++++--- pkg/ingester/recovery_test.go | 4 +++- 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index eccda20ab3c9c..7a9fdebeb099e 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1015,6 +1015,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, if stream, ok = streams[lhash]; !ok { stream = &logproto.Stream{ Labels: parsedLbs.String(), + Hash: lhash, } streams[lhash] = stream } @@ -1062,8 +1063,9 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra lhash := parsedLabels.Hash() if s, found = series[lhash]; !found { s = &logproto.Series{ - Labels: parsedLabels.String(), - Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0], + Labels: parsedLabels.String(), + Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0], + StreamHash: lhash, } series[lhash] = s } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index f7bf4ae6a0883..346f85dda56b4 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -215,7 +215,6 @@ func (hb *unorderedHeadBlock) Iterator( maxt int64, pipeline log.StreamPipeline, ) iter.EntryIterator { - // We are doing a copy everytime, this is because b.entries could change completely, // the alternate would be that we allocate a new b.entries everytime we cut a block, // but the tradeoff is that queries to near-realtime data would be much lower than @@ -238,6 +237,7 @@ func (hb *unorderedHeadBlock) Iterator( if stream, ok = streams[lhash]; !ok { stream = &logproto.Stream{ Labels: parsedLbs.String(), + Hash: lhash, } streams[lhash] = stream } @@ -267,7 +267,6 @@ func (hb *unorderedHeadBlock) SampleIterator( maxt int64, extractor log.StreamSampleExtractor, ) iter.SampleIterator { - series := map[uint64]*logproto.Series{} _ = hb.forEntries( @@ -285,8 +284,9 @@ func (hb *unorderedHeadBlock) SampleIterator( lhash := parsedLabels.Hash() if s, found = series[lhash]; !found { s = &logproto.Series{ - Labels: parsedLabels.String(), - Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0], + Labels: parsedLabels.String(), + Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0], + StreamHash: parsedLabels.Hash(), } series[lhash] = s } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 447df07e917f1..34b866ea13313 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/iter" @@ -268,7 +269,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) { // unordered, unordered // current default block size of 256kb with 75b avg log lines =~ 5.2k lines/block - var nWrites = (256 << 10) / 50 + nWrites := (256 << 10) / 50 headBlockFn := func() func(int64, string) { hb := &headBlock{} @@ -449,7 +450,6 @@ func BenchmarkUnorderedRead(b *testing.B) { }) } }) - } func TestUnorderedIteratorCountsAllEntries(t *testing.T) { @@ -563,7 +563,6 @@ func TestReorder(t *testing.T) { require.Equal(t, exp, b) }) - } } @@ -610,3 +609,30 @@ func TestReorderAcrossBlocks(t *testing.T) { } iterEq(t, exp, itr) } + +func Test_HeadIteratorHash(t *testing.T) { + lbs := labels.Labels{labels.Label{Name: "foo", Value: "bar"}} + ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) + if err != nil { + panic(err) + } + + for name, b := range map[string]HeadBlock{ + "unordered": newUnorderedHeadBlock(), + "ordered": &headBlock{}, + } { + t.Run(name, func(t *testing.T) { + require.NoError(t, b.Append(1, "foo")) + eit := b.Iterator(context.Background(), logproto.BACKWARD, 0, 2, log.NewNoopPipeline().ForStream(lbs)) + + for eit.Next() { + require.Equal(t, lbs.Hash(), eit.StreamHash()) + } + + sit := b.SampleIterator(context.TODO(), 0, 2, ex.ForStream(lbs)) + for sit.Next() { + require.Equal(t, lbs.Hash(), sit.StreamHash()) + } + }) + } +} diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index cb2ce94d135fd..1f643b108b0bf 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -262,15 +262,17 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { }, &result) require.NoError(t, err) require.Len(t, result.resps, 1) + lbls := labels.Labels{{Name: "bar", Value: "baz1"}, {Name: "foo", Value: "bar"}} expected := []logproto.Stream{ { - Labels: `{bar="baz1", foo="bar"}`, + Labels: lbls.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(1, 0), Line: "line 1", }, }, + Hash: lbls.Hash(), }, } require.Equal(t, expected, result.resps[0].Streams) From 2e7329c09726e9e750c02111e4880fb52481aefa Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 18 Feb 2022 16:12:13 +0100 Subject: [PATCH 2/2] Do not use WaitGroup context for StepEvaluator (#5425) --- pkg/logql/evaluator.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index bf0a5fa8f9c82..0d0bb2f199700 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -549,18 +549,26 @@ func binOpStepEvaluator( } var lse, rse StepEvaluator - g, ctx := errgroup.WithContext(ctx) + + ctx, cancel := context.WithCancel(ctx) + g := errgroup.Group{} // We have two non literal legs, // load them in parallel g.Go(func() error { var err error lse, err = ev.StepEvaluator(ctx, ev, expr.SampleExpr, q) + if err != nil { + cancel() + } return err }) g.Go(func() error { var err error rse, err = ev.StepEvaluator(ctx, ev, expr.RHS, q) + if err != nil { + cancel() + } return err }) @@ -652,7 +660,7 @@ func matchingSignature(sample promql.Sample, opts *BinOpOptions) uint64 { func vectorBinop(op string, opts *BinOpOptions, lhs, rhs promql.Vector, lsigs, rsigs []uint64) (promql.Vector, error) { // handle one-to-one or many-to-one matching - //for one-to-many, swap + // for one-to-many, swap if opts != nil && opts.VectorMatching.Card == CardOneToMany { lhs, rhs = rhs, lhs lsigs, rsigs = rsigs, lsigs