Skip to content

Commit

Permalink
Bytes aggregations (#2150)
Browse files Browse the repository at this point in the history
* Add bytes_rate and bytes_over_time.

Still need to works on test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* More lint fixes.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* More comments.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Review feedbacks.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix bad conflic resolution.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jun 2, 2020
1 parent 635dd0a commit 18f1b06
Show file tree
Hide file tree
Showing 20 changed files with 763 additions and 282 deletions.
4 changes: 3 additions & 1 deletion docs/logql.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ transform it into an instance vector.

The currently supported functions for operating over are:

- `rate`: calculate the number of entries per second
- `rate`: calculates the number of entries per second
- `count_over_time`: counts the entries for each log stream within the given
range.
- `bytes_rate`: calculates the number of bytes per second for each stream.
- `bytes_over_time`: counts the amount of bytes used by each log stream for a given range.

> `count_over_time({job="mysql"}[5m])`
Expand Down
6 changes: 4 additions & 2 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ const (
OpTypeTopK = "topk"

// range vector ops
OpTypeCountOverTime = "count_over_time"
OpTypeRate = "rate"
OpRangeTypeCount = "count_over_time"
OpRangeTypeRate = "rate"
OpRangeTypeBytes = "bytes_over_time"
OpRangeTypeBytesRate = "bytes_rate"

// binops - logical/set
OpTypeOr = "or"
Expand Down
12 changes: 0 additions & 12 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,3 @@ type groupedAggregation struct {
heap vectorByValueHeap
reverseHeap vectorByReverseValueHeap
}

// rate calculate the per-second rate of log lines.
func rate(selRange time.Duration) func(ts int64, samples []promql.Point) float64 {
return func(ts int64, samples []promql.Point) float64 {
return float64(len(samples)) / selRange.Seconds()
}
}

// count counts the amount of log lines.
func count(ts int64, samples []promql.Point) float64 {
return float64(len(samples))
}
48 changes: 48 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,54 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
`bytes_rate({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Stream{
{logproto.Stream{
Labels: `{app="foo"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(45, 0), Line: "0123456789"}, // 10 bytes / 30s for the first point.
{Timestamp: time.Unix(60, 0), Line: ""},
{Timestamp: time.Unix(75, 0), Line: ""},
{Timestamp: time.Unix(90, 0), Line: ""},
{Timestamp: time.Unix(105, 0), Line: ""},
},
}},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 10. / 30.}, {T: 75 * 1000, V: 0}, {T: 90 * 1000, V: 0}, {T: 105 * 1000, V: 0}, {T: 120 * 1000, V: 0}},
},
},
},
{
`bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Stream{
{logproto.Stream{
Labels: `{app="foo"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(45, 0), Line: "01234"}, // 5 bytes
{Timestamp: time.Unix(60, 0), Line: ""},
{Timestamp: time.Unix(75, 0), Line: ""},
{Timestamp: time.Unix(90, 0), Line: ""},
{Timestamp: time.Unix(105, 0), Line: "0123"}, // 4 bytes
},
}},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 5.}, {T: 75 * 1000, V: 0}, {T: 90 * 1000, V: 0}, {T: 105 * 1000, V: 4.}, {T: 120 * 1000, V: 4.}},
},
},
},
} {
test := test
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
Expand Down
48 changes: 31 additions & 17 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,28 +378,42 @@ func rangeAggEvaluator(
expr *rangeAggregationExpr,
q Params,
) (StepEvaluator, error) {
vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano())

var fn RangeVectorAggregator
switch expr.operation {
case OpTypeRate:
fn = rate(expr.left.interval)
case OpTypeCountOverTime:
fn = count

agg, err := expr.aggregator()
if err != nil {
return nil, err
}
extractor, err := expr.extractor()
if err != nil {
return nil, err
}
return rangeVectorEvaluator{
iter: newRangeVectorIterator(
newSeriesIterator(entryIter, extractor),
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
),
agg: agg,
}, nil
}

return newStepEvaluator(func() (bool, int64, promql.Vector) {
next := vecIter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := vecIter.At(fn)
return true, ts, vec
type rangeVectorEvaluator struct {
agg RangeVectorAggregator
iter RangeVectorIterator
}

}, vecIter.Close)
func (r rangeVectorEvaluator) Next() (bool, int64, promql.Vector) {
next := r.iter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := r.iter.At(r.agg)
return true, ts, vec
}

func (r rangeVectorEvaluator) Close() error { return r.iter.Close() }

// binOpExpr explicitly does not handle when both legs are literals as
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func binOpStepEvaluator(
Expand Down
7 changes: 5 additions & 2 deletions pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
%token <duration> DURATION
%token <val> MATCHERS LABELS EQ NEQ RE NRE OPEN_BRACE CLOSE_BRACE OPEN_BRACKET CLOSE_BRACKET COMMA DOT PIPE_MATCH PIPE_EXACT
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -166,8 +167,10 @@ vectorOp:
;

rangeOp:
COUNT_OVER_TIME { $$ = OpTypeCountOverTime }
| RATE { $$ = OpTypeRate }
COUNT_OVER_TIME { $$ = OpRangeTypeCount }
| RATE { $$ = OpRangeTypeRate }
| BYTES_OVER_TIME { $$ = OpRangeTypeBytes }
| BYTES_RATE { $$ = OpRangeTypeBytesRate }
;


Expand Down
Loading

0 comments on commit 18f1b06

Please sign in to comment.