diff --git a/.chloggen/add_aggregate_on_attributes.yaml b/.chloggen/add_aggregate_on_attributes.yaml new file mode 100644 index 000000000000..5b2ac0083d36 --- /dev/null +++ b/.chloggen/add_aggregate_on_attributes.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Support aggregating metrics based on their attributes." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [16224] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/coreinternal/aggregateutil/type.go b/internal/coreinternal/aggregateutil/type.go index 1d6a34835874..b2fb01271974 100644 --- a/internal/coreinternal/aggregateutil/type.go +++ b/internal/coreinternal/aggregateutil/type.go @@ -5,6 +5,7 @@ package aggregateutil // import "github.com/open-telemetry/opentelemetry-collect import ( "fmt" + "strings" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -32,11 +33,11 @@ const ( Count AggregationType = "count" ) -var AggregationTypes = []AggregationType{Sum, Mean, Min, Max, Count} +var AggregationTypes = []AggregationType{Sum, Mean, Min, Max, Median, Count} func (at AggregationType) IsValid() bool { - for _, AggregationType := range AggregationTypes { - if at == AggregationType { + for _, aggregationType := range AggregationTypes { + if at == aggregationType { return true } } @@ -44,6 +45,14 @@ func (at AggregationType) IsValid() bool { return false } +func GetSupportedAggregationFunctionsList() string { + slice := make([]string, 0, len(AggregationTypes)) + for _, a := range AggregationTypes { + slice = append(slice, string(a)) + } + return strings.Join(slice, ", ") +} + type AggGroups struct { gauge map[string]pmetric.NumberDataPointSlice sum map[string]pmetric.NumberDataPointSlice @@ -51,7 +60,7 @@ type AggGroups struct { expHistogram map[string]pmetric.ExponentialHistogramDataPointSlice } -func ConvertToAggregationType(str string) (AggregationType, error) { +func ConvertToAggregationFunction(str string) (AggregationType, error) { a := AggregationType(str) if a.IsValid() { return a, nil diff --git a/internal/coreinternal/aggregateutil/type_test.go b/internal/coreinternal/aggregateutil/type_test.go index 9ff63c4e7404..7306a7a905fb 100644 --- a/internal/coreinternal/aggregateutil/type_test.go +++ b/internal/coreinternal/aggregateutil/type_test.go @@ -58,9 +58,13 @@ func Test_AggregationType_Convert(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := ConvertToAggregationType(tt.in) + got, err := ConvertToAggregationFunction(tt.in) require.Equal(t, tt.want, got) require.Equal(t, tt.wantErr, err) }) } } + +func Test_GetSupportedAggregationFunctionsList(t *testing.T) { + require.Equal(t, "sum, mean, min, max, median, count", GetSupportedAggregationFunctionsList()) +} diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 93ffe9d778db..8189ef104896 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -219,6 +219,7 @@ In addition to OTTL functions, the processor defines its own functions to help w - [convert_summary_sum_val_to_sum](#convert_summary_sum_val_to_sum) - [copy_metric](#copy_metric) - [scale_metric](#scale_metric) +- [aggregate_on_attributes](#aggregate_on_attributes) ### convert_sum_to_gauge @@ -369,6 +370,51 @@ Examples: - `scale_metric(0.1)`: Scale the metric by a factor of `0.1`. The unit of the metric will not be modified. - `scale_metric(10.0, "kWh")`: Scale the metric by a factor of `10.0` and sets the unit to `kWh`. +### aggregate_on_attributes + +`aggregate_on_attributes(function, Optional[attributes])` + +The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys to aggregate upon. + +`aggregate_on_attributes` function removes all attributes that are present in datapoints except the ones that are specified in the `attributes` parameter. If `attributes` parameter is not set, all attributes are removed from datapoints. Afterwards all datapoints are aggregated depending on the attributes left (none or the ones present in the list). + +The following metric types can be aggregated: + +- sum +- gauge +- histogram +- exponential histogram + +Supported aggregation functions are: + +- sum +- max +- min +- mean +- median +- count + +**NOTE:** Only the `sum` aggregation function is supported for histogram and exponential histogram datatypes. + +Examples: + +- `aggregate_on_attributes("sum", ["attr1", "attr2"]) where name == "system.memory.usage"` +- `aggregate_on_attributes("max") where name == "system.memory.usage"` + +The `aggregate_on_attributes` function can also be used in conjunction with +[keep_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#keep_matching_keys) or +[delete_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#delete_matching_keys). + +For example, to remove attribute keys matching a regex and aggregate the metrics on the remaining attributes, you can perform the following statement sequence: + +```yaml +statements: + - delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage" + - aggregate_on_attributes("sum") where name == "system.memory.usage" +``` + +To aggregate only using a specified set of attributes, you can use `keep_matching_keys`. + ## Examples ### Perform transformation if field does not exist diff --git a/processor/transformprocessor/go.mod b/processor/transformprocessor/go.mod index 81b4516db794..60973d24845e 100644 --- a/processor/transformprocessor/go.mod +++ b/processor/transformprocessor/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.105.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.105.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.105.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.105.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.105.0 @@ -49,7 +50,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.105.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.105.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.19.1 // indirect diff --git a/processor/transformprocessor/internal/metrics/func_aggregate_on_attributes_metrics.go b/processor/transformprocessor/internal/metrics/func_aggregate_on_attributes_metrics.go new file mode 100644 index 000000000000..634706c6b002 --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_aggregate_on_attributes_metrics.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +type aggregateOnAttributesArguments struct { + AggregationFunction string + Attributes ottl.Optional[[]string] +} + +func newAggregateOnAttributesFactory() ottl.Factory[ottlmetric.TransformContext] { + return ottl.NewFactory("aggregate_on_attributes", &aggregateOnAttributesArguments{}, createAggregateOnAttributesFunction) +} + +func createAggregateOnAttributesFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + args, ok := oArgs.(*aggregateOnAttributesArguments) + + if !ok { + return nil, fmt.Errorf("AggregateOnAttributesFactory args must be of type *AggregateOnAttributesArguments") + } + + t, err := aggregateutil.ConvertToAggregationFunction(args.AggregationFunction) + if err != nil { + return nil, fmt.Errorf("invalid aggregation function: '%s', valid options: %s", err.Error(), aggregateutil.GetSupportedAggregationFunctionsList()) + } + + return AggregateOnAttributes(t, args.Attributes) +} + +func AggregateOnAttributes(aggregationFunction aggregateutil.AggregationType, attributes ottl.Optional[[]string]) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + return func(_ context.Context, tCtx ottlmetric.TransformContext) (any, error) { + metric := tCtx.GetMetric() + + if metric.Type() == pmetric.MetricTypeSummary { + return nil, fmt.Errorf("aggregate_on_attributes does not support aggregating Summary metrics") + } + + ag := aggregateutil.AggGroups{} + aggregateutil.FilterAttrs(metric, attributes.Get()) + newMetric := pmetric.NewMetric() + aggregateutil.CopyMetricDetails(metric, newMetric) + aggregateutil.GroupDataPoints(metric, &ag) + aggregateutil.MergeDataPoints(newMetric, aggregationFunction, ag) + newMetric.MoveTo(metric) + + return nil, nil + }, nil +} diff --git a/processor/transformprocessor/internal/metrics/func_aggregate_on_attributes_metrics_test.go b/processor/transformprocessor/internal/metrics/func_aggregate_on_attributes_metrics_test.go new file mode 100644 index 000000000000..eeb553229465 --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_aggregate_on_attributes_metrics_test.go @@ -0,0 +1,456 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +func Test_aggregateOnAttributes(t *testing.T) { + attr := ottl.Optional[[]string]{} + tests := []struct { + name string + input pmetric.Metric + t aggregateutil.AggregationType + attributes ottl.Optional[[]string] + want func(pmetric.MetricSlice) + wantErr error + }{ + { + name: "summary sum - error", + input: getTestSummaryMetric(), + t: aggregateutil.Sum, + attributes: attr, + want: nil, + wantErr: fmt.Errorf("aggregate_on_attributes does not support aggregating Summary metrics"), + }, + { + name: "non-matching attribute", + input: getTestSumMetricMultipleAttributes(), + t: aggregateutil.Sum, + attributes: ottl.NewTestingOptional[[]string]( + []string{"non-existing"}, + ), + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(170) + }, + }, + { + name: "matching attribute", + input: getTestSumMetricMultipleAttributes(), + t: aggregateutil.Sum, + attributes: ottl.NewTestingOptional[[]string]( + []string{"key1"}, + ), + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + input.Attributes().PutStr("key1", "val1") + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(20) + }, + }, + { + name: "duplicate attributes", + input: getTestSumMetricMultipleAttributes(), + t: aggregateutil.Sum, + attributes: ottl.NewTestingOptional[[]string]( + []string{"key1", "key1"}, + ), + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + input.Attributes().PutStr("key1", "val1") + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(20) + }, + }, + { + name: "sum sum", + input: getTestSumMetricMultiple(), + t: aggregateutil.Sum, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + }, + }, + { + name: "sum max", + input: getTestSumMetricMultiple(), + t: aggregateutil.Max, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + }, + }, + { + name: "sum min", + input: getTestSumMetricMultiple(), + t: aggregateutil.Min, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(50) + }, + }, + { + name: "sum mean", + input: getTestSumMetricMultiple(), + t: aggregateutil.Mean, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(75) + }, + }, + { + name: "sum count", + input: getTestSumMetricMultiple(), + t: aggregateutil.Count, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(2) + }, + }, + { + name: "sum median even", + input: getTestSumMetricMultiple(), + t: aggregateutil.Median, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(75) + }, + }, + { + name: "sum median odd", + input: getTestSumMetricMultipleOdd(), + t: aggregateutil.Median, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(50) + }, + }, + { + name: "gauge sum", + input: getTestGaugeMetricMultiple(), + t: aggregateutil.Sum, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(17) + }, + }, + { + name: "gauge min", + input: getTestGaugeMetricMultiple(), + t: aggregateutil.Min, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(5) + }, + }, + { + name: "gauge max", + input: getTestGaugeMetricMultiple(), + t: aggregateutil.Max, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + }, + }, + { + name: "gauge mean", + input: getTestGaugeMetricMultiple(), + t: aggregateutil.Mean, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(8) + }, + }, + { + name: "gauge count", + input: getTestGaugeMetricMultiple(), + t: aggregateutil.Count, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(2) + }, + }, + { + name: "gauge median even", + input: getTestGaugeMetricMultiple(), + t: aggregateutil.Median, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(8) + }, + }, + { + name: "gauge median odd", + input: getTestGaugeMetricMultipleOdd(), + t: aggregateutil.Median, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(5) + }, + }, + { + name: "histogram", + input: getTestHistogramMetricMultiple(), + t: aggregateutil.Sum, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyHistogram() + metricInput.SetName("histogram_metric") + metricInput.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.Histogram().DataPoints().AppendEmpty() + input.SetCount(10) + input.SetSum(25) + + input.BucketCounts().Append(4, 6) + input.ExplicitBounds().Append(1) + }, + }, + { + name: "exponential histogram", + input: getTestExponentialHistogramMetricMultiple(), + t: aggregateutil.Sum, + attributes: attr, + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyExponentialHistogram() + metricInput.SetName("exponential_histogram_metric") + metricInput.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input.SetScale(1) + input.SetCount(10) + input.SetSum(25) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + evaluate, err := AggregateOnAttributes(tt.t, tt.attributes) + require.Nil(t, err) + + _, err = evaluate(nil, ottlmetric.NewTransformContext(tt.input, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics())) + assert.Equal(t, tt.wantErr, err) + + actualMetrics := pmetric.NewMetricSlice() + tt.input.CopyTo(actualMetrics.AppendEmpty()) + + if tt.want != nil { + expected := pmetric.NewMetricSlice() + tt.want(expected) + assert.Equal(t, expected, actualMetrics) + } + }) + } +} + +func getTestSumMetricMultiple() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + + return metricInput +} + +func getTestSumMetricMultipleAttributes() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("key1", "val1") + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("key1", "val1") + + input3 := metricInput.Sum().DataPoints().AppendEmpty() + input3.SetDoubleValue(20) + input3.Attributes().PutStr("key2", "val1") + + return metricInput +} + +func getTestSumMetricMultipleOdd() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + + input3 := metricInput.Sum().DataPoints().AppendEmpty() + input3.SetDoubleValue(30) + + return metricInput +} + +func getTestGaugeMetricMultiple() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + + input2 := metricInput.Gauge().DataPoints().AppendEmpty() + input2.SetIntValue(5) + + return metricInput +} + +func getTestGaugeMetricMultipleOdd() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + + input2 := metricInput.Gauge().DataPoints().AppendEmpty() + input2.SetIntValue(5) + + input3 := metricInput.Gauge().DataPoints().AppendEmpty() + input3.SetIntValue(3) + + return metricInput +} + +func getTestHistogramMetricMultiple() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyHistogram() + metricInput.SetName("histogram_metric") + metricInput.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.Histogram().DataPoints().AppendEmpty() + input.SetCount(5) + input.SetSum(12.34) + + input.BucketCounts().Append(2, 3) + input.ExplicitBounds().Append(1) + + input2 := metricInput.Histogram().DataPoints().AppendEmpty() + input2.SetCount(5) + input2.SetSum(12.66) + + input2.BucketCounts().Append(2, 3) + input2.ExplicitBounds().Append(1) + return metricInput +} + +func getTestExponentialHistogramMetricMultiple() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyExponentialHistogram() + metricInput.SetName("exponential_histogram_metric") + metricInput.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input.SetScale(1) + input.SetCount(5) + input.SetSum(12.34) + + input2 := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input2.SetScale(1) + input2.SetCount(5) + input2.SetSum(12.66) + + return metricInput +} diff --git a/processor/transformprocessor/internal/metrics/functions.go b/processor/transformprocessor/internal/metrics/functions.go index 18c7b460f198..beee15b3f7d7 100644 --- a/processor/transformprocessor/internal/metrics/functions.go +++ b/processor/transformprocessor/internal/metrics/functions.go @@ -50,6 +50,7 @@ func MetricFunctions() map[string]ottl.Factory[ottlmetric.TransformContext] { newExtractCountMetricFactory(), newCopyMetricFactory(), newScaleMetricFactory(), + newAggregateOnAttributesFactory(), ) if useConvertBetweenSumAndGaugeMetricContext.IsEnabled() { diff --git a/processor/transformprocessor/internal/metrics/functions_test.go b/processor/transformprocessor/internal/metrics/functions_test.go index 60ef54812c91..2ea2da3eea89 100644 --- a/processor/transformprocessor/internal/metrics/functions_test.go +++ b/processor/transformprocessor/internal/metrics/functions_test.go @@ -34,6 +34,7 @@ func Test_MetricFunctions(t *testing.T) { expected := ottlfuncs.StandardFuncs[ottlmetric.TransformContext]() expected["convert_sum_to_gauge"] = newConvertSumToGaugeFactory() expected["convert_gauge_to_sum"] = newConvertGaugeToSumFactory() + expected["aggregate_on_attributes"] = newAggregateOnAttributesFactory() expected["extract_sum_metric"] = newExtractSumMetricFactory() expected["extract_count_metric"] = newExtractCountMetricFactory() expected["copy_metric"] = newCopyMetricFactory() diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 69c250aea76a..81140cd6babf 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -215,6 +215,39 @@ func Test_ProcessMetrics_MetricContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).SetDoubleValue(37.0) }, }, + { + statements: []string{`aggregate_on_attributes("sum", ["attr1", "attr2"]) where name == "operationA"`}, + want: func(td pmetric.Metrics) { + m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + + dataPoints := pmetric.NewNumberDataPointSlice() + dataPoint1 := dataPoints.AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(4.7) + dataPoint1.Attributes().PutStr("attr1", "test1") + dataPoint1.Attributes().PutStr("attr2", "test2") + + dataPoints.CopyTo(m.Sum().DataPoints()) + }, + }, + { + statements: []string{`aggregate_on_attributes("min") where name == "operationA"`}, + want: func(td pmetric.Metrics) { + m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + + dataPoints := pmetric.NewNumberDataPointSlice() + dataPoint1 := dataPoints.AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(1.0) + dataPoint1.Attributes().PutStr("attr1", "test1") + dataPoint1.Attributes().PutStr("attr2", "test2") + dataPoint1.Attributes().PutStr("attr3", "test3") + dataPoint1.Attributes().PutStr("flags", "A|B|C") + dataPoint1.Attributes().PutStr("total.string", "123456789") + + dataPoints.CopyTo(m.Sum().DataPoints()) + }, + }, } for _, tt := range tests {