Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(comp/otelcol): component to filter OTEL telemetry #5562

Merged
merged 27 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e3aa250
feat(comp/otelcol): component to filter OTEL telemetry
hainenber Oct 22, 2023
3b5e418
doc(ref): add doc for otelcol.processor.filter
hainenber Oct 22, 2023
1f2325e
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
0640d33
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
6912f9e
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
3108a5a
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
f5978a1
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
f83dae0
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
6b65a76
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
50c261b
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
8bbb9fa
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 23, 2023
b6cdef8
refactor(otelcol/filter): move non-arg types to own file
hainenber Oct 23, 2023
acb3735
fix(otelcol/filter): deep copy cfgs during conversions
hainenber Oct 23, 2023
8c9eb08
doc(ref/comp): remove unused MD ref
hainenber Oct 23, 2023
52eef29
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 28, 2023
9f20f73
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 28, 2023
82e19b3
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 28, 2023
18b8d5a
Apply suggestions from code review
hainenber Oct 28, 2023
2cb8a05
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 28, 2023
3c22bba
doc(flow/ref): wordsmith warning section
hainenber Oct 28, 2023
313a278
chore(dep): bump filterprocessor mod
hainenber Oct 28, 2023
b615fe1
Update component/otelcol/processor/filter/filter_test.go
hainenber Oct 30, 2023
e4c0b37
Update docs/sources/flow/reference/components/otelcol.processor.filte…
hainenber Oct 30, 2023
eec95ce
refactor(comp/otelcol): move convert() to types.go
hainenber Oct 30, 2023
623177e
doc(src/flow/ref): add ref to use River raw string
hainenber Oct 30, 2023
8755163
Merge branch 'main' into add-otelcol-processor-filter
hainenber Oct 30, 2023
527bff5
Merge branch 'main' into add-otelcol-processor-filter
hainenber Oct 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ Main (unreleased)

- Add queueing logs remote write client for `loki.write` when WAL is enabled. (@thepalbi)

- New Grafana Agent Flow components:

- `otelcol.processor.filter` - filters OTLP telemetry data using OpenTelemetry
Transformation Language (OTTL). (@hainenber)


### Bugfixes

- Fixed an issue where `loki.process` validation for stage `metric.counter` was
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
_ "github.com/grafana/agent/component/otelcol/processor/attributes" // Import otelcol.processor.attributes
_ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/agent/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/agent/component/otelcol/processor/filter" // Import otelcol.processor.filter
_ "github.com/grafana/agent/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes
_ "github.com/grafana/agent/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
_ "github.com/grafana/agent/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler
Expand Down
110 changes: 110 additions & 0 deletions component/otelcol/processor/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package filter

import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor"
otel_service "github.com/grafana/agent/service/otel"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.filter",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},
NeedsServices: []string{otel_service.ServiceName},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := filterprocessor.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

