diff --git a/.chloggen/ottl-emit-traces.yaml b/.chloggen/ottl-emit-traces.yaml new file mode 100644 index 000000000000..66644cc2ee88 --- /dev/null +++ b/.chloggen/ottl-emit-traces.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Emit traces for statement sequence executions to troubleshoot OTTL statements/conditions + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33433] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/ottl/README.md b/pkg/ottl/README.md index 57a53ac9e053..b4962f84a2d7 100644 --- a/pkg/ottl/README.md +++ b/pkg/ottl/README.md @@ -151,3 +151,21 @@ service: 2024-05-29T16:38:09.600-0600 debug ottl@v0.101.0/parser.go:268 TransformContext after statement execution {"kind": "processor", "name": "transform", "pipeline": "logs", "statement": "set(instrumentation_scope.attributes[\"test\"], [\"pass\"])", "condition matched": true, "TransformContext": {"resource": {"attributes": {"test": "pass"}, "dropped_attribute_count": 0}, "scope": {"attributes": {"test": ["pass"]}, "dropped_attribute_count": 0, "name": "", "version": ""}, "log_record": {"attributes": {"log.file.name": "test.log"}, "body": "test", "dropped_attribute_count": 0, "flags": 0, "observed_time_unix_nano": 1717022289500721000, "severity_number": 0, "severity_text": "", "span_id": "", "time_unix_nano": 0, "trace_id": ""}, "cache": {}}} 2024-05-29T16:38:09.601-0600 debug ottl@v0.101.0/parser.go:268 TransformContext after statement execution {"kind": "processor", "name": "transform", "pipeline": "logs", "statement": "set(attributes[\"test\"], true)", "condition matched": true, "TransformContext": {"resource": {"attributes": {"test": "pass"}, "dropped_attribute_count": 0}, "scope": {"attributes": {"test": ["pass"]}, "dropped_attribute_count": 0, "name": "", "version": ""}, "log_record": {"attributes": {"log.file.name": "test.log", "test": true}, "body": "test", "dropped_attribute_count": 0, "flags": 0, "observed_time_unix_nano": 1717022289500721000, "severity_number": 0, "severity_text": "", "span_id": "", "time_unix_nano": 0, "trace_id": ""}, "cache": {}}} ``` + +If configured to do so, the collector also emits traces for the execution of OTTL statement sequences. +These traces contain spans for the execution of each statement, including the statement itself and whether it has +been applied or not. To make use of this, enable the self monitoring of the collector by setting the +`--feature-gates=telemetry.useOtelWithSDKConfigurationForInternalTelemetry` flag, and using the following configuration +to export the traces to e.g. an OTLP API endpoint: + +```yaml +service: + telemetry: + traces: + processors: + - batch: + exporter: + otlp: + protocol: http/protobuf + endpoint: ${env:OTLP_ENDPOINT}/v1/traces +``` diff --git a/pkg/ottl/go.mod b/pkg/ottl/go.mod index 7fec131e523e..6eafe57857de 100644 --- a/pkg/ottl/go.mod +++ b/pkg/ottl/go.mod @@ -14,6 +14,8 @@ require ( go.opentelemetry.io/collector/component v0.104.0 go.opentelemetry.io/collector/pdata v1.11.0 go.opentelemetry.io/collector/semconv v0.104.0 + go.opentelemetry.io/otel v1.27.0 + go.opentelemetry.io/otel/sdk v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 @@ -37,10 +39,8 @@ require ( github.com/prometheus/common v0.54.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.104.0 // indirect - go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect - go.opentelemetry.io/otel/sdk v1.27.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.21.0 // indirect diff --git a/pkg/ottl/parser.go b/pkg/ottl/parser.go index 4a0b3cabe7be..f8d5ab7dabc8 100644 --- a/pkg/ottl/parser.go +++ b/pkg/ottl/parser.go @@ -10,9 +10,18 @@ import ( "github.com/alecthomas/participle/v2" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" ) +const ( + logAttributeTraceID = "trace_id" + logAttributeSpanID = "span_id" +) + // Statement holds a top level Statement for processing telemetry data. A Statement is a combination of a function // invocation and the boolean expression to match telemetry for invoking the function. type Statement[K any] struct { @@ -231,6 +240,7 @@ type StatementSequence[K any] struct { statements []*Statement[K] errorMode ErrorMode telemetrySettings component.TelemetrySettings + tracer trace.Tracer } type StatementSequenceOption[K any] func(*StatementSequence[K]) @@ -250,6 +260,10 @@ func NewStatementSequence[K any](statements []*Statement[K], telemetrySettings c statements: statements, errorMode: PropagateError, telemetrySettings: telemetrySettings, + tracer: &noop.Tracer{}, + } + if telemetrySettings.TracerProvider != nil { + s.tracer = telemetrySettings.TracerProvider.Tracer("ottl") } for _, op := range options { op(&s) @@ -262,20 +276,62 @@ func NewStatementSequence[K any](statements []*Statement[K], telemetrySettings c // When the ErrorMode of the StatementSequence is `ignore`, errors are logged and execution continues to the next statement. // When the ErrorMode of the StatementSequence is `silent`, errors are not logged and execution continues to the next statement. func (s *StatementSequence[K]) Execute(ctx context.Context, tCtx K) error { - s.telemetrySettings.Logger.Debug("initial TransformContext", zap.Any("TransformContext", tCtx)) + ctx, sequenceSpan := s.tracer.Start(ctx, "ottl/StatementSequenceExecution") + defer sequenceSpan.End() + s.telemetrySettings.Logger.Debug( + "initial TransformContext", + zap.Any("TransformContext", tCtx), + zap.String(logAttributeTraceID, sequenceSpan.SpanContext().TraceID().String()), + zap.String(logAttributeSpanID, sequenceSpan.SpanContext().SpanID().String()), + ) for _, statement := range s.statements { - _, condition, err := statement.Execute(ctx, tCtx) - s.telemetrySettings.Logger.Debug("TransformContext after statement execution", zap.String("statement", statement.origText), zap.Bool("condition matched", condition), zap.Any("TransformContext", tCtx)) + statementCtx, statementSpan := s.tracer.Start(ctx, "ottl/StatementExecution") + statementSpan.SetAttributes( + attribute.KeyValue{ + Key: "statement", + Value: attribute.StringValue(statement.origText), + }, + ) + _, condition, err := statement.Execute(statementCtx, tCtx) + statementSpan.SetAttributes( + attribute.KeyValue{ + Key: "condition.matched", + Value: attribute.BoolValue(condition), + }, + ) + s.telemetrySettings.Logger.Debug( + "TransformContext after statement execution", + zap.String("statement", statement.origText), + zap.Bool("condition matched", condition), + zap.Any("TransformContext", tCtx), + zap.String(logAttributeTraceID, statementSpan.SpanContext().TraceID().String()), + zap.String(logAttributeSpanID, statementSpan.SpanContext().SpanID().String()), + ) if err != nil { + statementSpan.RecordError(err) + errMsg := fmt.Sprintf("failed to execute statement '%s': %v", statement.origText, err) + statementSpan.SetStatus(codes.Error, errMsg) if s.errorMode == PropagateError { + sequenceSpan.SetStatus(codes.Error, errMsg) + statementSpan.End() err = fmt.Errorf("failed to execute statement: %v, %w", statement.origText, err) return err } if s.errorMode == IgnoreError { - s.telemetrySettings.Logger.Warn("failed to execute statement", zap.Error(err), zap.String("statement", statement.origText)) + s.telemetrySettings.Logger.Warn( + "failed to execute statement", + zap.Error(err), + zap.String("statement", statement.origText), + zap.String(logAttributeTraceID, statementSpan.SpanContext().TraceID().String()), + zap.String(logAttributeSpanID, statementSpan.SpanContext().SpanID().String()), + ) } + } else { + statementSpan.SetStatus(codes.Ok, "statement executed successfully") } + statementSpan.End() } + sequenceSpan.SetStatus(codes.Ok, "statement sequence executed successfully") return nil } diff --git a/pkg/ottl/parser_test.go b/pkg/ottl/parser_test.go index 10da41ead069..679ceddd1872 100644 --- a/pkg/ottl/parser_test.go +++ b/pkg/ottl/parser_test.go @@ -13,7 +13,12 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" ) @@ -2049,10 +2054,11 @@ func Test_Condition_Eval(t *testing.T) { func Test_Statements_Execute_Error(t *testing.T) { tests := []struct { - name string - condition boolExpressionEvaluator[any] - function ExprFunc[any] - errorMode ErrorMode + name string + condition boolExpressionEvaluator[any] + function ExprFunc[any] + errorMode ErrorMode + expectedSpans []expectedSpan }{ { name: "IgnoreError error from condition", @@ -2063,6 +2069,31 @@ func Test_Statements_Execute_Error(t *testing.T) { return 1, nil }, errorMode: IgnoreError, + expectedSpans: []expectedSpan{ + { + name: "ottl/StatementExecution", + attributes: []attribute.KeyValue{ + { + Key: "statement", + Value: attribute.StringValue("test"), + }, + { + Key: "condition.matched", + Value: attribute.BoolValue(false), + }, + }, + status: trace.Status{ + Code: codes.Error, + Description: "failed to execute statement 'test': test", + }, + }, + { + name: "ottl/StatementSequenceExecution", + status: trace.Status{ + Code: codes.Ok, + }, + }, + }, }, { name: "PropagateError error from condition", @@ -2073,6 +2104,32 @@ func Test_Statements_Execute_Error(t *testing.T) { return 1, nil }, errorMode: PropagateError, + expectedSpans: []expectedSpan{ + { + name: "ottl/StatementExecution", + attributes: []attribute.KeyValue{ + { + Key: "statement", + Value: attribute.StringValue("test"), + }, + { + Key: "condition.matched", + Value: attribute.BoolValue(false), + }, + }, + status: trace.Status{ + Code: codes.Error, + Description: "failed to execute statement 'test': test", + }, + }, + { + name: "ottl/StatementSequenceExecution", + status: trace.Status{ + Code: codes.Error, + Description: "failed to execute statement 'test': test", + }, + }, + }, }, { name: "IgnoreError error from function", @@ -2083,6 +2140,31 @@ func Test_Statements_Execute_Error(t *testing.T) { return 1, fmt.Errorf("test") }, errorMode: IgnoreError, + expectedSpans: []expectedSpan{ + { + name: "ottl/StatementExecution", + attributes: []attribute.KeyValue{ + { + Key: "statement", + Value: attribute.StringValue("test"), + }, + { + Key: "condition.matched", + Value: attribute.BoolValue(true), + }, + }, + status: trace.Status{ + Code: codes.Error, + Description: "failed to execute statement 'test': test", + }, + }, + { + name: "ottl/StatementSequenceExecution", + status: trace.Status{ + Code: codes.Ok, + }, + }, + }, }, { name: "PropagateError error from function", @@ -2093,6 +2175,32 @@ func Test_Statements_Execute_Error(t *testing.T) { return 1, fmt.Errorf("test") }, errorMode: PropagateError, + expectedSpans: []expectedSpan{ + { + name: "ottl/StatementExecution", + attributes: []attribute.KeyValue{ + { + Key: "statement", + Value: attribute.StringValue("test"), + }, + { + Key: "condition.matched", + Value: attribute.BoolValue(true), + }, + }, + status: trace.Status{ + Code: codes.Error, + Description: "failed to execute statement 'test': test", + }, + }, + { + name: "ottl/StatementSequenceExecution", + status: trace.Status{ + Code: codes.Error, + Description: "failed to execute statement 'test': test", + }, + }, + }, }, { name: "SilentError error from condition", @@ -2103,6 +2211,31 @@ func Test_Statements_Execute_Error(t *testing.T) { return 1, nil }, errorMode: SilentError, + expectedSpans: []expectedSpan{ + { + name: "ottl/StatementExecution", + attributes: []attribute.KeyValue{ + { + Key: "statement", + Value: attribute.StringValue("test"), + }, + { + Key: "condition.matched", + Value: attribute.BoolValue(false), + }, + }, + status: trace.Status{ + Code: codes.Error, + Description: "failed to execute statement 'test': test", + }, + }, + { + name: "ottl/StatementSequenceExecution", + status: trace.Status{ + Code: codes.Ok, + }, + }, + }, }, { name: "SilentError error from function", @@ -2113,6 +2246,31 @@ func Test_Statements_Execute_Error(t *testing.T) { return 1, fmt.Errorf("test") }, errorMode: SilentError, + expectedSpans: []expectedSpan{ + { + name: "ottl/StatementExecution", + attributes: []attribute.KeyValue{ + { + Key: "statement", + Value: attribute.StringValue("test"), + }, + { + Key: "condition.matched", + Value: attribute.BoolValue(true), + }, + }, + status: trace.Status{ + Code: codes.Error, + Description: "failed to execute statement 'test': test", + }, + }, + { + name: "ottl/StatementSequenceExecution", + status: trace.Status{ + Code: codes.Ok, + }, + }, + }, }, } for _, tt := range tests { @@ -2122,11 +2280,15 @@ func Test_Statements_Execute_Error(t *testing.T) { { condition: BoolExpr[any]{tt.condition}, function: Expr[any]{exprFunc: tt.function}, + origText: "test", }, }, errorMode: tt.errorMode, telemetrySettings: componenttest.NewNopTelemetrySettings(), } + spanRecorder := tracetest.NewSpanRecorder() + statements.telemetrySettings.TracerProvider = trace.NewTracerProvider(trace.WithSpanProcessor(spanRecorder)) + statements.tracer = statements.telemetrySettings.TracerProvider.Tracer("ottl") err := statements.Execute(context.Background(), nil) if tt.errorMode == PropagateError { @@ -2134,6 +2296,14 @@ func Test_Statements_Execute_Error(t *testing.T) { } else { assert.NoError(t, err) } + + require.Len(t, spanRecorder.Ended(), len(tt.expectedSpans)) + + for i, es := range tt.expectedSpans { + require.Equal(t, es.name, spanRecorder.Ended()[i].Name()) + require.Equal(t, es.attributes, spanRecorder.Ended()[i].Attributes()) + require.Equal(t, es.status, spanRecorder.Ended()[i].Status()) + } }) } } @@ -2337,3 +2507,9 @@ func Test_ConditionSequence_Eval_Error(t *testing.T) { }) } } + +type expectedSpan struct { + name string + attributes []attribute.KeyValue + status trace.Status +}