Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rate for unwrapped expressions. #3048

Merged
merged 2 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/sources/logql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,13 @@ The unwrap expression is noted `| unwrap label_identifier` where the label ident
Since label values are string, by default a conversion into a float (64bits) will be attempted, in case of failure the `__error__` label is added to the sample.
Optionally the label identifier can be wrapped by a conversion function `| unwrap <function>(label_identifier)`, which will attempt to convert the label value from a specific format.

We currently support the functions:
We currently support the functions:
- `duration_seconds(label_identifier)` (or its short equivalent `duration`) which will convert the label value in seconds from the [go duration format](https://golang.org/pkg/time/#ParseDuration) (e.g `5m`, `24s30ms`).
- `bytes(label_identifier)` which will convert the label value to raw bytes applying the bytes unit (e.g. `5 MiB`, `3k`, `1G`).

Supported function for operating over unwrapped ranges are:

- `rate(log-range)`: calculates per second rate of all values in the specified interval.
- `sum_over_time(unwrapped-range)`: the sum of all values in the specified interval.
- `avg_over_time(unwrapped-range)`: the average value of all points in the specified interval.
- `max_over_time(unwrapped-range)`: the maximum value of all points in the specified interval.
Expand All @@ -431,7 +432,7 @@ Supported function for operating over unwrapped ranges are:
- `stddev_over_time(unwrapped-range)`: the population standard deviation of the values in the specified interval.
- `quantile_over_time(scalar,unwrapped-range)`: the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.

Except for `sum_over_time`, `min_over_time` and `max_over_time` unwrapped range aggregations support grouping.
Except for `sum_over_time` and `rate` unwrapped range aggregations support grouping.

```logql
<aggr-op>([parameter,] <unwrapped-range>) [without|by (<label list>)]
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (e rangeAggregationExpr) validate() error {
}
if e.left.unwrap != nil {
switch e.operation {
case OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile:
case OpRangeTypeRate, OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile:
return nil
default:
return fmt.Errorf("invalid aggregation %s with unwrap", e.operation)
Expand Down
49 changes: 48 additions & 1 deletion pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.5}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`rate({app="foo"} | unwrap foo [30s])`, time.Unix(60, 0), logproto.FORWARD, 10,
[][]logproto.Series{
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
{newSeries(testSize, offset(46, constantValue(2)), `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 1.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
Expand Down Expand Up @@ -868,6 +879,25 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
`rate(({app=~"foo|bar"} |~".+bar" | unwrap bar)[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
{newSeries(testSize, factor(10, constantValue(2)), `{app="foo"}`), newSeries(testSize, factor(5, constantValue(2)), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"|unwrap bar[1m])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "bar"}},
Points: []promql.Point{{T: 60 * 1000, V: 0.4}, {T: 90 * 1000, V: 0.4}, {T: 120 * 1000, V: 0.4}, {T: 150 * 1000, V: 0.4}, {T: 180 * 1000, V: 0.4}},
},
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 0.2}, {T: 90 * 1000, V: 0.2}, {T: 120 * 1000, V: 0.2}, {T: 150 * 1000, V: 0.2}, {T: 180 * 1000, V: 0.2}},
},
},
},
{
`topk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
Expand Down Expand Up @@ -1965,7 +1995,24 @@ func constant(t int64) generator {
Sample: logproto.Sample{
Timestamp: time.Unix(t, 0).UnixNano(),
Hash: uint64(i),
Value: 1.,
Value: 1.0,
},
}
}
}

// nolint
func constantValue(t int64) generator {
return func(i int64) logData {
return logData{
Entry: logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: fmt.Sprintf("%d", i),
},
Sample: logproto.Sample{
Timestamp: time.Unix(i, 0).UnixNano(),
Hash: uint64(i),
Value: float64(t),
},
}
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/logql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r rangeAggregationExpr) extractor(gr *grouping, all bool) (log.SampleExtra
func (r rangeAggregationExpr) aggregator() (RangeVectorAggregator, error) {
switch r.operation {
case OpRangeTypeRate:
return rateLogs(r.left.interval), nil
return rateLogs(r.left.interval, r.left.unwrap != nil), nil
case OpRangeTypeCount:
return countOverTime, nil
case OpRangeTypeBytesRate:
Expand All @@ -104,9 +104,16 @@ func (r rangeAggregationExpr) aggregator() (RangeVectorAggregator, error) {
}

// rateLogs calculates the per-second rate of log lines.
func rateLogs(selRange time.Duration) func(samples []promql.Point) float64 {
func rateLogs(selRange time.Duration, computeValues bool) func(samples []promql.Point) float64 {
return func(samples []promql.Point) float64 {
return float64(len(samples)) / selRange.Seconds()
if !computeValues {
return float64(len(samples)) / selRange.Seconds()
}
var total float64
for _, p := range samples {
total += p.V
}
return total / selRange.Seconds()
}
}

Expand Down