From 9bd45adced6eea8ed69fece751950675ff83375c Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 22 May 2020 17:20:04 -0400 Subject: [PATCH 1/8] Add bytes_rate and bytes_over_time. Still need to works on test. Signed-off-by: Cyril Tovena --- pkg/logql/ast.go | 6 +- pkg/logql/engine.go | 12 -- pkg/logql/evaluator.go | 48 ++++-- pkg/logql/expr.y | 7 +- pkg/logql/expr.y.go | 297 +++++++++++++++++---------------- pkg/logql/functions.go | 63 +++++++ pkg/logql/lex.go | 56 ++++--- pkg/logql/parser_test.go | 118 ++++++++++--- pkg/logql/range_vector.go | 28 ++-- pkg/logql/range_vector_test.go | 11 +- pkg/logql/series_extractor.go | 95 +++++++++++ pkg/logql/shardmapper.go | 6 +- pkg/logql/shardmapper_test.go | 58 +++---- 13 files changed, 534 insertions(+), 271 deletions(-) create mode 100644 pkg/logql/functions.go create mode 100644 pkg/logql/series_extractor.go diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 4ad4444ac8c6..4babe9052da4 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -198,8 +198,10 @@ const ( OpTypeTopK = "topk" // range vector ops - OpTypeCountOverTime = "count_over_time" - OpTypeRate = "rate" + OpRangeTypeCount = "count_over_time" + OpRangeTypeRate = "rate" + OpRangeTypeBytes = "bytes_over_time" + OpRangeTypeBytesRate = "bytes_rate" // binops - logical/set OpTypeOr = "or" diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 9366e1575218..6d396e8e7c31 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -344,15 +344,3 @@ type groupedAggregation struct { heap vectorByValueHeap reverseHeap vectorByReverseValueHeap } - -// rate calculate the per-second rate of log lines. -func rate(selRange time.Duration) func(ts int64, samples []promql.Point) float64 { - return func(ts int64, samples []promql.Point) float64 { - return float64(len(samples)) / selRange.Seconds() - } -} - -// count counts the amount of log lines. -func count(ts int64, samples []promql.Point) float64 { - return float64(len(samples)) -} diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index e0f609efae93..b2a5268984bd 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -336,28 +336,42 @@ func rangeAggEvaluator( expr *rangeAggregationExpr, q Params, ) (StepEvaluator, error) { - vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(), - q.Start().UnixNano(), q.End().UnixNano()) - - var fn RangeVectorAggregator - switch expr.operation { - case OpTypeRate: - fn = rate(expr.left.interval) - case OpTypeCountOverTime: - fn = count + + agg, err := expr.aggregator() + if err != nil { + return nil, err } + extractor, err := expr.extractor() + if err != nil { + return nil, err + } + return rangeVectorEvaluator{ + iter: newRangeVectorIterator( + newSeriesIterator(entryIter, extractor), + expr.left.interval.Nanoseconds(), + q.Step().Nanoseconds(), + q.Start().UnixNano(), q.End().UnixNano(), + ), + agg: agg, + }, nil +} - return newStepEvaluator(func() (bool, int64, promql.Vector) { - next := vecIter.Next() - if !next { - return false, 0, promql.Vector{} - } - ts, vec := vecIter.At(fn) - return true, ts, vec +type rangeVectorEvaluator struct { + agg RangeVectorAggregator + iter RangeVectorIterator +} - }, vecIter.Close) +func (r rangeVectorEvaluator) Next() (bool, int64, promql.Vector) { + next := r.iter.Next() + if !next { + return false, 0, promql.Vector{} + } + ts, vec := r.iter.At(r.agg) + return true, ts, vec } +func (r rangeVectorEvaluator) Close() error { return r.iter.Close() } + // binOpExpr explicitly does not handle when both legs are literals as // it makes the type system simpler and these are reduced in mustNewBinOpExpr func binOpStepEvaluator( diff --git a/pkg/logql/expr.y b/pkg/logql/expr.y index 2b844851fa4b..11eb62ac07fa 100644 --- a/pkg/logql/expr.y +++ b/pkg/logql/expr.y @@ -52,6 +52,7 @@ import ( %token DURATION %token MATCHERS LABELS EQ NEQ RE NRE OPEN_BRACE CLOSE_BRACE OPEN_BRACKET CLOSE_BRACKET COMMA DOT PIPE_MATCH PIPE_EXACT OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK + BYTES_OVER_TIME BYTES_RATE // Operators are listed with increasing precedence. %left OR @@ -166,8 +167,10 @@ vectorOp: ; rangeOp: - COUNT_OVER_TIME { $$ = OpTypeCountOverTime } - | RATE { $$ = OpTypeRate } + COUNT_OVER_TIME { $$ = OpRangeTypeCount } + | RATE { $$ = OpRangeTypeRate } + | BYTES_OVER_TIME { $$ = OpRangeTypeBytes } + | BYTES_RATE { $$ = OpRangeTypeBytesRate } ; diff --git a/pkg/logql/expr.y.go b/pkg/logql/expr.y.go index 1ca0540835de..cb854c0b03b6 100644 --- a/pkg/logql/expr.y.go +++ b/pkg/logql/expr.y.go @@ -69,15 +69,17 @@ const STDDEV = 57375 const STDVAR = 57376 const BOTTOMK = 57377 const TOPK = 57378 -const OR = 57379 -const AND = 57380 -const UNLESS = 57381 -const ADD = 57382 -const SUB = 57383 -const MUL = 57384 -const DIV = 57385 -const MOD = 57386 -const POW = 57387 +const BYTES_OVER_TIME = 57379 +const BYTES_RATE = 57380 +const OR = 57381 +const AND = 57382 +const UNLESS = 57383 +const ADD = 57384 +const SUB = 57385 +const MUL = 57386 +const DIV = 57387 +const MOD = 57388 +const POW = 57389 var exprToknames = [...]string{ "$end", @@ -116,6 +118,8 @@ var exprToknames = [...]string{ "STDVAR", "BOTTOMK", "TOPK", + "BYTES_OVER_TIME", + "BYTES_RATE", "OR", "AND", "UNLESS", @@ -132,7 +136,7 @@ const exprEofCode = 1 const exprErrCode = 2 const exprInitialStackSize = 16 -//line pkg/logql/expr.y:183 +//line pkg/logql/expr.y:186 //line yacctab:1 var exprExca = [...]int{ @@ -142,8 +146,6 @@ var exprExca = [...]int{ -1, 3, 1, 2, 23, 2, - 37, 2, - 38, 2, 39, 2, 40, 2, 41, 2, @@ -151,10 +153,10 @@ var exprExca = [...]int{ 43, 2, 44, 2, 45, 2, + 46, 2, + 47, 2, -2, 0, - -1, 44, - 37, 2, - 38, 2, + -1, 46, 39, 2, 40, 2, 41, 2, @@ -162,55 +164,58 @@ var exprExca = [...]int{ 43, 2, 44, 2, 45, 2, + 46, 2, + 47, 2, -2, 0, } const exprPrivate = 57344 -const exprLast = 199 +const exprLast = 202 var exprAct = [...]int{ - 52, 4, 37, 100, 48, 3, 78, 14, 43, 36, - 112, 51, 44, 53, 54, 11, 31, 32, 33, 34, - 35, 36, 108, 6, 53, 54, 97, 17, 18, 19, - 20, 22, 23, 21, 24, 25, 26, 27, 82, 109, - 109, 15, 16, 11, 111, 110, 11, 33, 34, 35, - 36, 80, 85, 81, 6, 84, 79, 68, 17, 18, - 19, 20, 22, 23, 21, 24, 25, 26, 27, 83, - 50, 98, 15, 16, 73, 104, 67, 86, 103, 66, - 90, 91, 56, 55, 89, 99, 95, 96, 88, 102, - 29, 30, 31, 32, 33, 34, 35, 36, 106, 91, - 107, 28, 29, 30, 31, 32, 33, 34, 35, 36, - 45, 2, 38, 114, 87, 113, 92, 94, 47, 101, - 49, 42, 49, 41, 10, 42, 9, 41, 13, 8, - 39, 40, 5, 69, 39, 40, 12, 105, 71, 57, - 58, 59, 60, 61, 62, 63, 64, 65, 92, 7, - 46, 70, 38, 1, 72, 0, 0, 42, 0, 41, - 0, 42, 0, 41, 0, 0, 39, 40, 0, 93, - 39, 40, 38, 69, 0, 38, 0, 94, 0, 0, - 0, 42, 0, 41, 42, 0, 41, 0, 0, 0, - 39, 40, 0, 39, 40, 74, 75, 76, 77, + 54, 4, 39, 102, 50, 3, 80, 14, 45, 38, + 114, 53, 46, 55, 56, 11, 33, 34, 35, 36, + 37, 38, 110, 6, 55, 56, 99, 17, 18, 21, + 22, 24, 25, 23, 26, 27, 28, 29, 19, 20, + 84, 111, 111, 15, 16, 11, 113, 112, 11, 35, + 36, 37, 38, 82, 87, 83, 6, 86, 81, 70, + 17, 18, 21, 22, 24, 25, 23, 26, 27, 28, + 29, 19, 20, 85, 100, 52, 15, 16, 75, 88, + 58, 106, 57, 93, 105, 47, 2, 101, 97, 98, + 92, 104, 31, 32, 33, 34, 35, 36, 37, 38, + 108, 93, 109, 30, 31, 32, 33, 34, 35, 36, + 37, 38, 91, 90, 89, 116, 59, 60, 61, 62, + 63, 64, 65, 66, 67, 40, 115, 10, 69, 94, + 96, 68, 103, 49, 44, 51, 43, 51, 44, 9, + 43, 73, 94, 41, 42, 13, 71, 41, 42, 8, + 107, 44, 5, 43, 72, 40, 12, 74, 7, 48, + 41, 42, 1, 95, 44, 0, 43, 0, 0, 0, + 40, 0, 0, 41, 42, 96, 71, 0, 0, 44, + 0, 43, 40, 76, 77, 78, 79, 0, 41, 42, + 0, 44, 0, 43, 0, 0, 0, 0, 0, 0, + 41, 42, } var exprPact = [...]int{ - 1, -1000, 64, 173, -1000, -1000, 1, -1000, -1000, -1000, - -1000, 116, 48, -11, -1000, 77, 76, -1000, -1000, -1000, - -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, 1, 1, - 1, 1, 1, 1, 1, 1, 1, 74, -1000, -1000, - -1000, -1000, -1000, 34, 150, 64, 136, 59, -1000, 185, - 29, 32, 47, 33, 30, -1000, -1000, 52, -24, -24, - 5, 5, -36, -36, -36, -36, -1000, -1000, -1000, -1000, - -1000, -1000, 118, -1000, 109, 83, 79, 75, 146, 170, - 29, 3, 53, 1, 115, 115, -1000, -1000, -1000, -1000, - -1000, 73, -1000, -1000, -1000, 110, 114, 0, 1, -1, - 22, -1000, 21, -1000, -1000, -1000, -1000, -13, -1000, 111, - -1000, -1000, 0, -1000, -1000, + 1, -1000, 64, 180, -1000, -1000, 1, -1000, -1000, -1000, + -1000, 131, 53, -11, -1000, 76, 74, -1000, -1000, -1000, + -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 126, + -1000, -1000, -1000, -1000, -1000, 36, 153, 64, 139, 63, + -1000, 173, 31, 34, 51, 35, 32, -1000, -1000, 52, + -26, -26, 5, 5, -38, -38, -38, -38, -1000, -1000, + -1000, -1000, -1000, -1000, 133, -1000, 109, 108, 107, 85, + 140, 168, 31, 3, 56, 1, 128, 128, -1000, -1000, + -1000, -1000, -1000, 79, -1000, -1000, -1000, 123, 127, 0, + 1, -1, 24, -1000, 23, -1000, -1000, -1000, -1000, -13, + -1000, 122, -1000, -1000, 0, -1000, -1000, } var exprPgo = [...]int{ - 0, 153, 110, 2, 0, 3, 5, 1, 6, 4, - 150, 149, 136, 132, 129, 128, 126, 124, + 0, 162, 85, 2, 0, 3, 5, 1, 6, 4, + 159, 158, 156, 152, 149, 145, 139, 127, } var exprR1 = [...]int{ @@ -220,7 +225,7 @@ var exprR1 = [...]int{ 13, 13, 10, 10, 9, 9, 9, 9, 16, 16, 16, 16, 16, 16, 16, 16, 16, 17, 17, 17, 15, 15, 15, 15, 15, 15, 15, 15, 15, 12, - 12, 5, 5, 4, 4, + 12, 12, 12, 5, 5, 4, 4, } var exprR2 = [...]int{ @@ -230,37 +235,37 @@ var exprR2 = [...]int{ 3, 3, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, - 1, 1, 3, 4, 4, + 1, 1, 1, 1, 3, 4, 4, } var exprChk = [...]int{ -1000, -1, -2, -6, -7, -13, 22, -11, -14, -16, - -17, 14, -12, -15, 6, 40, 41, 26, 27, 28, - 29, 32, 30, 31, 33, 34, 35, 36, 37, 38, - 39, 40, 41, 42, 43, 44, 45, -3, 2, 20, - 21, 13, 11, -7, -6, -2, -10, 2, -9, 4, - 22, 22, -4, 24, 25, 6, 6, -2, -2, -2, - -2, -2, -2, -2, -2, -2, 5, 2, 23, 23, - 15, 2, 18, 15, 10, 11, 12, 13, -8, -6, - 22, -7, 6, 22, 22, 22, -9, 5, 5, 5, - 5, -3, 2, 23, 7, -6, -8, 23, 18, -7, - -5, 4, -5, 5, 2, 23, -4, -7, 23, 18, - 23, 23, 23, 4, -4, + -17, 14, -12, -15, 6, 42, 43, 26, 27, 37, + 38, 28, 29, 32, 30, 31, 33, 34, 35, 36, + 39, 40, 41, 42, 43, 44, 45, 46, 47, -3, + 2, 20, 21, 13, 11, -7, -6, -2, -10, 2, + -9, 4, 22, 22, -4, 24, 25, 6, 6, -2, + -2, -2, -2, -2, -2, -2, -2, -2, 5, 2, + 23, 23, 15, 2, 18, 15, 10, 11, 12, 13, + -8, -6, 22, -7, 6, 22, 22, 22, -9, 5, + 5, 5, 5, -3, 2, 23, 7, -6, -8, 23, + 18, -7, -5, 4, -5, 5, 2, 23, -4, -7, + 23, 18, 23, 23, 23, 4, -4, } var exprDef = [...]int{ 0, -2, 1, -2, 3, 9, 0, 4, 5, 6, - 7, 0, 0, 0, 47, 0, 0, 59, 60, 50, - 51, 52, 53, 54, 55, 56, 57, 58, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 13, 25, - 26, 27, 28, 3, -2, 0, 0, 0, 32, 0, - 0, 0, 0, 0, 0, 48, 49, 38, 39, 40, - 41, 42, 43, 44, 45, 46, 10, 12, 8, 11, - 29, 30, 0, 31, 0, 0, 0, 0, 0, 0, - 0, 3, 47, 0, 0, 0, 33, 34, 35, 36, - 37, 0, 18, 19, 14, 0, 0, 20, 0, 3, - 0, 61, 0, 15, 17, 16, 22, 3, 21, 0, - 63, 64, 23, 62, 24, + 7, 0, 0, 0, 47, 0, 0, 59, 60, 61, + 62, 50, 51, 52, 53, 54, 55, 56, 57, 58, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 13, 25, 26, 27, 28, 3, -2, 0, 0, 0, + 32, 0, 0, 0, 0, 0, 0, 48, 49, 38, + 39, 40, 41, 42, 43, 44, 45, 46, 10, 12, + 8, 11, 29, 30, 0, 31, 0, 0, 0, 0, + 0, 0, 0, 3, 47, 0, 0, 0, 33, 34, + 35, 36, 37, 0, 18, 19, 14, 0, 0, 20, + 0, 3, 0, 63, 0, 15, 17, 16, 22, 3, + 21, 0, 65, 66, 23, 64, 24, } var exprTok1 = [...]int{ @@ -272,7 +277,7 @@ var exprTok2 = [...]int{ 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, - 42, 43, 44, 45, + 42, 43, 44, 45, 46, 47, } var exprTok3 = [...]int{ 0, @@ -617,360 +622,372 @@ exprdefault: case 1: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:65 +//line pkg/logql/expr.y:66 { exprlex.(*lexer).expr = exprDollar[1].Expr } case 2: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:68 +//line pkg/logql/expr.y:69 { exprVAL.Expr = exprDollar[1].LogExpr } case 3: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:69 +//line pkg/logql/expr.y:70 { exprVAL.Expr = exprDollar[1].MetricExpr } case 4: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:73 +//line pkg/logql/expr.y:74 { exprVAL.MetricExpr = exprDollar[1].RangeAggregationExpr } case 5: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:74 +//line pkg/logql/expr.y:75 { exprVAL.MetricExpr = exprDollar[1].VectorAggregationExpr } case 6: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:75 +//line pkg/logql/expr.y:76 { exprVAL.MetricExpr = exprDollar[1].BinOpExpr } case 7: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:76 +//line pkg/logql/expr.y:77 { exprVAL.MetricExpr = exprDollar[1].LiteralExpr } case 8: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:77 +//line pkg/logql/expr.y:78 { exprVAL.MetricExpr = exprDollar[2].MetricExpr } case 9: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:81 +//line pkg/logql/expr.y:82 { exprVAL.LogExpr = newMatcherExpr(exprDollar[1].Selector) } case 10: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:82 +//line pkg/logql/expr.y:83 { exprVAL.LogExpr = NewFilterExpr(exprDollar[1].LogExpr, exprDollar[2].Filter, exprDollar[3].str) } case 11: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:83 +//line pkg/logql/expr.y:84 { exprVAL.LogExpr = exprDollar[2].LogExpr } case 14: exprDollar = exprS[exprpt-2 : exprpt+1] -//line pkg/logql/expr.y:89 +//line pkg/logql/expr.y:90 { exprVAL.LogRangeExpr = newLogRange(exprDollar[1].LogExpr, exprDollar[2].duration) } case 15: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:90 +//line pkg/logql/expr.y:91 { exprVAL.LogRangeExpr = addFilterToLogRangeExpr(exprDollar[1].LogRangeExpr, exprDollar[2].Filter, exprDollar[3].str) } case 16: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:91 +//line pkg/logql/expr.y:92 { exprVAL.LogRangeExpr = exprDollar[2].LogRangeExpr } case 19: exprDollar = exprS[exprpt-4 : exprpt+1] -//line pkg/logql/expr.y:96 +//line pkg/logql/expr.y:97 { exprVAL.RangeAggregationExpr = newRangeAggregationExpr(exprDollar[3].LogRangeExpr, exprDollar[1].RangeOp) } case 20: exprDollar = exprS[exprpt-4 : exprpt+1] -//line pkg/logql/expr.y:100 +//line pkg/logql/expr.y:101 { exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[3].MetricExpr, exprDollar[1].VectorOp, nil, nil) } case 21: exprDollar = exprS[exprpt-5 : exprpt+1] -//line pkg/logql/expr.y:101 +//line pkg/logql/expr.y:102 { exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[4].MetricExpr, exprDollar[1].VectorOp, exprDollar[2].Grouping, nil) } case 22: exprDollar = exprS[exprpt-5 : exprpt+1] -//line pkg/logql/expr.y:102 +//line pkg/logql/expr.y:103 { exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[3].MetricExpr, exprDollar[1].VectorOp, exprDollar[5].Grouping, nil) } case 23: exprDollar = exprS[exprpt-6 : exprpt+1] -//line pkg/logql/expr.y:104 +//line pkg/logql/expr.y:105 { exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[5].MetricExpr, exprDollar[1].VectorOp, nil, &exprDollar[3].str) } case 24: exprDollar = exprS[exprpt-7 : exprpt+1] -//line pkg/logql/expr.y:105 +//line pkg/logql/expr.y:106 { exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[5].MetricExpr, exprDollar[1].VectorOp, exprDollar[7].Grouping, &exprDollar[3].str) } case 25: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:109 +//line pkg/logql/expr.y:110 { exprVAL.Filter = labels.MatchRegexp } case 26: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:110 +//line pkg/logql/expr.y:111 { exprVAL.Filter = labels.MatchEqual } case 27: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:111 +//line pkg/logql/expr.y:112 { exprVAL.Filter = labels.MatchNotRegexp } case 28: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:112 +//line pkg/logql/expr.y:113 { exprVAL.Filter = labels.MatchNotEqual } case 29: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:116 +//line pkg/logql/expr.y:117 { exprVAL.Selector = exprDollar[2].Matchers } case 30: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:117 +//line pkg/logql/expr.y:118 { exprVAL.Selector = exprDollar[2].Matchers } case 31: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:118 +//line pkg/logql/expr.y:119 { } case 32: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:122 +//line pkg/logql/expr.y:123 { exprVAL.Matchers = []*labels.Matcher{exprDollar[1].Matcher} } case 33: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:123 +//line pkg/logql/expr.y:124 { exprVAL.Matchers = append(exprDollar[1].Matchers, exprDollar[3].Matcher) } case 34: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:127 +//line pkg/logql/expr.y:128 { exprVAL.Matcher = mustNewMatcher(labels.MatchEqual, exprDollar[1].str, exprDollar[3].str) } case 35: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:128 +//line pkg/logql/expr.y:129 { exprVAL.Matcher = mustNewMatcher(labels.MatchNotEqual, exprDollar[1].str, exprDollar[3].str) } case 36: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:129 +//line pkg/logql/expr.y:130 { exprVAL.Matcher = mustNewMatcher(labels.MatchRegexp, exprDollar[1].str, exprDollar[3].str) } case 37: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:130 +//line pkg/logql/expr.y:131 { exprVAL.Matcher = mustNewMatcher(labels.MatchNotRegexp, exprDollar[1].str, exprDollar[3].str) } case 38: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:139 +//line pkg/logql/expr.y:140 { exprVAL.BinOpExpr = mustNewBinOpExpr("or", exprDollar[1].Expr, exprDollar[3].Expr) } case 39: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:140 +//line pkg/logql/expr.y:141 { exprVAL.BinOpExpr = mustNewBinOpExpr("and", exprDollar[1].Expr, exprDollar[3].Expr) } case 40: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:141 +//line pkg/logql/expr.y:142 { exprVAL.BinOpExpr = mustNewBinOpExpr("unless", exprDollar[1].Expr, exprDollar[3].Expr) } case 41: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:142 +//line pkg/logql/expr.y:143 { exprVAL.BinOpExpr = mustNewBinOpExpr("+", exprDollar[1].Expr, exprDollar[3].Expr) } case 42: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:143 +//line pkg/logql/expr.y:144 { exprVAL.BinOpExpr = mustNewBinOpExpr("-", exprDollar[1].Expr, exprDollar[3].Expr) } case 43: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:144 +//line pkg/logql/expr.y:145 { exprVAL.BinOpExpr = mustNewBinOpExpr("*", exprDollar[1].Expr, exprDollar[3].Expr) } case 44: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:145 +//line pkg/logql/expr.y:146 { exprVAL.BinOpExpr = mustNewBinOpExpr("/", exprDollar[1].Expr, exprDollar[3].Expr) } case 45: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:146 +//line pkg/logql/expr.y:147 { exprVAL.BinOpExpr = mustNewBinOpExpr("%", exprDollar[1].Expr, exprDollar[3].Expr) } case 46: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:147 +//line pkg/logql/expr.y:148 { exprVAL.BinOpExpr = mustNewBinOpExpr("^", exprDollar[1].Expr, exprDollar[3].Expr) } case 47: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:151 +//line pkg/logql/expr.y:152 { exprVAL.LiteralExpr = mustNewLiteralExpr(exprDollar[1].str, false) } case 48: exprDollar = exprS[exprpt-2 : exprpt+1] -//line pkg/logql/expr.y:152 +//line pkg/logql/expr.y:153 { exprVAL.LiteralExpr = mustNewLiteralExpr(exprDollar[2].str, false) } case 49: exprDollar = exprS[exprpt-2 : exprpt+1] -//line pkg/logql/expr.y:153 +//line pkg/logql/expr.y:154 { exprVAL.LiteralExpr = mustNewLiteralExpr(exprDollar[2].str, true) } case 50: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:157 +//line pkg/logql/expr.y:158 { exprVAL.VectorOp = OpTypeSum } case 51: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:158 +//line pkg/logql/expr.y:159 { exprVAL.VectorOp = OpTypeAvg } case 52: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:159 +//line pkg/logql/expr.y:160 { exprVAL.VectorOp = OpTypeCount } case 53: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:160 +//line pkg/logql/expr.y:161 { exprVAL.VectorOp = OpTypeMax } case 54: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:161 +//line pkg/logql/expr.y:162 { exprVAL.VectorOp = OpTypeMin } case 55: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:162 +//line pkg/logql/expr.y:163 { exprVAL.VectorOp = OpTypeStddev } case 56: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:163 +//line pkg/logql/expr.y:164 { exprVAL.VectorOp = OpTypeStdvar } case 57: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:164 +//line pkg/logql/expr.y:165 { exprVAL.VectorOp = OpTypeBottomK } case 58: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:165 +//line pkg/logql/expr.y:166 { exprVAL.VectorOp = OpTypeTopK } case 59: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:169 +//line pkg/logql/expr.y:170 { - exprVAL.RangeOp = OpTypeCountOverTime + exprVAL.RangeOp = OpRangeTypeCount } case 60: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:170 +//line pkg/logql/expr.y:171 { - exprVAL.RangeOp = OpTypeRate + exprVAL.RangeOp = OpRangeTypeRate } case 61: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:175 +//line pkg/logql/expr.y:172 { - exprVAL.Labels = []string{exprDollar[1].str} + exprVAL.RangeOp = OpRangeTypeBytes } case 62: + exprDollar = exprS[exprpt-1 : exprpt+1] +//line pkg/logql/expr.y:173 + { + exprVAL.RangeOp = OpRangeTypeBytesRate + } + case 63: + exprDollar = exprS[exprpt-1 : exprpt+1] +//line pkg/logql/expr.y:178 + { + exprVAL.Labels = []string{exprDollar[1].str} + } + case 64: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:176 +//line pkg/logql/expr.y:179 { exprVAL.Labels = append(exprDollar[1].Labels, exprDollar[3].str) } - case 63: + case 65: exprDollar = exprS[exprpt-4 : exprpt+1] -//line pkg/logql/expr.y:180 +//line pkg/logql/expr.y:183 { exprVAL.Grouping = &grouping{without: false, groups: exprDollar[3].Labels} } - case 64: + case 66: exprDollar = exprS[exprpt-4 : exprpt+1] -//line pkg/logql/expr.y:181 +//line pkg/logql/expr.y:184 { exprVAL.Grouping = &grouping{without: true, groups: exprDollar[3].Labels} } diff --git a/pkg/logql/functions.go b/pkg/logql/functions.go new file mode 100644 index 000000000000..2ab98d768e64 --- /dev/null +++ b/pkg/logql/functions.go @@ -0,0 +1,63 @@ +package logql + +import ( + "fmt" + "time" + + "github.com/prometheus/prometheus/promql" +) + +const unsupportedErr = "unsupported range vector aggregation operation: %s" + +func (r rangeAggregationExpr) extractor() (SampleExtractor, error) { + switch r.operation { + case OpRangeTypeRate, OpRangeTypeCount: + return extractCount, nil + case OpRangeTypeBytes, OpRangeTypeBytesRate: + return extractBytes, nil + default: + return nil, fmt.Errorf(unsupportedErr, r.operation) + } +} + +func (r rangeAggregationExpr) aggregator() (RangeVectorAggregator, error) { + switch r.operation { + case OpRangeTypeRate: + return rateLogs(r.left.interval), nil + case OpRangeTypeCount: + return countOverTime, nil + case OpRangeTypeBytesRate: + return rateLogBytes(r.left.interval), nil + case OpRangeTypeBytes: + return sumOverTime, nil + default: + return nil, fmt.Errorf(unsupportedErr, r.operation) + } +} + +// rateLogs calculates the per-second rate of log lines. +func rateLogs(selRange time.Duration) func(ts int64, samples []promql.Point) float64 { + return func(ts int64, samples []promql.Point) float64 { + return float64(len(samples)) / selRange.Seconds() + } +} + +// rateLogBytes calculates the per-second rate of log bytes. +func rateLogBytes(selRange time.Duration) func(ts int64, samples []promql.Point) float64 { + return func(ts int64, samples []promql.Point) float64 { + return sumOverTime(ts, samples) / selRange.Seconds() + } +} + +// countOverTime counts the amount of log lines. +func countOverTime(ts int64, samples []promql.Point) float64 { + return float64(len(samples)) +} + +func sumOverTime(ts int64, samples []promql.Point) float64 { + var sum float64 + for _, v := range samples { + sum += v.V + } + return sum +} diff --git a/pkg/logql/lex.go b/pkg/logql/lex.go index 7bb17008c5f5..9dc6be50ed20 100644 --- a/pkg/logql/lex.go +++ b/pkg/logql/lex.go @@ -9,33 +9,35 @@ import ( ) var tokens = map[string]int{ - ",": COMMA, - ".": DOT, - "{": OPEN_BRACE, - "}": CLOSE_BRACE, - "=": EQ, - "!=": NEQ, - "=~": RE, - "!~": NRE, - "|=": PIPE_EXACT, - "|~": PIPE_MATCH, - "(": OPEN_PARENTHESIS, - ")": CLOSE_PARENTHESIS, - "by": BY, - "without": WITHOUT, - OpTypeCountOverTime: COUNT_OVER_TIME, - "[": OPEN_BRACKET, - "]": CLOSE_BRACKET, - OpTypeRate: RATE, - OpTypeSum: SUM, - OpTypeAvg: AVG, - OpTypeMax: MAX, - OpTypeMin: MIN, - OpTypeCount: COUNT, - OpTypeStddev: STDDEV, - OpTypeStdvar: STDVAR, - OpTypeBottomK: BOTTOMK, - OpTypeTopK: TOPK, + ",": COMMA, + ".": DOT, + "{": OPEN_BRACE, + "}": CLOSE_BRACE, + "=": EQ, + "!=": NEQ, + "=~": RE, + "!~": NRE, + "|=": PIPE_EXACT, + "|~": PIPE_MATCH, + "(": OPEN_PARENTHESIS, + ")": CLOSE_PARENTHESIS, + "by": BY, + "without": WITHOUT, + "[": OPEN_BRACKET, + "]": CLOSE_BRACKET, + OpRangeTypeRate: RATE, + OpRangeTypeCount: COUNT_OVER_TIME, + OpRangeTypeBytesRate: BYTES_RATE, + OpRangeTypeBytes: BYTES_OVER_TIME, + OpTypeSum: SUM, + OpTypeAvg: AVG, + OpTypeMax: MAX, + OpTypeMin: MIN, + OpTypeCount: COUNT, + OpTypeStddev: STDDEV, + OpTypeStdvar: STDVAR, + OpTypeBottomK: BOTTOMK, + OpTypeTopK: TOPK, // binops OpTypeOr: OR, diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index ff4366a65b44..78542d718d2a 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -107,6 +107,26 @@ func TestParse(t *testing.T) { operation: "count_over_time", }, }, + { + in: `bytes_over_time({ foo !~ "bar" }[12m])`, + exp: &rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 12 * time.Minute, + }, + operation: OpRangeTypeBytes, + }, + }, + { + in: `bytes_rate({ foo !~ "bar" }[12m])`, + exp: &rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 12 * time.Minute, + }, + operation: OpRangeTypeBytesRate, + }, + }, { in: `rate({ foo !~ "bar" }[5h])`, exp: &rangeAggregationExpr{ @@ -355,7 +375,31 @@ func TestParse(t *testing.T) { match: "flap", }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), + }, + { + in: `bytes_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])`, + exp: newRangeAggregationExpr( + &logRange{ + left: &filterExpr{ + left: &filterExpr{ + left: &filterExpr{ + left: &filterExpr{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + ty: labels.MatchEqual, + match: "baz", + }, + ty: labels.MatchRegexp, + match: "blip", + }, + ty: labels.MatchNotEqual, + match: "flip", + }, + ty: labels.MatchNotRegexp, + match: "flap", + }, + interval: 5 * time.Minute, + }, OpRangeTypeBytes), }, { in: `sum(count_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (foo)`, @@ -379,7 +423,37 @@ func TestParse(t *testing.T) { match: "flap", }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil), + }, + { + in: `sum(bytes_rate(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (foo)`, + exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &filterExpr{ + left: &filterExpr{ + left: &filterExpr{ + left: &filterExpr{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + ty: labels.MatchEqual, + match: "baz", + }, + ty: labels.MatchRegexp, + match: "blip", + }, + ty: labels.MatchNotEqual, + match: "flip", + }, + ty: labels.MatchNotRegexp, + match: "flap", + }, + interval: 5 * time.Minute, + }, OpRangeTypeBytesRate), "sum", &grouping{ without: false, @@ -409,7 +483,7 @@ func TestParse(t *testing.T) { match: "flap", }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "topk", &grouping{ without: true, @@ -441,7 +515,7 @@ func TestParse(t *testing.T) { match: "flap", }, interval: 5 * time.Minute, - }, OpTypeRate), + }, OpRangeTypeRate), "sum", &grouping{ without: false, @@ -474,7 +548,7 @@ func TestParse(t *testing.T) { match: "flap", }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), }, { in: `sum(count_over_time({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")) by (foo)`, @@ -498,7 +572,7 @@ func TestParse(t *testing.T) { match: "flap", }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -528,7 +602,7 @@ func TestParse(t *testing.T) { match: "flap", }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "topk", &grouping{ without: true, @@ -560,7 +634,7 @@ func TestParse(t *testing.T) { match: "flap", }, interval: 5 * time.Minute, - }, OpTypeRate), + }, OpRangeTypeRate), "sum", &grouping{ without: false, @@ -632,7 +706,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -648,7 +722,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -665,7 +739,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -693,7 +767,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -709,7 +783,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -726,7 +800,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -753,7 +827,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -771,7 +845,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -787,7 +861,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, @@ -818,7 +892,7 @@ func TestParse(t *testing.T) { ty: labels.MatchEqual, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), newRangeAggregationExpr( &logRange{ left: &matchersExpr{ @@ -827,7 +901,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime)), OpTypeSum, &grouping{groups: []string{"job"}}, nil), + }, OpRangeTypeCount)), OpTypeSum, &grouping{groups: []string{"job"}}, nil), }, { in: `sum by (job) ( @@ -849,7 +923,7 @@ func TestParse(t *testing.T) { ty: labels.MatchEqual, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), newRangeAggregationExpr( &logRange{ left: &matchersExpr{ @@ -858,7 +932,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime)), OpTypeSum, &grouping{groups: []string{"job"}}, nil), + }, OpRangeTypeCount)), OpTypeSum, &grouping{groups: []string{"job"}}, nil), mustNewLiteralExpr("100", false), ), }, @@ -876,7 +950,7 @@ func TestParse(t *testing.T) { }, }, interval: 5 * time.Minute, - }, OpTypeCountOverTime), + }, OpRangeTypeCount), "sum", &grouping{ without: false, diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index 3446c0ba1bca..2931febf2aca 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -5,8 +5,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" - - "github.com/grafana/loki/pkg/iter" ) // RangeVectorAggregator aggregates samples for a given range of samples. @@ -23,21 +21,21 @@ type RangeVectorIterator interface { } type rangeVectorIterator struct { - iter iter.PeekingEntryIterator + iter SeriesIterator selRange, step, end, current int64 window map[string]*promql.Series metrics map[string]labels.Labels } func newRangeVectorIterator( - it iter.EntryIterator, + it SeriesIterator, selRange, step, start, end int64) *rangeVectorIterator { // forces at least one step. if step == 0 { step = 1 } return &rangeVectorIterator{ - iter: iter.NewPeekingIterator(it), + iter: it, step: step, end: end, selRange: selRange, @@ -92,38 +90,38 @@ func (r *rangeVectorIterator) popBack(newStart int64) { // load the next sample range window. func (r *rangeVectorIterator) load(start, end int64) { - for lbs, entry, hasNext := r.iter.Peek(); hasNext; lbs, entry, hasNext = r.iter.Peek() { - if entry.Timestamp.UnixNano() > end { + for sample, hasNext := r.iter.Peek(); hasNext; sample, hasNext = r.iter.Peek() { + if sample.TimestampNano > end { // not consuming the iterator as this belong to another range. return } // the lower bound of the range is not inclusive - if entry.Timestamp.UnixNano() <= start { + if sample.TimestampNano <= start { _ = r.iter.Next() continue } // adds the sample. var series *promql.Series var ok bool - series, ok = r.window[lbs] + series, ok = r.window[sample.Labels] if !ok { var metric labels.Labels - if metric, ok = r.metrics[lbs]; !ok { + if metric, ok = r.metrics[sample.Labels]; !ok { var err error - metric, err = promql.ParseMetric(lbs) + metric, err = promql.ParseMetric(sample.Labels) if err != nil { continue } - r.metrics[lbs] = metric + r.metrics[sample.Labels] = metric } series = getSeries() series.Metric = metric - r.window[lbs] = series + r.window[sample.Labels] = series } p := promql.Point{ - T: entry.Timestamp.UnixNano(), - V: 1, + T: sample.TimestampNano, + V: sample.Value, } series.Points = append(series.Points, p) _ = r.iter.Next() diff --git a/pkg/logql/range_vector_test.go b/pkg/logql/range_vector_test.go index af2446c61c00..731030ed8750 100644 --- a/pkg/logql/range_vector_test.go +++ b/pkg/logql/range_vector_test.go @@ -43,6 +43,13 @@ func newEntryIterator() iter.EntryIterator { }, logproto.FORWARD) } +func newfakeSeriesIterator() SeriesIterator { + return &seriesIterator{ + iter: iter.NewPeekingIterator(newEntryIterator()), + sampler: extractCount, + } +} + func newPoint(t time.Time, v float64) promql.Point { return promql.Point{T: t.UnixNano() / 1e+6, V: v} } @@ -143,12 +150,12 @@ func Test_RangeVectorIterator(t *testing.T) { t.Run( fmt.Sprintf("logs[%s] - step: %s", time.Duration(tt.selRange), time.Duration(tt.step)), func(t *testing.T) { - it := newRangeVectorIterator(newEntryIterator(), tt.selRange, + it := newRangeVectorIterator(newfakeSeriesIterator(), tt.selRange, tt.step, tt.start.UnixNano(), tt.end.UnixNano()) i := 0 for it.Next() { - ts, v := it.At(count) + ts, v := it.At(countOverTime) require.ElementsMatch(t, tt.expectedVectors[i], v) require.Equal(t, tt.expectedTs[i].UnixNano()/1e+6, ts) i++ diff --git a/pkg/logql/series_extractor.go b/pkg/logql/series_extractor.go new file mode 100644 index 000000000000..8c8c626a896f --- /dev/null +++ b/pkg/logql/series_extractor.go @@ -0,0 +1,95 @@ +package logql + +import ( + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" +) + +var ( + extractBytes = bytesSampleExtractor{} + extractCount = countSampleExtractor{} +) + +type SeriesIterator interface { + Close() error + Next() bool + Peek() (Sample, bool) +} + +type Sample struct { + Labels string + Value float64 + TimestampNano int64 +} + +type seriesIterator struct { + iter iter.PeekingEntryIterator + sampler SampleExtractor + + updated bool + cur Sample +} + +func newSeriesIterator(it iter.EntryIterator, sampler SampleExtractor) SeriesIterator { + return &seriesIterator{ + iter: iter.NewPeekingIterator(it), + sampler: sampler, + } +} + +func (e *seriesIterator) Close() error { + return e.iter.Close() +} + +func (e *seriesIterator) Next() bool { + e.updated = false + return e.iter.Next() +} + +func (e *seriesIterator) Peek() (Sample, bool) { + if e.updated { + return e.cur, true + } + + for { + lbs, entry, ok := e.iter.Peek() + if !ok { + return Sample{}, false + } + + // transform + e.cur, ok = e.sampler.From(lbs, entry) + if ok { + break + } + if !e.iter.Next() { + return Sample{}, false + } + } + e.updated = true + return e.cur, true +} + +type SampleExtractor interface { + From(string, logproto.Entry) (Sample, bool) +} + +type countSampleExtractor struct{} + +func (countSampleExtractor) From(lbs string, entry logproto.Entry) (Sample, bool) { + return Sample{ + Labels: lbs, + TimestampNano: entry.Timestamp.UnixNano(), + Value: 1., + }, true +} + +type bytesSampleExtractor struct{} + +func (bytesSampleExtractor) From(lbs string, entry logproto.Entry) (Sample, bool) { + return Sample{ + Labels: lbs, + TimestampNano: entry.Timestamp.UnixNano(), + Value: float64(len(entry.Line)), + }, true +} diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index 79303365a96d..1f93534a21d2 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -171,7 +171,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (Samp func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) SampleExpr { switch expr.operation { - case OpTypeCountOverTime, OpTypeRate: + case OpRangeTypeCount, OpRangeTypeRate: // count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)... // rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)... return m.mapSampleExpr(expr) @@ -215,8 +215,8 @@ var shardableOps = map[string]bool{ OpTypeCount: true, // range vector ops - OpTypeCountOverTime: true, - OpTypeRate: true, + OpRangeTypeCount: true, + OpRangeTypeRate: true, // binops - arith OpTypeAdd: true, diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index f02107b7adfe..b7c6f0a66330 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -61,7 +61,7 @@ func TestMapSampleExpr(t *testing.T) { }{ { in: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -78,7 +78,7 @@ func TestMapSampleExpr(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -96,7 +96,7 @@ func TestMapSampleExpr(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -250,7 +250,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -268,7 +268,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -292,7 +292,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeCountOverTime, + operation: OpRangeTypeCount, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -310,7 +310,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeCountOverTime, + operation: OpRangeTypeCount, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -340,7 +340,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeSum, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -362,7 +362,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeSum, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -392,7 +392,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -410,7 +410,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -441,7 +441,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -459,7 +459,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -490,7 +490,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeCount, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -512,7 +512,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeCount, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -546,7 +546,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeSum, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -568,7 +568,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeSum, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -597,7 +597,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeCount, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -619,7 +619,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeCount, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -659,7 +659,7 @@ func TestMapping(t *testing.T) { }, operation: OpTypeSum, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -683,7 +683,7 @@ func TestMapping(t *testing.T) { }, operation: OpTypeSum, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -717,7 +717,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -735,7 +735,7 @@ func TestMapping(t *testing.T) { Of: 2, }, SampleExpr: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -771,7 +771,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeCount, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -793,7 +793,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeCount, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -835,7 +835,7 @@ func TestMapping(t *testing.T) { }, operation: OpTypeSum, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -859,7 +859,7 @@ func TestMapping(t *testing.T) { }, operation: OpTypeSum, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -889,7 +889,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeCount, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ @@ -911,7 +911,7 @@ func TestMapping(t *testing.T) { grouping: &grouping{}, operation: OpTypeCount, left: &rangeAggregationExpr{ - operation: OpTypeRate, + operation: OpRangeTypeRate, left: &logRange{ left: &matchersExpr{ matchers: []*labels.Matcher{ From 15c3eee0ea1cdcf60d746e3fdac8c1fc626c921b Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 29 May 2020 16:14:30 -0400 Subject: [PATCH 2/8] Add more tests. Signed-off-by: Cyril Tovena --- pkg/logql/engine_test.go | 48 ++++++++++ pkg/logql/series_extractor_test.go | 145 +++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 pkg/logql/series_extractor_test.go diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index da990a0e916c..ac528d66fbce 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -1158,6 +1158,54 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, }, + { + `bytes_rate({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10, + [][]logproto.Stream{ + {logproto.Stream{ + Labels: `{app="foo"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(45, 0), Line: "0123456789"}, // 10 bytes / 30s for the first point. + {Timestamp: time.Unix(60, 0), Line: ""}, + {Timestamp: time.Unix(75, 0), Line: ""}, + {Timestamp: time.Unix(90, 0), Line: ""}, + {Timestamp: time.Unix(105, 0), Line: ""}, + }, + }}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60 * 1000, V: 10. / 30.}, {T: 75 * 1000, V: 0}, {T: 90 * 1000, V: 0}, {T: 105 * 1000, V: 0}, {T: 120 * 1000, V: 0}}, + }, + }, + }, + { + `bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10, + [][]logproto.Stream{ + {logproto.Stream{ + Labels: `{app="foo"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(45, 0), Line: "01234"}, // 5 bytes + {Timestamp: time.Unix(60, 0), Line: ""}, + {Timestamp: time.Unix(75, 0), Line: ""}, + {Timestamp: time.Unix(90, 0), Line: ""}, + {Timestamp: time.Unix(105, 0), Line: "0123"}, // 4 bytes + }, + }}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60 * 1000, V: 5.}, {T: 75 * 1000, V: 0}, {T: 90 * 1000, V: 0}, {T: 105 * 1000, V: 4.}, {T: 120 * 1000, V: 4.}}, + }, + }, + }, } { test := test t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { diff --git a/pkg/logql/series_extractor_test.go b/pkg/logql/series_extractor_test.go new file mode 100644 index 000000000000..3155f06bb1c2 --- /dev/null +++ b/pkg/logql/series_extractor_test.go @@ -0,0 +1,145 @@ +package logql + +import ( + "context" + "testing" + "time" + + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" +) + +func Test_seriesIterator_Peek(t *testing.T) { + type expectation struct { + ok bool + sample Sample + } + for _, test := range []struct { + name string + it SeriesIterator + expectations []expectation + }{ + { + "count", + newSeriesIterator(iter.NewStreamIterator(newStream(5, identity, `{app="foo"}`)), extractCount), + []expectation{ + {true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 1}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 1}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 1}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(3, 0).UnixNano(), Value: 1}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(4, 0).UnixNano(), Value: 1}}, + {false, Sample{}}, + }, + }, + { + "bytes empty", + newSeriesIterator( + iter.NewStreamIterator(newStream(3, + func(i int64) logproto.Entry { + return logproto.Entry{ + Timestamp: time.Unix(i, 0), + } + }, + `{app="foo"}`)), + extractBytes, + ), + []expectation{ + {true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 0}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 0}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 0}}, + {false, Sample{}}, + }, + }, + { + "bytes", + newSeriesIterator( + iter.NewStreamIterator(newStream(3, + func(i int64) logproto.Entry { + return logproto.Entry{ + Timestamp: time.Unix(i, 0), + Line: "foo", + } + }, + `{app="foo"}`)), + extractBytes, + ), + []expectation{ + {true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}}, + {false, Sample{}}, + }, + }, + { + "bytes backward", + newSeriesIterator( + iter.NewStreamsIterator(context.Background(), + []logproto.Stream{ + newStream(3, + func(i int64) logproto.Entry { + return logproto.Entry{ + Timestamp: time.Unix(i, 0), + Line: "foo", + } + }, + `{app="foo"}`), newStream(3, + func(i int64) logproto.Entry { + return logproto.Entry{ + Timestamp: time.Unix(i, 0), + Line: "barr", + } + }, + `{app="barr"}`), + }, + logproto.BACKWARD, + ), + extractBytes, + ), + []expectation{ + {true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}}, + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}}, + {true, Sample{Labels: `{app="barr"}`, TimestampNano: 0, Value: 4}}, + {true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 4}}, + {true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 4}}, + {false, Sample{}}, + }, + }, + { + "skip first", + newSeriesIterator(iter.NewStreamIterator(newStream(2, identity, `{app="foo"}`)), fakeSampler{}), + []expectation{ + {true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 10}}, + {false, Sample{}}, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + for _, e := range test.expectations { + sample, ok := test.it.Peek() + require.Equal(t, e.ok, ok) + if !e.ok { + continue + } + require.Equal(t, e.sample, sample) + test.it.Next() + } + require.NoError(t, test.it.Close()) + }) + } +} + +// fakeSampler is a Sampler that returns no value for 0 timestamp otherwise always 10 +type fakeSampler struct{} + +func (fakeSampler) From(lbs string, entry logproto.Entry) (Sample, bool) { + if entry.Timestamp.UnixNano() == 0 { + return Sample{}, false + } + return Sample{ + Labels: lbs, + TimestampNano: entry.Timestamp.UnixNano(), + Value: 10, + }, true +} From b352a620d3754ff0e0d4ea86bfe2d8b94bae6ae4 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 29 May 2020 16:17:07 -0400 Subject: [PATCH 3/8] lint. Signed-off-by: Cyril Tovena --- pkg/logql/series_extractor_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/logql/series_extractor_test.go b/pkg/logql/series_extractor_test.go index 3155f06bb1c2..d1d41f10b622 100644 --- a/pkg/logql/series_extractor_test.go +++ b/pkg/logql/series_extractor_test.go @@ -5,9 +5,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/stretchr/testify/require" ) func Test_seriesIterator_Peek(t *testing.T) { From e1153d9c61b7074a5b68b38cf482a9e65bccaa33 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 29 May 2020 16:24:05 -0400 Subject: [PATCH 4/8] More lint fixes. Signed-off-by: Cyril Tovena --- pkg/logql/range_vector_test.go | 2 +- pkg/logql/test_utils.go | 2 +- pkg/promtail/client/config_test.go | 4 ++-- pkg/querier/queryrange/downstreamer_test.go | 5 +++-- pkg/querier/queryrange/querysharding_test.go | 3 ++- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/logql/range_vector_test.go b/pkg/logql/range_vector_test.go index 731030ed8750..5979cca8d7a7 100644 --- a/pkg/logql/range_vector_test.go +++ b/pkg/logql/range_vector_test.go @@ -129,7 +129,7 @@ func Test_RangeVectorIterator(t *testing.T) { time.Unix(10, 0), time.Unix(100, 0), }, { - (50 * time.Second).Nanoseconds(), // all step are overlaping + (50 * time.Second).Nanoseconds(), // all step are overlapping (10 * time.Second).Nanoseconds(), []promql.Vector{ []promql.Sample{ diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index f4e49e1c4c2c..910368ab5bdc 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -103,7 +103,7 @@ type MockDownstreamer struct { func (m MockDownstreamer) Downstreamer() Downstreamer { return m } -func (d MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery) ([]Result, error) { +func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery) ([]Result, error) { results := make([]Result, 0, len(queries)) for _, query := range queries { params := NewLiteralParams( diff --git a/pkg/promtail/client/config_test.go b/pkg/promtail/client/config_test.go index 74d8c04c2814..fdce18a4c744 100644 --- a/pkg/promtail/client/config_test.go +++ b/pkg/promtail/client/config_test.go @@ -41,7 +41,7 @@ func Test_Config(t *testing.T) { clientDefaultConfig, Config{ URL: flagext.URLValue{ - u, + URL: u, }, BackoffConfig: util.BackoffConfig{ MaxBackoff: 5 * time.Minute, @@ -75,7 +75,7 @@ func Test_Config(t *testing.T) { require.NoError(t, err) if !reflect.DeepEqual(tc.expectedConfig, clientConfig) { - t.Errorf("Configs does not match, expected: %v, recieved: %v", tc.expectedConfig, clientConfig) + t.Errorf("Configs does not match, expected: %v, received: %v", tc.expectedConfig, clientConfig) } } } diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index dbdbe47a1e82..4cd5173cbd88 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/grafana/loki/pkg/logproto" @@ -14,7 +16,6 @@ import ( "github.com/grafana/loki/pkg/logql/stats" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" - "github.com/stretchr/testify/require" ) func testSampleStreams() []queryrange.SampleStream { @@ -193,7 +194,7 @@ func TestDownstreamHandler(t *testing.T) { ensureParallelism(t, in, in.parallelism) } -// Consumes the locks in an instance, making sure they're all available. Does not replace them and thus instance is unusuable after. This is a cleanup test to ensure internal state +// Consumes the locks in an instance, making sure they're all available. Does not replace them and thus instance is unusable after. This is a cleanup test to ensure internal state func ensureParallelism(t *testing.T, in *instance, n int) { for i := 0; i < n; i++ { select { diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 670169f3948a..79aff05b2599 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -10,10 +10,11 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/go-kit/kit/log" + "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/stretchr/testify/require" ) var ( From f1d9f38a10ae3427b042ec90ffa80c13efd724a6 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 29 May 2020 16:26:51 -0400 Subject: [PATCH 5/8] More comments. Signed-off-by: Cyril Tovena --- pkg/logql/series_extractor.go | 6 +++++- pkg/logql/test_utils.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/logql/series_extractor.go b/pkg/logql/series_extractor.go index 8c8c626a896f..b1629515e793 100644 --- a/pkg/logql/series_extractor.go +++ b/pkg/logql/series_extractor.go @@ -10,12 +10,14 @@ var ( extractCount = countSampleExtractor{} ) +// SeriesIterator is an iterator that iterate over a stream of logs and returns sample. type SeriesIterator interface { Close() error Next() bool Peek() (Sample, bool) } +// Sample is a series sample type Sample struct { Labels string Value float64 @@ -70,8 +72,10 @@ func (e *seriesIterator) Peek() (Sample, bool) { return e.cur, true } +// SampleExtractor transforms a log entry into a sample. +// In case of failure the second return value will be false. type SampleExtractor interface { - From(string, logproto.Entry) (Sample, bool) + From(labels string, e logproto.Entry) (Sample, bool) } type countSampleExtractor struct{} diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 910368ab5bdc..0fd62b7992b9 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -116,7 +116,7 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu query.Params.Limit(), query.Shards.Encode(), ) - res, err := d.Query(params).Exec(ctx) + res, err := m.Query(params).Exec(ctx) if err != nil { return nil, err } From db251f3f7ef9537aa1a2c82b0a70e07100043687 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 29 May 2020 16:29:46 -0400 Subject: [PATCH 6/8] Add documentation. Signed-off-by: Cyril Tovena --- docs/logql.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/logql.md b/docs/logql.md index 81796ac41e26..a39ea59dc028 100644 --- a/docs/logql.md +++ b/docs/logql.md @@ -107,9 +107,11 @@ transform it into an instance vector. The currently supported functions for operating over are: -- `rate`: calculate the number of entries per second +- `rate`: calculates the number of entries per second - `count_over_time`: counts the entries for each log stream within the given range. +- `bytes_rate`: calculates the number of bytes per second for each stream. +- `bytes_over_time`: counts the amount of bytes used by each log stream for a given range. > `count_over_time({job="mysql"}[5m])` From fd599cfca68e08325e2854992d3bd8512f3f636c Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 2 Jun 2020 12:37:20 -0400 Subject: [PATCH 7/8] Review feedbacks. Signed-off-by: Cyril Tovena --- pkg/logql/functions.go | 14 ++++----- pkg/logql/range_vector.go | 4 +-- pkg/logql/series_extractor_test.go | 49 +++++++++++++++++++----------- pkg/logql/shardmapper.go | 9 ++++-- 4 files changed, 46 insertions(+), 30 deletions(-) diff --git a/pkg/logql/functions.go b/pkg/logql/functions.go index 2ab98d768e64..e2c3e2bcd131 100644 --- a/pkg/logql/functions.go +++ b/pkg/logql/functions.go @@ -36,25 +36,25 @@ func (r rangeAggregationExpr) aggregator() (RangeVectorAggregator, error) { } // rateLogs calculates the per-second rate of log lines. -func rateLogs(selRange time.Duration) func(ts int64, samples []promql.Point) float64 { - return func(ts int64, samples []promql.Point) float64 { +func rateLogs(selRange time.Duration) func(samples []promql.Point) float64 { + return func(samples []promql.Point) float64 { return float64(len(samples)) / selRange.Seconds() } } // rateLogBytes calculates the per-second rate of log bytes. -func rateLogBytes(selRange time.Duration) func(ts int64, samples []promql.Point) float64 { - return func(ts int64, samples []promql.Point) float64 { - return sumOverTime(ts, samples) / selRange.Seconds() +func rateLogBytes(selRange time.Duration) func(samples []promql.Point) float64 { + return func(samples []promql.Point) float64 { + return sumOverTime(samples) / selRange.Seconds() } } // countOverTime counts the amount of log lines. -func countOverTime(ts int64, samples []promql.Point) float64 { +func countOverTime(samples []promql.Point) float64 { return float64(len(samples)) } -func sumOverTime(ts int64, samples []promql.Point) float64 { +func sumOverTime(samples []promql.Point) float64 { var sum float64 for _, v := range samples { sum += v.V diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index 2931febf2aca..6f8c31ff9fc1 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -10,7 +10,7 @@ import ( // RangeVectorAggregator aggregates samples for a given range of samples. // It receives the current milliseconds timestamp and the list of point within // the range. -type RangeVectorAggregator func(int64, []promql.Point) float64 +type RangeVectorAggregator func([]promql.Point) float64 // RangeVectorIterator iterates through a range of samples. // To fetch the current vector use `At` with a `RangeVectorAggregator`. @@ -135,7 +135,7 @@ func (r *rangeVectorIterator) At(aggregator RangeVectorAggregator) (int64, promq for _, series := range r.window { result = append(result, promql.Sample{ Point: promql.Point{ - V: aggregator(ts, series.Points), + V: aggregator(series.Points), T: ts, }, Metric: series.Metric, diff --git a/pkg/logql/series_extractor_test.go b/pkg/logql/series_extractor_test.go index d1d41f10b622..ad6a77bcd822 100644 --- a/pkg/logql/series_extractor_test.go +++ b/pkg/logql/series_extractor_test.go @@ -36,13 +36,17 @@ func Test_seriesIterator_Peek(t *testing.T) { { "bytes empty", newSeriesIterator( - iter.NewStreamIterator(newStream(3, - func(i int64) logproto.Entry { - return logproto.Entry{ - Timestamp: time.Unix(i, 0), - } - }, - `{app="foo"}`)), + iter.NewStreamIterator( + newStream( + 3, + func(i int64) logproto.Entry { + return logproto.Entry{ + Timestamp: time.Unix(i, 0), + } + }, + `{app="foo"}`, + ), + ), extractBytes, ), []expectation{ @@ -55,14 +59,18 @@ func Test_seriesIterator_Peek(t *testing.T) { { "bytes", newSeriesIterator( - iter.NewStreamIterator(newStream(3, - func(i int64) logproto.Entry { - return logproto.Entry{ - Timestamp: time.Unix(i, 0), - Line: "foo", - } - }, - `{app="foo"}`)), + iter.NewStreamIterator( + newStream( + 3, + func(i int64) logproto.Entry { + return logproto.Entry{ + Timestamp: time.Unix(i, 0), + Line: "foo", + } + }, + `{app="foo"}`, + ), + ), extractBytes, ), []expectation{ @@ -77,21 +85,26 @@ func Test_seriesIterator_Peek(t *testing.T) { newSeriesIterator( iter.NewStreamsIterator(context.Background(), []logproto.Stream{ - newStream(3, + newStream( + 3, func(i int64) logproto.Entry { return logproto.Entry{ Timestamp: time.Unix(i, 0), Line: "foo", } }, - `{app="foo"}`), newStream(3, + `{app="foo"}`, + ), + newStream( + 3, func(i int64) logproto.Entry { return logproto.Entry{ Timestamp: time.Unix(i, 0), Line: "barr", } }, - `{app="barr"}`), + `{app="barr"}`, + ), }, logproto.BACKWARD, ), diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index caceee86ec7e..e88ad2f8361a 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -279,9 +279,10 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *sh func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr, r *shardRecorder) SampleExpr { switch expr.operation { - case OpRangeTypeCount, OpRangeTypeRate: + case OpRangeTypeCount, OpRangeTypeRate, OpRangeTypeBytesRate, OpRangeTypeBytes: // count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)... // rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)... + // same goes for bytes_rate and bytes_over_time return m.mapSampleExpr(expr, r) default: return expr @@ -323,8 +324,10 @@ var shardableOps = map[string]bool{ OpTypeCount: true, // range vector ops - OpRangeTypeCount: true, - OpRangeTypeRate: true, + OpRangeTypeCount: true, + OpRangeTypeRate: true, + OpRangeTypeBytes: true, + OpRangeTypeBytesRate: true, // binops - arith OpTypeAdd: true, From 36636239e495ba0a7ee7f7cc18da8d450b5a7eb2 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 2 Jun 2020 13:31:33 -0400 Subject: [PATCH 8/8] Fix bad conflic resolution. Signed-off-by: Cyril Tovena --- pkg/logql/range_vector.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index cf7cf418bf9a..73d66aa2bc25 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -6,8 +6,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" - - "github.com/grafana/loki/pkg/iter" ) // RangeVectorAggregator aggregates samples for a given range of samples.