Skip to content

Commit

Permalink
Support updates in tracing interceptor (#1595)
Browse files Browse the repository at this point in the history
Support updates in tracing interceptor
  • Loading branch information
Quinn-With-Two-Ns authored Aug 21, 2024
1 parent edc3c6c commit 1fe6141
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 46 deletions.
4 changes: 4 additions & 0 deletions contrib/datadog/tracing/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type TracerOptions struct {
// DisableQueryTracing can be set to disable query tracing.
DisableQueryTracing bool

// DisableUpdateTracing can be set to disable update tracing.
DisableUpdateTracing bool

// OnFinish sets finish options.
// If unset, this will use [tracer.WithError]
// in case [interceptor.TracerFinishSpanOptions.Error] is non-nil and not [workflow.IsContinueAsNewError].
Expand Down Expand Up @@ -78,6 +81,7 @@ func NewTracer(opts TracerOptions) interceptor.Tracer {
opts: TracerOptions{
DisableSignalTracing: opts.DisableSignalTracing,
DisableQueryTracing: opts.DisableQueryTracing,
DisableUpdateTracing: opts.DisableUpdateTracing,
OnFinish: opts.OnFinish,
},
}
Expand Down
4 changes: 3 additions & 1 deletion contrib/datadog/tracing/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func TestSpanName(t *testing.T) {
}
interceptortest.RunTestWorkflow(t, testTracer)
// Ensure the naming scheme follows "temporal.${operation}"
require.Equal(t, "temporal.RunWorkflow", testTracer.FinishedSpans()[0].Name)
require.Equal(t, "temporal.ValidateUpdate", testTracer.FinishedSpans()[0].Name)
require.Equal(t, "temporal.HandleUpdate", testTracer.FinishedSpans()[1].Name)
require.Equal(t, "temporal.RunWorkflow", testTracer.FinishedSpans()[2].Name)

}
func Test_tracerImpl_genSpanID(t1 *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions contrib/opentelemetry/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package opentelemetry
import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
Expand Down Expand Up @@ -56,6 +57,9 @@ type TracerOptions struct {
// DisableQueryTracing can be set to disable query tracing.
DisableQueryTracing bool

// DisableUpdateTracing can be set to disable update tracing.
DisableUpdateTracing bool

// DisableBaggage can be set to disable baggage propagation.
DisableBaggage bool

Expand Down Expand Up @@ -138,6 +142,7 @@ func (t *tracer) Options() interceptor.TracerOptions {
HeaderKey: t.options.HeaderKey,
DisableSignalTracing: t.options.DisableSignalTracing,
DisableQueryTracing: t.options.DisableQueryTracing,
DisableUpdateTracing: t.options.DisableUpdateTracing,
AllowInvalidParentSpans: t.options.AllowInvalidParentSpans,
}
}
Expand Down
103 changes: 103 additions & 0 deletions interceptor/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
workflowIDTagKey = "temporalWorkflowID"
runIDTagKey = "temporalRunID"
activityIDTagKey = "temporalActivityID"
updateIDTagKey = "temporalUpdateID"
)

// Tracer is an interface for tracing implementations as used by
Expand Down Expand Up @@ -113,6 +114,9 @@ type TracerOptions struct {
// DisableQueryTracing can be set to disable query tracing.
DisableQueryTracing bool

// DisableUpdateTracing can be set to disable update tracing.
DisableUpdateTracing bool

// AllowInvalidParentSpans will swallow errors interpreting parent
// spans from headers. Useful when migrating from one tracing library
// to another, while workflows/activities may be in progress.
Expand Down Expand Up @@ -348,6 +352,33 @@ func (t *tracingClientOutboundInterceptor) QueryWorkflow(
return val, err
}

func (t *tracingClientOutboundInterceptor) UpdateWorkflow(
ctx context.Context,
in *ClientUpdateWorkflowInput,
) (client.WorkflowUpdateHandle, error) {
// Only add tracing if enabled
if t.root.options.DisableUpdateTracing {
return t.Next.UpdateWorkflow(ctx, in)
}
// Start span and write to header
span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{
Operation: "UpdateWorkflow",
Name: in.UpdateName,
Tags: map[string]string{workflowIDTagKey: in.WorkflowID},
ToHeader: true,
Time: time.Now(),
})
if err != nil {
return nil, err
}
var finishOpts TracerFinishSpanOptions
defer span.Finish(&finishOpts)

val, err := t.Next.UpdateWorkflow(ctx, in)
finishOpts.Error = err
return val, err
}

type tracingActivityOutboundInterceptor struct {
ActivityOutboundInterceptorBase
root *tracingInterceptor
Expand Down Expand Up @@ -515,6 +546,78 @@ func (t *tracingWorkflowInboundInterceptor) HandleQuery(
return val, err
}

func (t *tracingWorkflowInboundInterceptor) ValidateUpdate(
ctx workflow.Context,
in *UpdateInput,
) error {
// Only add tracing if enabled and not replaying
if t.root.options.DisableUpdateTracing {
return t.Next.ValidateUpdate(ctx, in)
}
// Start span reading from header
info := workflow.GetInfo(ctx)
currentUpdateInfo := workflow.GetCurrentUpdateInfo(ctx)
span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{
Operation: "ValidateUpdate",
Name: in.Name,
Tags: map[string]string{
workflowIDTagKey: info.WorkflowExecution.ID,
runIDTagKey: info.WorkflowExecution.RunID,
updateIDTagKey: currentUpdateInfo.ID,
},
FromHeader: true,
Time: time.Now(),
// We intentionally do not set IdempotencyKey here because validation is not run on
// replay. When the tracing interceptor's span counter is reset between workflow
// replays, the validator will not be processed which could result in impotency key
// collisions with other requests.
})
if err != nil {
return err
}
var finishOpts TracerFinishSpanOptions
defer span.Finish(&finishOpts)

err = t.Next.ValidateUpdate(ctx, in)
finishOpts.Error = err
return err
}

func (t *tracingWorkflowInboundInterceptor) ExecuteUpdate(
ctx workflow.Context,
in *UpdateInput,
) (interface{}, error) {
// Only add tracing if enabled and not replaying
if t.root.options.DisableUpdateTracing {
return t.Next.ExecuteUpdate(ctx, in)
}
// Start span reading from header
info := workflow.GetInfo(ctx)
currentUpdateInfo := workflow.GetCurrentUpdateInfo(ctx)
span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{
// Using operation name "HandleUpdate" to match other SDKs and by consistence with other operations
Operation: "HandleUpdate",
Name: in.Name,
Tags: map[string]string{
workflowIDTagKey: info.WorkflowExecution.ID,
runIDTagKey: info.WorkflowExecution.RunID,
updateIDTagKey: currentUpdateInfo.ID,
},
FromHeader: true,
Time: time.Now(),
IdempotencyKey: t.newIdempotencyKey(),
})
if err != nil {
return nil, err
}
var finishOpts TracerFinishSpanOptions
defer span.Finish(&finishOpts)

val, err := t.Next.ExecuteUpdate(ctx, in)
finishOpts.Error = err
return val, err
}

