Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a otelcol.processor.transform component #5337

Merged
merged 3 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ Main (unreleased)
- `remote.kubernetes.secret` loads a secret's data for use in other components (@captncraig)
- `prometheus.exporter.agent` - scrape agent's metrics. (@hainenber)
- `prometheus.exporter.vsphere` - scrape vmware vsphere metrics. (@marctc)
- `otelcol.processor.transform` transforms OTLP telemetry data using the
OpenTelemetry Transformation Language (OTTL). It is most commonly used
for transformations on attributes.

- Flow: allow the HTTP server to be configured with TLS in the config file
using the new `http` config block. (@rfratto)
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
_ "github.com/grafana/agent/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler
_ "github.com/grafana/agent/component/otelcol/processor/span" // Import otelcol.processor.span
_ "github.com/grafana/agent/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling
_ "github.com/grafana/agent/component/otelcol/processor/transform" // Import otelcol.processor.transform
_ "github.com/grafana/agent/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger
_ "github.com/grafana/agent/component/otelcol/receiver/kafka" // Import otelcol.receiver.kafka
_ "github.com/grafana/agent/component/otelcol/receiver/loki" // Import otelcol.receiver.loki
Expand Down
173 changes: 173 additions & 0 deletions component/otelcol/processor/transform/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Package transform provides an otelcol.processor.transform component.
package transform

import (
"fmt"
"strings"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor"
otel_service "github.com/grafana/agent/service/otel"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.transform",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},
NeedsServices: []string{otel_service.ServiceName},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := transformprocessor.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

type ContextID string

const (
Resource ContextID = "resource"
Scope ContextID = "scope"
Span ContextID = "span"
SpanEvent ContextID = "spanevent"
Metric ContextID = "metric"
DataPoint ContextID = "datapoint"
Log ContextID = "log"
)

func (c *ContextID) UnmarshalText(text []byte) error {
str := ContextID(strings.ToLower(string(text)))
switch str {
case Resource, Scope, Span, SpanEvent, Metric, DataPoint, Log:
*c = str
return nil
default:
return fmt.Errorf("unknown context %v", str)
}
}

type contextStatementsSlice []contextStatements

type contextStatements struct {
ptodev marked this conversation as resolved.
Show resolved Hide resolved
Context ContextID `river:"context,attr"`
Statements []string `river:"statements,attr"`
}

// Arguments configures the otelcol.processor.transform component.
type Arguments struct {
// ErrorMode determines how the processor reacts to errors that occur while processing a statement.
ErrorMode ottl.ErrorMode `river:"error_mode,attr,optional"`
TraceStatements contextStatementsSlice `river:"trace_statements,block,optional"`
MetricStatements contextStatementsSlice `river:"metric_statements,block,optional"`
LogStatements contextStatementsSlice `river:"log_statements,block,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If this is required, should we make it a struct instead of a pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep it consistent between all otelcol components. Not sure why it's a pointer tbh. Maybe to avoid extra copies? @rfratto, do you happen to know why it was originally made this way?

}

var (
_ processor.Arguments = Arguments{}
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
ErrorMode: ottl.PropagateError,
}

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
otelArgs, err := args.convertImpl()
if err != nil {
return err
}
return otelArgs.Validate()
}

func (stmts *contextStatementsSlice) convert() []interface{} {
if stmts == nil {
return nil
}

res := make([]interface{}, 0, len(*stmts))

if len(*stmts) == 0 {
return res
}

for _, stmt := range *stmts {
res = append(res, stmt.convert())
}
return res
}

func (args *contextStatements) convert() map[string]interface{} {
if args == nil {
return nil
}

return map[string]interface{}{
"context": args.Context,
"statements": args.Statements,
}
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return args.convertImpl()
}

// convertImpl is a helper function which returns the real type of the config,
// instead of the otelcomponent.Config interface.
func (args Arguments) convertImpl() (*transformprocessor.Config, error) {
input := make(map[string]interface{})

input["error_mode"] = args.ErrorMode

if len(args.TraceStatements) > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be simplified by removing the len check? Your checking for nil in the convert itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes in the Collector config the behaviour is different even if none of the underlying config is set. E.g. I think this will enable both grpc and http receivers:

receivers:
  otlp:
    protocols:
      grpc:
      http:

But I think this will enable only http:

receivers:
  otlp:
    protocols:
      http:

In the case of this component, I think the behaviour would be the same, but I'd still want to refrain from setting this unless we want this field to be used with its default values.

input["trace_statements"] = args.TraceStatements.convert()
}

if len(args.MetricStatements) > 0 {
input["metric_statements"] = args.MetricStatements.convert()
}

if len(args.LogStatements) > 0 {
input["log_statements"] = args.LogStatements.convert()
}

var result transformprocessor.Config
err := mapstructure.Decode(input, &result)

if err != nil {
return nil, err
}

return &result, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}
Loading