Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bytes aggregations #2150

Merged
merged 11 commits into from
Jun 2, 2020
4 changes: 3 additions & 1 deletion docs/logql.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ transform it into an instance vector.

The currently supported functions for operating over are:

- `rate`: calculate the number of entries per second
- `rate`: calculates the number of entries per second
- `count_over_time`: counts the entries for each log stream within the given
range.
- `bytes_rate`: calculates the number of bytes per second for each stream.
- `bytes_over_time`: counts the amount of bytes used by each log stream for a given range.

> `count_over_time({job="mysql"}[5m])`

Expand Down
6 changes: 4 additions & 2 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ const (
OpTypeTopK = "topk"

// range vector ops
OpTypeCountOverTime = "count_over_time"
OpTypeRate = "rate"
OpRangeTypeCount = "count_over_time"
OpRangeTypeRate = "rate"
OpRangeTypeBytes = "bytes_over_time"
OpRangeTypeBytesRate = "bytes_rate"

// binops - logical/set
OpTypeOr = "or"
Expand Down
12 changes: 0 additions & 12 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,3 @@ type groupedAggregation struct {
heap vectorByValueHeap
reverseHeap vectorByReverseValueHeap
}

// rate calculate the per-second rate of log lines.
func rate(selRange time.Duration) func(ts int64, samples []promql.Point) float64 {
return func(ts int64, samples []promql.Point) float64 {
return float64(len(samples)) / selRange.Seconds()
}
}

// count counts the amount of log lines.
func count(ts int64, samples []promql.Point) float64 {
return float64(len(samples))
}
48 changes: 48 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,54 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
`bytes_rate({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Stream{
{logproto.Stream{
Labels: `{app="foo"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(45, 0), Line: "0123456789"}, // 10 bytes / 30s for the first point.
{Timestamp: time.Unix(60, 0), Line: ""},
{Timestamp: time.Unix(75, 0), Line: ""},
{Timestamp: time.Unix(90, 0), Line: ""},
{Timestamp: time.Unix(105, 0), Line: ""},
},
}},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 10. / 30.}, {T: 75 * 1000, V: 0}, {T: 90 * 1000, V: 0}, {T: 105 * 1000, V: 0}, {T: 120 * 1000, V: 0}},
},
},
},
{
`bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Stream{
{logproto.Stream{
Labels: `{app="foo"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(45, 0), Line: "01234"}, // 5 bytes
{Timestamp: time.Unix(60, 0), Line: ""},
{Timestamp: time.Unix(75, 0), Line: ""},
{Timestamp: time.Unix(90, 0), Line: ""},
{Timestamp: time.Unix(105, 0), Line: "0123"}, // 4 bytes
},
}},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 5.}, {T: 75 * 1000, V: 0}, {T: 90 * 1000, V: 0}, {T: 105 * 1000, V: 4.}, {T: 120 * 1000, V: 4.}},
},
},
},
} {
test := test
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
Expand Down
48 changes: 31 additions & 17 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,28 +378,42 @@ func rangeAggEvaluator(
expr *rangeAggregationExpr,
q Params,
) (StepEvaluator, error) {
vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano())

var fn RangeVectorAggregator
switch expr.operation {
case OpTypeRate:
fn = rate(expr.left.interval)
case OpTypeCountOverTime:
fn = count

agg, err := expr.aggregator()
if err != nil {
return nil, err
}
extractor, err := expr.extractor()
if err != nil {
return nil, err
}
return rangeVectorEvaluator{
iter: newRangeVectorIterator(
newSeriesIterator(entryIter, extractor),
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
),
agg: agg,
}, nil
}

return newStepEvaluator(func() (bool, int64, promql.Vector) {
next := vecIter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := vecIter.At(fn)
return true, ts, vec
type rangeVectorEvaluator struct {
agg RangeVectorAggregator
iter RangeVectorIterator
}

}, vecIter.Close)
func (r rangeVectorEvaluator) Next() (bool, int64, promql.Vector) {
next := r.iter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := r.iter.At(r.agg)
return true, ts, vec
}

func (r rangeVectorEvaluator) Close() error { return r.iter.Close() }

// binOpExpr explicitly does not handle when both legs are literals as
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func binOpStepEvaluator(
Expand Down
7 changes: 5 additions & 2 deletions pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
%token <duration> DURATION
%token <val> MATCHERS LABELS EQ NEQ RE NRE OPEN_BRACE CLOSE_BRACE OPEN_BRACKET CLOSE_BRACKET COMMA DOT PIPE_MATCH PIPE_EXACT
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -166,8 +167,10 @@ vectorOp:
;

rangeOp:
COUNT_OVER_TIME { $$ = OpTypeCountOverTime }
| RATE { $$ = OpTypeRate }
COUNT_OVER_TIME { $$ = OpRangeTypeCount }
| RATE { $$ = OpRangeTypeRate }
| BYTES_OVER_TIME { $$ = OpRangeTypeBytes }
| BYTES_RATE { $$ = OpRangeTypeBytesRate }
;


Expand Down
Loading