diff --git a/CHANGELOG.md b/CHANGELOG.md index c1c5ae9c6938..cc67b0ff56d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,9 @@ Main (unreleased) - `remote.kubernetes.secret` loads a secret's data for use in other components (@captncraig) - `prometheus.exporter.agent` - scrape agent's metrics. (@hainenber) - `prometheus.exporter.vsphere` - scrape vmware vsphere metrics. (@marctc) + - `otelcol.processor.transform` transforms OTLP telemetry data using the + OpenTelemetry Transformation Language (OTTL). It is most commonly used + for transformations on attributes. - Flow: allow the HTTP server to be configured with TLS in the config file using the new `http` config block. (@rfratto) diff --git a/component/all/all.go b/component/all/all.go index b201065f3063..9f9e20ffc106 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -81,6 +81,7 @@ import ( _ "github.com/grafana/agent/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler _ "github.com/grafana/agent/component/otelcol/processor/span" // Import otelcol.processor.span _ "github.com/grafana/agent/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling + _ "github.com/grafana/agent/component/otelcol/processor/transform" // Import otelcol.processor.transform _ "github.com/grafana/agent/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger _ "github.com/grafana/agent/component/otelcol/receiver/kafka" // Import otelcol.receiver.kafka _ "github.com/grafana/agent/component/otelcol/receiver/loki" // Import otelcol.receiver.loki diff --git a/component/otelcol/processor/transform/transform.go b/component/otelcol/processor/transform/transform.go new file mode 100644 index 000000000000..85f86e5ac1ca --- /dev/null +++ b/component/otelcol/processor/transform/transform.go @@ -0,0 +1,173 @@ +// Package transform provides an otelcol.processor.transform component. +package transform + +import ( + "fmt" + "strings" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/processor" + otel_service "github.com/grafana/agent/service/otel" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.processor.transform", + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + NeedsServices: []string{otel_service.ServiceName}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := transformprocessor.NewFactory() + return processor.New(opts, fact, args.(Arguments)) + }, + }) +} + +type ContextID string + +const ( + Resource ContextID = "resource" + Scope ContextID = "scope" + Span ContextID = "span" + SpanEvent ContextID = "spanevent" + Metric ContextID = "metric" + DataPoint ContextID = "datapoint" + Log ContextID = "log" +) + +func (c *ContextID) UnmarshalText(text []byte) error { + str := ContextID(strings.ToLower(string(text))) + switch str { + case Resource, Scope, Span, SpanEvent, Metric, DataPoint, Log: + *c = str + return nil + default: + return fmt.Errorf("unknown context %v", str) + } +} + +type contextStatementsSlice []contextStatements + +type contextStatements struct { + Context ContextID `river:"context,attr"` + Statements []string `river:"statements,attr"` +} + +// Arguments configures the otelcol.processor.transform component. +type Arguments struct { + // ErrorMode determines how the processor reacts to errors that occur while processing a statement. + ErrorMode ottl.ErrorMode `river:"error_mode,attr,optional"` + TraceStatements contextStatementsSlice `river:"trace_statements,block,optional"` + MetricStatements contextStatementsSlice `river:"metric_statements,block,optional"` + LogStatements contextStatementsSlice `river:"log_statements,block,optional"` + + // Output configures where to send processed data. Required. + Output *otelcol.ConsumerArguments `river:"output,block"` +} + +var ( + _ processor.Arguments = Arguments{} +) + +// DefaultArguments holds default settings for Arguments. +var DefaultArguments = Arguments{ + ErrorMode: ottl.PropagateError, +} + +// SetToDefault implements river.Defaulter. +func (args *Arguments) SetToDefault() { + *args = DefaultArguments +} + +// Validate implements river.Validator. +func (args *Arguments) Validate() error { + otelArgs, err := args.convertImpl() + if err != nil { + return err + } + return otelArgs.Validate() +} + +func (stmts *contextStatementsSlice) convert() []interface{} { + if stmts == nil { + return nil + } + + res := make([]interface{}, 0, len(*stmts)) + + if len(*stmts) == 0 { + return res + } + + for _, stmt := range *stmts { + res = append(res, stmt.convert()) + } + return res +} + +func (args *contextStatements) convert() map[string]interface{} { + if args == nil { + return nil + } + + return map[string]interface{}{ + "context": args.Context, + "statements": args.Statements, + } +} + +// Convert implements processor.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + return args.convertImpl() +} + +// convertImpl is a helper function which returns the real type of the config, +// instead of the otelcomponent.Config interface. +func (args Arguments) convertImpl() (*transformprocessor.Config, error) { + input := make(map[string]interface{}) + + input["error_mode"] = args.ErrorMode + + if len(args.TraceStatements) > 0 { + input["trace_statements"] = args.TraceStatements.convert() + } + + if len(args.MetricStatements) > 0 { + input["metric_statements"] = args.MetricStatements.convert() + } + + if len(args.LogStatements) > 0 { + input["log_statements"] = args.LogStatements.convert() + } + + var result transformprocessor.Config + err := mapstructure.Decode(input, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} + +// Extensions implements processor.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements processor.Arguments. +func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements processor.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} diff --git a/component/otelcol/processor/transform/transform_test.go b/component/otelcol/processor/transform/transform_test.go new file mode 100644 index 000000000000..9547d30b24ec --- /dev/null +++ b/component/otelcol/processor/transform/transform_test.go @@ -0,0 +1,549 @@ +package transform_test + +import ( + "testing" + + "github.com/grafana/agent/component/otelcol/processor/transform" + "github.com/grafana/river" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" + "github.com/stretchr/testify/require" +) + +func TestArguments_UnmarshalRiver(t *testing.T) { + tests := []struct { + testName string + cfg string + expected map[string]interface{} + errorMsg string + }{ + { + testName: "Defaults", + cfg: ` + output {} + `, + expected: map[string]interface{}{ + "error_mode": "propagate", + }, + }, + { + testName: "IgnoreErrors", + cfg: ` + error_mode = "ignore" + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + }, + }, + { + testName: "TransformIfFieldDoesNotExist", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "span" + statements = [ + // Accessing a map with a key that does not exist will return nil. + "set(attributes[\"test\"], \"pass\") where attributes[\"test\"] == nil", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "span", + "statements": []interface{}{ + `set(attributes["test"], "pass") where attributes["test"] == nil`, + }, + }, + }, + }, + }, + { + testName: "RenameAttribute1", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "resource" + statements = [ + "set(attributes[\"namespace\"], attributes[\"k8s.namespace.name\"])", + "delete_key(attributes, \"k8s.namespace.name\")", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["namespace"], attributes["k8s.namespace.name"])`, + `delete_key(attributes, "k8s.namespace.name")`, + }, + }, + }, + }, + }, + { + testName: "RenameAttribute2", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "resource" + statements = [ + "replace_all_patterns(attributes, \"key\", \"k8s\\\\.namespace\\\\.name\", \"namespace\")", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `replace_all_patterns(attributes, "key", "k8s\\.namespace\\.name", "namespace")`, + }, + }, + }, + }, + }, + { + testName: "CreateAttributeFromContentOfLogBody", + cfg: ` + error_mode = "ignore" + log_statements { + context = "log" + statements = [ + "set(attributes[\"body\"], body)", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "log_statements": []interface{}{ + map[string]interface{}{ + "context": "log", + "statements": []interface{}{ + `set(attributes["body"], body)`, + }, + }, + }, + }, + }, + { + testName: "CombineTwoAttributes", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "resource" + statements = [ + // The Concat function combines any number of strings, separated by a delimiter. + "set(attributes[\"test\"], Concat([attributes[\"foo\"], attributes[\"bar\"]], \" \"))", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["test"], Concat([attributes["foo"], attributes["bar"]], " "))`, + }, + }, + }, + }, + }, + { + testName: "ParseJsonLogs", + cfg: ` + error_mode = "ignore" + log_statements { + context = "log" + statements = [ + "merge_maps(cache, ParseJSON(body), \"upsert\") where IsMatch(body, \"^\\\\{\") ", + "set(attributes[\"attr1\"], cache[\"attr1\"])", + "set(attributes[\"attr2\"], cache[\"attr2\"])", + "set(attributes[\"nested.attr3\"], cache[\"nested\"][\"attr3\"])", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "log_statements": []interface{}{ + map[string]interface{}{ + "context": "log", + "statements": []interface{}{ + `merge_maps(cache, ParseJSON(body), "upsert") where IsMatch(body, "^\\{") `, + `set(attributes["attr1"], cache["attr1"])`, + `set(attributes["attr2"], cache["attr2"])`, + `set(attributes["nested.attr3"], cache["nested"]["attr3"])`, + }, + }, + }, + }, + }, + { + testName: "ManyStatements1", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\", \"process.command_line\"])", + "replace_pattern(attributes[\"process.command_line\"], \"password\\\\=[^\\\\s]*(\\\\s?)\", \"password=***\")", + "limit(attributes, 100, [])", + "truncate_all(attributes, 4096)", + ] + } + trace_statements { + context = "span" + statements = [ + "set(status.code, 1) where attributes[\"http.path\"] == \"/health\"", + "set(name, attributes[\"http.route\"])", + "replace_match(attributes[\"http.target\"], \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", + "limit(attributes, 100, [])", + "truncate_all(attributes, 4096)", + ] + } + metric_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"host.name\"])", + "truncate_all(attributes, 4096)", + ] + } + metric_statements { + context = "metric" + statements = [ + "set(description, \"Sum\") where type == \"Sum\"", + ] + } + metric_statements { + context = "datapoint" + statements = [ + "limit(attributes, 100, [\"host.name\"])", + "truncate_all(attributes, 4096)", + "convert_sum_to_gauge() where metric.name == \"system.processes.count\"", + "convert_gauge_to_sum(\"cumulative\", false) where metric.name == \"prometheus_metric\"", + ] + } + log_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\"])", + ] + } + log_statements { + context = "log" + statements = [ + "set(severity_text, \"FAIL\") where body == \"request failed\"", + "replace_all_matches(attributes, \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", + "replace_all_patterns(attributes, \"value\", \"/account/\\\\d{4}\", \"/account/{accountId}\")", + "set(body, attributes[\"http.route\"])", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `keep_keys(attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"])`, + `replace_pattern(attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***")`, + `limit(attributes, 100, [])`, + `truncate_all(attributes, 4096)`, + }, + }, + map[string]interface{}{ + "context": "span", + "statements": []interface{}{ + `set(status.code, 1) where attributes["http.path"] == "/health"`, + `set(name, attributes["http.route"])`, + `replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")`, + `limit(attributes, 100, [])`, + `truncate_all(attributes, 4096)`, + }, + }, + }, + "metric_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `keep_keys(attributes, ["host.name"])`, + `truncate_all(attributes, 4096)`, + }, + }, + map[string]interface{}{ + "context": "metric", + "statements": []interface{}{ + `set(description, "Sum") where type == "Sum"`, + }, + }, + map[string]interface{}{ + "context": "datapoint", + "statements": []interface{}{ + `limit(attributes, 100, ["host.name"])`, + `truncate_all(attributes, 4096)`, + `convert_sum_to_gauge() where metric.name == "system.processes.count"`, + `convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric"`, + }, + }, + }, + "log_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `keep_keys(attributes, ["service.name", "service.namespace", "cloud.region"])`, + }, + }, + map[string]interface{}{ + "context": "log", + "statements": []interface{}{ + `set(severity_text, "FAIL") where body == "request failed"`, + `replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")`, + `replace_all_patterns(attributes, "value", "/account/\\d{4}", "/account/{accountId}")`, + `set(body, attributes["http.route"])`, + }, + }, + }, + }, + }, + { + testName: "ManyStatements2", + cfg: ` + trace_statements { + context = "span" + statements = [ + "set(name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + trace_statements { + context = "resource" + statements = [ + "set(attributes[\"name\"], \"bear\")", + ] + } + metric_statements { + context = "datapoint" + statements = [ + "set(metric.name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + metric_statements { + context = "resource" + statements = [ + "set(attributes[\"name\"], \"bear\")", + ] + } + log_statements { + context = "log" + statements = [ + "set(body, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + log_statements { + context = "resource" + statements = [ + "set(attributes[\"name\"], \"bear\")", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "propagate", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "span", + "statements": []interface{}{ + `set(name, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["name"], "bear")`, + }, + }, + }, + "metric_statements": []interface{}{ + map[string]interface{}{ + "context": "datapoint", + "statements": []interface{}{ + `set(metric.name, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["name"], "bear")`, + }, + }, + }, + "log_statements": []interface{}{ + map[string]interface{}{ + "context": "log", + "statements": []interface{}{ + `set(body, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["name"], "bear")`, + }, + }, + }, + }, + }, + { + testName: "unknown_error_mode", + cfg: ` + error_mode = "test" + output {} + `, + errorMsg: `2:17: "test" unknown error mode test`, + }, + { + testName: "bad_syntax_log", + cfg: ` + log_statements { + context = "log" + statements = [ + "set(body, \"bear\" where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement "set(body, \"bear\" where attributes[\"http.path\"] == \"/animal\"": statement has invalid syntax: 1:18: unexpected token "where" (expected ")" Key*)`, + }, + { + testName: "bad_syntax_metric", + cfg: ` + metric_statements { + context = "datapoint" + statements = [ + "set(name, \"bear\" where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement "set(name, \"bear\" where attributes[\"http.path\"] == \"/animal\"": statement has invalid syntax: 1:18: unexpected token "where" (expected ")" Key*)`, + }, + { + testName: "bad_syntax_trace", + cfg: ` + trace_statements { + context = "span" + statements = [ + "set(name, \"bear\" where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement "set(name, \"bear\" where attributes[\"http.path\"] == \"/animal\"": statement has invalid syntax: 1:18: unexpected token "where" (expected ")" Key*)`, + }, + { + testName: "unknown_function_log", + cfg: ` + log_statements { + context = "log" + statements = [ + "set(body, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "not_a_function(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement "not_a_function(attributes, [\"http.method\", \"http.path\"])": undefined function "not_a_function"`, + }, + { + testName: "unknown_function_metric", + cfg: ` + metric_statements { + context = "datapoint" + statements = [ + "set(metric.name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "not_a_function(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement "not_a_function(attributes, [\"http.method\", \"http.path\"])": undefined function "not_a_function"`, + }, + { + testName: "unknown_function_trace", + cfg: ` + trace_statements { + context = "span" + statements = [ + "set(name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "not_a_function(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement "not_a_function(attributes, [\"http.method\", \"http.path\"])": undefined function "not_a_function"`, + }, + { + testName: "unknown_context", + cfg: ` + trace_statements { + context = "test" + statements = [ + "set(name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + ] + } + output {} + `, + errorMsg: `3:15: "test" unknown context test`, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args transform.Arguments + err := river.Unmarshal([]byte(tc.cfg), &args) + if tc.errorMsg != "" { + require.ErrorContains(t, err, tc.errorMsg) + return + } + + require.NoError(t, err) + + actualPtr, err := args.Convert() + require.NoError(t, err) + + actual := actualPtr.(*transformprocessor.Config) + + var expectedCfg transformprocessor.Config + err = mapstructure.Decode(tc.expected, &expectedCfg) + require.NoError(t, err) + + // Validate the two configs + require.NoError(t, actual.Validate()) + require.NoError(t, expectedCfg.Validate()) + + // Compare the two configs + require.Equal(t, expectedCfg, *actual) + }) + } +} diff --git a/docs/developer/writing-flow-component-documentation.md b/docs/developer/writing-flow-component-documentation.md index 7a5b8914f6f6..ddaf6466e021 100644 --- a/docs/developer/writing-flow-component-documentation.md +++ b/docs/developer/writing-flow-component-documentation.md @@ -560,4 +560,15 @@ The [loki.source.podlogs][] component documentation needed to add an extra section to document the PodLogs CRD, since we do not yet have a way of documenting auxiliary artifacts which are related to a component. +### otelcol.processor.transform + +The [otelcol.processor.transform][] component documentation needed to add +an extra section about OTTL Contexts because there is no appropriate OTEL docs page +that we could link to. Currently this information is housed on the [transformprocessor][] +doc page, but because it contains yaml config for the Collector, users might get confused +how this maps to River and it is better not to link to it. In the future we could try to +move this information from [transformprocessor][] to the [OTTL Context][ottl context] doc. + [loki.source.podlogs]: ../sources/flow/reference/components/loki.source.podlogs.md +[otelcol.processor.transform]: ../sources/flow/reference/components/otelcol.processor.transform.md +[ottl context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/README.md \ No newline at end of file diff --git a/docs/sources/flow/reference/components/otelcol.processor.transform.md b/docs/sources/flow/reference/components/otelcol.processor.transform.md new file mode 100644 index 000000000000..60aaa3fee27b --- /dev/null +++ b/docs/sources/flow/reference/components/otelcol.processor.transform.md @@ -0,0 +1,575 @@ +--- +aliases: +- /docs/grafana-cloud/agent/flow/reference/components/otelcol.processor.transform/ +- /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/components/otelcol.processor.transform/ +- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/otelcol.processor.transform/ +canonical: https://grafana.com/docs/agent/latest/flow/reference/components/otelcol.processor.transform/ +labels: + stage: experimental +title: otelcol.processor.transform +description: Learn about otelcol.processor.transform +--- + +# otelcol.processor.transform + +{{< docs/shared lookup="flow/stability/experimental.md" source="agent" version="" >}} + +`otelcol.processor.transform` accepts telemetry data from other `otelcol` +components and modifies it using the [OpenTelemetry Transformation Language (OTTL)][OTTL]. +OTTL statements consist of [OTTL functions][], which act on paths. +A path is a reference to a telemetry data such as: +* Resource attributes. +* Instrumentation scope name. +* Span attributes. + +In addition to the [standard OTTL functions][OTTL functions], +there is also a set of metrics-only functions: +* [convert_sum_to_gauge][] +* [convert_gauge_to_sum][] +* [convert_summary_count_val_to_sum][] +* [convert_summary_sum_val_to_sum][] + +[OTTL][] statements can also contain constructs such as: +* [Booleans][OTTL booleans]: + * `not true` + * `not IsMatch(name, "http_.*")` +* [Boolean Expressions][OTTL boolean expressions] consisting of a `where` followed by one or more booleans: + * `set(attributes["whose_fault"], "ours") where attributes["http.status"] == 500` + * `set(attributes["whose_fault"], "theirs") where attributes["http.status"] == 400 or attributes["http.status"] == 404` +* [Math expressions][OTTL math expressions]: + * `1 + 1` + * `end_time_unix_nano - start_time_unix_nano` + * `sum([1, 2, 3, 4]) + (10 / 1) - 1` + +{{% admonition type="note" %}} +Some characters inside River strings [need to be escaped][river-strings] with a `\` character. +For example, the OTTL statement `set(description, "Sum") where type == "Sum"` +is written in River as `"set(description, \"Sum\") where type == \"Sum\""`. + +[river-strings]: {{< relref "../../config-language/expressions/types_and_values.md/#strings" >}} +{{% /admonition %}} + +{{% admonition type="note" %}} +`otelcol.processor.transform` is a wrapper over the upstream +OpenTelemetry Collector `transform` processor. If necessary, bug reports or feature requests +will be redirected to the upstream repository. +{{% /admonition %}} + +You can specify multiple `otelcol.processor.transform` components by giving them different labels. + +{{% admonition type="warning" %}} +`otelcol.processor.transform` allows you to modify all aspects of your telemetry. Some specific risks are given below, +but this is not an exhaustive list. It is important to understand your data before using this processor. + +- [Unsound Transformations][]: Transformations between metric data types are not defined in the [metrics data model][]. +To use these functions, you must understand the incoming data and know that it can be meaningfully converted +to a new metric data type or can be used to create new metrics. + - Although OTTL allows you to use the `set` function with `metric.data_type`, + its implementation in the transform processor is a [no-op][]. + To modify a data type, you must use a specific function such as `convert_gauge_to_sum`. +- [Identity Conflict][]: Transformation of metrics can potentially affect a metric's identity, + leading to an Identity Crisis. Be especially cautious when transforming a metric name and when reducing or changing + existing attributes. Adding new attributes is safe. +- [Orphaned Telemetry][]: The processor allows you to modify `span_id`, `trace_id`, and `parent_span_id` for traces + and `span_id`, and `trace_id` logs. Modifying these fields could lead to orphaned spans or logs. + +[Unsound Transformations]: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.85.0/docs/standard-warnings.md#unsound-transformations +[Identity Conflict]: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.85.0/docs/standard-warnings.md#identity-conflict +[Orphaned Telemetry]: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.85.0/docs/standard-warnings.md#orphaned-telemetry +[no-op]: https://en.wikipedia.org/wiki/NOP_(code) +[metrics data model]: https://github.com/open-telemetry/opentelemetry-specification/blob/main//specification/metrics/data-model.md +{{% /admonition %}} + +## Usage + +```river +otelcol.processor.transform "LABEL" { + output { + metrics = [...] + logs = [...] + traces = [...] + } +} +``` + +## Arguments + +`otelcol.processor.transform` supports the following arguments: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`error_mode` | `string` | How to react to errors if they occur while processing a statement. | `"propagate"` | no + +The supported values for `error_mode` are: +* `ignore`: Ignore errors returned by statements and continue on to the next statement. This is the recommended mode. +* `propagate`: Return the error up the pipeline. This will result in the payload being dropped from the Agent. + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.processor.transform`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +trace_statements | [trace_statements][] | Statements which transform traces. | no +metric_statements | [metric_statements][] | Statements which transform metrics. | no +log_statements | [log_statements][] | Statements which transform logs. | no +output | [output][] | Configures where to send received telemetry data. | yes + +[trace_statements]: #trace_statements-block +[metric_statements]: #metric_statements-block +[log_statements]: #log_statements-block +[output]: #output-block + +[OTTL Context]: #ottl-context + +### trace_statements block + +The `trace_statements` block specifies statements which transform trace telemetry signals. +Multiple `trace_statements` blocks can be specified. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`context` | `string` | OTTL Context to use when interpreting the associated statements. | | yes +`statements` | `list(string)` | A list of OTTL statements. | | yes + +The supported values for `context` are: +* `resource`: Use when interacting only with OTLP resources (for example, resource attributes). +* `scope`: Use when interacting only with OTLP instrumentation scope (for example, the name of the instrumentation scope). +* `span`: Use when interacting only with OTLP spans. +* `spanevent`: Use when interacting only with OTLP span events. + +See [OTTL Context][] for more information about how ot use contexts. + +### metric_statements block + +The `metric_statements` block specifies statements which transform metric telemetry signals. +Multiple `metric_statements` blocks can be specified. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`context` | `string` | OTTL Context to use when interpreting the associated statements. | | yes +`statements` | `list(string)` | A list of OTTL statements. | | yes + +The supported values for `context` are: +* `resource`: Use when interacting only with OTLP resources (for example, resource attributes). +* `scope`: Use when interacting only with OTLP instrumentation scope (for example, the name of the instrumentation scope). +* `metric`: Use when interacting only with individual OTLP metrics. +* `datapoint`: Use when interacting only with individual OTLP metric data points. + +Refer to [OTTL Context][] for more information about how to use contexts. + +### log_statements block + +The `log_statements` block specifies statements which transform log telemetry signals. +Multiple `log_statements` blocks can be specified. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`context` | `string` | OTTL Context to use when interpreting the associated statements. | | yes +`statements` | `list(string)` | A list of OTTL statements. | | yes + +The supported values for `context` are: +* `resource`: Use when interacting only with OTLP resources (for example, resource attributes). +* `scope`: Use when interacting only with OTLP instrumentation scope (for example, the name of the instrumentation scope). +* `log`: Use when interacting only with OTLP logs. + +See [OTTL Context][] for more information about how ot use contexts. + +### OTTL Context + +Each context allows the transformation of its type of telemetry. +For example, statements associated with a `resource` context will be able to transform the resource's +`attributes` and `dropped_attributes_count`. + +Each type of `context` defines its own paths and enums specific to that context. +Refer to the OpenTelemetry documentation for a list of paths and enums for each context: +* [resource][OTTL resource context] +* [scope][OTTL scope context] +* [span][OTTL span context] +* [spanevent][OTTL spanevent context] +* [log][OTTL log context] +* [metric][OTTL metric context] +* [datapoint][OTTL datapoint context] + + +Contexts __NEVER__ supply access to individual items "lower" in the protobuf definition. +- This means statements associated to a `resource` __WILL NOT__ be able to access the underlying instrumentation scopes. +- This means statements associated to a `scope` __WILL NOT__ be able to access the underlying telemetry slices (spans, metrics, or logs). +- Similarly, statements associated to a `metric` __WILL NOT__ be able to access individual datapoints, but can access the entire datapoints slice. +- Similarly, statements associated to a `span` __WILL NOT__ be able to access individual SpanEvents, but can access the entire SpanEvents slice. + +For practical purposes, this means that a context cannot make decisions on its telemetry based on telemetry "lower" in the structure. +For example, __the following context statement is not possible__ because it attempts to use individual datapoint +attributes in the condition of a statement associated to a `metric`: + +```river +metric_statements { + context = "metric" + statements = [ + "set(description, \"test passed\") where datapoints.attributes[\"test\"] == \"pass\"", + ] +} +``` + +Context __ALWAYS__ supply access to the items "higher" in the protobuf definition that are associated to the telemetry being transformed. +- This means that statements associated to a `datapoint` have access to a datapoint's metric, instrumentation scope, and resource. +- This means that statements associated to a `spanevent` have access to a spanevent's span, instrumentation scope, and resource. +- This means that statements associated to a `span`/`metric`/`log` have access to the telemetry's instrumentation scope, and resource. +- This means that statements associated to a `scope` have access to the scope's resource. + +For example, __the following context statement is possible__ because `datapoint` statements can access the datapoint's metric. + +```river +metric_statements { + context = "datapoint" + statements = [ + "set(metric.description, \"test passed\") where attributes[\"test\"] == \"pass\"", + ] +} +``` + +The protobuf definitions for OTLP signals are maintained on GitHub: +* [traces][traces protobuf] +* [metrics][metrics protobuf] +* [logs][logs protobuf] + +Whenever possible, associate your statements to the context which the statement intens to transform. +The contexts are nested, and the higher-level contexts don't have to iterate through any of the +contexts at a lower level. For example, although you can modify resource attributes associated to a +span using the `span` context, it is more efficient to use the `resource` context. + +### output block + +{{< docs/shared lookup="flow/reference/components/output-block.md" source="agent" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +---- | ---- | ----------- +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics, +logs, or traces). + +## Component health + +`otelcol.processor.transform` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.processor.transform` does not expose any component-specific debug +information. + +## Debug metrics + +`otelcol.processor.transform` does not expose any component-specific debug metrics. + +## Examples + +### Perform a transformation if an attribute does not exist + +This example sets the attribute `test` to `pass` if the attribute `test` does not exist. + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "span" + statements = [ + // Accessing a map with a key that does not exist will return nil. + "set(attributes[\"test\"], \"pass\") where attributes[\"test\"] == nil", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Each `"` is [escaped][river-strings] with `\"` inside the River string. + +### Rename a resource attribute + +The are two ways to rename an attribute key. +One way is to set a new attribute and delete the old one: + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = [ + "set(attributes[\"namespace\"], attributes[\"k8s.namespace.name\"])", + "delete_key(attributes, \"k8s.namespace.name\")", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Another way is to update the key using regular expressions: + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = [ + "replace_all_patterns(attributes, \"key\", \"k8s\\\\.namespace\\\\.name\", \"namespace\")", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Some values in the River string are [escaped][river-strings]: +* `\` is escaped with `\\` +* `"` is escaped with `\"` + +### Create an attribute from the contents of a log body + +This example sets the attribute `body` to the value of the log body: + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + log_statements { + context = "log" + statements = [ + "set(attributes[\"body\"], body)", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Each `"` is [escaped][river-strings] with `\"` inside the River string. + +### Combine two attributes + +This example sets the attribute `test` to the value of attributes `service.name` and `service.version` combined. + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = [ + // The Concat function combines any number of strings, separated by a delimiter. + "set(attributes[\"test\"], Concat([attributes[\"service.name\"], attributes[\"service.version\"]], \" \"))", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Each `"` is [escaped][river-strings] with `\"` inside the River string. + +### Parsing JSON logs + +Given the following JSON body: + +```json +{ + "name": "log", + "attr1": "example value 1", + "attr2": "example value 2", + "nested": { + "attr3": "example value 3" + } +} +``` + +You can add specific fields as attributes on the log: + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + log_statements { + context = "log" + + statements = [ + // Parse body as JSON and merge the resulting map with the cache map, ignoring non-json bodies. + // cache is a field exposed by OTTL that is a temporary storage place for complex operations. + "merge_maps(cache, ParseJSON(body), \"upsert\") where IsMatch(body, \"^\\\\{\") ", + + // Set attributes using the values merged into cache. + // If the attribute doesn't exist in cache then nothing happens. + "set(attributes[\"attr1\"], cache[\"attr1\"])", + "set(attributes[\"attr2\"], cache[\"attr2\"])", + + // To access nested maps you can chain index ([]) operations. + // If nested or attr3 do no exist in cache then nothing happens. + "set(attributes[\"nested.attr3\"], cache[\"nested\"][\"attr3\"])", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Some values in the River strings are [escaped][river-strings]: +* `\` is escaped with `\\` +* `"` is escaped with `\"` + +### Various transformations of attributes and status codes + +The example takes advantage of context efficiency by grouping transformations +with the context which it intends to transform. + +```river +otelcol.receiver.otlp "default" { + http {} + grpc {} + + output { + metrics = [otelcol.processor.transform.default.input] + logs = [otelcol.processor.transform.default.input] + traces = [otelcol.processor.transform.default.input] + } +} + +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\", \"process.command_line\"])", + "replace_pattern(attributes[\"process.command_line\"], \"password\\\\=[^\\\\s]*(\\\\s?)\", \"password=***\")", + "limit(attributes, 100, [])", + "truncate_all(attributes, 4096)", + ] + } + + trace_statements { + context = "span" + statements = [ + "set(status.code, 1) where attributes[\"http.path\"] == \"/health\"", + "set(name, attributes[\"http.route\"])", + "replace_match(attributes[\"http.target\"], \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", + "limit(attributes, 100, [])", + "truncate_all(attributes, 4096)", + ] + } + + metric_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"host.name\"])", + "truncate_all(attributes, 4096)", + ] + } + + metric_statements { + context = "metric" + statements = [ + "set(description, \"Sum\") where type == \"Sum\"", + ] + } + + metric_statements { + context = "datapoint" + statements = [ + "limit(attributes, 100, [\"host.name\"])", + "truncate_all(attributes, 4096)", + "convert_sum_to_gauge() where metric.name == \"system.processes.count\"", + "convert_gauge_to_sum(\"cumulative\", false) where metric.name == \"prometheus_metric\"", + ] + } + + log_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\"])", + ] + } + + log_statements { + context = "log" + statements = [ + "set(severity_text, \"FAIL\") where body == \"request failed\"", + "replace_all_matches(attributes, \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", + "replace_all_patterns(attributes, \"value\", \"/account/\\\\d{4}\", \"/account/{accountId}\")", + "set(body, attributes[\"http.route\"])", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = env("OTLP_ENDPOINT") + } +} +``` + +Some values in the River strings are [escaped][river-strings]: +* `\` is escaped with `\\` +* `"` is escaped with `\"` + +[river-strings]: {{< relref "../../config-language/expressions/types_and_values.md/#strings" >}} + +[traces protobuf]: https://github.com/open-telemetry/opentelemetry-proto/blob/v0.17.0/opentelemetry/proto/trace/v1/trace.proto +[metrics protobuf]: https://github.com/open-telemetry/opentelemetry-proto/blob/v0.17.0/opentelemetry/proto/metrics/v1/metrics.proto +[logs protobuf]: https://github.com/open-telemetry/opentelemetry-proto/blob/v0.17.0/opentelemetry/proto/logs/v1/logs.proto + + +[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/README.md +[OTTL functions]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/ottlfuncs/README.md +[convert_sum_to_gauge]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/processor/transformprocessor#convert_sum_to_gauge +[convert_gauge_to_sum]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/processor/transformprocessor#convert_gauge_to_sum +[convert_summary_count_val_to_sum]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/processor/transformprocessor#convert_summary_count_val_to_sum +[convert_summary_sum_val_to_sum]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/processor/transformprocessor#convert_summary_sum_val_to_sum +[OTTL booleans]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/pkg/ottl#booleans +[OTTL math expressions]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/pkg/ottl#math-expressions +[OTTL boolean expressions]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/pkg/ottl#boolean-expressions +[OTTL resource context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlresource/README.md +[OTTL scope context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlscope/README.md +[OTTL span context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlspan/README.md +[OTTL spanevent context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlspanevent/README.md +[OTTL metric context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlmetric/README.md +[OTTL datapoint context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottldatapoint/README.md +[OTTL log context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottllog/README.md diff --git a/go.mod b/go.mod index cfa25b797dc9..4ad2d654f998 100644 --- a/go.mod +++ b/go.mod @@ -123,6 +123,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.85.0 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.85.0 diff --git a/go.sum b/go.sum index 76270822846a..65276509f825 100644 --- a/go.sum +++ b/go.sum @@ -1837,6 +1837,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocesso github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.85.0/go.mod h1:Rp+79qY7tJEgja2kHdWkWS0WDHP4+KRPfHmttCjbwDo= github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.85.0 h1:R4oQpH2bRx6UEGDdv15S2GkkkDFRbjXLO1b8fcjS8Lg= github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.85.0/go.mod h1:WuIjXo1AF9FWkG5HVSnm5koj27SEoVmENH3YVDNR1x4= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.85.0 h1:8bhHzQpYe0zFANlQU+yRvhqsVFMRpaqtBVlHzyPqHkc= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.85.0/go.mod h1:D1VNHbfUdVgtWiXUwr6OBetOl+TVDsZ8PkC5Fji1AAc= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.85.0 h1:1oQGK9OibOxvtJcnCdB6+r6jGpzSp3DrcoeaVgCwP28= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.85.0/go.mod h1:GNnx5ftrhf2bBSVIUzvfAkXBBjHfUYvmUOuR7mJWUOE= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.85.0 h1:yDfzy7NPgjWptm/wmbLunsbV4L6YzY1fwXY5i++afFU=