Skip to content

Commit

Permalink
Add a otelcol.processor.attributes component
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Mar 31, 2023
1 parent c5dd273 commit 08f66a8
Show file tree
Hide file tree
Showing 11 changed files with 2,127 additions and 7 deletions.
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
_ "github.com/grafana/agent/component/otelcol/exporter/otlphttp" // Import otelcol.exporter.otlphttp
_ "github.com/grafana/agent/component/otelcol/exporter/prometheus" // Import otelcol.exporter.prometheus
_ "github.com/grafana/agent/component/otelcol/extension/jaeger_remote_sampling" // Import otelcol.extension.jaeger_remote_sampling
_ "github.com/grafana/agent/component/otelcol/processor/attributes" // Import otelcol.processor.attributes
_ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/agent/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
_ "github.com/grafana/agent/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling
Expand Down
103 changes: 103 additions & 0 deletions component/otelcol/config_attraction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package otelcol

type AttrActionKeyValueSlice []AttrActionKeyValue

func (actions AttrActionKeyValueSlice) Convert() []interface{} {
res := make([]interface{}, 0, len(actions))

if len(actions) == 0 {
return res
}

for _, action := range actions {
res = append(res, action.Convert())
}
return res
}

type AttrActionKeyValue struct {
// Key specifies the attribute to act upon.
// This is a required field.
Key string `river:"key,attr"`

// Value specifies the value to populate for the key.
// The type of the value is inferred from the configuration.
Value interface{} `river:"value,attr,optional"`

// A regex pattern must be specified for the action EXTRACT.
// It uses the attribute specified by `key' to extract values from
// The target keys are inferred based on the names of the matcher groups
// provided and the names will be inferred based on the values of the
// matcher group.
// Note: All subexpressions must have a name.
// Note: The value type of the source key must be a string. If it isn't,
// no extraction will occur.
RegexPattern string `river:"pattern,attr,optional"`

// FromAttribute specifies the attribute to use to populate
// the value. If the attribute doesn't exist, no action is performed.
FromAttribute string `river:"from_attribute,attr,optional"`

// FromContext specifies the context value to use to populate
// the value. The values would be searched in client.Info.Metadata.
// If the key doesn't exist, no action is performed.
// If the key has multiple values the values will be joined with `;` separator.
FromContext string `river:"from_context,attr,optional"`

// ConvertedType specifies the target type of an attribute to be converted
// If the key doesn't exist, no action is performed.
// If the value cannot be converted, the original value will be left as-is
ConvertedType string `river:"converted_type,attr,optional"`

// Action specifies the type of action to perform.
// The set of values are {INSERT, UPDATE, UPSERT, DELETE, HASH}.
// Both lower case and upper case are supported.
// INSERT - Inserts the key/value to attributes when the key does not exist.
// No action is applied to attributes where the key already exists.
// Either Value, FromAttribute or FromContext must be set.
// UPDATE - Updates an existing key with a value. No action is applied
// to attributes where the key does not exist.
// Either Value, FromAttribute or FromContext must be set.
// UPSERT - Performs insert or update action depending on the attributes
// containing the key. The key/value is inserted to attributes
// that did not originally have the key. The key/value is updated
// for attributes where the key already existed.
// Either Value, FromAttribute or FromContext must be set.
// DELETE - Deletes the attribute. If the key doesn't exist,
// no action is performed.
// HASH - Calculates the SHA-1 hash of an existing value and overwrites the
// value with it's SHA-1 hash result.
// EXTRACT - Extracts values using a regular expression rule from the input
// 'key' to target keys specified in the 'rule'. If a target key
// already exists, it will be overridden.
// CONVERT - converts the type of an existing attribute, if convertable
// This is a required field.
Action string `river:"action,attr"`
}

// Convert converts args into the upstream type.
func (args *AttrActionKeyValue) Convert() map[string]interface{} {
res := make(map[string]interface{})

// Mandatory attributes - always set those
res["key"] = args.Key
res["action"] = args.Action

// Optional attributes
if args.Value != "" {
res["value"] = args.Value
}
if args.RegexPattern != "" {
res["pattern"] = args.RegexPattern
}
if args.FromAttribute != "" {
res["from_attribute"] = args.FromAttribute
}
if args.FromContext != "" {
res["from_context"] = args.FromContext
}
if args.ConvertedType != "" {
res["converted_type"] = args.ConvertedType
}
return res
}
50 changes: 50 additions & 0 deletions component/otelcol/config_attraction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package otelcol_test