type Arguments struct {
// ErrorMode determines how the processor reacts to errors that occur while processing a statement.
ErrorMode ottl.ErrorMode `river:"error_mode,attr,optional"`
Traces traceConfig `river:"traces,block,optional"`
Metrics metricConfig `river:"metrics,block,optional"`
Logs logConfig `river:"logs,block,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
}

var (
_ processor.Arguments = Arguments{}
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
ErrorMode: ottl.PropagateError,
}

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
otelArgs, err := args.convertImpl()
if err != nil {
return err
}
return otelArgs.Validate()
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return args.convertImpl()
}

// convertImpl is a helper function which returns the real type of the config,
// instead of the otelcomponent.Config interface.
func (args Arguments) convertImpl() (*filterprocessor.Config, error) {
input := make(map[string]interface{})

input["error_mode"] = args.ErrorMode

if len(args.Traces.Span) > 0 || len(args.Traces.SpanEvent) > 0 {
input["traces"] = args.Traces.convert()
}

if len(args.Metrics.Metric) > 0 || len(args.Metrics.Datapoint) > 0 {
input["metrics"] = args.Metrics.convert()
}

if len(args.Logs.LogRecord) > 0 {
input["logs"] = args.Logs.convert()
}

var result filterprocessor.Config
err := mapstructure.Decode(input, &result)

if err != nil {
return nil, err
}

return &result, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}
188 changes: 188 additions & 0 deletions component/otelcol/processor/filter/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package filter_test

import (
"testing"

"github.com/grafana/agent/component/otelcol/processor/filter"
"github.com/grafana/river"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
"github.com/stretchr/testify/require"
)

// Source: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/filterprocessor/README.md#filter-spans-from-traces
func TestArguments_UnmarshalRiver(t *testing.T) {
tests := []struct {
testName string
cfg string
expected map[string]interface{}
errMsg string
}{
{
testName: "Defaults",
cfg: `
output {}
`,
expected: map[string]interface{}{
"error_mode": "propagate",
},
},
{
testName: "IgnoreErrors",
cfg: `
error_mode = "ignore"
output {}
`,
expected: map[string]interface{}{
"error_mode": "ignore",
},
},
{
testName: "DropNonHttpSpans",
cfg: `
error_mode = "ignore"
traces {
span = [
"attributes[\"http.request.method\"] == nil",
]
}
output {}
`,
expected: map[string]interface{}{
"error_mode": "ignore",
"traces": map[string]interface{}{
"span": []interface{}{
`attributes["http.request.method"] == nil`,
},
},
},
},
{
testName: "FilterForMultipleObs",
cfg: `
error_mode = "ignore"
traces {
span = [
"attributes[\"container.name\"] == \"app_container_1\"",
"resource.attributes[\"host.name\"] == \"localhost\"",
"name == \"app_1\"",
]
spanevent = [
"attributes[\"grpc\"] == true",
"IsMatch(name, \".*grpc.*\")",
]
}
metrics {
metric = [
"name == \"my.metric\" and resource.attributes[\"my_label\"] == \"abc123\"",
"type == METRIC_DATA_TYPE_HISTOGRAM",
]
datapoint = [
"metric.type == METRIC_DATA_TYPE_SUMMARY",
"resource.attributes[\"service.name\"] == \"my_service_name\"",
]
}
logs {
log_record = [
"IsMatch(body, \".*password.*\")",
"severity_number < SEVERITY_NUMBER_WARN",
]
}
output {}
`,
expected: map[string]interface{}{
"error_mode": "ignore",
"traces": map[string]interface{}{
"span": []interface{}{
`attributes["container.name"] == "app_container_1"`,
`resource.attributes["host.name"] == "localhost"`,
`name == "app_1"`,
},
"spanevent": []interface{}{
`attributes["grpc"] == true`,
`IsMatch(name, ".*grpc.*")`,
},
},
"metrics": map[string]interface{}{
"metric": []interface{}{
`name == "my.metric" and resource.attributes["my_label"] == "abc123"`,
`type == METRIC_DATA_TYPE_HISTOGRAM`,
},
"datapoint": []interface{}{
`metric.type == METRIC_DATA_TYPE_SUMMARY`,
`resource.attributes["service.name"] == "my_service_name"`,
},
},
"logs": map[string]interface{}{
"log_record": []interface{}{
`IsMatch(body, ".*password.*")`,
`severity_number < SEVERITY_NUMBER_WARN`,
},
},
},
},
{
testName: "ValidOtelFilterFunctionUsage",
cfg: `
error_mode = "ignore"
metrics {
metric = [
"HasAttrKeyOnDatapoint(\"http.method\")",
"HasAttrOnDatapoint(\"http.method\", \"GET\")",
]
}
output {}
`,
expected: map[string]interface{}{
"error_mode": "ignore",
"metrics": map[string]interface{}{
"metric": []interface{}{
`HasAttrKeyOnDatapoint("http.method")`,
`HasAttrOnDatapoint("http.method", "GET")`,
},
},
},
},
{
testName: "invalidOtelFilterFunctionUsage",
cfg: `
error_mode = "ignore"
metrics {
metric = [
"UnknowFunction(\"http.method\")",
]
}
output {}
`,
errMsg: `unable to parse OTTL statement "match() where UnknowFunction(\"http.method\")": undefined function "UnknowFunction"`,
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args filter.Arguments
err := river.Unmarshal([]byte(tc.cfg), &args)
if tc.errMsg != "" {
require.ErrorContains(t, err, tc.errMsg)
return
}
require.NoError(t, err)

actualPtr, err := args.Convert()
require.NoError(t, err)

actual := actualPtr.(*filterprocessor.Config)

var expectedCfg filterprocessor.Config
err = mapstructure.Decode(tc.expected, &expectedCfg)
require.NoError(t, err)

// Validate
require.NoError(t, actual.Validate())
require.NoError(t, expectedCfg.Validate())

// Compare
require.Equal(t, expectedCfg, *actual)
})
}
}
56 changes: 56 additions & 0 deletions component/otelcol/processor/filter/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package filter

type traceConfig struct {
Span []string `river:"span,attr,optional"`
SpanEvent []string `river:"spanevent,attr,optional"`
}

type metricConfig struct {
Metric []string `river:"metric,attr,optional"`
Datapoint []string `river:"datapoint,attr,optional"`
}
type logConfig struct {
LogRecord []string `river:"log_record,attr,optional"`
}

func (args *traceConfig) convert() map[string]interface{} {
if args == nil {
return nil
}

result := make(map[string]interface{})
if len(args.Span) > 0 {
result["span"] = append([]string{}, args.Span...)
}
if len(args.SpanEvent) > 0 {
result["spanevent"] = append([]string{}, args.SpanEvent...)
}

return result
}

func (args *metricConfig) convert() map[string]interface{} {
if args == nil {
return nil
}

result := make(map[string]interface{})
if len(args.Metric) > 0 {
result["metric"] = append([]string{}, args.Metric...)
}
if len(args.Datapoint) > 0 {
result["datapoint"] = append([]string{}, args.Datapoint...)
}

return result
}

func (args *logConfig) convert() map[string]interface{} {
if args == nil {
return nil
}

return map[string]interface{}{
"log_record": append([]string{}, args.LogRecord...),
}
}
Loading