Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add absent_over_time #3053

Merged
merged 3 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/sources/logql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,9 @@ Supported function for operating over unwrapped ranges are:
- `stdvar_over_time(unwrapped-range)`: the population standard variance of the values in the specified interval.
- `stddev_over_time(unwrapped-range)`: the population standard deviation of the values in the specified interval.
- `quantile_over_time(scalar,unwrapped-range)`: the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
- `absent_over_time(unwrapped-range)`: returns an empty vector if the range vector passed to it has any elements and a 1-element vector with the value 1 if the range vector passed to it has no elements. (`absent_over_time` is useful for alerting on when no time series and logs stream exist for label combination for a certain amount of time.)

Except for `sum_over_time` and `rate` unwrapped range aggregations support grouping.
Except for `sum_over_time`,`absent_over_time` and `rate`, unwrapped range aggregations support grouping.

```logql
<aggr-op>([parameter,] <unwrapped-range>) [without|by (<label list>)]
Expand Down
5 changes: 3 additions & 2 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ const (
OpRangeTypeStdvar = "stdvar_over_time"
OpRangeTypeStddev = "stddev_over_time"
OpRangeTypeQuantile = "quantile_over_time"
OpRangeTypeAbsent = "absent_over_time"

// binops - logical/set
OpTypeOr = "or"
Expand Down Expand Up @@ -611,14 +612,14 @@ func (e rangeAggregationExpr) validate() error {
}
if e.left.unwrap != nil {
switch e.operation {
case OpRangeTypeRate, OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile:
case OpRangeTypeRate, OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeAbsent:
return nil
default:
return fmt.Errorf("invalid aggregation %s with unwrap", e.operation)
}
}
switch e.operation {
case OpRangeTypeBytes, OpRangeTypeBytesRate, OpRangeTypeCount, OpRangeTypeRate:
case OpRangeTypeBytes, OpRangeTypeBytesRate, OpRangeTypeCount, OpRangeTypeRate, OpRangeTypeAbsent:
return nil
default:
return fmt.Errorf("invalid aggregation %s without unwrap", e.operation)
Expand Down
2 changes: 2 additions & 0 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_SampleExpr_String(t *testing.T) {
t.Parallel()
for _, tc := range []string{
`rate( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`sum without(a) ( rate ( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum by(a) (rate( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum(count_over_time({job="mysql"}[5m]))`,
Expand Down Expand Up @@ -101,6 +102,7 @@ func Test_SampleExpr_String(t *testing.T) {
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m])
)`,
`sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms | unwrap latency | __error__!~".*" | foo >5[5m])`,
`absent_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms | unwrap latency | __error__!~".*" | foo >5[5m])`,
`sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m]
Expand Down
35 changes: 35 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
promql.Vector{promql.Sample{Point: promql.Point{T: 5 * 60 * 1000, V: 30}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`absent_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
{newSeries(testSize, factor(10, identity), `{app="foo"}`)}, // 10 , 20 , 30 .. 300 = 30 total
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(5*60, 0), Selector: `absent_over_time({app="foo"}|~".+bar"[5m])`}},
},
promql.Vector{},
},
{
`absent_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{},
[]SelectSampleParams{},
promql.Vector{promql.Sample{Point: promql.Point{T: 5 * 60 * 1000, V: 1}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), logproto.FORWARD, 100,
[][]logproto.Series{
Expand Down Expand Up @@ -914,6 +930,22 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
`absent_over_time(({app="foo"} |~".+bar")[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
{newSeries(1, constant(50), `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `absent_over_time({app="foo"}|~".+bar"[1m])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{
{T: 120000, V: 1}, {T: 150000, V: 1}, {T: 180000, V: 1}},
},
},
},
{
`rate(({app=~"foo|bar"} |~".+bar" | unwrap bar)[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
Expand Down Expand Up @@ -1935,6 +1967,9 @@ func (q *querierRecorder) SelectSamples(ctx context.Context, p SelectSampleParam
}
}
recordID := paramsID(p)
if len(q.series) == 0 {
return iter.NoopIterator, nil
}
series, ok := q.series[recordID]
if !ok {
return nil, fmt.Errorf("no series found for id: %s has: %+v", recordID, q.series)
Expand Down
92 changes: 85 additions & 7 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,21 @@ func rangeAggEvaluator(
if err != nil {
return nil, err
}
iter := newRangeVectorIterator(
it,
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
)
if expr.operation == OpRangeTypeAbsent {
return &absentRangeVectorEvaluator{
iter: iter,
lbs: absentLabels(expr),
}, nil
}
return &rangeVectorEvaluator{
iter: newRangeVectorIterator(
it,
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
),
agg: agg,
iter: iter,
agg: agg,
}, nil
}

Expand Down Expand Up @@ -462,6 +469,50 @@ func (r rangeVectorEvaluator) Error() error {
return r.iter.Error()
}

type absentRangeVectorEvaluator struct {
iter RangeVectorIterator
lbs labels.Labels

err error
}

func (r *absentRangeVectorEvaluator) Next() (bool, int64, promql.Vector) {
next := r.iter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := r.iter.At(one)
for _, s := range vec {
// Errors are not allowed in metrics.
if s.Metric.Has(log.ErrorLabel) {
r.err = newPipelineErr(s.Metric)
return false, 0, promql.Vector{}
}
}
if len(vec) > 0 {
return next, ts, promql.Vector{}
}
// values are missing.
return next, ts, promql.Vector{
promql.Sample{
Point: promql.Point{
T: ts,
V: 1.,
},
Metric: r.lbs,
},
}
}

func (r absentRangeVectorEvaluator) Close() error { return r.iter.Close() }

func (r absentRangeVectorEvaluator) Error() error {
if r.err != nil {
return r.err
}
return r.iter.Error()
}

// binOpExpr explicitly does not handle when both legs are literals as
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func binOpStepEvaluator(
Expand Down Expand Up @@ -948,3 +999,30 @@ func labelReplaceEvaluator(
return next, ts, vec
}, nextEvaluator.Close, nextEvaluator.Error)
}

// This is to replace missing timeseries during absent_over_time aggregation.
func absentLabels(expr SampleExpr) labels.Labels {
m := labels.Labels{}

lm := expr.Selector().Matchers()
if len(lm) == 0 {
return m
}

empty := []string{}
for _, ma := range lm {
if ma.Name == labels.MetricName {
continue
}
if ma.Type == labels.MatchEqual && !m.Has(ma.Name) {
m = labels.NewBuilder(m).Set(ma.Name, ma.Value).Labels()
} else {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
empty = append(empty, ma.Name)
}
}

for _, v := range empty {
m = labels.NewBuilder(m).Del(v).Labels()
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
}
return m
}
3 changes: 2 additions & 1 deletion pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
LABEL_REPLACE
ABSENT_OVER_TIME LABEL_REPLACE

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -340,6 +340,7 @@ rangeOp:
| STDVAR_OVER_TIME { $$ = OpRangeTypeStdvar }
| STDDEV_OVER_TIME { $$ = OpRangeTypeStddev }
| QUANTILE_OVER_TIME { $$ = OpRangeTypeQuantile }
| ABSENT_OVER_TIME { $$ = OpRangeTypeAbsent }
;


Expand Down
Loading