From e9632c257e5058615613db5e5ac21557f7884d75 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Fri, 29 Sep 2023 16:25:29 +0100 Subject: [PATCH] Add a otelcol.processor.transform component --- CHANGELOG.md | 3 + .../otelcol/processor/transform/transform.go | 173 ++++++ .../processor/transform/transform_test.go | 549 +++++++++++++++++ .../writing-flow-component-documentation.md | 11 + .../components/otelcol.processor.transform.md | 574 ++++++++++++++++++ go.mod | 1 + go.sum | 2 + 7 files changed, 1313 insertions(+) create mode 100644 component/otelcol/processor/transform/transform.go create mode 100644 component/otelcol/processor/transform/transform_test.go create mode 100644 docs/sources/flow/reference/components/otelcol.processor.transform.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 3170e2486876..ca4ece48b23d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,9 @@ Main (unreleased) - `remote.kubernetes.secret` loads a secret's data for use in other components (@captncraig) - `prometheus.exporter.agent` - scrape agent's metrics. (@hainenber) - `prometheus.exporter.vsphere` - scrape vmware vsphere metrics. (@marctc) + - `otelcol.processor.transform` transforms OTLP telemetry data using the + OpenTelemetry Transformation Language (OTTL). It is most commonly used + for transformations on attributes. - Flow: allow the HTTP server to be configured with TLS in the config file using the new `http` config block. (@rfratto) diff --git a/component/otelcol/processor/transform/transform.go b/component/otelcol/processor/transform/transform.go new file mode 100644 index 000000000000..85f86e5ac1ca --- /dev/null +++ b/component/otelcol/processor/transform/transform.go @@ -0,0 +1,173 @@ +// Package transform provides an otelcol.processor.transform component. +package transform + +import ( + "fmt" + "strings" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/processor" + otel_service "github.com/grafana/agent/service/otel" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.processor.transform", + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + NeedsServices: []string{otel_service.ServiceName}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := transformprocessor.NewFactory() + return processor.New(opts, fact, args.(Arguments)) + }, + }) +} + +type ContextID string + +const ( + Resource ContextID = "resource" + Scope ContextID = "scope" + Span ContextID = "span" + SpanEvent ContextID = "spanevent" + Metric ContextID = "metric" + DataPoint ContextID = "datapoint" + Log ContextID = "log" +) + +func (c *ContextID) UnmarshalText(text []byte) error { + str := ContextID(strings.ToLower(string(text))) + switch str { + case Resource, Scope, Span, SpanEvent, Metric, DataPoint, Log: + *c = str + return nil + default: + return fmt.Errorf("unknown context %v", str) + } +} + +type contextStatementsSlice []contextStatements + +type contextStatements struct { + Context ContextID `river:"context,attr"` + Statements []string `river:"statements,attr"` +} + +// Arguments configures the otelcol.processor.transform component. +type Arguments struct { + // ErrorMode determines how the processor reacts to errors that occur while processing a statement. + ErrorMode ottl.ErrorMode `river:"error_mode,attr,optional"` + TraceStatements contextStatementsSlice `river:"trace_statements,block,optional"` + MetricStatements contextStatementsSlice `river:"metric_statements,block,optional"` + LogStatements contextStatementsSlice `river:"log_statements,block,optional"` + + // Output configures where to send processed data. Required. + Output *otelcol.ConsumerArguments `river:"output,block"` +} + +var ( + _ processor.Arguments = Arguments{} +) + +// DefaultArguments holds default settings for Arguments. +var DefaultArguments = Arguments{ + ErrorMode: ottl.PropagateError, +} + +// SetToDefault implements river.Defaulter. +func (args *Arguments) SetToDefault() { + *args = DefaultArguments +} + +// Validate implements river.Validator. +func (args *Arguments) Validate() error { + otelArgs, err := args.convertImpl() + if err != nil { + return err + } + return otelArgs.Validate() +} + +func (stmts *contextStatementsSlice) convert() []interface{} { + if stmts == nil { + return nil + } + + res := make([]interface{}, 0, len(*stmts)) + + if len(*stmts) == 0 { + return res + } + + for _, stmt := range *stmts { + res = append(res, stmt.convert()) + } + return res +} + +func (args *contextStatements) convert() map[string]interface{} { + if args == nil { + return nil + } + + return map[string]interface{}{ + "context": args.Context, + "statements": args.Statements, + } +} + +// Convert implements processor.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + return args.convertImpl() +} + +// convertImpl is a helper function which returns the real type of the config, +// instead of the otelcomponent.Config interface. +func (args Arguments) convertImpl() (*transformprocessor.Config, error) { + input := make(map[string]interface{}) + + input["error_mode"] = args.ErrorMode + + if len(args.TraceStatements) > 0 { + input["trace_statements"] = args.TraceStatements.convert() + } + + if len(args.MetricStatements) > 0 { + input["metric_statements"] = args.MetricStatements.convert() + } + + if len(args.LogStatements) > 0 { + input["log_statements"] = args.LogStatements.convert() + } + + var result transformprocessor.Config + err := mapstructure.Decode(input, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} + +// Extensions implements processor.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements processor.Arguments. +func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements processor.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} diff --git a/component/otelcol/processor/transform/transform_test.go b/component/otelcol/processor/transform/transform_test.go new file mode 100644 index 000000000000..00a876d6f433 --- /dev/null +++ b/component/otelcol/processor/transform/transform_test.go @@ -0,0 +1,549 @@ +package transform_test + +import ( + "testing" + + "github.com/grafana/agent/component/otelcol/processor/transform" + "github.com/grafana/river" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" + "github.com/stretchr/testify/require" +) + +func TestArguments_UnmarshalRiver(t *testing.T) { + tests := []struct { + testName string + cfg string + expected map[string]interface{} + errorMsg string + }{ + { + testName: "Defaults", + cfg: ` + output {} + `, + expected: map[string]interface{}{ + "error_mode": "propagate", + }, + }, + { + testName: "IgnoreErrors", + cfg: ` + error_mode = "ignore" + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + }, + }, + { + testName: "TransformIfFieldDoesNotExist", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "span" + statements = [ + // Accessing a map with a key that does not exist will return nil. + "set(attributes[\"test\"], \"pass\") where attributes[\"test\"] == nil", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "span", + "statements": []interface{}{ + `set(attributes["test"], "pass") where attributes["test"] == nil`, + }, + }, + }, + }, + }, + { + testName: "RenameAttribute1", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "resource" + statements = [ + "set(attributes[\"namespace\"], attributes[\"k8s.namespace.name\"])", + "delete_key(attributes, \"k8s.namespace.name\")", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["namespace"], attributes["k8s.namespace.name"])`, + `delete_key(attributes, "k8s.namespace.name")`, + }, + }, + }, + }, + }, + { + testName: "RenameAttribute2", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "resource" + statements = [ + "replace_all_patterns(attributes, \"key\", \"k8s\\\\.namespace\\\\.name\", \"namespace\")", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `replace_all_patterns(attributes, "key", "k8s\\.namespace\\.name", "namespace")`, + }, + }, + }, + }, + }, + { + testName: "CreateAttributeFromContentOfLogBody", + cfg: ` + error_mode = "ignore" + log_statements { + context = "log" + statements = [ + "set(attributes[\"body\"], body)", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "log_statements": []interface{}{ + map[string]interface{}{ + "context": "log", + "statements": []interface{}{ + `set(attributes["body"], body)`, + }, + }, + }, + }, + }, + { + testName: "CombineTwoAttributes", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "resource" + statements = [ + // The Concat function combines any number of strings, separated by a delimiter. + "set(attributes[\"test\"], Concat([attributes[\"foo\"], attributes[\"bar\"]], \" \"))", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["test"], Concat([attributes["foo"], attributes["bar"]], " "))`, + }, + }, + }, + }, + }, + { + testName: "ParseJsonLogs", + cfg: ` + error_mode = "ignore" + log_statements { + context = "log" + statements = [ + "merge_maps(cache, ParseJSON(body), \"upsert\") where IsMatch(body, \"^\\\\{\") ", + "set(attributes[\"attr1\"], cache[\"attr1\"])", + "set(attributes[\"attr2\"], cache[\"attr2\"])", + "set(attributes[\"nested.attr3\"], cache[\"nested\"][\"attr3\"])", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "log_statements": []interface{}{ + map[string]interface{}{ + "context": "log", + "statements": []interface{}{ + `merge_maps(cache, ParseJSON(body), "upsert") where IsMatch(body, "^\\{") `, + `set(attributes["attr1"], cache["attr1"])`, + `set(attributes["attr2"], cache["attr2"])`, + `set(attributes["nested.attr3"], cache["nested"]["attr3"])`, + }, + }, + }, + }, + }, + { + testName: "ManyStatements1", + cfg: ` + error_mode = "ignore" + trace_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\", \"process.command_line\"])", + "replace_pattern(attributes[\"process.command_line\"], \"password\\\\=[^\\\\s]*(\\\\s?)\", \"password=***\")", + "limit(attributes, 100, [])", + "truncate_all(attributes, 4096)", + ] + } + trace_statements { + context = "span" + statements = [ + "set(status.code, 1) where attributes[\"http.path\"] == \"/health\"", + "set(name, attributes[\"http.route\"])", + "replace_match(attributes[\"http.target\"], \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", + "limit(attributes, 100, [])", + "truncate_all(attributes, 4096)", + ] + } + metric_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"host.name\"])", + "truncate_all(attributes, 4096)", + ] + } + metric_statements { + context = "metric" + statements = [ + "set(description, \"Sum\") where type == \"Sum\"", + ] + } + metric_statements { + context = "datapoint" + statements = [ + "limit(attributes, 100, [\"host.name\"])", + "truncate_all(attributes, 4096)", + "convert_sum_to_gauge() where metric.name == \"system.processes.count\"", + "convert_gauge_to_sum(\"cumulative\", false) where metric.name == \"prometheus_metric\"", + ] + } + log_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\"])", + ] + } + log_statements { + context = "log" + statements = [ + "set(severity_text, \"FAIL\") where body == \"request failed\"", + "replace_all_matches(attributes, \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", + "replace_all_patterns(attributes, \"value\", \"/account/\\\\d{4}\", \"/account/{accountId}\")", + "set(body, attributes[\"http.route\"])", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "ignore", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `keep_keys(attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"])`, + `replace_pattern(attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***")`, + `limit(attributes, 100, [])`, + `truncate_all(attributes, 4096)`, + }, + }, + map[string]interface{}{ + "context": "span", + "statements": []interface{}{ + `set(status.code, 1) where attributes["http.path"] == "/health"`, + `set(name, attributes["http.route"])`, + `replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")`, + `limit(attributes, 100, [])`, + `truncate_all(attributes, 4096)`, + }, + }, + }, + "metric_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `keep_keys(attributes, ["host.name"])`, + `truncate_all(attributes, 4096)`, + }, + }, + map[string]interface{}{ + "context": "metric", + "statements": []interface{}{ + `set(description, "Sum") where type == "Sum"`, + }, + }, + map[string]interface{}{ + "context": "datapoint", + "statements": []interface{}{ + `limit(attributes, 100, ["host.name"])`, + `truncate_all(attributes, 4096)`, + `convert_sum_to_gauge() where metric.name == "system.processes.count"`, + `convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric"`, + }, + }, + }, + "log_statements": []interface{}{ + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `keep_keys(attributes, ["service.name", "service.namespace", "cloud.region"])`, + }, + }, + map[string]interface{}{ + "context": "log", + "statements": []interface{}{ + `set(severity_text, "FAIL") where body == "request failed"`, + `replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")`, + `replace_all_patterns(attributes, "value", "/account/\\d{4}", "/account/{accountId}")`, + `set(body, attributes["http.route"])`, + }, + }, + }, + }, + }, + { + testName: "ManyStatements2", + cfg: ` + trace_statements { + context = "span" + statements = [ + "set(name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + trace_statements { + context = "resource" + statements = [ + "set(attributes[\"name\"], \"bear\")", + ] + } + metric_statements { + context = "datapoint" + statements = [ + "set(metric.name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + metric_statements { + context = "resource" + statements = [ + "set(attributes[\"name\"], \"bear\")", + ] + } + log_statements { + context = "log" + statements = [ + "set(body, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + log_statements { + context = "resource" + statements = [ + "set(attributes[\"name\"], \"bear\")", + ] + } + output {} + `, + expected: map[string]interface{}{ + "error_mode": "propagate", + "trace_statements": []interface{}{ + map[string]interface{}{ + "context": "span", + "statements": []interface{}{ + `set(name, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["name"], "bear")`, + }, + }, + }, + "metric_statements": []interface{}{ + map[string]interface{}{ + "context": "datapoint", + "statements": []interface{}{ + `set(metric.name, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["name"], "bear")`, + }, + }, + }, + "log_statements": []interface{}{ + map[string]interface{}{ + "context": "log", + "statements": []interface{}{ + `set(body, "bear") where attributes["http.path"] == "/animal"`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + map[string]interface{}{ + "context": "resource", + "statements": []interface{}{ + `set(attributes["name"], "bear")`, + }, + }, + }, + }, + }, + { + testName: "unknown_error_mode", + cfg: ` + error_mode = "test" + output {} + `, + errorMsg: `2:17: "test" unknown error mode test`, + }, + { + testName: "bad_syntax_log", + cfg: ` + log_statements { + context = "log" + statements = [ + "set(body, \"bear\" where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement: 1:18: unexpected token "where" (expected ")" Key*)`, + }, + { + testName: "bad_syntax_metric", + cfg: ` + metric_statements { + context = "datapoint" + statements = [ + "set(name, \"bear\" where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement: 1:18: unexpected token "where" (expected ")" Key*)`, + }, + { + testName: "bad_syntax_trace", + cfg: ` + trace_statements { + context = "span" + statements = [ + "set(name, \"bear\" where attributes[\"http.path\"] == \"/animal\"", + "keep_keys(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `unable to parse OTTL statement: 1:18: unexpected token "where" (expected ")" Key*)`, + }, + { + testName: "unknown_function_log", + cfg: ` + log_statements { + context = "log" + statements = [ + "set(body, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "not_a_function(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `undefined function not_a_function`, + }, + { + testName: "unknown_function_metric", + cfg: ` + metric_statements { + context = "datapoint" + statements = [ + "set(metric.name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "not_a_function(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `undefined function not_a_function`, + }, + { + testName: "unknown_function_trace", + cfg: ` + trace_statements { + context = "span" + statements = [ + "set(name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + "not_a_function(attributes, [\"http.method\", \"http.path\"])", + ] + } + output {} + `, + errorMsg: `undefined function not_a_function`, + }, + { + testName: "unknown_context", + cfg: ` + trace_statements { + context = "test" + statements = [ + "set(name, \"bear\") where attributes[\"http.path\"] == \"/animal\"", + ] + } + output {} + `, + errorMsg: `3:15: "test" unknown context test`, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args transform.Arguments + err := river.Unmarshal([]byte(tc.cfg), &args) + if tc.errorMsg != "" { + require.ErrorContains(t, err, tc.errorMsg) + return + } + + require.NoError(t, err) + + actualPtr, err := args.Convert() + require.NoError(t, err) + + actual := actualPtr.(*transformprocessor.Config) + + var expectedCfg transformprocessor.Config + err = mapstructure.Decode(tc.expected, &expectedCfg) + require.NoError(t, err) + + // Validate the two configs + require.NoError(t, actual.Validate()) + require.NoError(t, expectedCfg.Validate()) + + // Compare the two configs + require.Equal(t, expectedCfg, *actual) + }) + } +} diff --git a/docs/developer/writing-flow-component-documentation.md b/docs/developer/writing-flow-component-documentation.md index 7a5b8914f6f6..ddaf6466e021 100644 --- a/docs/developer/writing-flow-component-documentation.md +++ b/docs/developer/writing-flow-component-documentation.md @@ -560,4 +560,15 @@ The [loki.source.podlogs][] component documentation needed to add an extra section to document the PodLogs CRD, since we do not yet have a way of documenting auxiliary artifacts which are related to a component. +### otelcol.processor.transform + +The [otelcol.processor.transform][] component documentation needed to add +an extra section about OTTL Contexts because there is no appropriate OTEL docs page +that we could link to. Currently this information is housed on the [transformprocessor][] +doc page, but because it contains yaml config for the Collector, users might get confused +how this maps to River and it is better not to link to it. In the future we could try to +move this information from [transformprocessor][] to the [OTTL Context][ottl context] doc. + [loki.source.podlogs]: ../sources/flow/reference/components/loki.source.podlogs.md +[otelcol.processor.transform]: ../sources/flow/reference/components/otelcol.processor.transform.md +[ottl context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/README.md \ No newline at end of file diff --git a/docs/sources/flow/reference/components/otelcol.processor.transform.md b/docs/sources/flow/reference/components/otelcol.processor.transform.md new file mode 100644 index 000000000000..88b03e515e24 --- /dev/null +++ b/docs/sources/flow/reference/components/otelcol.processor.transform.md @@ -0,0 +1,574 @@ +--- +aliases: +- /docs/grafana-cloud/agent/flow/reference/components/otelcol.processor.transform/ +- /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/components/otelcol.processor.transform/ +- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/otelcol.processor.transform/ +canonical: https://grafana.com/docs/agent/latest/flow/reference/components/otelcol.processor.transform/ +labels: + stage: experimental +title: otelcol.processor.transform +description: Learn about otelcol.processor.transform +--- + +# otelcol.processor.transform + +{{< docs/shared lookup="flow/stability/experimental.md" source="agent" version="" >}} + +`otelcol.processor.transform` accepts telemetry data from other `otelcol` +components and modifies it using the [OpenTelemetry Transformation Language (OTTL)][OTTL]. +OTTL statements consists of [OTTL functions][] which act on "paths". +A "path" is a reference to a telemetry data such as: +* Resource attributes. +* Instrumentation scope name. +* Span attributes. + +In addition to the [standard OTTL functions][OTTL functions], +there is also a set of metrics-only functions: +* [convert_sum_to_gauge][] +* [convert_gauge_to_sum][] +* [convert_summary_count_val_to_sum][] +* [convert_summary_sum_val_to_sum][] + +[OTTL][] statements can also contain constructs such as: +* [Booleans][OTTL booleans] such as: + * `not true` + * `not IsMatch(name, "http_.*")` +* [Boolean Expressions][OTTL boolean expressions] consisting of a `where` followed by one or more booleans: + * `set(attributes["whose_fault"], "ours") where attributes["http.status"] == 500` + * `set(attributes["whose_fault"], "theirs") where attributes["http.status"] == 400 or attributes["http.status"] == 404` +* [Math expressions][OTTL math expressions] such as: + * `1 + 1` + * `end_time_unix_nano - end_time_unix_nano` + * `sum([1, 2, 3, 4]) + (10 / 1) - 1` + +{{% admonition type="note" %}} +Some characters inside River strings [need to be escaped][river-strings] via `\`. +For example, the OTTL statement `set(description, "Sum") where type == "Sum"` +is written in River as `"set(description, \"Sum\") where type == \"Sum\""`. + +[river-strings]: {{< relref "../../config-language/expressions/types_and_values.md/#strings" >}} +{{% /admonition %}} + +{{% admonition type="note" %}} +`otelcol.processor.transform` is a wrapper over the upstream +OpenTelemetry Collector `transform` processor. Bug reports or feature requests +will be redirected to the upstream repository, if necessary. +{{% /admonition %}} + +Multiple `otelcol.processor.transform` components can be specified by giving them +different labels. + +{{% admonition type="warning" %}} +`otelcol.processor.transform` allows users to modify all aspects of their telemetry. Some specific risks are listed below, but this is not an exhaustive list. It is important to understand your data before using this processor. + +- [Unsound Transformations][]: Transformations between metric data types are not defined in the [metrics data model][]. +These functions have the expectation that you understand the incoming data and know that it can be meaningfully converted +to a new metric data type or can meaningfully be used to create new metrics. + - Although OTTL allows the `set` function to be used with `metric.data_type`, its implementation in the transform processor is a [no-op][]. + To modify a data type you must use a function specific to that purpose. +- [Identity Conflict][]: Transformation of metrics have the potential to affect the identity of a metric, + leading to an Identity Crisis. Be especially cautious when transforming metric name and when reducing/changing + existing attributes. Adding new attributes is safe. +- [Orphaned Telemetry][]: The processor allows you to modify `span_id`, `trace_id`, and `parent_span_id` for traces + and `span_id`, and `trace_id` logs. Modifying these fields could lead to orphaned spans or logs. + +[Unsound Transformations]: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.85.0/docs/standard-warnings.md#unsound-transformations +[Identity Conflict]: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.85.0/docs/standard-warnings.md#identity-conflict +[Orphaned Telemetry]: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.85.0/docs/standard-warnings.md#orphaned-telemetry +[no-op]: https://en.wikipedia.org/wiki/NOP_(code) +[metrics data model]: https://github.com/open-telemetry/opentelemetry-specification/blob/main//specification/metrics/data-model.md +{{% /admonition %}} + +## Usage + +```river +otelcol.processor.transform "LABEL" { + output { + metrics = [...] + logs = [...] + traces = [...] + } +} +``` + +## Arguments + +`otelcol.processor.transform` supports the following arguments: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`error_mode` | `string` | How to react to errors if they occur while processing a statement. | `"propagate"` | no + +The supported values for `error_mode` are: +* `ignore`: Ignore errors returned by statements and continue on to the next statement. This is the recommended mode. +* `propagate`: Return the error up the pipeline. This will result in the payload being dropped from the Agent. + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.processor.transform`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +output | [output][] | Configures where to send received telemetry data. | yes +trace_statements | [trace_statements][] | Statements which transform traces. | no +metric_statements | [metric_statements][] | Statements which transform metrics. | no +log_statements | [log_statements][] | Statements which transform logs. | no + +[trace_statements]: #trace_statements-block +[metric_statements]: #metric_statements-block +[log_statements]: #log_statements-block +[output]: #output-block + +[OTTL Context]: #ottl-context + +### trace_statements block + +The `trace_statements` block specifies statements which transform trace telemetry signals. +Multiple `trace_statements` blocks can be specified. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`context` | `string` | OTTL Context to use when interpreting the associated statements. | | yes +`statements` | `list(string)` | A list of OTTL statements. | | yes + +The supported values for `context` are: +* `resource`: Use when interacting only with OTLP resources (e.g. resource attributes). +* `scope`: Use when interacting only with OTLP instrumentation scope (e.g. name of the instrumentation scope). +* `span`: Use when interacting only with OTLP spans. +* `spanevent`: Use when interacting only with OTLP span events. + +See [OTTL Context][] for more information about how ot use contexts. + +### metric_statements block + +The `metric_statements` block specifies statements which transform metric telemetry signals. +Multiple `metric_statements` blocks can be specified. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`context` | `string` | OTTL Context to use when interpreting the associated statements. | | yes +`statements` | `list(string)` | A list of OTTL statements. | | yes + +The supported values for `context` are: +* `resource`: Use when interacting only with OTLP resources (e.g. resource attributes). +* `scope`: Use when interacting only with OTLP instrumentation scope (e.g. name of the instrumentation scope). +* `metric`: Use when interacting only with individual OTLP metrics. +* `datapoint`: Use when interacting only with individual OTLP data points. + +See [OTTL Context][] for more information about how ot use contexts. + +### log_statements block + +The `log_statements` block specifies statements which transform log telemetry signals. +Multiple `log_statements` blocks can be specified. + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`context` | `string` | OTTL Context to use when interpreting the associated statements. | | yes +`statements` | `list(string)` | A list of OTTL statements. | | yes + +The supported values for `context` are: +* `resource`: Use when interacting only with OTLP resources (e.g. resource attributes). +* `scope`: Use when interacting only with OTLP instrumentation scope (e.g. name of the instrumentation scope). +* `log`: Use when interacting only with OTLP logs. + +See [OTTL Context][] for more information about how ot use contexts. + +### OTTL Context + +Each context allows transformation of its type of telemetry. +For example, statements associated with a `resource` context will be able to transform the resource's +`attributes` and `dropped_attributes_count`. + +Each type of `context` defines its own paths and enums specific to that context. +Refer to the OpenTelemetry documentation for a list of paths and enums for each context: +* [resource][OTTL resource context] +* [scope][OTTL scope context] +* [span][OTTL span context] +* [spanevent][OTTL spanevent context] +* [log][OTTL log context] +* [metric][OTTL metric context] +* [datapoint][OTTL datapoint context] + + +Contexts __NEVER__ supply access to individual items "lower" in the protobuf definition. +- This means statements associated to a `resource` __WILL NOT__ be able to access the underlying instrumentation scopes. +- This means statements associated to a `scope` __WILL NOT__ be able to access the underlying telemetry slices (spans, metrics, or logs). +- Similarly, statements associated to a `metric` __WILL NOT__ be able to access individual datapoints, but can access the entire datapoints slice. +- Similarly, statements associated to a `span` __WILL NOT__ be able to access individual SpanEvents, but can access the entire SpanEvents slice. + +For practical purposes, this means that a context cannot make decisions on its telemetry based on telemetry "lower" in the structure. +For example, __the following context statement is not possible__ because it attempts to use individual datapoint +attributes in the condition of a statement associated to a `metric`: + +```river +metric_statements { + context = "metric" + statements = [ + "set(description, \"test passed\") where datapoints.attributes[\"test\"] == \"pass\"", + ] +} +``` + +Context __ALWAYS__ supply access to the items "higher" in the protobuf definition that are associated to the telemetry being transformed. +- This means that statements associated to a `datapoint` have access to a datapoint's metric, instrumentation scope, and resource. +- This means that statements associated to a `spanevent` have access to a spanevent's span, instrumentation scope, and resource. +- This means that statements associated to a `span`/`metric`/`log` have access to the telemetry's instrumentation scope, and resource. +- This means that statements associated to a `scope` have access to the scope's resource. + +For example, __the following context statement is possible__ because `datapoint` statements can access the datapoint's metric. + +```river +metric_statements { + context = "datapoint" + statements = [ + "set(metric.description, \"test passed\") where attributes[\"test\"] == \"pass\"", + ] +} +``` + +The protobuf definitions for OTLP signals are maintained on GitHub: +* [traces][traces protobuf] +* [metrics][metrics protobuf] +* [logs][logs protobuf] + +Whenever possible, associate your statements to the context that the statement intend to transform. +Although you can modify resource attributes associated to a span using the `span` context, +it is more efficient to use the `resource` context. This is because contexts are nested: +the efficiency comes because higher-level contexts can avoid iterating through any of the contexts at a lower level. + +### output block + +{{< docs/shared lookup="flow/reference/components/output-block.md" source="agent" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +---- | ---- | ----------- +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics, +logs, or traces). + +## Component health + +`otelcol.processor.transform` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.processor.transform` does not expose any component-specific debug +information. + +## Debug metrics + +`otelcol.processor.transform` does not expose any component-specific debug metrics. + +## Examples + +### Perform a transformation if an attribute does not exist + +This example will set attribute "test" to "pass" if the attribute "test" does not exist. + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "span" + statements = [ + // Accessing a map with a key that does not exist will return nil. + "set(attributes[\"test\"], \"pass\") where attributes[\"test\"] == nil", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Note that each `"` was [escaped][river-strings] with `\"` inside the River string. + +### Rename a resource attribute + +The are two ways to rename an attribute key. +One way is to set a new attribute and delete the old one: + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = [ + "set(attributes[\"namespace\"], attributes[\"k8s.namespace.name\"])", + "delete_key(attributes, \"k8s.namespace.name\")", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Another way is to update the key using regular expressions: + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = [ + "replace_all_patterns(attributes, \"key\", \"k8s\\\\.namespace\\\\.name\", \"namespace\")", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Note that some values in the River string were [escaped][river-strings]: +* `\` was escaped with `\\` +* `"` was escaped with `\"` + +### Create an attribute from the contents of a log body + +This example will set the attribute "body" to the value of the log body: + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + log_statements { + context = "log" + statements = [ + "set(attributes[\"body\"], body)", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Note that each `"` was [escaped][river-strings] with `\"` inside the River string. + +### Combine two attributes + +This example will set attribute "test" to the value of attributes "foo" and "bar" combined. + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = [ + // The Concat function combines any number of strings, separated by a delimiter. + "set(attributes[\"test\"], Concat([attributes[\"foo\"], attributes[\"bar\"]], \" \"))", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Note that each `"` was [escaped][river-strings] with `\"` inside the River string. + +### Parsing JSON logs + +Given the following json body... + +```json +{ + "name": "log", + "attr1": "foo", + "attr2": "bar", + "nested": { + "attr3": "example" + } +} +``` + +... add specific fields as attributes on the log: + +```river +otelcol.processor.transform "default" { + error_mode = "ignore" + + log_statements { + context = "log" + + statements = [ + // Parse body as JSON and merge the resulting map with the cache map, ignoring non-json bodies. + // cache is a field exposed by OTTL that is a temporary storage place for complex operations. + "merge_maps(cache, ParseJSON(body), \"upsert\") where IsMatch(body, \"^\\\\{\") ", + + // Set attributes using the values merged into cache. + // If the attribute doesn't exist in cache then nothing happens. + "set(attributes[\"attr1\"], cache[\"attr1\"])", + "set(attributes[\"attr2\"], cache[\"attr2\"])", + + // To access nested maps you can chain index ([]) operations. + // If nested or attr3 do no exist in cache then nothing happens. + "set(attributes[\"nested.attr3\"], cache[\"nested\"][\"attr3\"])", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} +``` + +Note that some values in the River strings were [escaped][river-strings]: +* `\` was escaped with `\\` +* `"` was escaped with `\"` + +### Various transformations of attributes and status codes + +The example takes advantage of context efficiency by grouping transformations +with the context which it intends to transform. + +```river +otelcol.receiver.otlp "default" { + http {} + grpc {} + + output { + metrics = [otelcol.processor.transform.default.input] + logs = [otelcol.processor.transform.default.input] + traces = [otelcol.processor.transform.default.input] + } +} + +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\", \"process.command_line\"])", + "replace_pattern(attributes[\"process.command_line\"], \"password\\\\=[^\\\\s]*(\\\\s?)\", \"password=***\")", + "limit(attributes, 100, [])", + "truncate_all(attributes, 4096)", + ] + } + + trace_statements { + context = "span" + statements = [ + "set(status.code, 1) where attributes[\"http.path\"] == \"/health\"", + "set(name, attributes[\"http.route\"])", + "replace_match(attributes[\"http.target\"], \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", + "limit(attributes, 100, [])", + "truncate_all(attributes, 4096)", + ] + } + + metric_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"host.name\"])", + "truncate_all(attributes, 4096)", + ] + } + + metric_statements { + context = "metric" + statements = [ + "set(description, \"Sum\") where type == \"Sum\"", + ] + } + + metric_statements { + context = "datapoint" + statements = [ + "limit(attributes, 100, [\"host.name\"])", + "truncate_all(attributes, 4096)", + "convert_sum_to_gauge() where metric.name == \"system.processes.count\"", + "convert_gauge_to_sum(\"cumulative\", false) where metric.name == \"prometheus_metric\"", + ] + } + + log_statements { + context = "resource" + statements = [ + "keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\"])", + ] + } + + log_statements { + context = "log" + statements = [ + "set(severity_text, \"FAIL\") where body == \"request failed\"", + "replace_all_matches(attributes, \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", + "replace_all_patterns(attributes, \"value\", \"/account/\\\\d{4}\", \"/account/{accountId}\")", + "set(body, attributes[\"http.route\"])", + ] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = env("OTLP_ENDPOINT") + } +} +``` + +Note that some values in the River strings were [escaped][river-strings]: +* `\` was escaped with `\\` +* `"` was escaped with `\"` + +[river-strings]: {{< relref "../../config-language/expressions/types_and_values.md/#strings" >}} + +[traces protobuf]: https://github.com/open-telemetry/opentelemetry-proto/blob/v0.17.0/opentelemetry/proto/trace/v1/trace.proto +[metrics protobuf]: https://github.com/open-telemetry/opentelemetry-proto/blob/v0.17.0/opentelemetry/proto/metrics/v1/metrics.proto +[logs protobuf]: https://github.com/open-telemetry/opentelemetry-proto/blob/v0.17.0/opentelemetry/proto/logs/v1/logs.proto + + +[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/README.md +[OTTL functions]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/ottlfuncs/README.md +[convert_sum_to_gauge]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/processor/transformprocessor#convert_sum_to_gauge +[convert_gauge_to_sum]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/processor/transformprocessor#convert_gauge_to_sum +[convert_summary_count_val_to_sum]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/processor/transformprocessor#convert_summary_count_val_to_sum +[convert_summary_sum_val_to_sum]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/processor/transformprocessor#convert_summary_sum_val_to_sum +[OTTL booleans]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/pkg/ottl#booleans +[OTTL math expressions]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/pkg/ottl#math-expressions +[OTTL boolean expressions]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.85.0/pkg/ottl#boolean-expressions +[OTTL resource context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlresource/README.md +[OTTL scope context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlscope/README.md +[OTTL span context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlspan/README.md +[OTTL spanevent context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlspanevent/README.md +[OTTL metric context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottlmetric/README.md +[OTTL datapoint context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottldatapoint/README.md +[OTTL log context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/pkg/ottl/contexts/ottllog/README.md diff --git a/go.mod b/go.mod index c74961f327b0..576f597b2225 100644 --- a/go.mod +++ b/go.mod @@ -123,6 +123,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.85.0 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.85.0 diff --git a/go.sum b/go.sum index aa0eeb44dc72..b34b038b7daa 100644 --- a/go.sum +++ b/go.sum @@ -1833,6 +1833,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocesso github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.85.0/go.mod h1:Rp+79qY7tJEgja2kHdWkWS0WDHP4+KRPfHmttCjbwDo= github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.85.0 h1:R4oQpH2bRx6UEGDdv15S2GkkkDFRbjXLO1b8fcjS8Lg= github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.85.0/go.mod h1:WuIjXo1AF9FWkG5HVSnm5koj27SEoVmENH3YVDNR1x4= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.85.0 h1:8bhHzQpYe0zFANlQU+yRvhqsVFMRpaqtBVlHzyPqHkc= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.85.0/go.mod h1:D1VNHbfUdVgtWiXUwr6OBetOl+TVDsZ8PkC5Fji1AAc= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.85.0 h1:1oQGK9OibOxvtJcnCdB6+r6jGpzSp3DrcoeaVgCwP28= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.85.0/go.mod h1:GNnx5ftrhf2bBSVIUzvfAkXBBjHfUYvmUOuR7mJWUOE= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.85.0 h1:yDfzy7NPgjWptm/wmbLunsbV4L6YzY1fwXY5i++afFU=