diff --git a/processor/labelsprocessor/README.md b/processor/labelsprocessor/README.md new file mode 100644 index 00000000000..022d39311f6 --- /dev/null +++ b/processor/labelsprocessor/README.md @@ -0,0 +1,23 @@ +# Labels Processor + +Supported pipeline types: metrics + +The labels processor can be used to add data point labels to all metrics that pass through it. +If any specified labels already exist in the metric, the value will be updated. + +Please refer to [config.go](./config.go) for the config spec. + +Example: + +```yaml +processors: + labels_processor: + labels: + - key: label1 + value: value1 + - key: label2 + value: value2 +``` + +Refer to [config.yaml](./testdata/config.yaml) for detailed +examples on using the processor. diff --git a/processor/labelsprocessor/config.go b/processor/labelsprocessor/config.go new file mode 100644 index 00000000000..68ed04ad741 --- /dev/null +++ b/processor/labelsprocessor/config.go @@ -0,0 +1,15 @@ +package labelsprocessor + +import "go.opentelemetry.io/collector/config/configmodels" + +// Config defines configuration for Labels processor. +type Config struct { + configmodels.ProcessorSettings `mapstructure:",squash"` + Labels []LabelConfig `mapstructure:"labels"` +} + +// LabelConfig defines configuration for provided labels +type LabelConfig struct { + Key string `mapstructure:"key"` + Value string `mapstructure:"value"` +} diff --git a/processor/labelsprocessor/config_test.go b/processor/labelsprocessor/config_test.go new file mode 100644 index 00000000000..16db26d5e76 --- /dev/null +++ b/processor/labelsprocessor/config_test.go @@ -0,0 +1,50 @@ +package labelsprocessor + +import ( + "os" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtest" +) + +func TestLoadConfig(t *testing.T) { + + factories, err := componenttest.ExampleComponents() + assert.NoError(t, err) + + factories.Processors[typeStr] = NewFactory() + + os.Setenv("VALUE_1", "first_val") + os.Setenv("VALUE_2", "second_val") + + cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + assert.NoError(t, err) + assert.NotNil(t, cfg) + + assert.Equal(t, cfg.Processors["labels_processor"], &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "labels_processor", + NameVal: "labels_processor", + }, + Labels: []LabelConfig{ + {Key: "label1", Value: "value1"}, + {Key: "label2", Value: "value2"}, + }, + }) + + assert.Equal(t, cfg.Processors["labels_processor/env_vars"], &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "labels_processor", + NameVal: "labels_processor/env_vars", + }, + Labels: []LabelConfig{ + {Key: "label1", Value: "first_val"}, + {Key: "label2", Value: "second_val"}, + }, + }) + +} diff --git a/processor/labelsprocessor/factory.go b/processor/labelsprocessor/factory.go new file mode 100644 index 00000000000..dc44f2f6387 --- /dev/null +++ b/processor/labelsprocessor/factory.go @@ -0,0 +1,49 @@ +package labelsprocessor + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +const ( + typeStr = "labels_processor" +) + +var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true} + +// NewFactory returns a new factory for the Labels processor. +func NewFactory() component.ProcessorFactory { + return processorhelper.NewFactory( + typeStr, + createDefaultConfig, + processorhelper.WithMetrics(createMetricsProcessor)) +} + +func createDefaultConfig() configmodels.Processor { + return &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + } +} + +func createMetricsProcessor( + _ context.Context, + _ component.ProcessorCreateParams, + cfg configmodels.Processor, + nextConsumer consumer.MetricsConsumer) (component.MetricsProcessor, error) { + lp, err := newLabelMetricProcessor(cfg.(*Config)) + if err != nil { + return nil, err + } + return processorhelper.NewMetricsProcessor( + cfg, + nextConsumer, + lp, + processorhelper.WithCapabilities(processorCapabilities)) +} diff --git a/processor/labelsprocessor/factory_test.go b/processor/labelsprocessor/factory_test.go new file mode 100644 index 00000000000..43b20c74053 --- /dev/null +++ b/processor/labelsprocessor/factory_test.go @@ -0,0 +1,51 @@ +package labelsprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exportertest" +) + +func TestType(t *testing.T) { + factory := NewFactory() + pType := factory.Type() + assert.Equal(t, pType, configmodels.Type("labels_processor")) +} + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.Equal(t, cfg, &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + NameVal: typeStr, + TypeVal: typeStr, + }, + }) + assert.NoError(t, configcheck.ValidateConfig(cfg)) +} + +func TestCreateProcessor(t *testing.T) { + + factory := NewFactory() + cfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "labels_processor", + NameVal: "labels_processor", + }, + Labels: []LabelConfig{ + {Key: "label1", Value: "value1"}, + }, + } + + mp, mErr := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, exportertest.NewNopMetricsExporter()) + assert.NoError(t, mErr) + assert.NotNil(t, mp) + +} diff --git a/processor/labelsprocessor/labels_processor.go b/processor/labelsprocessor/labels_processor.go new file mode 100644 index 00000000000..aafda4bef58 --- /dev/null +++ b/processor/labelsprocessor/labels_processor.go @@ -0,0 +1,118 @@ +package labelsprocessor + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/consumer/pdata" + v11 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" +) + +type labelMetricProcessor struct { + cfg *Config +} + +func newLabelMetricProcessor(cfg *Config) (*labelMetricProcessor, error) { + err := validateConfig(cfg) + if err != nil { + return nil, err + } + return &labelMetricProcessor{cfg: cfg}, nil +} + +func validateConfig(cfg *Config) error { + for _, elem := range cfg.Labels { + if elem.Key == "" || elem.Value == "" { + return fmt.Errorf("Labels Processor configuration contains an empty key or value") + } + } + + keys := make(map[string]bool) + for _, elem := range cfg.Labels { + _, value := keys[elem.Key] + if value { + return fmt.Errorf("Labels Processor configuration contains duplicate keys") + } + keys[elem.Key] = true + } + + return nil +} + +func (lp *labelMetricProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { + + otlpMetrics := pdata.MetricsToOtlp(md) + + for _, otlpMetric := range otlpMetrics { + for _, instrMetric := range otlpMetric.GetInstrumentationLibraryMetrics() { + for _, metric := range instrMetric.GetMetrics() { + + // Multiple types of Data Points exists, and each of them must be handled differently + if metric.GetIntSum() != nil { + intDataPoints := metric.GetIntSum().GetDataPoints() + handleIntDataPoints(intDataPoints, lp) + } else if metric.GetIntGauge() != nil { + intDataPoints := metric.GetIntGauge().GetDataPoints() + handleIntDataPoints(intDataPoints, lp) + } else if metric.GetDoubleGauge() != nil { + doubleDataPoints := metric.GetDoubleGauge().GetDataPoints() + handleDoubleDataPoints(doubleDataPoints, lp) + } else if metric.GetDoubleSum() != nil { + doubleDataPoints := metric.GetDoubleSum().GetDataPoints() + handleDoubleDataPoints(doubleDataPoints, lp) + } else if metric.GetIntHistogram() != nil { + intHistogramDataPoints := metric.GetIntHistogram().GetDataPoints() + handleIntHistogramDataPoints(intHistogramDataPoints, lp) + } else if metric.GetDoubleHistogram() != nil { + doubleHistogramDataPoints := metric.GetDoubleHistogram().GetDataPoints() + handleDoubleHistogramDataPoints(doubleHistogramDataPoints, lp) + } + + } + } + } + + return md, nil +} + +func handleIntDataPoints(intDataPoints []*v1.IntDataPoint, lp *labelMetricProcessor) { + for _, dataPoint := range intDataPoints { + upsertLabels(&dataPoint.Labels, lp) + } +} + +func handleDoubleDataPoints(doubleDataPoints []*v1.DoubleDataPoint, lp *labelMetricProcessor) { + for _, dataPoint := range doubleDataPoints { + upsertLabels(&dataPoint.Labels, lp) + } +} + +func handleIntHistogramDataPoints(intHistogramDataPoints []*v1.IntHistogramDataPoint, lp *labelMetricProcessor) { + for _, dataPoint := range intHistogramDataPoints { + upsertLabels(&dataPoint.Labels, lp) + } +} + +func handleDoubleHistogramDataPoints(doubleHistogramDataPoints []*v1.DoubleHistogramDataPoint, lp *labelMetricProcessor) { + for _, dataPoint := range doubleHistogramDataPoints { + upsertLabels(&dataPoint.Labels, lp) + } + +} + +func upsertLabels(labels *[]*v11.StringKeyValue, lp *labelMetricProcessor) { + for _, label := range lp.cfg.Labels { + var updated bool = false + for _, elem := range *labels { + if elem.Key == label.Key { + elem.Value = label.Value + updated = true + break + } + } + if !updated { + *labels = append(*labels, &v11.StringKeyValue{Key: label.Key, Value: label.Value}) + } + } +} diff --git a/processor/labelsprocessor/labels_processor_test.go b/processor/labelsprocessor/labels_processor_test.go new file mode 100644 index 00000000000..6a46f00b675 --- /dev/null +++ b/processor/labelsprocessor/labels_processor_test.go @@ -0,0 +1,128 @@ +package labelsprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/pdata" + + otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" +) + +func TestValidateConfig(t *testing.T) { + missingValConfig := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "labels_processor", + NameVal: "labels_processor", + }, + Labels: []LabelConfig{ + {Key: "cluster"}, + {Key: "__replica__", Value: "r1"}, + }, + } + + emptyValConfig := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "labels_processor", + NameVal: "labels_processor", + }, + Labels: []LabelConfig{ + {Key: "cluster", Value: ""}, + {Key: "__replica__", Value: "r1"}, + }, + } + + duplicateKeyConfig := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "labels_processor", + NameVal: "labels_processor", + }, + Labels: []LabelConfig{ + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + {Key: "cluster", Value: "c2"}, + }, + } + + validCfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "labels_processor", + NameVal: "labels_processor", + }, + Labels: []LabelConfig{ + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + } + + assert.Error(t, validateConfig(missingValConfig)) + assert.Error(t, validateConfig(emptyValConfig)) + assert.Error(t, validateConfig(duplicateKeyConfig)) + assert.Nil(t, validateConfig(validCfg)) +} + +func TestAttachLabels(t *testing.T) { + cfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "labels_processor", + NameVal: "labels_processor", + }, + Labels: []LabelConfig{ + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + } + + // Set up tests + tests := []struct { + name string + config *Config + incomingMetric []*otlpmetrics.ResourceMetrics + expectedOutgoingMetric []*otlpmetrics.ResourceMetrics + }{ + { + name: "test_add_labels_on_all_metrics_types", + config: cfg, + incomingMetric: metricAllDataTypes, + expectedOutgoingMetric: expectedMetricAllDataTypes, + }, + { + name: "test_adding_duplicate_labels", + config: cfg, + incomingMetric: metricDuplicateLabels, + expectedOutgoingMetric: expectedMetricDuplicateLabels, + }, + } + + // Execute tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + tmn := &testMetricsConsumer{} + rmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, tmn) + require.NoError(t, err) + assert.Equal(t, true, rmp.GetCapabilities().MutatesConsumedData) + + sourceMetricData := pdata.MetricsFromOtlp(tt.incomingMetric) + wantMetricData := pdata.MetricsFromOtlp(tt.expectedOutgoingMetric) + err = rmp.ConsumeMetrics(context.Background(), sourceMetricData) + require.NoError(t, err) + assert.EqualValues(t, wantMetricData, tmn.md) + }) + } + +} + +type testMetricsConsumer struct { + md pdata.Metrics +} + +func (tmn *testMetricsConsumer) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { + // simply store md in struct so that we can compare the result to the input + tmn.md = md + return nil +} diff --git a/processor/labelsprocessor/test_data_all_metrics_data_types.go b/processor/labelsprocessor/test_data_all_metrics_data_types.go new file mode 100644 index 00000000000..a5a40aea897 --- /dev/null +++ b/processor/labelsprocessor/test_data_all_metrics_data_types.go @@ -0,0 +1,463 @@ +package labelsprocessor + +import ( + v11 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" + v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/resource/v1" +) + +var metricAllDataTypes []*otlpmetrics.ResourceMetrics = []*otlpmetrics.ResourceMetrics{ + { + Resource: &v1.Resource{ + Attributes: []*v11.KeyValue{}, + DroppedAttributesCount: 0, + }, + InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{ + { + InstrumentationLibrary: &v11.InstrumentationLibrary{}, + Metrics: []*otlpmetrics.Metric{ + { + Name: "counter-int", + Description: "counter-int", + Unit: "1", + Data: &otlpmetrics.Metric_IntSum{ + IntSum: &otlpmetrics.IntSum{ + DataPoints: []*otlpmetrics.IntDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 123, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 456, + }, + }, + AggregationTemporality: 2, + IsMonotonic: true, + }, + }, + }, + { + Name: "counter-double", + Description: "counter-double", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleSum{ + DoubleSum: &otlpmetrics.DoubleSum{ + DataPoints: []*otlpmetrics.DoubleDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 1.23, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 4.56, + }, + }, + AggregationTemporality: 2, + IsMonotonic: true, + }, + }, + }, + { + Name: "double-histogram", + Description: "double-histogram", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleHistogram{ + DoubleHistogram: &otlpmetrics.DoubleHistogram{ + DataPoints: []*otlpmetrics.DoubleHistogramDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "label-3", Value: "label-value-3"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + BucketCounts: []uint64{0, 1}, + ExplicitBounds: []float64{1}, + Exemplars: []*otlpmetrics.DoubleExemplar{ + { + FilteredLabels: []*v11.StringKeyValue{ + {Key: "exemplar-attachment", Value: "exemplar-attachment-value"}, + }, + TimeUnixNano: 1581452773000000123, + Value: 15, + SpanId: v11.SpanID{}, + TraceId: v11.TraceID{}, + }, + }, + }, + }, + AggregationTemporality: 2, + }, + }, + }, + { + Name: "int-histogram", + Description: "int-histogram", + Unit: "1", + Data: &otlpmetrics.Metric_IntHistogram{ + IntHistogram: &otlpmetrics.IntHistogram{ + DataPoints: []*otlpmetrics.IntHistogramDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "label-3", Value: "label-value-3"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + BucketCounts: []uint64{0, 1}, + ExplicitBounds: []float64{0}, + Exemplars: []*otlpmetrics.IntExemplar{ + { + FilteredLabels: []*v11.StringKeyValue{ + {Key: "exemplar-attachment", Value: "exemplar-attachment-value"}, + }, + TimeUnixNano: 1581452773000000123, + Value: 15, + SpanId: v11.SpanID{}, + TraceId: v11.TraceID{}, + }, + }, + }, + }, + AggregationTemporality: 2, + }, + }, + }, + { + Name: "int-gauge", + Description: "int-gauge", + Unit: "1", + Data: &otlpmetrics.Metric_IntGauge{ + IntGauge: &otlpmetrics.IntGauge{ + DataPoints: []*otlpmetrics.IntDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 123, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 456, + }, + }, + }, + }, + }, + { + Name: "double-gauge", + Description: "double-gauge", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleGauge{ + DoubleGauge: &otlpmetrics.DoubleGauge{ + DataPoints: []*otlpmetrics.DoubleDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 1.23, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 4.56, + }, + }, + }, + }, + }, + }, + }, + }, + }, +} + +var expectedMetricAllDataTypes []*otlpmetrics.ResourceMetrics = []*otlpmetrics.ResourceMetrics{ + { + Resource: &v1.Resource{ + Attributes: []*v11.KeyValue{}, + DroppedAttributesCount: 0, + }, + InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{ + { + InstrumentationLibrary: &v11.InstrumentationLibrary{}, + Metrics: []*otlpmetrics.Metric{ + { + Name: "counter-int", + Description: "counter-int", + Unit: "1", + Data: &otlpmetrics.Metric_IntSum{ + IntSum: &otlpmetrics.IntSum{ + DataPoints: []*otlpmetrics.IntDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 123, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 456, + }, + }, + AggregationTemporality: 2, + IsMonotonic: true, + }, + }, + }, + { + Name: "counter-double", + Description: "counter-double", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleSum{ + DoubleSum: &otlpmetrics.DoubleSum{ + DataPoints: []*otlpmetrics.DoubleDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 1.23, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 4.56, + }, + }, + AggregationTemporality: 2, + IsMonotonic: true, + }, + }, + }, + { + Name: "double-histogram", + Description: "double-histogram", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleHistogram{ + DoubleHistogram: &otlpmetrics.DoubleHistogram{ + DataPoints: []*otlpmetrics.DoubleHistogramDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "label-3", Value: "label-value-3"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + BucketCounts: []uint64{0, 1}, + ExplicitBounds: []float64{1}, + Exemplars: []*otlpmetrics.DoubleExemplar{ + { + FilteredLabels: []*v11.StringKeyValue{ + {Key: "exemplar-attachment", Value: "exemplar-attachment-value"}, + }, + TimeUnixNano: 1581452773000000123, + Value: 15, + SpanId: v11.SpanID{}, + TraceId: v11.TraceID{}, + }, + }, + }, + }, + AggregationTemporality: 2, + }, + }, + }, + { + Name: "int-histogram", + Description: "int-histogram", + Unit: "1", + Data: &otlpmetrics.Metric_IntHistogram{ + IntHistogram: &otlpmetrics.IntHistogram{ + DataPoints: []*otlpmetrics.IntHistogramDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "label-3", Value: "label-value-3"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + BucketCounts: []uint64{0, 1}, + ExplicitBounds: []float64{0}, + Exemplars: []*otlpmetrics.IntExemplar{ + { + FilteredLabels: []*v11.StringKeyValue{ + {Key: "exemplar-attachment", Value: "exemplar-attachment-value"}, + }, + TimeUnixNano: 1581452773000000123, + Value: 15, + SpanId: v11.SpanID{}, + TraceId: v11.TraceID{}, + }, + }, + }, + }, + AggregationTemporality: 2, + }, + }, + }, + { + Name: "int-gauge", + Description: "int-gauge", + Unit: "1", + Data: &otlpmetrics.Metric_IntGauge{ + IntGauge: &otlpmetrics.IntGauge{ + DataPoints: []*otlpmetrics.IntDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 123, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 456, + }, + }, + }, + }, + }, + { + Name: "double-gauge", + Description: "double-gauge", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleGauge{ + DoubleGauge: &otlpmetrics.DoubleGauge{ + DataPoints: []*otlpmetrics.DoubleDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 1.23, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 4.56, + }, + }, + }, + }, + }, + }, + }, + }, + }, +} diff --git a/processor/labelsprocessor/test_data_metrics_duplicate_labels.go b/processor/labelsprocessor/test_data_metrics_duplicate_labels.go new file mode 100644 index 00000000000..e89324db605 --- /dev/null +++ b/processor/labelsprocessor/test_data_metrics_duplicate_labels.go @@ -0,0 +1,355 @@ +package labelsprocessor + +import ( + v11 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" + v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/resource/v1" +) + +var metricDuplicateLabels []*otlpmetrics.ResourceMetrics = []*otlpmetrics.ResourceMetrics{ + { + Resource: &v1.Resource{ + Attributes: []*v11.KeyValue{}, + DroppedAttributesCount: 0, + }, + InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{ + { + InstrumentationLibrary: &v11.InstrumentationLibrary{}, + Metrics: []*otlpmetrics.Metric{ + { + Name: "counter-int", + Description: "counter-int", + Unit: "1", + Data: &otlpmetrics.Metric_IntSum{ + IntSum: &otlpmetrics.IntSum{ + DataPoints: []*otlpmetrics.IntDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "cluster", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 123, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 456, + }, + }, + AggregationTemporality: 2, + IsMonotonic: true, + }, + }, + }, + { + Name: "counter-double", + Description: "counter-double", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleSum{ + DoubleSum: &otlpmetrics.DoubleSum{ + DataPoints: []*otlpmetrics.DoubleDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "cluster", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 1.23, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 4.56, + }, + }, + AggregationTemporality: 2, + IsMonotonic: true, + }, + }, + }, + { + Name: "double-histogram", + Description: "double-histogram", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleHistogram{ + DoubleHistogram: &otlpmetrics.DoubleHistogram{ + DataPoints: []*otlpmetrics.DoubleHistogramDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "label-3", Value: "label-value-3"}, + {Key: "cluster", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + BucketCounts: []uint64{0, 1}, + ExplicitBounds: []float64{1}, + Exemplars: []*otlpmetrics.DoubleExemplar{ + { + FilteredLabels: []*v11.StringKeyValue{ + {Key: "exemplar-attachment", Value: "exemplar-attachment-value"}, + }, + TimeUnixNano: 1581452773000000123, + Value: 15, + SpanId: v11.SpanID{}, + TraceId: v11.TraceID{}, + }, + }, + }, + }, + AggregationTemporality: 2, + }, + }, + }, + { + Name: "int-histogram", + Description: "int-histogram", + Unit: "1", + Data: &otlpmetrics.Metric_IntHistogram{ + IntHistogram: &otlpmetrics.IntHistogram{ + DataPoints: []*otlpmetrics.IntHistogramDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "label-3", Value: "label-value-3"}, + {Key: "cluster", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "label-value-1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + BucketCounts: []uint64{0, 1}, + ExplicitBounds: []float64{0}, + Exemplars: []*otlpmetrics.IntExemplar{ + { + FilteredLabels: []*v11.StringKeyValue{ + {Key: "exemplar-attachment", Value: "exemplar-attachment-value"}, + }, + TimeUnixNano: 1581452773000000123, + Value: 15, + SpanId: v11.SpanID{}, + TraceId: v11.TraceID{}, + }, + }, + }, + }, + AggregationTemporality: 2, + }, + }, + }, + }, + }, + }, + }, +} + +var expectedMetricDuplicateLabels []*otlpmetrics.ResourceMetrics = []*otlpmetrics.ResourceMetrics{ + { + Resource: &v1.Resource{ + Attributes: []*v11.KeyValue{}, + DroppedAttributesCount: 0, + }, + InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{ + { + InstrumentationLibrary: &v11.InstrumentationLibrary{}, + Metrics: []*otlpmetrics.Metric{ + { + Name: "counter-int", + Description: "counter-int", + Unit: "1", + Data: &otlpmetrics.Metric_IntSum{ + IntSum: &otlpmetrics.IntSum{ + DataPoints: []*otlpmetrics.IntDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 123, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 456, + }, + }, + AggregationTemporality: 2, + IsMonotonic: true, + }, + }, + }, + { + Name: "counter-double", + Description: "counter-double", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleSum{ + DoubleSum: &otlpmetrics.DoubleSum{ + DataPoints: []*otlpmetrics.DoubleDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 1.23, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Value: 4.56, + }, + }, + AggregationTemporality: 2, + IsMonotonic: true, + }, + }, + }, + { + Name: "double-histogram", + Description: "double-histogram", + Unit: "1", + Data: &otlpmetrics.Metric_DoubleHistogram{ + DoubleHistogram: &otlpmetrics.DoubleHistogram{ + DataPoints: []*otlpmetrics.DoubleHistogramDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "label-3", Value: "label-value-3"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + BucketCounts: []uint64{0, 1}, + ExplicitBounds: []float64{1}, + Exemplars: []*otlpmetrics.DoubleExemplar{ + { + FilteredLabels: []*v11.StringKeyValue{ + {Key: "exemplar-attachment", Value: "exemplar-attachment-value"}, + }, + TimeUnixNano: 1581452773000000123, + Value: 15, + SpanId: v11.SpanID{}, + TraceId: v11.TraceID{}, + }, + }, + }, + }, + AggregationTemporality: 2, + }, + }, + }, + { + Name: "int-histogram", + Description: "int-histogram", + Unit: "1", + Data: &otlpmetrics.Metric_IntHistogram{ + IntHistogram: &otlpmetrics.IntHistogram{ + DataPoints: []*otlpmetrics.IntHistogramDataPoint{ + { + Labels: []*v11.StringKeyValue{ + {Key: "label-1", Value: "label-value-1"}, + {Key: "label-3", Value: "label-value-3"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + }, + { + Labels: []*v11.StringKeyValue{ + {Key: "label-2", Value: "label-value-2"}, + {Key: "cluster", Value: "c1"}, + {Key: "__replica__", Value: "r1"}, + }, + StartTimeUnixNano: 1581452772000000321, + TimeUnixNano: 1581452773000000789, + Count: 1, + Sum: 15, + BucketCounts: []uint64{0, 1}, + ExplicitBounds: []float64{0}, + Exemplars: []*otlpmetrics.IntExemplar{ + { + FilteredLabels: []*v11.StringKeyValue{ + {Key: "exemplar-attachment", Value: "exemplar-attachment-value"}, + }, + TimeUnixNano: 1581452773000000123, + Value: 15, + SpanId: v11.SpanID{}, + TraceId: v11.TraceID{}, + }, + }, + }, + }, + AggregationTemporality: 2, + }, + }, + }, + }, + }, + }, + }, +} diff --git a/processor/labelsprocessor/testdata/config.yaml b/processor/labelsprocessor/testdata/config.yaml new file mode 100644 index 00000000000..5e565c2570a --- /dev/null +++ b/processor/labelsprocessor/testdata/config.yaml @@ -0,0 +1,29 @@ +receivers: + examplereceiver: + +processors: + labels_processor: + labels: + - key: label1 + value: value1 + - key: label2 + value: value2 + labels_processor/env_vars: + labels: + - key: label1 + value: $VALUE_1 + - key: label2 + value: $VALUE_2 + + +exporters: + exampleexporter: + +service: + pipelines: + # currently only metrics are supported + metrics: + receivers: [examplereceiver] + processors: [labels_processor] + exporters: [exampleexporter] + diff --git a/service/defaultcomponents/defaults.go b/service/defaultcomponents/defaults.go index 93c509c1ecb..a9a6a1c69fc 100644 --- a/service/defaultcomponents/defaults.go +++ b/service/defaultcomponents/defaults.go @@ -35,6 +35,7 @@ import ( "go.opentelemetry.io/collector/processor/batchprocessor" "go.opentelemetry.io/collector/processor/filterprocessor" "go.opentelemetry.io/collector/processor/groupbytraceprocessor" + "go.opentelemetry.io/collector/processor/labelsprocessor" "go.opentelemetry.io/collector/processor/memorylimiter" "go.opentelemetry.io/collector/processor/queuedprocessor" "go.opentelemetry.io/collector/processor/resourceprocessor" @@ -101,6 +102,7 @@ func Components() ( processors, err := component.MakeProcessorFactoryMap( attributesprocessor.NewFactory(), resourceprocessor.NewFactory(), + labelsprocessor.NewFactory(), queuedprocessor.NewFactory(), batchprocessor.NewFactory(), memorylimiter.NewFactory(), diff --git a/service/defaultcomponents/defaults_test.go b/service/defaultcomponents/defaults_test.go index eae40c28e4a..c080daf437b 100644 --- a/service/defaultcomponents/defaults_test.go +++ b/service/defaultcomponents/defaults_test.go @@ -45,6 +45,7 @@ func TestDefaultComponents(t *testing.T) { expectedProcessors := []configmodels.Type{ "attributes", "resource", + "labels_processor", "queued_retry", "batch", "memory_limiter",