diff --git a/CHANGELOG.md b/CHANGELOG.md index 730bc36191b..d67c9c74212 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,7 @@ * [CHANGE] Distributor: reject incoming requests until the distributor service has started. #9317 * [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173 * [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. diff --git a/pkg/streamingpromql/aggregations/common.go b/pkg/streamingpromql/aggregations/common.go index 5cf8cfb06ca..4f1b4f1a74b 100644 --- a/pkg/streamingpromql/aggregations/common.go +++ b/pkg/streamingpromql/aggregations/common.go @@ -22,10 +22,11 @@ type AggregationGroup interface { type AggregationGroupFactory func() AggregationGroup var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{ - parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} }, - parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) }, - parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) }, - parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} }, + parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} }, + parser.COUNT: func() AggregationGroup { return &CountAggregationGroup{} }, + parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) }, + parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) }, + parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} }, } // Sentinel value used to indicate a sample has seen an invalid combination of histograms and should be ignored. diff --git a/pkg/streamingpromql/aggregations/count.go b/pkg/streamingpromql/aggregations/count.go new file mode 100644 index 00000000000..a69d50cc386 --- /dev/null +++ b/pkg/streamingpromql/aggregations/count.go @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package aggregations + +import ( + "github.com/prometheus/prometheus/promql" + + "github.com/grafana/mimir/pkg/streamingpromql/functions" + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +type CountAggregationGroup struct { + values []float64 +} + +func (g *CountAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error { + if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.values == nil { + var err error + // First series with values for this group, populate it. + g.values, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) + if err != nil { + return err + } + + if err != nil { + return err + } + g.values = g.values[:timeRange.StepCount] + } + + for _, p := range data.Floats { + idx := (p.T - timeRange.StartT) / timeRange.IntervalMs + g.values[idx]++ + } + + for _, p := range data.Histograms { + idx := (p.T - timeRange.StartT) / timeRange.IntervalMs + g.values[idx]++ + } + + types.PutInstantVectorSeriesData(data, memoryConsumptionTracker) + return nil +} + +func (g *CountAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { + floatPointCount := 0 + for _, fv := range g.values { + if fv > 0 { + floatPointCount++ + } + } + var floatPoints []promql.FPoint + var err error + if floatPointCount > 0 { + floatPoints, err = types.FPointSlicePool.Get(floatPointCount, memoryConsumptionTracker) + if err != nil { + return types.InstantVectorSeriesData{}, false, err + } + + for i, fv := range g.values { + if fv > 0 { + t := timeRange.StartT + int64(i)*timeRange.IntervalMs + floatPoints = append(floatPoints, promql.FPoint{T: t, F: fv}) + } + } + } + + types.Float64SlicePool.Put(g.values, memoryConsumptionTracker) + + return types.InstantVectorSeriesData{Floats: floatPoints}, false, nil +} diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 5b431a2792e..039f476e833 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -50,7 +50,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { `count_values("foo", metric{})`: "'count_values' aggregation with parameter", "rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr for range vectors", "quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function", - "count(metric{})": "aggregation operation with 'count'", + "quantile(0.95, metric{})": "'quantile' aggregation with parameter", } for expression, expectedError := range unsupportedExpressions { @@ -1786,7 +1786,7 @@ func TestCompareVariousMixedMetrics(t *testing.T) { for _, labels := range labelCombinations { labelRegex := strings.Join(labels, "|") // Aggregations - for _, aggFunc := range []string{"sum", "avg", "min", "max"} { + for _, aggFunc := range []string{"avg", "count", "min", "max", "sum"} { expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"})`, aggFunc, labelRegex)) expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) diff --git a/pkg/streamingpromql/testdata/ours/aggregators.test b/pkg/streamingpromql/testdata/ours/aggregators.test index cb149d20c16..c94a0cf8ebb 100644 --- a/pkg/streamingpromql/testdata/ours/aggregators.test +++ b/pkg/streamingpromql/testdata/ours/aggregators.test @@ -314,3 +314,13 @@ load 1m eval range from 0m to 4m step 1m avg(series) {} {{schema:4 count:9 sum:13.5 buckets:[2.5 5 1.5]}} {{schema:4 count:12 sum:17 buckets:[3.5 7 1.5]}} {{schema:4 count:14.5 sum:19.5 buckets:[4 9 1.5]}} {{schema:4 count:17.5 sum:23 buckets:[4.5 11.5 1.5]}} {{schema:4 count:20.5 sum:26.5 buckets:[5.5 13.5 1.5]}} + +clear + +# Make sure count doesn't emit a 0 when there are no values +load 1m + series{label="a"} _ 9 + series{label="b"} _ 9 + +eval range from 0m to 1m step 1m count(series) + {} _ 2 diff --git a/pkg/streamingpromql/testdata/upstream/aggregators.test b/pkg/streamingpromql/testdata/upstream/aggregators.test index 701b9a90822..93a4437562a 100644 --- a/pkg/streamingpromql/testdata/upstream/aggregators.test +++ b/pkg/streamingpromql/testdata/upstream/aggregators.test @@ -37,10 +37,9 @@ eval instant at 50m avg by (group) (http_requests{job="api-server"}) {group="production"} 150 # Simple count. -# Unsupported by streaming engine. -# eval instant at 50m count by (group) (http_requests{job="api-server"}) -# {group="canary"} 2 -# {group="production"} 2 +eval instant at 50m count by (group) (http_requests{job="api-server"}) + {group="canary"} 2 + {group="production"} 2 # Simple without. eval instant at 50m sum without (instance) (http_requests{job="api-server"}) @@ -100,10 +99,9 @@ eval instant at 50m SUM(http_requests) BY (job, nonexistent) {job="api-server"} 1000 {job="app-server"} 2600 -# Unsupported by streaming engine. -# eval instant at 50m COUNT(http_requests) BY (job) -# {job="api-server"} 4 -# {job="app-server"} 4 +eval instant at 50m COUNT(http_requests) BY (job) + {job="api-server"} 4 + {job="app-server"} 4 eval instant at 50m SUM(http_requests) BY (job, group) {group="canary", job="api-server"} 700 diff --git a/pkg/streamingpromql/testdata/upstream/native_histograms.test b/pkg/streamingpromql/testdata/upstream/native_histograms.test index 8fdb59b6587..9593e05aeed 100644 --- a/pkg/streamingpromql/testdata/upstream/native_histograms.test +++ b/pkg/streamingpromql/testdata/upstream/native_histograms.test @@ -1125,9 +1125,8 @@ eval_warn instant at 10m sum({idx="0"}) eval instant at 10m sum(histogram_sum{idx="0"} + ignoring(idx) histogram_sum{idx="1"} + ignoring(idx) histogram_sum{idx="2"} + ignoring(idx) histogram_sum{idx="3"}) {} {{schema:0 count:107 sum:4691.2 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}} -# Unsupported by streaming engine. -# eval instant at 10m count(histogram_sum) -# {} 4 +eval instant at 10m count(histogram_sum) + {} 4 eval instant at 10m avg(histogram_sum) {} {{schema:0 count:26.75 sum:1172.8 z_bucket:3.5 z_bucket_w:0.001 buckets:[0.75 2 0.5 1.25 0.75 0.5 0.5] n_buckets:[0.5 1.5 2 1 3.75 2.25 0 0 0 2.5 2.5 1]}} diff --git a/pkg/streamingpromql/testdata/upstream/operators.test b/pkg/streamingpromql/testdata/upstream/operators.test index ba62c3492fd..4d7bc84fa31 100644 --- a/pkg/streamingpromql/testdata/upstream/operators.test +++ b/pkg/streamingpromql/testdata/upstream/operators.test @@ -19,10 +19,9 @@ load 5m vector_matching_b{l="x"} 0+4x25 -# Unsupported by streaming engine. -# eval instant at 50m SUM(http_requests) BY (job) - COUNT(http_requests) BY (job) -# {job="api-server"} 996 -# {job="app-server"} 2596 +eval instant at 50m SUM(http_requests) BY (job) - COUNT(http_requests) BY (job) + {job="api-server"} 996 + {job="app-server"} 2596 eval instant at 50m 2 - SUM(http_requests) BY (job) {job="api-server"} -998 @@ -88,10 +87,9 @@ eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2 {job="api-server"} 1000 {job="app-server"} 2600 -# Unsupported by streaming engine. -# eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job) -# {job="api-server"} 256 -# {job="app-server"} 256 +eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job) + {job="api-server"} 256 + {job="app-server"} 256 eval instant at 50m SUM(http_requests) BY (job) / 0 {job="api-server"} +Inf