Skip to content

Commit

Permalink
Merge pull request #4 from uber-go/master
Browse files Browse the repository at this point in the history
Syncing with upstream
  • Loading branch information
bbassingthwaite authored Jul 14, 2020
2 parents fd06acb + 3ac9595 commit 4f146ab
Show file tree
Hide file tree
Showing 25 changed files with 476 additions and 84 deletions.
2 changes: 1 addition & 1 deletion evictiontest/workflow_cache_eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
ret := &m.PollForDecisionTaskResponse{
TaskToken: make([]byte, 5),
WorkflowExecution: &m.WorkflowExecution{WorkflowId: workflowID, RunId: runID},
WorkflowType: &m.WorkflowType{Name: common.StringPtr("testReplayWorkflow")},
WorkflowType: &m.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/evictiontest.testReplayWorkflow")},
History: &m.History{Events: testEvents},
PreviousStartedEventId: common.Int64Ptr(5)}
return ret, nil
Expand Down
7 changes: 7 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,14 @@ type (

// RegisterActivityOptions consists of options for registering an activity
RegisterActivityOptions struct {
// When an activity is a function the name is an actual activity type name.
// When an activity is part of a structure then each member of the structure becomes an activity with
// this Name as a prefix + activity function name.
Name string
// Activity type name is equal to function name instead of fully qualified
// name including function package (and struct type if used).
// This option has no effect when explicit Name is provided.
EnableShortName bool
DisableAlreadyRegisteredCheck bool
}

Expand Down
1 change: 1 addition & 0 deletions internal/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (g jsonEncoding) Marshal(objs []interface{}) ([]byte, error) {
// Unmarshal decodes a byte array into the passed in objects
func (g jsonEncoding) Unmarshal(data []byte, objs []interface{}) error {
dec := json.NewDecoder(bytes.NewBuffer(data))
dec.UseNumber()
for i, obj := range objs {
if err := dec.Decode(obj); err != nil {
return fmt.Errorf(
Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type (
DataConverter DataConverter
Attempt int32
ScheduledTime time.Time
Header *shared.Header
}

// asyncActivityClient for requesting activity execution
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type (
attempt int32 // attempt starting from 0
retryPolicy *RetryPolicy
expireTime time.Time
header *shared.Header
}

localActivityMarkerData struct {
Expand Down Expand Up @@ -503,6 +504,7 @@ func newLocalActivityTask(params executeLocalActivityParams, callback laResultHa
callback: callback,
retryPolicy: params.RetryPolicy,
attempt: params.Attempt,
header: params.Header,
}

if params.RetryPolicy != nil && params.RetryPolicy.ExpirationInterval > 0 {
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *s.PollForDecisio
ParentWorkflowExecution: parentWorkflowExecution,
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
RetryPolicy: attributes.RetryPolicy,
}

wfStartTime := time.Unix(0, h.Events[0].GetTimestamp())
Expand Down Expand Up @@ -1497,6 +1498,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
Header: contErr.params.header,
Memo: workflowContext.workflowInfo.Memo,
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
RetryPolicy: workflowContext.workflowInfo.RetryPolicy,
}
} else if workflowContext.err != nil {
// Workflow failures
Expand Down
12 changes: 10 additions & 2 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() {
t.NotNil(response)
t.Equal(1, len(response.Decisions))
t.Equal(s.DecisionTypeCompleteWorkflowExecution, response.Decisions[0].GetDecisionType())
checksumsJSON := string(response.Decisions[0].CompleteWorkflowExecutionDecisionAttributes.Result)
checksumsBytes := response.Decisions[0].CompleteWorkflowExecutionDecisionAttributes.Result
var checksums []string
json.Unmarshal([]byte(checksumsJSON), &checksums)
json.Unmarshal(checksumsBytes, &checksums)
t.Equal(3, len(checksums))
t.Equal("chck1", checksums[0])
t.Equal("chck2", checksums[1])
Expand Down Expand Up @@ -858,6 +858,12 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
workflowType := "GetWorkflowInfoWorkflow"
lastCompletionResult, err := getDefaultDataConverter().ToData("lastCompletionData")
t.NoError(err)
retryPolicy := &s.RetryPolicy{
InitialIntervalInSeconds: common.Int32Ptr(1),
BackoffCoefficient: common.Float64Ptr(1.0),
MaximumIntervalInSeconds: common.Int32Ptr(1),
MaximumAttempts: common.Int32Ptr(3),
}
startedEventAttributes := &s.WorkflowExecutionStartedEventAttributes{
Input: lastCompletionResult,
TaskList: &s.TaskList{Name: &taskList},
Expand All @@ -869,6 +875,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
ExecutionStartToCloseTimeoutSeconds: &executionTimeout,
TaskStartToCloseTimeoutSeconds: &taskTimeout,
LastCompletionResult: lastCompletionResult,
RetryPolicy: retryPolicy,
}
testEvents := []*s.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, startedEventAttributes),
Expand Down Expand Up @@ -904,6 +911,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
t.EqualValues(taskTimeout, result.TaskStartToCloseTimeoutSeconds)
t.EqualValues(workflowType, result.WorkflowType.Name)
t.EqualValues(testDomain, result.Domain)
t.EqualValues(retryPolicy, result.RetryPolicy)
}

func (t *TaskHandlersTestSuite) TestConsistentQuery_InvalidQueryTask() {
Expand Down
13 changes: 13 additions & 0 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,19 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
attempt: task.attempt,
})

// propagate context information into the local activity activity context from the headers
for _, ctxProp := range lath.contextPropagators {
var err error
if ctx, err = ctxProp.Extract(ctx, NewHeaderReader(task.header)); err != nil {
result = &localActivityResult{
task: task,
result: nil,
err: fmt.Errorf("unable to propagate context %v", err),
}
return result
}
}

// panic handler
defer func() {
if p := recover(); p != nil {
Expand Down
19 changes: 13 additions & 6 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,12 +1087,19 @@ func isError(inType reflect.Type) bool {
}

func getFunctionName(i interface{}) string {
fullName, ok := i.(string)
if !ok {
fullName = runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}
elements := strings.Split(fullName, ".")
return elements[len(elements)-1]
if fullName, ok := i.(string); ok {
return fullName
}
fullName := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
// This allows to call activities by method pointer
// Compiler adds -fm suffix to a function name which has a receiver
// Note that this works even if struct pointer used to get the function is nil
// It is possible because nil receivers are allowed.
// For example:
// var a *Activities
// ExecuteActivity(ctx, a.Foo)
// will call this function which is going to return "Foo"
return strings.TrimSuffix(fullName, "-fm")
}

func getActivityFunctionName(r *registry, i interface{}) string {
Expand Down
10 changes: 5 additions & 5 deletions internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ func querySignalWorkflowFunc(ctx Context, numSignals int) error {
return nil
}

func binaryChecksumWorkflowFunc(ctx Context) ([]*string, error) {
var result []*string
result = append(result, GetWorkflowInfo(ctx).BinaryChecksum)
func binaryChecksumWorkflowFunc(ctx Context) ([]string, error) {
var result []string
result = append(result, GetWorkflowInfo(ctx).GetBinaryChecksum())
Sleep(ctx, time.Hour)
result = append(result, GetWorkflowInfo(ctx).BinaryChecksum)
result = append(result, GetWorkflowInfo(ctx).GetBinaryChecksum())
Sleep(ctx, time.Hour)
result = append(result, GetWorkflowInfo(ctx).BinaryChecksum)
result = append(result, GetWorkflowInfo(ctx).GetBinaryChecksum())
return result, nil
}

Expand Down
18 changes: 9 additions & 9 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() {
taskList := "taskList1"
testEvents := []*shared.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{
WorkflowType: &shared.WorkflowType{Name: common.StringPtr("testReplayWorkflow")},
WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflow")},
TaskList: &shared.TaskList{Name: common.StringPtr(taskList)},
Input: testEncodeFunctionArgs(getDefaultDataConverter()),
}),
Expand Down Expand Up @@ -261,7 +261,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() {
taskList := "taskList1"
testEvents := []*shared.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{
WorkflowType: &shared.WorkflowType{Name: common.StringPtr("testReplayWorkflowLocalActivity")},
WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflowLocalActivity")},
TaskList: &shared.TaskList{Name: common.StringPtr(taskList)},
Input: testEncodeFunctionArgs(getDefaultDataConverter()),
}),
Expand Down Expand Up @@ -379,7 +379,7 @@ func (s *internalWorkerTestSuite) testDecisionTaskHandlerHelper(params workerExe
createTestEventDecisionTaskStarted(3),
}

workflowType := "testReplayWorkflow"
workflowType := "go.uber.org/cadence/internal.testReplayWorkflow"
workflowID := "testID"
runID := "testRunID"

Expand Down Expand Up @@ -799,30 +799,30 @@ func (w activitiesCallingOptionsWorkflow) Execute(ctx Context, input []byte) (re
require.True(w.t, **rStruct2Ptr == testActivityResult{Index: 10})

// By names.
err = ExecuteActivity(ctx, "testActivityByteArgs", input).Get(ctx, nil)
err = ExecuteActivity(ctx, "go.uber.org/cadence/internal.testActivityByteArgs", input).Get(ctx, nil)
require.NoError(w.t, err, err)

err = ExecuteActivity(ctx, "testActivityMultipleArgs", 2, []string{"test"}, true).Get(ctx, nil)
require.NoError(w.t, err, err)

err = ExecuteActivity(ctx, "testActivityNoResult", 2, "test").Get(ctx, nil)
err = ExecuteActivity(ctx, "go.uber.org/cadence/internal.testActivityNoResult", 2, "test").Get(ctx, nil)
require.NoError(w.t, err, err)

err = ExecuteActivity(ctx, "testActivityNoContextArg", 2, "test").Get(ctx, nil)
err = ExecuteActivity(ctx, "go.uber.org/cadence/internal.testActivityNoContextArg", 2, "test").Get(ctx, nil)
require.NoError(w.t, err, err)

f = ExecuteActivity(ctx, "testActivityReturnString")
f = ExecuteActivity(ctx, "go.uber.org/cadence/internal.testActivityReturnString")
err = f.Get(ctx, &rString)
require.NoError(w.t, err, err)
require.Equal(w.t, "testActivity", rString, rString)

f = ExecuteActivity(ctx, "testActivityReturnEmptyString")
f = ExecuteActivity(ctx, "go.uber.org/cadence/internal.testActivityReturnEmptyString")
var r2sString string
err = f.Get(ctx, &r2String)
require.NoError(w.t, err, err)
require.Equal(w.t, "", r2sString)

f = ExecuteActivity(ctx, "testActivityReturnEmptyStruct")
f = ExecuteActivity(ctx, "go.uber.org/cadence/internal.testActivityReturnEmptyStruct")
err = f.Get(ctx, &r2Struct)
require.NoError(w.t, err, err)
require.Equal(w.t, testActivityResult{}, r2Struct)
Expand Down
1 change: 1 addition & 0 deletions internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func returnPanicWorkflow(ctx Context) (err error) {

func (s *WorkflowUnitTest) Test_SplitJoinActivityWorkflow() {
env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflowWithOptions(splitJoinActivityWorkflow, RegisterWorkflowOptions{Name: "splitJoinActivityWorkflow"})
env.RegisterActivityWithOptions(testAct, RegisterActivityOptions{Name: "testActivityWithOptions"})
env.OnActivity(testAct, mock.Anything).Return(func(ctx context.Context) (string, error) {
activityID := GetActivityInfo(ctx).ActivityID
Expand Down
10 changes: 5 additions & 5 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_OnActivityStartedListener() {

env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflow(workflowFn)
env.RegisterActivity(testActivityHello)
env.RegisterActivityWithOptions(testActivityHello, RegisterActivityOptions{Name: "testActivityHello"})

var activityCalls []string
env.SetOnActivityStartedListener(func(activityInfo *ActivityInfo, ctx context.Context, args Values) {
Expand Down Expand Up @@ -745,7 +745,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Mock_Panic_GetChildWorkfl
}

env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflow(testWorkflowHello)
env.RegisterWorkflowWithOptions(testWorkflowHello, RegisterWorkflowOptions{Name: "testWorkflowHello"})
env.RegisterWorkflow(workflowFn)
env.OnWorkflow(testWorkflowHello, mock.Anything, mock.Anything, mock.Anything).
Return("mock_result", nil, "extra_argument") // extra arg causes panic
Expand Down Expand Up @@ -803,7 +803,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Listener() {

env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflow(workflowFn)
env.RegisterWorkflow(testWorkflowHello)
env.RegisterWorkflowWithOptions(testWorkflowHello, RegisterWorkflowOptions{Name: "testWorkflowHello"})
env.RegisterActivity(testActivityHello)

var childWorkflowName, childWorkflowResult string
Expand Down Expand Up @@ -1524,7 +1524,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowFriendlyName() {

env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflow(workflowFn)
env.RegisterWorkflow(testWorkflowHello)
env.RegisterWorkflowWithOptions(testWorkflowHello, RegisterWorkflowOptions{Name: "testWorkflowHello"})
env.RegisterActivity(testActivityHello)
var called []string
env.SetOnChildWorkflowStartedListener(func(workflowInfo *WorkflowInfo, ctx Context, args Values) {
Expand Down Expand Up @@ -2147,7 +2147,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_Channel() {
}

// continue as new
return NewContinueAsNewError(ctx, "this-workflow-fn")
return NewContinueAsNewError(ctx, "this-workflow")
}

for i := range fanoutChs {
Expand Down
43 changes: 32 additions & 11 deletions internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (r *registry) RegisterWorkflowWithOptions(
fnName := getFunctionName(wf)
alias := options.Name
registerName := fnName

if options.EnableShortName {
registerName = getShortFunctionName(fnName)
}
if len(alias) > 0 {
registerName = alias
}
Expand All @@ -93,8 +97,8 @@ func (r *registry) RegisterWorkflowWithOptions(
}
}
r.workflowFuncMap[registerName] = wf
if len(alias) > 0 {
r.workflowAliasMap[fnName] = alias
if len(alias) > 0 || options.EnableShortName {
r.workflowAliasMap[fnName] = registerName
}
}

Expand Down Expand Up @@ -125,6 +129,10 @@ func (r *registry) registerActivityFunction(af interface{}, options RegisterActi
fnName := getFunctionName(af)
alias := options.Name
registerName := fnName

if options.EnableShortName {
registerName = getShortFunctionName(fnName)
}
if len(alias) > 0 {
registerName = alias
}
Expand All @@ -138,8 +146,8 @@ func (r *registry) registerActivityFunction(af interface{}, options RegisterActi
}
}
r.activityFuncMap[registerName] = &activityExecutor{registerName, af}
if len(alias) > 0 {
r.activityAliasMap[fnName] = alias
if len(alias) > 0 || options.EnableShortName {
r.activityAliasMap[fnName] = registerName
}

return nil
Expand All @@ -159,22 +167,30 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
if method.PkgPath != "" {
continue
}
name := method.Name
methodName := getFunctionName(method.Func.Interface())
if err := validateFnFormat(method.Type, false); err != nil {
return fmt.Errorf("failed to register activity method %v of %v: %e", name, structType.Name(), err)
return fmt.Errorf("failed to register activity method %v of %v: %e", methodName, structType.Name(), err)
}
prefix := options.Name
registerName := name
if len(prefix) == 0 {
prefix = structType.Elem().Name() + "_"

structPrefix := options.Name
registerName := methodName

if options.EnableShortName {
registerName = getShortFunctionName(methodName)
}
if len(structPrefix) > 0 {
registerName = structPrefix + getShortFunctionName(methodName)
}
registerName = prefix + name

if !options.DisableAlreadyRegisteredCheck {
if _, ok := r.getActivityNoLock(registerName); ok {
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface()}
if len(structPrefix) > 0 || options.EnableShortName {
r.activityAliasMap[methodName] = registerName
}
count++
}

Expand All @@ -185,6 +201,11 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
return nil
}

func getShortFunctionName(fnName string) string {
elements := strings.Split(fnName, ".")
return elements[len(elements)-1]
}

func (r *registry) getWorkflowAlias(fnName string) (string, bool) {
r.Lock() // do not defer for Unlock to call next.getWorkflowAlias without lock
alias, ok := r.workflowAliasMap[fnName]
Expand Down
Loading

0 comments on commit 4f146ab

Please sign in to comment.