diff --git a/contrib/datadog/tracing/interceptor.go b/contrib/datadog/tracing/interceptor.go index 70a672a41..6ad5ead74 100644 --- a/contrib/datadog/tracing/interceptor.go +++ b/contrib/datadog/tracing/interceptor.go @@ -46,6 +46,11 @@ type TracerOptions struct { // DisableQueryTracing can be set to disable query tracing. DisableQueryTracing bool + + // OnFinish sets finish options. + // If unset, this will use [tracer.WithError] + // in case [interceptor.TracerFinishSpanOptions.Error] is non-nil and not [workflow.IsContinueAsNewError]. + OnFinish func(options *interceptor.TracerFinishSpanOptions) []tracer.FinishOption } // NewTracingInterceptor convenience method that wraps a NeTracer() with a tracing interceptor @@ -57,10 +62,23 @@ func NewTracingInterceptor(opts TracerOptions) interceptor.Interceptor { // NewTracer creates an interceptor for setting on client options // that implements Datadog tracing for workflows. func NewTracer(opts TracerOptions) interceptor.Tracer { + if opts.OnFinish == nil { + opts.OnFinish = func(options *interceptor.TracerFinishSpanOptions) []tracer.FinishOption { + var finishOpts []tracer.FinishOption + + if err := options.Error; err != nil && !workflow.IsContinueAsNewError(err) { + finishOpts = append(finishOpts, tracer.WithError(err)) + } + + return finishOpts + } + } + return &tracerImpl{ opts: TracerOptions{ DisableSignalTracing: opts.DisableSignalTracing, DisableQueryTracing: opts.DisableQueryTracing, + OnFinish: opts.OnFinish, }, } } @@ -114,7 +132,7 @@ func (t *tracerImpl) SpanFromContext(ctx context.Context) interceptor.TracerSpan if !ok { return nil } - return &tracerSpan{Span: span} + return &tracerSpan{OnFinish: t.opts.OnFinish, Span: span} } func (t *tracerImpl) ContextWithSpan(ctx context.Context, span interceptor.TracerSpan) context.Context { @@ -174,7 +192,7 @@ func (t *tracerImpl) StartSpan(options *interceptor.TracerStartSpanOptions) (int // Start and return span s := tracer.StartSpan(t.SpanName(options), startOpts...) - return &tracerSpan{Span: s}, nil + return &tracerSpan{OnFinish: t.opts.OnFinish, Span: s}, nil } func (t *tracerImpl) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger { @@ -223,6 +241,7 @@ func (r spanContextReader) ForeachKey(handler func(key string, value string) err type tracerSpan struct { ddtrace.Span + OnFinish func(options *interceptor.TracerFinishSpanOptions) []tracer.FinishOption } type tracerSpanCtx struct { ddtrace.SpanContext @@ -241,9 +260,7 @@ func (t *tracerSpan) ForeachBaggageItem(handler func(k string, v string) bool) { } func (t *tracerSpan) Finish(options *interceptor.TracerFinishSpanOptions) { - var opts []tracer.FinishOption - if err := options.Error; err != nil && !workflow.IsContinueAsNewError(err) { - opts = append(opts, tracer.WithError(err)) - } + opts := t.OnFinish(options) + t.Span.Finish(opts...) } diff --git a/contrib/datadog/tracing/interceptor_test.go b/contrib/datadog/tracing/interceptor_test.go index 9e5ff4c6d..b533b33e1 100644 --- a/contrib/datadog/tracing/interceptor_test.go +++ b/contrib/datadog/tracing/interceptor_test.go @@ -22,10 +22,12 @@ package tracing import ( + "strings" "testing" "github.com/stretchr/testify/require" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "go.temporal.io/sdk/interceptor" "go.temporal.io/sdk/internal/interceptortest" @@ -110,3 +112,30 @@ func Test_tracerImpl_genSpanID(t1 *testing.T) { }) } } +func Test_OnFinishOption(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + onFinish := func(options *interceptor.TracerFinishSpanOptions) []tracer.FinishOption { + var finishOpts []tracer.FinishOption + + if err := options.Error; strings.Contains(err.Error(), "ignore me") { + finishOpts = append(finishOpts, tracer.WithError(err)) + } + + return finishOpts + } + + impl := NewTracer(TracerOptions{OnFinish: onFinish}) + trc := testTracer{ + Tracer: impl, + mt: mt, + } + + interceptortest.RunTestWorkflowWithError(t, trc) + + spans := trc.FinishedSpans() + + require.Len(t, spans, 1) + require.Equal(t, "temporal.RunWorkflow", spans[0].Name) +} diff --git a/internal/interceptortest/tracing.go b/internal/interceptortest/tracing.go index b40ab6ecf..15ab8cc72 100644 --- a/internal/interceptortest/tracing.go +++ b/internal/interceptortest/tracing.go @@ -24,6 +24,7 @@ package interceptortest import ( "context" + "errors" "fmt" "testing" "time" @@ -90,6 +91,27 @@ func RunTestWorkflow(t *testing.T, tracer interceptor.Tracer) { require.Equal(t, "query-response", queryResp) } +func RunTestWorkflowWithError(t *testing.T, tracer interceptor.Tracer) { + var suite testsuite.WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + + env.RegisterWorkflow(testWorkflowWithError) + + // Set tracer interceptor + env.SetWorkerOptions(worker.Options{ + Interceptors: []interceptor.WorkerInterceptor{interceptor.NewTracingInterceptor(tracer)}, + }) + + env.SetStartTime(testWorkflowStartTime) + + // Exec + env.ExecuteWorkflow(testWorkflowWithError) + + // Confirm result + require.True(t, env.IsWorkflowCompleted()) + require.Error(t, env.GetWorkflowError()) +} + func AssertSpanPropagation(t *testing.T, tracer TestTracer) { require.Equal(t, []*SpanInfo{ @@ -110,6 +132,10 @@ func AssertSpanPropagation(t *testing.T, tracer TestTracer) { }, tracer.FinishedSpans()) } +func testWorkflowWithError(_ workflow.Context) error { + return errors.New("ignore me") +} + func testWorkflow(ctx workflow.Context) ([]string, error) { // Run code ret, err := workflowInternal(ctx, false)