From 182ed095fe84b966c4f5a759da1e25f37411badb Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 6 Jan 2021 11:32:00 -0500 Subject: [PATCH 1/5] avoid sharding label replace --- pkg/logql/ast.go | 2 +- pkg/logql/sharding_test.go | 12 ++++++++++++ pkg/logql/shardmapper.go | 12 ++++++++++++ pkg/logql/shardmapper_test.go | 13 +++++++++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index c58aa135a8913..55a481bdfad36 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -933,7 +933,7 @@ func (e *labelReplaceExpr) Extractor() (SampleExtractor, error) { } func (e *labelReplaceExpr) Operations() []string { - return e.left.Operations() + return append([]string{OpLabelReplace}, e.left.Operations()...) } func (e *labelReplaceExpr) String() string { diff --git a/pkg/logql/sharding_test.go b/pkg/logql/sharding_test.go index 7c04f4f2113fd..fb256c8a28b30 100644 --- a/pkg/logql/sharding_test.go +++ b/pkg/logql/sharding_test.go @@ -48,6 +48,18 @@ func TestMappingEquivalence(t *testing.T) { {`sum(max(rate({a=~".*"}[1s])))`, false}, {`max(count(rate({a=~".*"}[1s])))`, false}, {`max(sum by (cluster) (rate({a=~".*"}[1s]))) / count(rate({a=~".*"}[1s]))`, false}, + { + ` + sum without (a) ( + label_replace( + sum without(z) ( + rate({foo="bar"}[5m]) + ), + "baz", "buz", "b", "(.*)" + ) + ) + `, false, + }, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass. diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index eef0b9e4aa2e7..1b79dfaad1f1c 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -133,6 +133,8 @@ func (m ShardMapper) Map(expr Expr, r *shardRecorder) (Expr, error) { return m.mapLogSelectorExpr(e.(LogSelectorExpr), r), nil case *vectorAggregationExpr: return m.mapVectorAggregationExpr(e, r) + case *labelReplaceExpr: + return m.mapLabelReplaceExpr(e, r) case *rangeAggregationExpr: return m.mapRangeAggregationExpr(e, r), nil case *binOpExpr: @@ -277,6 +279,16 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *sh } } +func (m ShardMapper) mapLabelReplaceExpr(expr *labelReplaceExpr, r *shardRecorder) (SampleExpr, error) { + subMapped, err := m.Map(expr.left, r) + if err != nil { + return nil, err + } + cpy := *expr + cpy.left = subMapped.(SampleExpr) + return &cpy, nil +} + func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr, r *shardRecorder) SampleExpr { if hasLabelModifier(expr) { // if an expr can modify labels this means multiple shards can returns the same labelset. diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 454fd71cc67cd..cdd5e4806b8f9 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -175,6 +175,19 @@ func TestMappingStrings(t *testing.T) { in: `sum by (cluster) (stddev_over_time({foo="bar"} |= "id=123" | logfmt | unwrap latency [5m]))`, out: `sum by (cluster) (stddev_over_time({foo="bar"} |= "id=123" | logfmt | unwrap latency [5m]))`, }, + { + in: ` + sum without (a) ( + label_replace( + sum without (b) ( + rate({foo="bar"}[5m]) + ), + "baz", "buz", "foo", "(.*)" + ) + ) + `, + out: `sum without(a)(label_replace(sum without(b)(downstream++downstream),"baz","buz","foo","(.*)"))`, + }, } { t.Run(tc.in, func(t *testing.T) { ast, err := ParseExpr(tc.in) From e210ed28f0af6c7163cdb86be03aaddb21cb0e49 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 6 Jan 2021 13:44:12 -0500 Subject: [PATCH 2/5] updates sharding tests --- pkg/logql/sharding_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/logql/sharding_test.go b/pkg/logql/sharding_test.go index fb256c8a28b30..86a9189fe6186 100644 --- a/pkg/logql/sharding_test.go +++ b/pkg/logql/sharding_test.go @@ -39,7 +39,6 @@ func TestMappingEquivalence(t *testing.T) { {`rate({a=~".*"}[1s])`, false}, {`sum by (a) (rate({a=~".*"}[1s]))`, false}, {`sum(rate({a=~".*"}[1s]))`, false}, - {`max without (a) (rate({a=~".*"}[1s]))`, false}, {`count(rate({a=~".*"}[1s]))`, false}, {`avg(rate({a=~".*"}[1s]))`, true}, @@ -48,12 +47,21 @@ func TestMappingEquivalence(t *testing.T) { {`sum(max(rate({a=~".*"}[1s])))`, false}, {`max(count(rate({a=~".*"}[1s])))`, false}, {`max(sum by (cluster) (rate({a=~".*"}[1s]))) / count(rate({a=~".*"}[1s]))`, false}, + { + ` + sum without (a) ( + sum without(z) ( + rate({a=~".*"}[5m]) + ) + ) + `, false, + }, { ` sum without (a) ( label_replace( sum without(z) ( - rate({foo="bar"}[5m]) + rate({a=~".*"}[5m]) ), "baz", "buz", "b", "(.*)" ) From 2f34deab98810df52ed1b1d9270b3a0f43d82181 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 6 Jan 2021 13:48:24 -0500 Subject: [PATCH 3/5] matrix stepper end-inclusive for rangeVectorIterator parity --- pkg/logql/matrix.go | 2 +- pkg/logql/matrix_test.go | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/logql/matrix.go b/pkg/logql/matrix.go index 4a1e406781481..e2feaeb6bb46a 100644 --- a/pkg/logql/matrix.go +++ b/pkg/logql/matrix.go @@ -33,7 +33,7 @@ func NewMatrixStepper(start, end time.Time, step time.Duration, m promql.Matrix) func (m *MatrixStepper) Next() (bool, int64, promql.Vector) { m.ts = m.ts.Add(m.step) - if !m.ts.Before(m.end) { + if m.ts.After(m.end) { return false, 0, nil } diff --git a/pkg/logql/matrix_test.go b/pkg/logql/matrix_test.go index ddd0389c5c35a..3cb20efe6dd1b 100644 --- a/pkg/logql/matrix_test.go +++ b/pkg/logql/matrix_test.go @@ -26,6 +26,7 @@ func TestMatrixStepper(t *testing.T) { {T: start.Add(3*step).UnixNano() / int64(time.Millisecond), V: 3}, {T: start.Add(4*step).UnixNano() / int64(time.Millisecond), V: 4}, {T: start.Add(5*step).UnixNano() / int64(time.Millisecond), V: 5}, + {T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 6}, }, }, promql.Series{ @@ -100,9 +101,19 @@ func TestMatrixStepper(t *testing.T) { Metric: labels.Labels{{Name: "bazz", Value: "buzz"}}, }, }, + { + promql.Sample{ + Point: promql.Point{T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 6}, + Metric: labels.Labels{{Name: "foo", Value: "bar"}}, + }, + promql.Sample{ + Point: promql.Point{T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 0}, + Metric: labels.Labels{{Name: "bazz", Value: "buzz"}}, + }, + }, } - for i := 0; i < int(end.Sub(start)/step); i++ { + for i := 0; i <= int(end.Sub(start)/step); i++ { ok, ts, vec := s.Next() require.Equal(t, ok, true) require.Equal(t, start.Add(step*time.Duration(i)).UnixNano()/int64(time.Millisecond), ts) From 4398d2a6c43f57c0b1a0f045763f9f250211701e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 6 Jan 2021 15:21:01 -0500 Subject: [PATCH 4/5] Revert "matrix stepper end-inclusive for rangeVectorIterator parity" This reverts commit 2f34deab98810df52ed1b1d9270b3a0f43d82181. --- pkg/logql/matrix.go | 2 +- pkg/logql/matrix_test.go | 13 +------------ 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/pkg/logql/matrix.go b/pkg/logql/matrix.go index e2feaeb6bb46a..4a1e406781481 100644 --- a/pkg/logql/matrix.go +++ b/pkg/logql/matrix.go @@ -33,7 +33,7 @@ func NewMatrixStepper(start, end time.Time, step time.Duration, m promql.Matrix) func (m *MatrixStepper) Next() (bool, int64, promql.Vector) { m.ts = m.ts.Add(m.step) - if m.ts.After(m.end) { + if !m.ts.Before(m.end) { return false, 0, nil } diff --git a/pkg/logql/matrix_test.go b/pkg/logql/matrix_test.go index 3cb20efe6dd1b..ddd0389c5c35a 100644 --- a/pkg/logql/matrix_test.go +++ b/pkg/logql/matrix_test.go @@ -26,7 +26,6 @@ func TestMatrixStepper(t *testing.T) { {T: start.Add(3*step).UnixNano() / int64(time.Millisecond), V: 3}, {T: start.Add(4*step).UnixNano() / int64(time.Millisecond), V: 4}, {T: start.Add(5*step).UnixNano() / int64(time.Millisecond), V: 5}, - {T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 6}, }, }, promql.Series{ @@ -101,19 +100,9 @@ func TestMatrixStepper(t *testing.T) { Metric: labels.Labels{{Name: "bazz", Value: "buzz"}}, }, }, - { - promql.Sample{ - Point: promql.Point{T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 6}, - Metric: labels.Labels{{Name: "foo", Value: "bar"}}, - }, - promql.Sample{ - Point: promql.Point{T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 0}, - Metric: labels.Labels{{Name: "bazz", Value: "buzz"}}, - }, - }, } - for i := 0; i <= int(end.Sub(start)/step); i++ { + for i := 0; i < int(end.Sub(start)/step); i++ { ok, ts, vec := s.Next() require.Equal(t, ok, true) require.Equal(t, start.Add(step*time.Duration(i)).UnixNano()/int64(time.Millisecond), ts) From f6d94753289020bd91f6bc45eec7c0537e821b99 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 6 Jan 2021 15:48:08 -0500 Subject: [PATCH 5/5] removes sharding equivalence tests due to interference by non-sharding bug in the control case --- pkg/logql/sharding_test.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/pkg/logql/sharding_test.go b/pkg/logql/sharding_test.go index 86a9189fe6186..38c6f27037b41 100644 --- a/pkg/logql/sharding_test.go +++ b/pkg/logql/sharding_test.go @@ -47,27 +47,6 @@ func TestMappingEquivalence(t *testing.T) { {`sum(max(rate({a=~".*"}[1s])))`, false}, {`max(count(rate({a=~".*"}[1s])))`, false}, {`max(sum by (cluster) (rate({a=~".*"}[1s]))) / count(rate({a=~".*"}[1s]))`, false}, - { - ` - sum without (a) ( - sum without(z) ( - rate({a=~".*"}[5m]) - ) - ) - `, false, - }, - { - ` - sum without (a) ( - label_replace( - sum without(z) ( - rate({a=~".*"}[5m]) - ), - "baz", "buz", "b", "(.*)" - ) - ) - `, false, - }, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass.