Skip to content

Commit

Permalink
Add context propagators to local activity executions through test env…
Browse files Browse the repository at this point in the history
…ironments (#1291)

Add context propagators to the local activity task handler
  • Loading branch information
DeRauk authored Nov 8, 2023
1 parent 0352634 commit 0e432a9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
11 changes: 7 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
ActivityFn: activityFn,
InputArgs: args,
WorkflowInfo: env.workflowInfo,
Header: env.header,
}
task := &localActivityTask{
activityID: "test-local-activity",
Expand All @@ -648,12 +649,14 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
},
attempt: 1,
scheduledTime: time.Now(),
header: params.Header,
}
taskHandler := localActivityTaskHandler{
userContext: env.workerOptions.BackgroundActivityContext,
metricsHandler: env.metricsHandler,
logger: env.logger,
interceptors: env.registry.interceptors,
userContext: env.workerOptions.BackgroundActivityContext,
metricsHandler: env.metricsHandler,
logger: env.logger,
interceptors: env.registry.interceptors,
contextPropagators: env.contextPropagators,
}

result := taskHandler.executeLocalActivityTask(task)
Expand Down
26 changes: 26 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,32 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowLocalActivityWithMockAndListene
s.True(localActivityFnCanceled.Load())
}

func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityWithHeaderContext() {
// inline activity using value passing through user context.
activityWithUserContext := func(ctx context.Context) (string, error) {
value := ctx.Value(contextKey(testHeader))
if val, ok := value.(string); ok {
return val, nil
}
return "", errors.New("value not found from ctx")
}

env := s.NewTestActivityEnvironment()
env.SetHeader(&commonpb.Header{
Fields: map[string]*commonpb.Payload{
testHeader: encodeString(s.T(), "test-data"),
},
})
env.SetContextPropagators([]ContextPropagator{NewKeysPropagator([]string{testHeader})})

env.RegisterActivity(activityWithUserContext)
blob, err := env.ExecuteLocalActivity(activityWithUserContext)
s.NoError(err)
var value string
_ = blob.Get(&value)
s.Equal("test-data", value)
}

func (s *WorkflowTestSuiteUnitTest) Test_SignalChildWorkflow() {
// This test will send signal from parent to child, and then child will send back signal to ack. No mock is needed.
signalName := "test-signal-name"
Expand Down

0 comments on commit 0e432a9

Please sign in to comment.