Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Add support for count aggregation #9342

Merged
merged 9 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* [CHANGE] Distributor: reject incoming requests until the distributor service has started. #9317
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
9 changes: 5 additions & 4 deletions pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ type AggregationGroup interface {
type AggregationGroupFactory func() AggregationGroup

var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{
parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} },
parser.COUNT: func() AggregationGroup { return &CountAggregationGroup{} },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
}

// Sentinel value used to indicate a sample has seen an invalid combination of histograms and should be ignored.
Expand Down
82 changes: 82 additions & 0 deletions pkg/streamingpromql/aggregations/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package aggregations

import (
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type CountAggregationGroup struct {
floatValues []float64
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
floatPresent []bool
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
}

func (g *CountAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.floatValues == nil {
var err error
// First series with values for this group, populate it.
g.floatValues, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatValues = g.floatValues[:timeRange.StepCount]
g.floatPresent = g.floatPresent[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be something for a follow up PR, but we have expressions like this in lots of places - what do you think about adding this calculation as a helper method on QueryTimeRange?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will add it later

g.floatValues[idx] += 1
g.floatPresent[idx] = true
}

for _, p := range data.Histograms {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.floatValues[idx] += 1
g.floatPresent[idx] = true
}

types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
return nil
}

func (g *CountAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, p := range g.floatPresent {
if p {
floatPointCount++
}
}
var floatPoints []promql.FPoint
var err error
if floatPointCount > 0 {
floatPoints, err = types.FPointSlicePool.Get(floatPointCount, memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, false, err
}

for i, havePoint := range g.floatPresent {
if havePoint {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar suggestion to above, similarly could be a separate PR: we could add a method to QueryTimeRange to do this calculation.

f := g.floatValues[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
}
}

types.Float64SlicePool.Put(g.floatValues, memoryConsumptionTracker)
types.BoolSlicePool.Put(g.floatPresent, memoryConsumptionTracker)

return types.InstantVectorSeriesData{Floats: floatPoints}, false, nil
}
2 changes: 1 addition & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,7 @@ func TestCompareVariousMixedMetrics(t *testing.T) {
for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
// Aggregations
for _, aggFunc := range []string{"sum", "avg", "min", "max"} {
for _, aggFunc := range []string{"avg", "count", "min", "max", "sum"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
Expand Down
14 changes: 6 additions & 8 deletions pkg/streamingpromql/testdata/upstream/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ eval instant at 50m avg by (group) (http_requests{job="api-server"})
{group="production"} 150

# Simple count.
# Unsupported by streaming engine.
# eval instant at 50m count by (group) (http_requests{job="api-server"})
# {group="canary"} 2
# {group="production"} 2
eval instant at 50m count by (group) (http_requests{job="api-server"})
{group="canary"} 2
{group="production"} 2

# Simple without.
eval instant at 50m sum without (instance) (http_requests{job="api-server"})
Expand Down Expand Up @@ -100,10 +99,9 @@ eval instant at 50m SUM(http_requests) BY (job, nonexistent)
{job="api-server"} 1000
{job="app-server"} 2600

# Unsupported by streaming engine.
# eval instant at 50m COUNT(http_requests) BY (job)
# {job="api-server"} 4
# {job="app-server"} 4
eval instant at 50m COUNT(http_requests) BY (job)
{job="api-server"} 4
{job="app-server"} 4

eval instant at 50m SUM(http_requests) BY (job, group)
{group="canary", job="api-server"} 700
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1125,9 +1125,8 @@ eval_warn instant at 10m sum({idx="0"})
eval instant at 10m sum(histogram_sum{idx="0"} + ignoring(idx) histogram_sum{idx="1"} + ignoring(idx) histogram_sum{idx="2"} + ignoring(idx) histogram_sum{idx="3"})
{} {{schema:0 count:107 sum:4691.2 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}}

# Unsupported by streaming engine.
# eval instant at 10m count(histogram_sum)
# {} 4
eval instant at 10m count(histogram_sum)
{} 4

eval instant at 10m avg(histogram_sum)
{} {{schema:0 count:26.75 sum:1172.8 z_bucket:3.5 z_bucket_w:0.001 buckets:[0.75 2 0.5 1.25 0.75 0.5 0.5] n_buckets:[0.5 1.5 2 1 3.75 2.25 0 0 0 2.5 2.5 1]}}
Expand Down
14 changes: 6 additions & 8 deletions pkg/streamingpromql/testdata/upstream/operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ load 5m
vector_matching_b{l="x"} 0+4x25


# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) - COUNT(http_requests) BY (job)
# {job="api-server"} 996
# {job="app-server"} 2596
eval instant at 50m SUM(http_requests) BY (job) - COUNT(http_requests) BY (job)
{job="api-server"} 996
{job="app-server"} 2596

eval instant at 50m 2 - SUM(http_requests) BY (job)
{job="api-server"} -998
Expand Down Expand Up @@ -88,10 +87,9 @@ eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2
{job="api-server"} 1000
{job="app-server"} 2600

# Unsupported by streaming engine.
# eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job)
# {job="api-server"} 256
# {job="app-server"} 256
eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job)
{job="api-server"} 256
{job="app-server"} 256

eval instant at 50m SUM(http_requests) BY (job) / 0
{job="api-server"} +Inf
Expand Down
Loading