import (
"testing"

"github.com/grafana/agent/component/otelcol"
"github.com/stretchr/testify/require"
)

func TestConvertAttrAction(t *testing.T) {
inputActions := otelcol.AttrActionKeyValueSlice{
{
Action: "insert",
Value: 123,
Key: "attribute1",
},
{
Action: "delete",
Key: "attribute2",
},
{
Action: "upsert",
Value: true,
Key: "attribute3",
},
}

expectedActions := []interface{}(
[]interface{}{
map[string]interface{}{
"action": "insert",
"key": "attribute1",
"value": 123,
},
map[string]interface{}{
"action": "delete",
"key": "attribute2",
"value": interface{}(nil),
},
map[string]interface{}{
"action": "upsert",
"key": "attribute3",
"value": true,
},
},
)

result := inputActions.Convert()
require.Equal(t, expectedActions, result)
}
215 changes: 215 additions & 0 deletions component/otelcol/config_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package otelcol

import "strings"

// MatchConfig has two optional MatchProperties:
// 1. 'include': to define what is processed by the processor.
// 2. 'exclude': to define what is excluded from the processor.
//
// If both 'include' and 'exclude' are specified, the 'include' properties are checked
// before the 'exclude' properties.
type MatchConfig struct {
Include *MatchProperties `river:"include,block,optional"`
Exclude *MatchProperties `river:"exclude,block,optional"`
}

// MatchProperties specifies the set of properties in a spans/log/metric to match
// against and if the input data should be included or excluded from the processor.
type MatchProperties struct {
MatchType string `river:"match_type,attr"`
RegexpConfig *RegexpConfig `river:"regexp,block,optional"`

// Note: For spans, one of Services, SpanNames, Attributes, Resources or Libraries must be specified with a
// non-empty value for a valid configuration.

// For logs, one of LogNames, Attributes, Resources or Libraries must be specified with a
// non-empty value for a valid configuration.

// For metrics, one of MetricNames, Expressions, or ResourceAttributes must be specified with a
// non-empty value for a valid configuration.

// Services specify the list of items to match service name against.
// A match occurs if the span's service name matches at least one item in this list.
Services []string `river:"services,attr,optional"`

// SpanNames specify the list of items to match span name against.
// A match occurs if the span name matches at least one item in this list.
SpanNames []string `river:"span_names,attr,optional"`

// LogBodies is a list of strings that the LogRecord's body field must match against.
LogBodies []string `river:"log_bodies,attr,optional"`

// LogSeverityTexts is a list of strings that the LogRecord's severity text field must match against.
LogSeverityTexts []string `river:"log_severity_texts,attr,optional"`

// LogSeverityNumber defines how to match against a log record's SeverityNumber, if defined.
LogSeverityNumber *LogSeverityNumberMatchProperties `river:"log_severity_number,block,optional"`

// MetricNames is a list of strings to match metric name against.
// A match occurs if metric name matches at least one item in the list.
MetricNames []string `river:"metric_names,attr,optional"`

// Attributes specifies the list of attributes to match against.
// All of these attributes must match exactly for a match to occur.
// Only match_type=strict is allowed if "attributes" are specified.
Attributes []Attribute `river:"attribute,block,optional"`

// Resources specify the list of items to match the resources against.
// A match occurs if the data's resources match at least one item in this list.
Resources []Attribute `river:"resource,block,optional"`

// Libraries specify the list of items to match the implementation library against.
// A match occurs if the span's implementation library matches at least one item in this list.
Libraries []InstrumentationLibrary `river:"library,block,optional"`

// SpanKinds specify the list of items to match the span kind against.
// A match occurs if the span's span kind matches at least one item in this list.
SpanKinds []string `river:"span_kinds,attr,optional"`
}

