diff --git a/CHANGELOG.md b/CHANGELOG.md index 250055d83e16..e47633947581 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,8 @@ Main (unreleased) components. (@akselleirv) - `discovery.gce` discovers resources on Google Compute Engine (GCE). (@marctc) - `discovery.digitalocean` provides service discovery for DigitalOcean. (@spartan0x117) + - `otelcol.processor.attributes` accepts telemetry data from other `otelcol` + components and modifies attributes of a span, log, or metric. (@ptodev) - Add support for Flow-specific system packages: diff --git a/component/all/all.go b/component/all/all.go index 58e617f6ec4e..ba82c2ba073b 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -44,6 +44,7 @@ import ( _ "github.com/grafana/agent/component/otelcol/exporter/otlphttp" // Import otelcol.exporter.otlphttp _ "github.com/grafana/agent/component/otelcol/exporter/prometheus" // Import otelcol.exporter.prometheus _ "github.com/grafana/agent/component/otelcol/extension/jaeger_remote_sampling" // Import otelcol.extension.jaeger_remote_sampling + _ "github.com/grafana/agent/component/otelcol/processor/attributes" // Import otelcol.processor.attributes _ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch _ "github.com/grafana/agent/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter _ "github.com/grafana/agent/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling diff --git a/component/otelcol/config_attraction.go b/component/otelcol/config_attraction.go new file mode 100644 index 000000000000..908acb523ca2 --- /dev/null +++ b/component/otelcol/config_attraction.go @@ -0,0 +1,93 @@ +package otelcol + +type AttrActionKeyValueSlice []AttrActionKeyValue + +func (actions AttrActionKeyValueSlice) Convert() []interface{} { + res := make([]interface{}, 0, len(actions)) + + if len(actions) == 0 { + return res + } + + for _, action := range actions { + res = append(res, action.convert()) + } + return res +} + +type AttrActionKeyValue struct { + // Key specifies the attribute to act upon. + // This is a required field. + Key string `river:"key,attr"` + + // Value specifies the value to populate for the key. + // The type of the value is inferred from the configuration. + Value interface{} `river:"value,attr,optional"` + + // A regex pattern must be specified for the action EXTRACT. + // It uses the attribute specified by `key' to extract values from + // The target keys are inferred based on the names of the matcher groups + // provided and the names will be inferred based on the values of the + // matcher group. + // Note: All subexpressions must have a name. + // Note: The value type of the source key must be a string. If it isn't, + // no extraction will occur. + RegexPattern string `river:"pattern,attr,optional"` + + // FromAttribute specifies the attribute to use to populate + // the value. If the attribute doesn't exist, no action is performed. + FromAttribute string `river:"from_attribute,attr,optional"` + + // FromContext specifies the context value to use to populate + // the value. The values would be searched in client.Info.Metadata. + // If the key doesn't exist, no action is performed. + // If the key has multiple values the values will be joined with `;` separator. + FromContext string `river:"from_context,attr,optional"` + + // ConvertedType specifies the target type of an attribute to be converted + // If the key doesn't exist, no action is performed. + // If the value cannot be converted, the original value will be left as-is + ConvertedType string `river:"converted_type,attr,optional"` + + // Action specifies the type of action to perform. + // The set of values are {INSERT, UPDATE, UPSERT, DELETE, HASH}. + // Both lower case and upper case are supported. + // INSERT - Inserts the key/value to attributes when the key does not exist. + // No action is applied to attributes where the key already exists. + // Either Value, FromAttribute or FromContext must be set. + // UPDATE - Updates an existing key with a value. No action is applied + // to attributes where the key does not exist. + // Either Value, FromAttribute or FromContext must be set. + // UPSERT - Performs insert or update action depending on the attributes + // containing the key. The key/value is inserted to attributes + // that did not originally have the key. The key/value is updated + // for attributes where the key already existed. + // Either Value, FromAttribute or FromContext must be set. + // DELETE - Deletes the attribute. If the key doesn't exist, + // no action is performed. + // HASH - Calculates the SHA-1 hash of an existing value and overwrites the + // value with it's SHA-1 hash result. + // EXTRACT - Extracts values using a regular expression rule from the input + // 'key' to target keys specified in the 'rule'. If a target key + // already exists, it will be overridden. + // CONVERT - converts the type of an existing attribute, if convertable + // This is a required field. + Action string `river:"action,attr"` +} + +// Convert converts args into the upstream type. +func (args *AttrActionKeyValue) convert() map[string]interface{} { + if args == nil { + return nil + } + + return map[string]interface{}{ + "key": args.Key, + "action": args.Action, + "value": args.Value, + "pattern": args.RegexPattern, + "from_attribute": args.FromAttribute, + "from_context": args.FromContext, + "converted_type": args.ConvertedType, + } +} diff --git a/component/otelcol/config_attraction_test.go b/component/otelcol/config_attraction_test.go new file mode 100644 index 000000000000..4c1d61742032 --- /dev/null +++ b/component/otelcol/config_attraction_test.go @@ -0,0 +1,60 @@ +package otelcol_test + +import ( + "testing" + + "github.com/grafana/agent/component/otelcol" + "github.com/stretchr/testify/require" +) + +func TestConvertAttrAction(t *testing.T) { + inputActions := otelcol.AttrActionKeyValueSlice{ + { + Action: "insert", + Value: 123, + Key: "attribute1", + }, + { + Action: "delete", + Key: "attribute2", + }, + { + Action: "upsert", + Value: true, + Key: "attribute3", + }, + } + + expectedActions := []interface{}{ + map[string]interface{}{ + "action": "insert", + "converted_type": "", + "from_attribute": "", + "from_context": "", + "key": "attribute1", + "pattern": "", + "value": 123, + }, + map[string]interface{}{ + "action": "delete", + "converted_type": "", + "from_attribute": "", + "from_context": "", + "key": "attribute2", + "pattern": "", + "value": interface{}(nil), + }, + map[string]interface{}{ + "action": "upsert", + "converted_type": "", + "from_attribute": "", + "from_context": "", + "key": "attribute3", + "pattern": "", + "value": true, + }, + } + + result := inputActions.Convert() + require.Equal(t, expectedActions, result) +} diff --git a/component/otelcol/config_filter.go b/component/otelcol/config_filter.go new file mode 100644 index 000000000000..40f76e77a342 --- /dev/null +++ b/component/otelcol/config_filter.go @@ -0,0 +1,281 @@ +package otelcol + +import ( + "encoding" + "fmt" + "strings" + + "go.opentelemetry.io/collector/pdata/plog" +) + +// MatchConfig has two optional MatchProperties: +// 1. 'include': to define what is processed by the processor. +// 2. 'exclude': to define what is excluded from the processor. +// +// If both 'include' and 'exclude' are specified, the 'include' properties are checked +// before the 'exclude' properties. +type MatchConfig struct { + Include *MatchProperties `river:"include,block,optional"` + Exclude *MatchProperties `river:"exclude,block,optional"` +} + +// MatchProperties specifies the set of properties in a spans/log/metric to match +// against and if the input data should be included or excluded from the processor. +type MatchProperties struct { + MatchType string `river:"match_type,attr"` + RegexpConfig *RegexpConfig `river:"regexp,block,optional"` + + // Note: For spans, one of Services, SpanNames, Attributes, Resources or Libraries must be specified with a + // non-empty value for a valid configuration. + + // For logs, one of LogNames, Attributes, Resources or Libraries must be specified with a + // non-empty value for a valid configuration. + + // For metrics, one of MetricNames, Expressions, or ResourceAttributes must be specified with a + // non-empty value for a valid configuration. + + // Services specify the list of items to match service name against. + // A match occurs if the span's service name matches at least one item in this list. + Services []string `river:"services,attr,optional"` + + // SpanNames specify the list of items to match span name against. + // A match occurs if the span name matches at least one item in this list. + SpanNames []string `river:"span_names,attr,optional"` + + // LogBodies is a list of strings that the LogRecord's body field must match against. + LogBodies []string `river:"log_bodies,attr,optional"` + + // LogSeverityTexts is a list of strings that the LogRecord's severity text field must match against. + LogSeverityTexts []string `river:"log_severity_texts,attr,optional"` + + // LogSeverity defines how to match against a log record's SeverityNumber, if defined. + LogSeverity *LogSeverityNumberMatchProperties `river:"log_severity,block,optional"` + + // MetricNames is a list of strings to match metric name against. + // A match occurs if metric name matches at least one item in the list. + MetricNames []string `river:"metric_names,attr,optional"` + + // Attributes specifies the list of attributes to match against. + // All of these attributes must match exactly for a match to occur. + // Only match_type=strict is allowed if "attributes" are specified. + Attributes []Attribute `river:"attribute,block,optional"` + + // Resources specify the list of items to match the resources against. + // A match occurs if the data's resources match at least one item in this list. + Resources []Attribute `river:"resource,block,optional"` + + // Libraries specify the list of items to match the implementation library against. + // A match occurs if the span's implementation library matches at least one item in this list. + Libraries []InstrumentationLibrary `river:"library,block,optional"` + + // SpanKinds specify the list of items to match the span kind against. + // A match occurs if the span's span kind matches at least one item in this list. + SpanKinds []string `river:"span_kinds,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args *MatchProperties) Convert() map[string]interface{} { + if args == nil { + return nil + } + + res := make(map[string]interface{}) + + res["match_type"] = args.MatchType + + if args.RegexpConfig != nil { + res["regexp"] = args.RegexpConfig.convert() + } + + if len(args.Services) > 0 { + res["services"] = args.Services + } + + if len(args.SpanNames) > 0 { + res["span_names"] = args.SpanNames + } + + if len(args.LogBodies) > 0 { + res["log_bodies"] = args.LogBodies + } + + if len(args.LogSeverityTexts) > 0 { + res["log_severity_texts"] = args.LogSeverityTexts + } + + if args.LogSeverity != nil { + // The Otel config's field is called "log_severity_number" because it uses a number. + // The River config's field is called just "log_severity" because it uses a a textual + // representation of the log severity instead of a number. + res["log_severity_number"] = args.LogSeverity.convert() + } + + if len(args.MetricNames) > 0 { + res["metric_names"] = args.MetricNames + } + + if subRes := convertAttributeSlice(args.Attributes); len(subRes) > 0 { + res["attributes"] = subRes + } + + if subRes := convertAttributeSlice(args.Resources); len(subRes) > 0 { + res["resources"] = subRes + } + + if subRes := convertInstrumentationLibrariesSlice(args.Libraries); len(subRes) > 0 { + res["libraries"] = subRes + } + + if len(args.SpanKinds) > 0 { + res["span_kinds"] = args.SpanKinds + } + + return res +} + +// Return an empty slice if the input slice is empty. +func convertAttributeSlice(attrs []Attribute) []interface{} { + attrArr := make([]interface{}, 0, len(attrs)) + for _, attr := range attrs { + attrArr = append(attrArr, attr.convert()) + } + return attrArr +} + +// Return an empty slice if the input slice is empty. +func convertInstrumentationLibrariesSlice(libs []InstrumentationLibrary) []interface{} { + libsArr := make([]interface{}, 0, len(libs)) + for _, lib := range libs { + libsArr = append(libsArr, lib.convert()) + } + return libsArr +} + +type RegexpConfig struct { + // CacheEnabled determines whether match results are LRU cached to make subsequent matches faster. + // Cache size is unlimited unless CacheMaxNumEntries is also specified. + CacheEnabled bool `river:"cache_enabled,attr,optional"` + // CacheMaxNumEntries is the max number of entries of the LRU cache that stores match results. + // CacheMaxNumEntries is ignored if CacheEnabled is false. + CacheMaxNumEntries int `river:"cache_max_num_entries,attr,optional"` +} + +func (args RegexpConfig) convert() map[string]interface{} { + return map[string]interface{}{ + "cacheenabled": args.CacheEnabled, + "cachemaxnumentries": args.CacheMaxNumEntries, + } +} + +// Attribute specifies the attribute key and optional value to match against. +type Attribute struct { + // Key specifies the attribute key. + Key string `river:"key,attr"` + + // Values specifies the value to match against. + // If it is not set, any value will match. + Value interface{} `river:"value,attr,optional"` +} + +func (args Attribute) convert() map[string]interface{} { + return map[string]interface{}{ + "key": args.Key, + "value": args.Value, + } +} + +// InstrumentationLibrary specifies the instrumentation library and optional version to match against. +type InstrumentationLibrary struct { + Name string `river:"name,attr"` + // version match + // expected actual match + // nil yes + // nil 1 yes + // yes + // 1 no + // 1 no + // 1 1 yes + Version *string `river:"version,attr,optional"` +} + +func (args InstrumentationLibrary) convert() map[string]interface{} { + res := map[string]interface{}{ + "name": args.Name, + } + + if args.Version != nil { + res["version"] = strings.Clone(*args.Version) + } + return res +} + +// LogSeverityNumberMatchProperties defines how to match based on a log record's SeverityNumber field. +type LogSeverityNumberMatchProperties struct { + // Min is the lowest severity that may be matched. + // e.g. if this is plog.SeverityNumberInfo, INFO, WARN, ERROR, and FATAL logs will match. + Min SeverityLevel `river:"min,attr"` + + // MatchUndefined controls whether logs with "undefined" severity matches. + // If this is true, entries with undefined severity will match. + MatchUndefined bool `river:"match_undefined,attr"` +} + +func (args LogSeverityNumberMatchProperties) convert() map[string]interface{} { + numVal, exists := severityLevels[args.Min] + if !exists { + panic(fmt.Sprintf("No severity value for %q", args.Min)) + } + + return map[string]interface{}{ + "min": numVal, + "match_undefined": args.MatchUndefined, + } +} + +type SeverityLevel string + +var ( + _ encoding.TextUnmarshaler = (*SeverityLevel)(nil) +) + +// The severity levels should be in sync with "opentelemetry-collector/pdata/plog/logs.go" +var severityLevels = map[SeverityLevel]int32{ + "TRACE": 1, + "TRACE2": 2, + "TRACE3": 3, + "TRACE4": 4, + "DEBUG": 5, + "DEBUG2": 6, + "DEBUG3": 7, + "DEBUG4": 8, + "INFO": 9, + "INFO2": 10, + "INFO3": 11, + "INFO4": 12, + "WARN": 13, + "WARN2": 14, + "WARN3": 15, + "WARN4": 16, + "ERROR": 17, + "ERROR2": 18, + "ERROR3": 19, + "ERROR4": 20, + "FATAL": 21, + "FATAL2": 22, + "FATAL3": 23, + "FATAL4": 24, +} + +// UnmarshalText implements encoding.TextUnmarshaler for SeverityLevel. +func (sl *SeverityLevel) UnmarshalText(text []byte) error { + if numVal, exists := severityLevels[SeverityLevel(text)]; exists { + // Check if this is a valid plog severity number + plogInt := plog.SeverityNumber(numVal) + plogStr := plogInt.String() + if plogStr == "SEVERITY_NUMBER_"+string(text) { + *sl = SeverityLevel(text) + return nil + } + } + return fmt.Errorf("unrecognized severity level %q", string(text)) +} diff --git a/component/otelcol/config_filter_test.go b/component/otelcol/config_filter_test.go new file mode 100644 index 000000000000..306c2337474a --- /dev/null +++ b/component/otelcol/config_filter_test.go @@ -0,0 +1,258 @@ +package otelcol_test + +import ( + "testing" + + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/pkg/river" + "github.com/stretchr/testify/require" + "k8s.io/utils/pointer" +) + +func TestConvertMatchProperties(t *testing.T) { + inputMatchProps := otelcol.MatchProperties{ + MatchType: "strict", + RegexpConfig: &otelcol.RegexpConfig{ + CacheEnabled: false, + CacheMaxNumEntries: 2, + }, + Services: []string{"svcA", "svcB"}, + SpanNames: []string{"login.*"}, + LogBodies: []string{"AUTH.*"}, + LogSeverityTexts: []string{"debug.*"}, + LogSeverity: &otelcol.LogSeverityNumberMatchProperties{ + Min: "TRACE2", + MatchUndefined: true, + }, + MetricNames: []string{"metric1"}, + Attributes: []otelcol.Attribute{ + { + Key: "attr1", + Value: 5, + }, + { + Key: "attr2", + Value: "asdf", + }, + { + Key: "attr2", + Value: false, + }, + }, + Resources: []otelcol.Attribute{ + { + Key: "attr1", + Value: 5, + }, + }, + Libraries: []otelcol.InstrumentationLibrary{ + { + Name: "mongo-java-driver", + Version: pointer.String("3.8.0"), + }, + }, + SpanKinds: []string{"span1"}, + } + + expectedMatchProps := map[string]interface{}{ + "attributes": []interface{}{ + map[string]interface{}{ + "key": "attr1", + "value": 5, + }, + map[string]interface{}{ + "key": "attr2", + "value": "asdf", + }, + map[string]interface{}{ + "key": "attr2", + "value": false, + }, + }, + "libraries": []interface{}{ + map[string]interface{}{ + "name": "mongo-java-driver", + "version": "3.8.0", + }, + }, + "log_bodies": []string{"AUTH.*"}, + "log_severity_number": map[string]interface{}{ + "min": int32(2), + "match_undefined": true, + }, + "log_severity_texts": []string{ + "debug.*", + }, + "match_type": "strict", + "metric_names": []string{"metric1"}, + "regexp": map[string]interface{}{ + "cacheenabled": false, + "cachemaxnumentries": 2, + }, + "resources": []interface{}{ + map[string]interface{}{ + "key": "attr1", + "value": 5, + }, + }, + "services": []string{"svcA", "svcB"}, + "span_kinds": []string{"span1"}, + "span_names": []string{"login.*"}, + } + + tests := []struct { + testName string + inputMatchConfig otelcol.MatchConfig + expectedMatchConfig map[string]interface{} + }{ + { + testName: "TestConvertEmpty", + inputMatchConfig: otelcol.MatchConfig{}, + expectedMatchConfig: make(map[string]interface{}), + }, + { + testName: "TestConvertMandatory", + inputMatchConfig: otelcol.MatchConfig{ + Include: &otelcol.MatchProperties{ + MatchType: "strict", + }, + }, + expectedMatchConfig: map[string]interface{}{ + "include": map[string]interface{}{ + "match_type": "strict", + }, + }, + }, + { + testName: "TestAllOptsInclExcl", + inputMatchConfig: otelcol.MatchConfig{ + Include: &inputMatchProps, + Exclude: &inputMatchProps, + }, + expectedMatchConfig: map[string]interface{}{ + "include": expectedMatchProps, + "exclude": expectedMatchProps, + }, + }, + { + testName: "TestAllOptsIncl", + inputMatchConfig: otelcol.MatchConfig{ + Include: &inputMatchProps, + }, + expectedMatchConfig: map[string]interface{}{ + "include": expectedMatchProps, + }, + }, + { + testName: "TestAllOptsExcl", + inputMatchConfig: otelcol.MatchConfig{ + Exclude: &inputMatchProps, + }, + expectedMatchConfig: map[string]interface{}{ + "exclude": expectedMatchProps, + }, + }, + } + + for _, tt := range tests { + if matchConf := tt.inputMatchConfig.Exclude; matchConf != nil { + result := matchConf.Convert() + require.Equal(t, tt.expectedMatchConfig["exclude"], result) + } else { + require.Empty(t, tt.expectedMatchConfig["exclude"]) + } + + if matchConf := tt.inputMatchConfig.Include; matchConf != nil { + result := matchConf.Convert() + require.Equal(t, tt.expectedMatchConfig["include"], result) + } else { + require.Empty(t, tt.expectedMatchConfig["include"]) + } + } +} + +func TestUnmarshalSeverityLevel(t *testing.T) { + for _, tt := range []struct { + name string + cfg string + expectErr bool + }{ + { + name: "valid TRACE config", + cfg: ` + min = "TRACE" + match_undefined = true + `, + }, + { + name: "valid DEBUG config", + cfg: ` + min = "DEBUG" + match_undefined = true + `, + }, + { + name: "valid INFO config", + cfg: ` + min = "INFO" + match_undefined = true + `, + }, + { + name: "valid WARN config", + cfg: ` + min = "WARN" + match_undefined = true + `, + }, + { + name: "valid ERROR config", + cfg: ` + min = "ERROR" + match_undefined = true + `, + }, + { + name: "valid FATAL config", + cfg: ` + min = "FATAL" + match_undefined = true + `, + }, + { + name: "valid FATAL4 config", + cfg: ` + min = "FATAL4" + match_undefined = true + `, + }, + { + name: "invalid lowercase sev level", + cfg: ` + min = "trace" + match_undefined = true + `, + expectErr: true, + }, + { + name: "non-existent sev level", + cfg: ` + min = "foo" + match_undefined = true + `, + expectErr: true, + }, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + var sl otelcol.LogSeverityNumberMatchProperties + err := river.Unmarshal([]byte(tt.cfg), &sl) + if tt.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/component/otelcol/processor/attributes/attributes.go b/component/otelcol/processor/attributes/attributes.go new file mode 100644 index 000000000000..6b5607c3fc0f --- /dev/null +++ b/component/otelcol/processor/attributes/attributes.go @@ -0,0 +1,89 @@ +// Package attributes provides an otelcol.processor.attributes component. +package attributes + +import ( + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/processor" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.processor.attributes", + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := attributesprocessor.NewFactory() + return processor.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.processor.attributes component. +type Arguments struct { + // Pre-processing filtering to include/exclude data from the processor. + Match otelcol.MatchConfig `river:",squash"` + + // Actions performed on the input data in the order specified in the config. + // Example actions are "insert", "update", "upsert", "delete", "hash". + Actions otelcol.AttrActionKeyValueSlice `river:"action,block,optional"` + + // Output configures where to send processed data. Required. + Output *otelcol.ConsumerArguments `river:"output,block"` +} + +var ( + _ processor.Arguments = Arguments{} +) + +// Convert implements processor.Arguments. +func (args Arguments) Convert() (otelconfig.Processor, error) { + input := make(map[string]interface{}) + + if actions := args.Actions.Convert(); len(actions) > 0 { + input["actions"] = actions + } + + if args.Match.Include != nil { + if matchConfig := args.Match.Include.Convert(); len(matchConfig) > 0 { + input["include"] = matchConfig + } + } + + if args.Match.Exclude != nil { + if matchConfig := args.Match.Exclude.Convert(); len(matchConfig) > 0 { + input["exclude"] = matchConfig + } + } + + var result attributesprocessor.Config + err := mapstructure.Decode(input, &result) + + if err != nil { + return nil, err + } + + result.ProcessorSettings = otelconfig.NewProcessorSettings(otelconfig.NewComponentID("attributes")) + + return &result, nil +} + +// Extensions implements processor.Arguments. +func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { + return nil +} + +// Exporters implements processor.Arguments. +func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { + return nil +} + +// NextConsumers implements processor.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} diff --git a/component/otelcol/processor/attributes/attributes_test.go b/component/otelcol/processor/attributes/attributes_test.go new file mode 100644 index 000000000000..63746d38299c --- /dev/null +++ b/component/otelcol/processor/attributes/attributes_test.go @@ -0,0 +1,998 @@ +package attributes_test + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/fakeconsumer" + "github.com/grafana/agent/component/otelcol/processor/attributes" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/river" + "github.com/grafana/agent/pkg/util" + "github.com/grafana/dskit/backoff" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// A lot of the TestDecode tests were inspired by tests in the Otel repo: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.63.0/processor/attributesprocessor/testdata/config.yaml + +func TestDecode_Insert(t *testing.T) { + cfg := ` + action { + key = "attribute1" + value = 123 + action = "insert" + } + action { + key = "string key" + value = "anotherkey" + action = "insert" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "attribute1", action.Key) + require.Equal(t, 123, action.Value) + require.Equal(t, "insert", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "string key", action.Key) + require.Equal(t, "anotherkey", action.Value) + require.Equal(t, "insert", string(action.Action)) +} + +func TestDecode_RegexExtract(t *testing.T) { + cfg := ` + action { + key = "http.url" + pattern = "^(?P.*):\\/\\/(?P.*)\\/(?P.*)(\\?|\\&)(?P.*)" + action = "extract" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "http.url", action.Key) + require.Equal(t, "^(?P.*):\\/\\/(?P.*)\\/(?P.*)(\\?|\\&)(?P.*)", action.RegexPattern) + require.Equal(t, "extract", string(action.Action)) +} + +func TestDecode_Update(t *testing.T) { + cfg := ` + action { + key = "boo" + from_attribute = "foo" + action = "update" + } + action { + key = "db.secret" + value = "redacted" + action = "update" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "boo", action.Key) + require.Equal(t, "foo", action.FromAttribute) + require.Equal(t, "update", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "db.secret", action.Key) + require.Equal(t, "redacted", action.Value) + require.Equal(t, "update", string(action.Action)) +} + +func TestDecode_Upsert(t *testing.T) { + cfg := ` + action { + key = "region" + value = "planet-earth" + action = "upsert" + } + action { + key = "new_user_key" + from_attribute = "user_key" + action = "upsert" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "region", action.Key) + require.Equal(t, "planet-earth", action.Value) + require.Equal(t, "upsert", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "new_user_key", action.Key) + require.Equal(t, "user_key", action.FromAttribute) + require.Equal(t, "upsert", string(action.Action)) +} + +func TestDecode_Delete(t *testing.T) { + cfg := ` + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "credit_card", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "duplicate_key", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_Hash(t *testing.T) { + cfg := ` + action { + key = "user.email" + action = "hash" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "user.email", action.Key) + require.Equal(t, "hash", string(action.Action)) +} + +func TestDecode_Convert(t *testing.T) { + cfg := ` + action { + key = "http.status_code" + converted_type = "int" + action = "convert" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "http.status_code", action.Key) + require.Equal(t, "int", action.ConvertedType) + require.Equal(t, "convert", string(action.Action)) +} + +func TestDecode_ExcludeMulti(t *testing.T) { + cfg := ` + exclude { + match_type = "strict" + services = ["svcA", "svcB"] + attribute { + key = "env" + value = "dev" + } + attribute { + key = "test_request" + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Exclude) + + require.Equal(t, "strict", string(otelObj.Exclude.MatchType)) + + svc := &otelObj.Exclude.Services[0] + require.Equal(t, "svcA", *svc) + svc = &otelObj.Exclude.Services[1] + require.Equal(t, "svcB", *svc) + + attr := &otelObj.Exclude.Attributes[0] + require.Equal(t, "env", attr.Key) + require.Equal(t, "dev", attr.Value) + + attr = &otelObj.Exclude.Attributes[1] + require.Equal(t, "test_request", attr.Key) + + action := &otelObj.Actions[0] + require.Equal(t, "credit_card", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "duplicate_key", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_ExcludeResources(t *testing.T) { + cfg := ` + exclude { + match_type = "strict" + resource { + key = "host.type" + value = "n1-standard-1" + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Exclude) + + res := &otelObj.Exclude.Resources[0] + require.Equal(t, "strict", string(otelObj.Exclude.MatchType)) + + require.Equal(t, "host.type", res.Key) + require.Equal(t, "n1-standard-1", res.Value) + + action := &otelObj.Actions[0] + require.Equal(t, "credit_card", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "duplicate_key", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_ExcludeLibrary(t *testing.T) { + cfg := ` + exclude { + match_type = "strict" + library { + name = "mongo-java-driver" + version = "3.8.0" + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Exclude) + require.Equal(t, "strict", string(otelObj.Exclude.MatchType)) + + lib := &otelObj.Exclude.Libraries[0] + require.Equal(t, "mongo-java-driver", lib.Name) + require.Equal(t, "3.8.0", *lib.Version) + + action := &otelObj.Actions[0] + require.Equal(t, "credit_card", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "duplicate_key", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_ExcludeLibraryAnyVersion(t *testing.T) { + cfg := ` + exclude { + match_type = "strict" + library { + name = "mongo-java-driver" + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Exclude) + require.Equal(t, "strict", string(otelObj.Exclude.MatchType)) + + lib := &otelObj.Exclude.Libraries[0] + require.Equal(t, "mongo-java-driver", lib.Name) + require.Nil(t, lib.Version) + + action := &otelObj.Actions[0] + require.Equal(t, "credit_card", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "duplicate_key", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_ExcludeLibraryBlankVersion(t *testing.T) { + cfg := ` + exclude { + match_type = "strict" + library { + name = "mongo-java-driver" + version = "" + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Exclude) + require.Equal(t, "strict", string(otelObj.Exclude.MatchType)) + + lib := &otelObj.Exclude.Libraries[0] + require.Equal(t, "mongo-java-driver", lib.Name) + require.Equal(t, "", *lib.Version) + + action := &otelObj.Actions[0] + require.Equal(t, "credit_card", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "duplicate_key", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_ExcludeServices(t *testing.T) { + cfg := ` + exclude { + match_type = "regexp" + services = ["auth.*", "login.*"] + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Exclude) + require.Equal(t, "regexp", string(otelObj.Exclude.MatchType)) + + svc := &otelObj.Exclude.Services + require.Equal(t, "auth.*", (*svc)[0]) + require.Equal(t, "login.*", (*svc)[1]) + + action := &otelObj.Actions[0] + require.Equal(t, "credit_card", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "duplicate_key", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_SelectiveProcessing(t *testing.T) { + cfg := ` + include { + match_type = "strict" + services = ["svcA", "svcB"] + } + exclude { + match_type = "strict" + attribute { + key = "redact_trace" + value = false + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Include) + require.Equal(t, "strict", string(otelObj.Include.MatchType)) + + svc := &otelObj.Include.Services + require.Equal(t, "svcA", (*svc)[0]) + require.Equal(t, "svcB", (*svc)[1]) + + require.NotNil(t, otelObj.Exclude) + require.Equal(t, "strict", string(otelObj.Exclude.MatchType)) + + attr := &otelObj.Exclude.Attributes[0] + require.Equal(t, "redact_trace", attr.Key) + require.Equal(t, false, attr.Value) + + action := &otelObj.Actions[0] + require.Equal(t, "credit_card", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "duplicate_key", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_Complex(t *testing.T) { + cfg := ` + action { + key = "operation" + value = "default" + action = "insert" + } + action { + key = "svc.operation" + value = "operation" + action = "upsert" + } + action { + key = "operation" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "operation", action.Key) + require.Equal(t, "default", action.Value) + require.Equal(t, "insert", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "svc.operation", action.Key) + require.Equal(t, "operation", action.Value) + require.Equal(t, "upsert", string(action.Action)) + + action = &otelObj.Actions[2] + require.Equal(t, "operation", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_ExampleActions(t *testing.T) { + cfg := ` + action { + key = "db.table" + action = "delete" + } + action { + key = "redacted_span" + value = true + action = "upsert" + } + action { + key = "copy_key" + from_attribute = "key_original" + action = "update" + } + action { + key = "account_id" + value = 2245 + action = "insert" + } + action { + key = "account_password" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "db.table", action.Key) + require.Equal(t, "delete", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "redacted_span", action.Key) + require.Equal(t, true, action.Value) + require.Equal(t, "upsert", string(action.Action)) + + action = &otelObj.Actions[2] + require.Equal(t, "copy_key", action.Key) + require.Equal(t, "key_original", action.FromAttribute) + require.Equal(t, "update", string(action.Action)) + + action = &otelObj.Actions[3] + require.Equal(t, "account_id", action.Key) + require.Equal(t, 2245, otelObj.Actions[3].Value) + require.Equal(t, "insert", string(action.Action)) + + action = &otelObj.Actions[4] + require.Equal(t, "account_password", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_Regexp(t *testing.T) { + cfg := ` + include { + match_type = "regexp" + services = ["auth.*"] + } + exclude { + match_type = "regexp" + span_names = ["login.*"] + } + action { + key = "password" + action = "update" + value = "obfuscated" + } + action { + key = "token" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Include) + require.Equal(t, "regexp", string(otelObj.Include.MatchType)) + require.Equal(t, "auth.*", otelObj.Include.Services[0]) + + require.NotNil(t, otelObj.Exclude) + require.Equal(t, "regexp", string(otelObj.Exclude.MatchType)) + require.Equal(t, "login.*", otelObj.Exclude.SpanNames[0]) + + action := &otelObj.Actions[0] + require.Equal(t, "password", action.Key) + require.Equal(t, "obfuscated", action.Value) + require.Equal(t, "update", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "token", action.Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_Regexp2(t *testing.T) { + cfg := ` + include { + match_type = "regexp" + attribute { + key = "db.statement" + value = "SELECT \\* FROM USERS.*" + } + } + action { + key = "db.statement" + action = "update" + value = "SELECT * FROM USERS [obfuscated]" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Include) + require.Equal(t, "regexp", string(otelObj.Include.MatchType)) + + attr := &otelObj.Include.Attributes[0] + require.Equal(t, "db.statement", attr.Key) + require.Equal(t, "SELECT \\* FROM USERS.*", attr.Value) + + action := &otelObj.Actions[0] + require.Equal(t, "db.statement", action.Key) + require.Equal(t, "SELECT * FROM USERS [obfuscated]", action.Value) + require.Equal(t, "update", string(action.Action)) +} + +func TestDecode_LogBodyRegexp(t *testing.T) { + cfg := ` + include { + match_type = "regexp" + log_bodies = ["AUTH.*"] + } + action { + key = "password" + action = "update" + value = "obfuscated" + } + action { + key = "token" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Include) + require.Equal(t, "regexp", string(otelObj.Include.MatchType)) + + require.Equal(t, "AUTH.*", otelObj.Include.LogBodies[0]) + + action := &otelObj.Actions[0] + require.Equal(t, "password", action.Key) + require.Equal(t, "obfuscated", action.Value) + require.Equal(t, "update", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "token", otelObj.Actions[1].Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_LogSeverityRegexp(t *testing.T) { + cfg := ` + include { + match_type = "regexp" + log_severity_texts = ["debug.*"] + log_severity { + min = "DEBUG" + match_undefined = true + } + } + action { + key = "password" + action = "update" + value = "obfuscated" + } + action { + key = "token" + action = "delete" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + otelObj := (convertedArgs).(*attributesprocessor.Config) + + require.NotNil(t, otelObj.Include) + require.Equal(t, "regexp", string(otelObj.Include.MatchType)) + + require.Equal(t, "debug.*", otelObj.Include.LogSeverityTexts[0]) + + require.Equal(t, int32(5), int32(otelObj.Include.LogSeverityNumber.Min)) + require.Equal(t, true, otelObj.Include.LogSeverityNumber.MatchUndefined) + + action := &otelObj.Actions[0] + require.Equal(t, "password", action.Key) + require.Equal(t, "obfuscated", action.Value) + require.Equal(t, "update", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "token", otelObj.Actions[1].Key) + require.Equal(t, "delete", string(action.Action)) +} + +func TestDecode_FromContext(t *testing.T) { + cfg := ` + action { + key = "origin" + from_context = "metadata.origin" + action = "insert" + } + action { + key = "enduser.id" + from_context = "auth.subject" + action = "insert" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + convertedArgs, err := args.Convert() + require.NoError(t, err) + + otelObj := (convertedArgs).(*attributesprocessor.Config) + + action := &otelObj.Actions[0] + require.Equal(t, "origin", action.Key) + require.Equal(t, "metadata.origin", action.FromContext) + require.Equal(t, "insert", string(action.Action)) + + action = &otelObj.Actions[1] + require.Equal(t, "enduser.id", action.Key) + require.Equal(t, "auth.subject", action.FromContext) + require.Equal(t, "insert", string(action.Action)) +} + +func TestRun(t *testing.T) { + ctx := componenttest.TestContext(t) + l := util.TestLogger(t) + + ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.attributes") + require.NoError(t, err) + + cfg := ` + action { + key = "attribute1" + value = 123 + action = "insert" + } + action { + key = "string key" + value = "anotherkey" + action = "insert" + } + + output { + // no-op: will be overridden by test code. + } + ` + var args attributes.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + // Override our arguments so traces get forwarded to traceCh. + traceCh := make(chan ptrace.Traces) + args.Output = makeTracesOutput(traceCh) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + require.NoError(t, ctrl.WaitRunning(time.Second), "component never started") + require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything") + + // Send traces in the background to our processor. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + //TODO: Try changing actual attributes, just to sanity checks that everything works? + + // Wait for our processor to finish and forward data to traceCh. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } +} + +//TODO: makeTracesOutput and createTestTraces were copied from other tests. +// Import them from a common go file? + +// makeTracesOutput returns ConsumerArguments which will forward traces to the +// provided channel. +func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments { + traceConsumer := fakeconsumer.Consumer{ + ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- t: + return nil + } + }, + } + + return &otelcol.ConsumerArguments{ + Traces: []otelcol.Consumer{&traceConsumer}, + } +} + +func createTestTraces() ptrace.Traces { + // Matches format from the protobuf definition: + // https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto + var bb = `{ + "resource_spans": [{ + "scope_spans": [{ + "spans": [{ + "name": "TestSpan" + }] + }] + }] + }` + + decoder := &ptrace.JSONUnmarshaler{} + data, err := decoder.UnmarshalTraces([]byte(bb)) + if err != nil { + panic(err) + } + return data +} diff --git a/docs/sources/flow/reference/components/discovery.kubernetes.md b/docs/sources/flow/reference/components/discovery.kubernetes.md index 1ab2a38bb19f..5f26641de2f9 100644 --- a/docs/sources/flow/reference/components/discovery.kubernetes.md +++ b/docs/sources/flow/reference/components/discovery.kubernetes.md @@ -269,7 +269,7 @@ omitted, all namespaces are searched. Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `own_namespace` | `bool` | Include the namespace the agent is running in. | | no -`names` | `[]string` | List of namespaces to search. | | no +`names` | `list(string)` | List of namespaces to search. | | no ### selectors block diff --git a/docs/sources/flow/reference/components/mimir.rules.kubernetes.md b/docs/sources/flow/reference/components/mimir.rules.kubernetes.md index 7f275118926e..8913b234b35d 100644 --- a/docs/sources/flow/reference/components/mimir.rules.kubernetes.md +++ b/docs/sources/flow/reference/components/mimir.rules.kubernetes.md @@ -120,11 +120,11 @@ The `match_expression` block describes a Kubernetes label match expression for r The following arguments are supported: -Name | Type | Description | Default | Required ------------|------------|----------------------------------------------------|---------|--------- -`key` | `string` | The label name to match against. | | yes -`operator` | `string` | The operator to use when matching. | | yes -`values` | `[]string` | The values used when matching. | | no +Name | Type | Description | Default | Required +-----------|----------------|----------------------------------------------------|---------|--------- +`key` | `string` | The label name to match against. | | yes +`operator` | `string` | The operator to use when matching. | | yes +`values` | `list(string)` | The values used when matching. | | no The `operator` argument should be one of the following strings: diff --git a/docs/sources/flow/reference/components/otelcol.processor.attributes.md b/docs/sources/flow/reference/components/otelcol.processor.attributes.md new file mode 100644 index 000000000000..92ae6c9c7a72 --- /dev/null +++ b/docs/sources/flow/reference/components/otelcol.processor.attributes.md @@ -0,0 +1,638 @@ +--- +title: otelcol.processor.attributes +--- + +# otelcol.processor.attributes + +`otelcol.processor.attributes` accepts telemetry data from other `otelcol` +components and modifies attributes of a span, log, or metric. +It also supports the ability to filter and match input data to determine if +it should be included or excluded for attribute modifications. + +> **NOTE**: `otelcol.processor.attributes` is a wrapper over the upstream +> OpenTelemetry Collector `attributes` processor. Bug reports or feature requests +> will be redirected to the upstream repository, if necessary. + +Multiple `otelcol.processor.attributes` components can be specified by giving them +different labels. + +## Usage + +```river +otelcol.processor.attributes "LABEL" { + output { + metrics = [...] + logs = [...] + traces = [...] + } +} +``` + +## Arguments + +`otelcol.processor.attributes` doesn't support any arguments and is configured fully +through inner blocks. + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.processor.attributes`: +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +output | [output][] | Configures where to send received telemetry data. | yes +action | [action][] | Actions to take on the attributes of incoming metrics/logs/traces. | no +include | [include][] | Filter for data to be included to this processor's actions. | no +include > regexp | [regexp][] | Regex cache settings. | no +include > attribute | [attribute][] | A list of attributes to match against. | no +include > resource | [resource][] | A list of items to match the resources against. | no +include > library | [library][] | A list of items to match the implementation library against. | no +include > log_severity | [library][] | How to match against a log record's SeverityNumber, if defined. | no +exclude | [exclude][] | Filter for data to be excluded from this processor's actions | no +exclude > regexp | [regexp][] | Regex cache settings. | no +exclude > attribute | [attribute][] | A list of attributes to match against. | no +exclude > resource | [resource][] | A list of items to match the resources against. | no +exclude > library | [library][] | A list of items to match the implementation library against. | no +exclude > log_severity | [log_severity][] | How to match against a log record's SeverityNumber, if defined. | no + +The `>` symbol indicates deeper levels of nesting. For example, `include > attribute` +refers to an `attribute` block defined inside an `include` block. + +If both and `include` block and an `exclude`block are specified, the `include` properties are checked before the `exclude` properties. + +[output]: #output-block +[action]: #action-block +[include]: #include-block +[exclude]: #exclude-block +[regexp]: #regexp-block +[attribute]: #attribute-block +[resource]: #resource-block +[library]: #library-block +[log_severity]: #log_severity-block + +### action block + +The `action` block configures how to modify the span, log or metric. + +The following attributes are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`key` | `string` | The attribute that the action relates to. | | yes +`action` | `string` | The type of action to be performed. | | yes +`value` | `any` | The value to populate for the key. | | no +`pattern` | `string` | A regex pattern. | `""` | no +`from_attribute` | `string` | The attribute from the input data to use to populate the attribute value. | `""` | no +`from_context` | `string` | The context value to use to populate the attribute value. | `""` | no +`converted_type` | `string` | The type to convert the attribute value to. | `""` | no + +The type of `value` must be either a number, string, or boolean. + +The supported values for `action` are: + +* `insert`: Inserts a new attribute in input data where the key does not already exist. + + * The `key` attribute is required. It specifies the attribute to act upon. + * One of the `value`, `from_attribute` or `from_context` attributes is required. + +* `update`: Updates an attribute in input data where the key does exist. + + * The `key`attribute is required. It specifies the attribute to act upon. + * One of the `value`, `from_attribute` or `from_context` attributes is required. + +* `upsert`: Either inserts a new attribute in input data where the key does not already exist, or updates an attribute in input data where the key does exist. + + * The `key`attribute is required. It specifies the attribute to act upon. + * One of the `value`, `from_attribute` or `from_context`attributes is required: + * `value` specifies the value to populate for the key. + * `from_attribute` specifies the attribute from the input data to use to populate + the value. If the attribute doesn't exist, no action is performed. + * `from_context` specifies the context value to use to populate the attribute value. + If the key is prefixed with `metadata.`, the values are searched + in the receiver's transport protocol additional information like gRPC Metadata or HTTP Headers. + If the key is prefixed with `auth.`, the values are searched + in the authentication information set by the server authenticator. + Refer to the server authenticator's documentation part of your pipeline + for more information about which attributes are available. + If the key doesn't exist, no action is performed. + If the key has multiple values the values will be joined with `;` separator. + +* `hash`: Hashes (SHA1) an existing attribute value. + + * The `key` attribute and/or the `pattern` attributes is required. + +* `extract`: Extracts values using a regular expression rule from the input key to target keys specified in the rule. If a target key already exists, it will be overridden. Note: It behaves similar to the Span Processor `to_attributes` setting with the existing attribute as the source. + + * The `key` attribute is required. It specifies the attribute to extract values from. The value of `key` is NOT altered. + * The `pattern` attribute is required. It is the regex pattern used to extract attributes from the value of `key`. The submatchers must be named. If attributes already exist, they will be overwritten. + +* `convert`: Converts an existing attribute to a specified type. + + * The `key` attribute is required. It specifies the attribute to act upon. + * The `converted_type` attribute is required and must be one of int, double or string. + +* `delete`: Deletes an attribute from the input data. + + * The `key` attribute and/or the `pattern` attribute is required. It specifies the attribute to act upon. + +### include block + +The `include` block provides an option to include data being fed into the [action] blocks, based on the properties of a span, log, or metric records. + +{{< docs/shared lookup="flow/reference/components/match-properties-block.md" source="agent" >}} + +One of the following is also required: +* For spans, one of `services`, `span_names`, `span_kinds`, [attribute][], [resource][] or [library][] must be specified with a non-empty value for a valid configuration. The `log_bodies`, `log_severity_texts`, `log_severity` and `metric_names` attributes are invalid. +* For logs, one of `log_bodies`, `log_severity_texts`, `log_severity`, [attribute][], [resource][] or [library][] must be specified with a non-empty value for a valid configuration. The `span_names`, `span_kinds`, `metric_names`, and `services` attributes are invalid. +* For metrics, one of `metric_names` or [resource][] must be specified with a valid non-empty value for a valid configuration. The `span_names`, `span_kinds`, `log_bodies`, `log_severity_texts`, `log_severity`, `services`, [attribute][] and [library][] attributes are invalid. + +For `metric_names`, a match occurs if the metric name matches at least one item in the list. +For `span_kinds`, a match occurs if the span's span kind matches at least one item in the list. + +### exclude block + +The `exclude` blocks provides an option to exclude data from being fed into the [action] blocks, based on the properties of a span, log, or metric records. + +{{< docs/shared lookup="flow/reference/components/match-properties-block.md" source="agent" >}} + +One of the following is also required: +* For spans, one of `services`, `span_names`, `span_kinds`, [attribute][], [resource][] or [library][] must be specified with a non-empty value for a valid configuration. The `log_bodies`, `log_severity_texts`, `log_severity` and `metric_names` attributes are invalid. +* For logs, one of `log_bodies`, `log_severity_texts`, `log_severity`, [attribute][], [resource][] or [library][] must be specified with a non-empty value for a valid configuration. The `span_names`, `span_kinds`, `metric_names`, and `services` attributes are invalid. +* For metrics, one of `metric_names` or [resource][] must be specified with a valid non-empty value for a valid configuration. The `span_names`, `span_kinds`, `log_bodies`, `log_severity_texts`, `log_severity`, `services`, [attribute][] and [library][] attributes are invalid. + +For `metric_names`, a match occurs if the metric name matches at least one item in the list. +For `span_kinds`, a match occurs if the span's span kind matches at least one item in the list. + +### regexp block + +This block is optional configuration for the `match_type` of `"regexp"`. +It configures a Least Recently Used (LRU) cache. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`cache_enabled` | `bool` | Determines whether match results are LRU cached. | `false` | no +`cache_max_num_entries` | `int` | The max number of entries of the LRU cache that stores match results. | `0` | no + +Enabling `cache_enabled` could make subsequent matches faster. +Cache size is unlimited unless `cache_max_num_entries` is also specified. + +`cache_max_num_entries` is ignored if `cache_enabled` is false. + +### attribute block + +This block specifies an attribute to match against: + +* More than one `attribute` block can be defined. +* Only `match_type = "strict"` is allowed if `attribute` is specified. +* All `attribute` blocks must match exactly for a match to occur. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`key` | `string` | The attribute key. | | yes +`value` | `any` | The attribute value to match against. | | no + +If `value` is not set, any value will match. +The type of `value` could be a number, a string or a boolean. + +### resource block + +This block specifies items to match the resources against: + +* More than one `resource` block can be defined. +* A match occurs if the input data resources matches at least one `resource` block. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`key` | `string` | The resource key. | | yes +`value` | `any` | The resource value to match against. | | no + +If `value` is not set, any value will match. +The type of `value` could be a number, a string or a boolean. + +### library block + +This block specifies properties to match the implementation library against: + +* More than one `library` block can be defined. +* A match occurs if the span's implementation library matches at least one `library` block. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`name` | `string` | The attribute key. | | yes +`version` | `string` | The version to match against. | null | no + +If `version` is unset, any version will match. If `version` is set to an empty string, it will only match +a library version which is also an empty string. + +### log_severity block + +This block defines how to match based on a log record's SeverityNumber field. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`min` | `string` | The lowest severity that may be matched. | | yes +`match_undefined` | `bool` | Whether logs with "undefined" severity match. | | yes + +If `match_undefined` is true, entries with undefined severity will match. + +The severities supported by Otel are listed in the table below. +The value for `min` should be one of the values in the "Log Severity" column. + +Log Severity | Severity number +------------ | --------------- +TRACE | 1 +TRACE2 | 2 +TRACE3 | 3 +TRACE4 | 4 +DEBUG | 5 +DEBUG2 | 6 +DEBUG3 | 7 +DEBUG4 | 8 +INFO | 9 +INFO2 | 10 +INFO3 | 11 +INFO4 | 12 +WARN | 13 +WARN2 | 14 +WARN3 | 15 +WARN4 | 16 +ERROR | 17 +ERROR2 | 18 +ERROR3 | 19 +ERROR4 | 20 +FATAL | 21 +FATAL2 | 22 +FATAL3 | 23 +FATAL4 | 24 + +For example, if the `min` attribute in the `log_severity` block is "INFO", then +INFO, WARN, ERROR, and FATAL logs will match. + +### output block + +{{< docs/shared lookup="flow/reference/components/output-block.md" source="agent" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +---- | ---- | ----------- +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics, logs, or traces). + +## Component health + +`otelcol.processor.attributes` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.processor.attributes` does not expose any component-specific debug +information. + +## Examples + +### Various uses of the "action" block + +```river +otelcol.receiver.otlp "default" { + http {} + grpc {} + + output { + metrics = [otelcol.processor.attributes.default.input] + logs = [otelcol.processor.attributes.default.input] + traces = [otelcol.processor.attributes.default.input] + } +} + +otelcol.processor.attributes "default" { + // Inserts a new attribute "attribute1" to spans where + // the key "attribute1" doesn't exist. + // The type of `attribute1` is inferred by the configuration. + // `123` is an integer and is stored as an integer in the attributes. + action { + key = "attribute1" + value = 123 + action = "insert" + } + + // Inserts a new attribute with a key of "string key" and + // a string value of "anotherkey". + action { + key = "string key" + value = "anotherkey" + action = "insert" + } + + // Setting an attribute on all spans. + // Any spans that already had `region` now have value `planet-earth`. + // This can be done to set properties for all traces without + // requiring an instrumentation change. + action { + key = "region" + value = "planet-earth" + action = "upsert" + } + + // The following demonstrates copying a value to a new key. + // If a span doesn't contain `user_key`, no new attribute `new_user_key` is created. + action { + key = "new_user_key" + from_attribute = "user_key" + action = "upsert" + } + + // Hashing existing attribute values. + action { + key = "user.email" + action = "hash" + } + + // Uses the value from `key:http.url` to upsert attributes + // to the target keys specified in the `pattern`. + // (Insert attributes for target keys that do not exist and update keys that exist.) + // Given http.url = http://example.com/path?queryParam1=value1,queryParam2=value2 + // then the following attributes will be inserted: + // http_protocol: http + // http_domain: example.com + // http_path: path + // http_query_params=queryParam1=value1,queryParam2=value2 + // http.url value does NOT change. + // Note: Similar to the Span Processor, if a target key already exists, + // it will be updated. + action { + key = "http.url" + pattern = "^(?P.*):\\/\\/(?P.*)\\/(?P.*)(\\?|\\&)(?P.*)" + action = "extract" + } + + // Converting the type of an existing attribute value. + action { + key = "http.status_code" + converted_type = "int" + action = "convert" + } + + // Deleting keys from an attribute. + action { + key = "credit_card" + action = "delete" + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = env("OTLP_ENDPOINT") + } +} +``` + +### Excluding spans based on attributes + +For example, the following spans match the properties and won't be processed by the processor: +* Span1 Name: "svcB", Attributes: {env: "dev", test_request: 123, credit_card: 1234} +* Span2 Name: "svcA", Attributes: {env: "dev", test_request: false} + +The following spans do not match the properties and the processor actions are applied to it: +* Span3 Name: "svcB", Attributes: {env: 1, test_request: "dev", credit_card: 1234} +* Span4 Name: "svcC", Attributes: {env: "dev", test_request: false} + +```river +otelcol.processor.attributes "default" { + exclude { + match_type = "strict" + services = ["svcA", "svcB"] + attribute { + key = "env" + value = "dev" + } + attribute { + key = "test_request" + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +### Excluding spans based on resources + +A "strict" `match_type` means that we must match the `resource` key/value pairs strictly. + +```river +otelcol.processor.attributes "default" { + exclude { + match_type = "strict" + resource { + key = "host.type" + value = "n1-standard-1" + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +### Excluding spans based on resources + +A "strict" `match_type` means that we must match the `library` key/value pairs strictly. + +```river +otelcol.processor.attributes "default" { + exclude { + match_type = "strict" + library { + name = "mongo-java-driver" + version = "3.8.0" + } + } + action { + key = "credit_card" + action = "delete" + } + action { + key = "duplicate_key" + action = "delete" + } + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +### Including and excluding spans based on regex and services + +This processor will remove the "token" attribute and will obfuscate the "password" attribute +in spans where the service name matches `"auth.*"` and where the span name does not match `"login.*"`. + +```river +otelcol.processor.attributes "default" { + // Specifies the span properties that must exist for the processor to be applied. + include { + // "match_type" defines that "services" is an array of regexp-es. + match_type = "regexp" + // The span service name must match "auth.*" pattern. + services = ["auth.*"] + } + + exclude { + // "match_type" defines that "span_names" is an array of regexp-es. + match_type = "regexp" + // The span name must not match "login.*" pattern. + span_names = ["login.*"] + } + + action { + key = "password" + action = "update" + value = "obfuscated" + } + + action { + key = "token" + action = "delete" + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +### Including spans based on regex and attributes + +The following demonstrates how to process spans that have an attribute that matches a regexp patterns. +This processor will obfuscate the "db.statement" attribute in spans where the "db.statement" attribute +matches a regex pattern. + +```river +otelcol.processor.attributes "default" { + include { + // "match_type" of "regexp" defines that the "value" attributes + // in the "attribute" blocks are regexp-es. + match_type = "regexp" + + // This attribute ('db.statement') must exist in the span and match + // the regex ('SELECT \* FROM USERS.*') for a match. + attribute { + key = "db.statement" + value = "SELECT \* FROM USERS.*" + } + } + + action { + key = "db.statement" + action = "update" + value = "SELECT * FROM USERS [obfuscated]" + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +### Including spans based on regex of log body + +This processor will remove "token" attribute and will obfuscate "password" +attribute in spans where the log body matches "AUTH.*". + +```river +otelcol.processor.attributes "default" { + include { + match_type = "regexp" + log_bodies = ["AUTH.*"] + } + action { + key = "password" + action = "update" + value = "obfuscated" + } + action { + key = "token" + action = "delete" + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +### Including spans based on regex of log severity + +The following demonstrates how to process logs that have a severity text that match regexp +patterns. This processor will remove "token" attribute and will obfuscate "password" +attribute in spans where severity matches "debug". + +```river +otelcol.processor.attributes "default" { + include { + match_type = "regexp" + log_severity_texts = ["debug.*"] + } + action { + key = "password" + action = "update" + value = "obfuscated" + } + action { + key = "token" + action = "delete" + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` \ No newline at end of file diff --git a/docs/sources/flow/reference/components/otelcol.processor.batch.md b/docs/sources/flow/reference/components/otelcol.processor.batch.md index 7e1a326e5902..ee5e22b56e1f 100644 --- a/docs/sources/flow/reference/components/otelcol.processor.batch.md +++ b/docs/sources/flow/reference/components/otelcol.processor.batch.md @@ -112,7 +112,6 @@ otelcol.exporter.otlp "production" { endpoint = env("OTLP_SERVER_ENDPOINT") } } - ``` [otelcol.exporter.otlp]: {{< relref "./otelcol.exporter.otlp.md" >}} diff --git a/docs/sources/shared/flow/reference/components/match-properties-block.md b/docs/sources/shared/flow/reference/components/match-properties-block.md new file mode 100644 index 000000000000..1f76c67ff75f --- /dev/null +++ b/docs/sources/shared/flow/reference/components/match-properties-block.md @@ -0,0 +1,19 @@ +--- +aliases: +- /docs/agent/shared/flow/reference/components/match-properties-block/ +headless: true +--- + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`match_type` | `string` | Controls how items in "services" and "span_names" arrays are interpreted. | | yes +`services` | `list(string)` | A list of items to match the service name against. | `[]` | no +`span_names` | `list(string)` | A list of items to match the span name against. | `[]` | no +`log_bodies` | `list(string)` | A list of strings that the LogRecord's body field must match against. | `[]` | no +`log_severity_texts` | `list(string)` | A list of strings that the LogRecord's severity text field must match against. | `[]` | no +`metric_names` | `list(string)` | A list of strings to match the metric name against. | `[]` | no +`span_kinds` | `list(string)` | A list of items to match the span kind against. | `[]` | no + +`match_type` is required and must be set to either `"regexp"` or `"strict"`.