From 19adea51195c993006d4fcf133aca49d6a9004ba Mon Sep 17 00:00:00 2001 From: Emil Shakirov Date: Tue, 9 Apr 2024 21:52:43 +0200 Subject: [PATCH 1/3] Add tracer option to check error eligibility in the span --- contrib/datadog/tracing/interceptor.go | 24 +++++++++++++++--- contrib/datadog/tracing/interceptor_test.go | 27 +++++++++++++++++++++ internal/interceptortest/tracing.go | 26 ++++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/contrib/datadog/tracing/interceptor.go b/contrib/datadog/tracing/interceptor.go index 70a672a41..7915715dd 100644 --- a/contrib/datadog/tracing/interceptor.go +++ b/contrib/datadog/tracing/interceptor.go @@ -46,6 +46,9 @@ type TracerOptions struct { // DisableQueryTracing can be set to disable query tracing. DisableQueryTracing bool + + // ErrCheckFn can be set to a custom function to determine if an error should be traced. + ErrCheckFn func(err error) bool } // NewTracingInterceptor convenience method that wraps a NeTracer() with a tracing interceptor @@ -61,6 +64,7 @@ func NewTracer(opts TracerOptions) interceptor.Tracer { opts: TracerOptions{ DisableSignalTracing: opts.DisableSignalTracing, DisableQueryTracing: opts.DisableQueryTracing, + ErrCheckFn: opts.ErrCheckFn, }, } } @@ -114,7 +118,7 @@ func (t *tracerImpl) SpanFromContext(ctx context.Context) interceptor.TracerSpan if !ok { return nil } - return &tracerSpan{Span: span} + return &tracerSpan{ErrCheckFn: t.opts.ErrCheckFn, Span: span} } func (t *tracerImpl) ContextWithSpan(ctx context.Context, span interceptor.TracerSpan) context.Context { @@ -174,7 +178,7 @@ func (t *tracerImpl) StartSpan(options *interceptor.TracerStartSpanOptions) (int // Start and return span s := tracer.StartSpan(t.SpanName(options), startOpts...) - return &tracerSpan{Span: s}, nil + return &tracerSpan{ErrCheckFn: t.opts.ErrCheckFn, Span: s}, nil } func (t *tracerImpl) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger { @@ -223,6 +227,7 @@ func (r spanContextReader) ForeachKey(handler func(key string, value string) err type tracerSpan struct { ddtrace.Span + ErrCheckFn func(err error) bool } type tracerSpanCtx struct { ddtrace.SpanContext @@ -242,8 +247,21 @@ func (t *tracerSpan) ForeachBaggageItem(handler func(k string, v string) bool) { func (t *tracerSpan) Finish(options *interceptor.TracerFinishSpanOptions) { var opts []tracer.FinishOption - if err := options.Error; err != nil && !workflow.IsContinueAsNewError(err) { + + err := options.Error + isErrEligible := err != nil && !workflow.IsContinueAsNewError(err) && t.shouldErrBeTraced(err) + + if isErrEligible { opts = append(opts, tracer.WithError(err)) } + t.Span.Finish(opts...) } + +func (t *tracerSpan) shouldErrBeTraced(err error) bool { + if t.ErrCheckFn == nil { + return true + } + + return t.ErrCheckFn(err) +} diff --git a/contrib/datadog/tracing/interceptor_test.go b/contrib/datadog/tracing/interceptor_test.go index 9e5ff4c6d..1881bc1d6 100644 --- a/contrib/datadog/tracing/interceptor_test.go +++ b/contrib/datadog/tracing/interceptor_test.go @@ -22,6 +22,7 @@ package tracing import ( + "strings" "testing" "github.com/stretchr/testify/require" @@ -110,3 +111,29 @@ func Test_tracerImpl_genSpanID(t1 *testing.T) { }) } } +func Test_ErrCheckFn(t *testing.T) { + // Start the mock tracer. + mt := mocktracer.Start() + defer mt.Stop() + + errCheckFn := func(err error) bool { + if strings.Contains(err.Error(), "ignore me") { + return false + } + + return true + } + + impl := NewTracer(TracerOptions{ErrCheckFn: errCheckFn}) + trc := testTracer{ + Tracer: impl, + mt: mt, + } + + interceptortest.RunTestWorkflowWithError(t, trc) + + spans := trc.FinishedSpans() + + require.Len(t, spans, 1) + require.Equal(t, "temporal.RunWorkflow", spans[0].Name) +} diff --git a/internal/interceptortest/tracing.go b/internal/interceptortest/tracing.go index b40ab6ecf..15ab8cc72 100644 --- a/internal/interceptortest/tracing.go +++ b/internal/interceptortest/tracing.go @@ -24,6 +24,7 @@ package interceptortest import ( "context" + "errors" "fmt" "testing" "time" @@ -90,6 +91,27 @@ func RunTestWorkflow(t *testing.T, tracer interceptor.Tracer) { require.Equal(t, "query-response", queryResp) } +func RunTestWorkflowWithError(t *testing.T, tracer interceptor.Tracer) { + var suite testsuite.WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + + env.RegisterWorkflow(testWorkflowWithError) + + // Set tracer interceptor + env.SetWorkerOptions(worker.Options{ + Interceptors: []interceptor.WorkerInterceptor{interceptor.NewTracingInterceptor(tracer)}, + }) + + env.SetStartTime(testWorkflowStartTime) + + // Exec + env.ExecuteWorkflow(testWorkflowWithError) + + // Confirm result + require.True(t, env.IsWorkflowCompleted()) + require.Error(t, env.GetWorkflowError()) +} + func AssertSpanPropagation(t *testing.T, tracer TestTracer) { require.Equal(t, []*SpanInfo{ @@ -110,6 +132,10 @@ func AssertSpanPropagation(t *testing.T, tracer TestTracer) { }, tracer.FinishedSpans()) } +func testWorkflowWithError(_ workflow.Context) error { + return errors.New("ignore me") +} + func testWorkflow(ctx workflow.Context) ([]string, error) { // Run code ret, err := workflowInternal(ctx, false) From 0a54b5b00f860881cb1f1c839886558a2c8379b0 Mon Sep 17 00:00:00 2001 From: Emil Shakirov Date: Wed, 10 Apr 2024 09:23:16 +0200 Subject: [PATCH 2/3] Rename option to CheckError --- contrib/datadog/tracing/interceptor.go | 24 ++++++++------------- contrib/datadog/tracing/interceptor_test.go | 2 +- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/contrib/datadog/tracing/interceptor.go b/contrib/datadog/tracing/interceptor.go index 7915715dd..de0bd9001 100644 --- a/contrib/datadog/tracing/interceptor.go +++ b/contrib/datadog/tracing/interceptor.go @@ -47,8 +47,8 @@ type TracerOptions struct { // DisableQueryTracing can be set to disable query tracing. DisableQueryTracing bool - // ErrCheckFn can be set to a custom function to determine if an error should be traced. - ErrCheckFn func(err error) bool + // CheckError can be set to a custom function to determine if an error should be traced. + CheckError func(err error) bool } // NewTracingInterceptor convenience method that wraps a NeTracer() with a tracing interceptor @@ -64,7 +64,7 @@ func NewTracer(opts TracerOptions) interceptor.Tracer { opts: TracerOptions{ DisableSignalTracing: opts.DisableSignalTracing, DisableQueryTracing: opts.DisableQueryTracing, - ErrCheckFn: opts.ErrCheckFn, + CheckError: opts.CheckError, }, } } @@ -118,7 +118,7 @@ func (t *tracerImpl) SpanFromContext(ctx context.Context) interceptor.TracerSpan if !ok { return nil } - return &tracerSpan{ErrCheckFn: t.opts.ErrCheckFn, Span: span} + return &tracerSpan{CheckError: t.opts.CheckError, Span: span} } func (t *tracerImpl) ContextWithSpan(ctx context.Context, span interceptor.TracerSpan) context.Context { @@ -178,7 +178,7 @@ func (t *tracerImpl) StartSpan(options *interceptor.TracerStartSpanOptions) (int // Start and return span s := tracer.StartSpan(t.SpanName(options), startOpts...) - return &tracerSpan{ErrCheckFn: t.opts.ErrCheckFn, Span: s}, nil + return &tracerSpan{CheckError: t.opts.CheckError, Span: s}, nil } func (t *tracerImpl) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger { @@ -227,7 +227,7 @@ func (r spanContextReader) ForeachKey(handler func(key string, value string) err type tracerSpan struct { ddtrace.Span - ErrCheckFn func(err error) bool + CheckError func(err error) bool } type tracerSpanCtx struct { ddtrace.SpanContext @@ -249,7 +249,9 @@ func (t *tracerSpan) Finish(options *interceptor.TracerFinishSpanOptions) { var opts []tracer.FinishOption err := options.Error - isErrEligible := err != nil && !workflow.IsContinueAsNewError(err) && t.shouldErrBeTraced(err) + isErrEligible := err != nil && + !workflow.IsContinueAsNewError(err) && + t.CheckError != nil && t.CheckError(err) if isErrEligible { opts = append(opts, tracer.WithError(err)) @@ -257,11 +259,3 @@ func (t *tracerSpan) Finish(options *interceptor.TracerFinishSpanOptions) { t.Span.Finish(opts...) } - -func (t *tracerSpan) shouldErrBeTraced(err error) bool { - if t.ErrCheckFn == nil { - return true - } - - return t.ErrCheckFn(err) -} diff --git a/contrib/datadog/tracing/interceptor_test.go b/contrib/datadog/tracing/interceptor_test.go index 1881bc1d6..168a519b0 100644 --- a/contrib/datadog/tracing/interceptor_test.go +++ b/contrib/datadog/tracing/interceptor_test.go @@ -124,7 +124,7 @@ func Test_ErrCheckFn(t *testing.T) { return true } - impl := NewTracer(TracerOptions{ErrCheckFn: errCheckFn}) + impl := NewTracer(TracerOptions{CheckError: errCheckFn}) trc := testTracer{ Tracer: impl, mt: mt, From e62646d2aaed00222c82df1da70008f6c7297353 Mon Sep 17 00:00:00 2001 From: Emil Shakirov Date: Thu, 11 Apr 2024 16:26:37 +0200 Subject: [PATCH 3/3] Implement OnFinish function option --- contrib/datadog/tracing/interceptor.go | 37 ++++++++++++--------- contrib/datadog/tracing/interceptor_test.go | 16 +++++---- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/contrib/datadog/tracing/interceptor.go b/contrib/datadog/tracing/interceptor.go index de0bd9001..6ad5ead74 100644 --- a/contrib/datadog/tracing/interceptor.go +++ b/contrib/datadog/tracing/interceptor.go @@ -47,8 +47,10 @@ type TracerOptions struct { // DisableQueryTracing can be set to disable query tracing. DisableQueryTracing bool - // CheckError can be set to a custom function to determine if an error should be traced. - CheckError func(err error) bool + // OnFinish sets finish options. + // If unset, this will use [tracer.WithError] + // in case [interceptor.TracerFinishSpanOptions.Error] is non-nil and not [workflow.IsContinueAsNewError]. + OnFinish func(options *interceptor.TracerFinishSpanOptions) []tracer.FinishOption } // NewTracingInterceptor convenience method that wraps a NeTracer() with a tracing interceptor @@ -60,11 +62,23 @@ func NewTracingInterceptor(opts TracerOptions) interceptor.Interceptor { // NewTracer creates an interceptor for setting on client options // that implements Datadog tracing for workflows. func NewTracer(opts TracerOptions) interceptor.Tracer { + if opts.OnFinish == nil { + opts.OnFinish = func(options *interceptor.TracerFinishSpanOptions) []tracer.FinishOption { + var finishOpts []tracer.FinishOption + + if err := options.Error; err != nil && !workflow.IsContinueAsNewError(err) { + finishOpts = append(finishOpts, tracer.WithError(err)) + } + + return finishOpts + } + } + return &tracerImpl{ opts: TracerOptions{ DisableSignalTracing: opts.DisableSignalTracing, DisableQueryTracing: opts.DisableQueryTracing, - CheckError: opts.CheckError, + OnFinish: opts.OnFinish, }, } } @@ -118,7 +132,7 @@ func (t *tracerImpl) SpanFromContext(ctx context.Context) interceptor.TracerSpan if !ok { return nil } - return &tracerSpan{CheckError: t.opts.CheckError, Span: span} + return &tracerSpan{OnFinish: t.opts.OnFinish, Span: span} } func (t *tracerImpl) ContextWithSpan(ctx context.Context, span interceptor.TracerSpan) context.Context { @@ -178,7 +192,7 @@ func (t *tracerImpl) StartSpan(options *interceptor.TracerStartSpanOptions) (int // Start and return span s := tracer.StartSpan(t.SpanName(options), startOpts...) - return &tracerSpan{CheckError: t.opts.CheckError, Span: s}, nil + return &tracerSpan{OnFinish: t.opts.OnFinish, Span: s}, nil } func (t *tracerImpl) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger { @@ -227,7 +241,7 @@ func (r spanContextReader) ForeachKey(handler func(key string, value string) err type tracerSpan struct { ddtrace.Span - CheckError func(err error) bool + OnFinish func(options *interceptor.TracerFinishSpanOptions) []tracer.FinishOption } type tracerSpanCtx struct { ddtrace.SpanContext @@ -246,16 +260,7 @@ func (t *tracerSpan) ForeachBaggageItem(handler func(k string, v string) bool) { } func (t *tracerSpan) Finish(options *interceptor.TracerFinishSpanOptions) { - var opts []tracer.FinishOption - - err := options.Error - isErrEligible := err != nil && - !workflow.IsContinueAsNewError(err) && - t.CheckError != nil && t.CheckError(err) - - if isErrEligible { - opts = append(opts, tracer.WithError(err)) - } + opts := t.OnFinish(options) t.Span.Finish(opts...) } diff --git a/contrib/datadog/tracing/interceptor_test.go b/contrib/datadog/tracing/interceptor_test.go index 168a519b0..b533b33e1 100644 --- a/contrib/datadog/tracing/interceptor_test.go +++ b/contrib/datadog/tracing/interceptor_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "go.temporal.io/sdk/interceptor" "go.temporal.io/sdk/internal/interceptortest" @@ -111,20 +112,21 @@ func Test_tracerImpl_genSpanID(t1 *testing.T) { }) } } -func Test_ErrCheckFn(t *testing.T) { - // Start the mock tracer. +func Test_OnFinishOption(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - errCheckFn := func(err error) bool { - if strings.Contains(err.Error(), "ignore me") { - return false + onFinish := func(options *interceptor.TracerFinishSpanOptions) []tracer.FinishOption { + var finishOpts []tracer.FinishOption + + if err := options.Error; strings.Contains(err.Error(), "ignore me") { + finishOpts = append(finishOpts, tracer.WithError(err)) } - return true + return finishOpts } - impl := NewTracer(TracerOptions{CheckError: errCheckFn}) + impl := NewTracer(TracerOptions{OnFinish: onFinish}) trc := testTracer{ Tracer: impl, mt: mt,