// Convert converts args into the upstream type.
func (args *MatchProperties) Convert() map[string]interface{} {
if args == nil {
return nil
}

res := make(map[string]interface{})

res["match_type"] = args.MatchType

if args.RegexpConfig != nil {
res["regexp"] = args.RegexpConfig.Convert()
}

if len(args.Services) > 0 {
res["services"] = args.Services
}

if len(args.SpanNames) > 0 {
res["span_names"] = args.SpanNames
}

if len(args.LogBodies) > 0 {
res["log_bodies"] = args.LogBodies
}

if len(args.LogSeverityTexts) > 0 {
res["log_severity_texts"] = args.LogSeverityTexts
}

if args.LogSeverityNumber != nil {
res["log_severity_number"] = args.LogSeverityNumber.Convert()
}

if len(args.MetricNames) > 0 {
res["metric_names"] = args.MetricNames
}

if subRes := convertAttributeSlice(args.Attributes); len(subRes) > 0 {
res["attributes"] = subRes
}

if subRes := convertAttributeSlice(args.Resources); len(subRes) > 0 {
res["resources"] = subRes
}

if subRes := convertInstrumentationLibrariesSlice(args.Libraries); len(subRes) > 0 {
res["libraries"] = subRes
}

if len(args.SpanKinds) > 0 {
res["span_kinds"] = args.SpanKinds
}

return res
}

// Return an empty slice if the input slice is empty.
func convertAttributeSlice(attrs []Attribute) []interface{} {
attrArr := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
attrArr = append(attrArr, attr.Convert())
}
return attrArr
}

// Return an empty slice if the input slice is empty.
func convertInstrumentationLibrariesSlice(libs []InstrumentationLibrary) []interface{} {
libsArr := make([]interface{}, 0, len(libs))
for _, lib := range libs {
libsArr = append(libsArr, lib.Convert())
}
return libsArr
}

type RegexpConfig struct {
// CacheEnabled determines whether match results are LRU cached to make subsequent matches faster.
// Cache size is unlimited unless CacheMaxNumEntries is also specified.
CacheEnabled bool `river:"cacheenabled,attr,optional"`
// CacheMaxNumEntries is the max number of entries of the LRU cache that stores match results.
// CacheMaxNumEntries is ignored if CacheEnabled is false.
CacheMaxNumEntries int `river:"cachemaxnumentries,attr,optional"`
}

func (args RegexpConfig) Convert() map[string]interface{} {
return map[string]interface{}{
"cacheenabled": args.CacheEnabled,
"cachemaxnumentries": args.CacheMaxNumEntries,
}
}

// Attribute specifies the attribute key and optional value to match against.
type Attribute struct {
// Key specifies the attribute key.
Key string `river:"key,attr"`

// Values specifies the value to match against.
// If it is not set, any value will match.
Value interface{} `river:"value,attr,optional"`
}

func (args Attribute) Convert() map[string]interface{} {
return map[string]interface{}{
"key": args.Key,
"value": args.Value,
}
}

// InstrumentationLibrary specifies the instrumentation library and optional version to match against.
type InstrumentationLibrary struct {
Name string `river:"name,attr"`
// version match
// expected actual match
// nil <blank> yes
// nil 1 yes
// <blank> <blank> yes
// <blank> 1 no
// 1 <blank> no
// 1 1 yes
Version *string `river:"version,attr"`
}

func (args InstrumentationLibrary) Convert() map[string]interface{} {
return map[string]interface{}{
"name": args.Name,
"version": strings.Clone(*args.Version),
}
}

// LogSeverityNumberMatchProperties defines how to match based on a log record's SeverityNumber field.
type LogSeverityNumberMatchProperties struct {
// Min is the lowest severity that may be matched.
// e.g. if this is plog.SeverityNumberInfo, INFO, WARN, ERROR, and FATAL logs will match.
Min int32 `river:"min,attr"`

// MatchUndefined controls whether logs with "undefined" severity matches.
// If this is true, entries with undefined severity will match.
MatchUndefined bool `river:"match_undefined,attr"`
}

func (args LogSeverityNumberMatchProperties) Convert() map[string]interface{} {
return map[string]interface{}{
"min": args.Min,
"match_undefined": args.MatchUndefined,
}
}
Loading

0 comments on commit 08f66a8

Please sign in to comment.