type tracingWorkflowOutboundInterceptor struct {
WorkflowOutboundInterceptorBase
root *tracingInterceptor
Expand Down
52 changes: 52 additions & 0 deletions internal/interceptortest/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ import (

var testWorkflowStartTime = time.Date(1969, 7, 20, 20, 17, 0, 0, time.UTC)

type testUpdateCallbacks struct {
AcceptImpl func()
RejectImpl func(err error)
CompleteImpl func(success interface{}, err error)
}

// Accept implements internal.UpdateCallbacks.
func (t *testUpdateCallbacks) Accept() {
}

// Complete implements internal.UpdateCallbacks.
func (t *testUpdateCallbacks) Complete(success interface{}, err error) {
}

// Reject implements internal.UpdateCallbacks.
func (t *testUpdateCallbacks) Reject(err error) {
}

// TestTracer is an interceptor.Tracer that returns finished spans.
type TestTracer interface {
interceptor.Tracer
Expand Down Expand Up @@ -73,6 +91,18 @@ func RunTestWorkflow(t *testing.T, tracer interceptor.Tracer) {

env.SetStartTime(testWorkflowStartTime)

// Send an update
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow("testUpdate", "updateID", &testUpdateCallbacks{
RejectImpl: func(err error) {
},
AcceptImpl: func() {
},
CompleteImpl: func(interface{}, error) {
},
})
}, 0*time.Second)

// Exec
env.ExecuteWorkflow(testWorkflow)

Expand Down Expand Up @@ -115,6 +145,12 @@ func RunTestWorkflowWithError(t *testing.T, tracer interceptor.Tracer) {
func AssertSpanPropagation(t *testing.T, tracer TestTracer) {

require.Equal(t, []*SpanInfo{
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "ValidateUpdate", Name: "testUpdate"})),
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "HandleUpdate", Name: "testUpdate"}),
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "StartActivity", Name: "testActivity"}),
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "RunActivity", Name: "testActivity"}))),
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "StartActivity", Name: "testActivityLocal"}),
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "RunActivity", Name: "testActivityLocal"})))),
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "RunWorkflow", Name: "testWorkflow"}),
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "StartActivity", Name: "testActivity"}),
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "RunActivity", Name: "testActivity"}))),
Expand All @@ -137,6 +173,22 @@ func testWorkflowWithError(_ workflow.Context) error {
}

func testWorkflow(ctx workflow.Context) ([]string, error) {
var updateRan bool
err := workflow.SetUpdateHandler(ctx, "testUpdate", func(ctx workflow.Context) (string, error) {
defer func() { updateRan = true }()
_, err := workflowInternal(ctx, false)
if err != nil {
return "", err
}
return "updateID", nil
})
if err != nil {
return nil, err
}
err = workflow.Await(ctx, func() bool { return updateRan })
if err != nil {
return nil, err
}
// Run code
ret, err := workflowInternal(ctx, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (a *Activities) ExternalSignalsAndQueries(ctx context.Context) error {
// Signal with start
workflowOpts := client.StartWorkflowOptions{TaskQueue: activity.GetInfo(ctx).TaskQueue}
run, err := a.client.SignalWithStartWorkflow(ctx, "test-external-signals-and-queries", "start-signal",
"signal-value", workflowOpts, new(Workflows).SignalsAndQueries, false, false)
"signal-value", workflowOpts, new(Workflows).SignalsQueriesAndUpdate, false, false)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 1fe6141

Please sign in to comment.