From 4a2de7fb79f63d974513847d962383bc5c7b27b5 Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Fri, 29 Mar 2024 15:41:26 -0700 Subject: [PATCH 1/5] Update client wrappers with new async APIs --- client/client.go | 18 + internal/client.go | 18 + internal/internal_workflow_client.go | 414 ++++++++++++++-------- internal/internal_workflow_client_test.go | 289 ++++++++++++--- internal/workflow.go | 5 + mocks/Client.go | 250 +++++++++++-- mocks/README.md | 23 ++ workflow/workflow.go | 3 + 8 files changed, 810 insertions(+), 210 deletions(-) create mode 100644 mocks/README.md diff --git a/client/client.go b/client/client.go index 6265852b5..f67ff0f88 100644 --- a/client/client.go +++ b/client/client.go @@ -104,6 +104,15 @@ type ( // - InternalServiceError StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflowFunc interface{}, args ...interface{}) (*workflow.Execution, error) + // StartWorkflowAsync behaves like StartWorkflow except that the request is first queued and then processed asynchronously. + // See StartWorkflow for parameter details. + // The returned AsyncWorkflowExecution doesn't contain run ID, because the workflow hasn't started yet. + // The errors it can return: + // - EntityNotExistsError, if domain does not exists + // - BadRequestError + // - InternalServiceError + StartWorkflowAsync(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*workflow.ExecutionAsync, error) + // ExecuteWorkflow starts a workflow execution and return a WorkflowRun instance and error // The user can use this to start using a function or workflow type name. // Either by @@ -169,6 +178,15 @@ type ( SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflowFunc interface{}, workflowArgs ...interface{}) (*workflow.Execution, error) + // SignalWithStartWorkflowAsync behaves like SignalWithStartWorkflow except that the request is first queued and then processed asynchronously. + // See SignalWithStartWorkflow for parameter details. + // The errors it can return: + // - EntityNotExistsError, if domain does not exist + // - BadRequestError + // - InternalServiceError + SignalWithStartWorkflowAsync(ctx context.Context, workflowID string, signalName string, signalArg interface{}, + options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*workflow.ExecutionAsync, error) + // CancelWorkflow cancels a workflow in execution // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. diff --git a/internal/client.go b/internal/client.go index 0c05e3ad6..3b7b293b5 100644 --- a/internal/client.go +++ b/internal/client.go @@ -93,6 +93,15 @@ type ( // subjected to change in the future. StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecution, error) + // StartWorkflowAsync behaves like StartWorkflow except that the request is first queued and then processed asynchronously. + // See StartWorkflow for parameter details. + // The returned AsyncWorkflowExecution doesn't contain run ID, because the workflow hasn't started yet. + // The errors it can return: + // - EntityNotExistsError, if domain does not exists + // - BadRequestError + // - InternalServiceError + StartWorkflowAsync(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecutionAsync, error) + // ExecuteWorkflow starts a workflow execution and return a WorkflowRun instance and error // The user can use this to start using a function or workflow type name. // Either by @@ -160,6 +169,15 @@ type ( SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*WorkflowExecution, error) + // SignalWithStartWorkflowAsync behaves like SignalWithStartWorkflow except that the request is first queued and then processed asynchronously. + // See SignalWithStartWorkflow for parameter details. + // The errors it can return: + // - EntityNotExistsError, if domain does not exist + // - BadRequestError + // - InternalServiceError + SignalWithStartWorkflowAsync(ctx context.Context, workflowID string, signalName string, signalArg interface{}, + options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*WorkflowExecutionAsync, error) + // CancelWorkflow cancels a workflow in execution // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 80d2ecc45..012744f44 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -155,90 +155,57 @@ func (wc *workflowClient) StartWorkflow( workflowFunc interface{}, args ...interface{}, ) (*WorkflowExecution, error) { - workflowID := options.ID - if len(workflowID) == 0 { - workflowID = uuid.NewRandom().String() - } - - if options.TaskList == "" { - return nil, errors.New("missing TaskList") + startRequest, err := wc.getWorkflowStartRequest(ctx, "StartWorkflow", options, workflowFunc, args...) + if err != nil { + return nil, err } - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - if executionTimeout <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } + var response *s.StartWorkflowExecutionResponse - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) - if decisionTaskTimeout < 0 { - return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") - } - if decisionTaskTimeout == 0 { - decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs - } + // Start creating workflow request. + err = backoff.Retry(ctx, + func() error { + tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags) + defer cancel() - // Validate type and its arguments. - workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, args, wc.dataConverter, wc.registry) - if err != nil { - return nil, err - } + var err1 error + response, err1 = wc.workflowService.StartWorkflowExecution(tchCtx, startRequest, opt...) + return err1 + }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) - memo, err := getWorkflowMemo(options.Memo, wc.dataConverter) if err != nil { return nil, err } - searchAttr, err := serializeSearchAttributes(options.SearchAttributes) - if err != nil { - return nil, err + if wc.metricsScope != nil { + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, startRequest.WorkflowType.GetName()) + scope.Counter(metrics.WorkflowStartCounter).Inc(1) } - delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds()) - if delayStartSeconds < 0 { - return nil, errors.New("Invalid DelayStart option") + executionInfo := &WorkflowExecution{ + ID: *startRequest.WorkflowId, + RunID: response.GetRunId(), } + return executionInfo, nil +} - jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds()) - if jitterStartSeconds < 0 { - return nil, errors.New("Invalid JitterStart option") +// StartWorkflowAsync queues a workflow execution which is going to be picked up and started by Cadence backend asynchronously. +// See StartWorkflow for details about inputs and usage. +func (wc *workflowClient) StartWorkflowAsync( + ctx context.Context, + options StartWorkflowOptions, + workflowFunc interface{}, + args ...interface{}, +) (*WorkflowExecutionAsync, error) { + startRequest, err := wc.getWorkflowStartRequest(ctx, "StartWorkflowAsync", options, workflowFunc, args...) + if err != nil { + return nil, err } - // create a workflow start span and attach it to the context object. - // N.B. we need to finish this immediately as jaeger does not give us a way - // to recreate a span given a span context - which means we will run into - // issues during replay. we work around this by creating and ending the - // workflow start span and passing in that context to the workflow. So - // everything beginning with the StartWorkflowExecutionRequest will be - // parented by the created start workflow span. - ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("StartWorkflow-%s", workflowType.Name), workflowID) - span.Finish() - - // get workflow headers from the context - header := wc.getWorkflowHeader(ctx) - - // run propagators to extract information about tracing and other stuff, store in headers field - startRequest := &s.StartWorkflowExecutionRequest{ - Domain: common.StringPtr(wc.domain), - RequestId: common.StringPtr(uuid.New()), - WorkflowId: common.StringPtr(workflowID), - WorkflowType: workflowTypePtr(*workflowType), - TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}), - Input: input, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTaskTimeout), - Identity: common.StringPtr(wc.identity), - WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(), - RetryPolicy: convertRetryPolicy(options.RetryPolicy), - CronSchedule: common.StringPtr(options.CronSchedule), - Memo: memo, - SearchAttributes: searchAttr, - Header: header, - DelayStartSeconds: common.Int32Ptr(delayStartSeconds), - JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), + asyncStartRequest := &s.StartWorkflowExecutionAsyncRequest{ + Request: startRequest, } - var response *s.StartWorkflowExecutionResponse - // Start creating workflow request. err = backoff.Retry(ctx, func() error { @@ -246,7 +213,7 @@ func (wc *workflowClient) StartWorkflow( defer cancel() var err1 error - response, err1 = wc.workflowService.StartWorkflowExecution(tchCtx, startRequest, opt...) + _, err1 = wc.workflowService.StartWorkflowExecutionAsync(tchCtx, asyncStartRequest, opt...) return err1 }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) @@ -255,13 +222,13 @@ func (wc *workflowClient) StartWorkflow( } if wc.metricsScope != nil { - scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, workflowType.Name) + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, startRequest.WorkflowType.GetName()) scope.Counter(metrics.WorkflowStartCounter).Inc(1) } - executionInfo := &WorkflowExecution{ - ID: workflowID, - RunID: response.GetRunId()} + executionInfo := &WorkflowExecutionAsync{ + ID: *startRequest.WorkflowId, + } return executionInfo, nil } @@ -343,100 +310,76 @@ func (wc *workflowClient) SignalWorkflow(ctx context.Context, workflowID string, // SignalWithStartWorkflow sends a signal to a running workflow. // If the workflow is not running or not found, it starts the workflow and then sends the signal in transaction. -func (wc *workflowClient) SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, - options StartWorkflowOptions, workflowFunc interface{}, workflowArgs ...interface{}) (*WorkflowExecution, error) { +func (wc *workflowClient) SignalWithStartWorkflow( + ctx context.Context, + workflowID, signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowFunc interface{}, + workflowArgs ...interface{}, +) (*WorkflowExecution, error) { - signalInput, err := encodeArg(wc.dataConverter, signalArg) + signalWithStartRequest, err := wc.getSignalWithStartRequest(ctx, "SignalWithStartWorkflow", workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) if err != nil { return nil, err } - if workflowID == "" { - workflowID = uuid.NewRandom().String() - } - - if options.TaskList == "" { - return nil, errors.New("missing TaskList") - } + var response *s.StartWorkflowExecutionResponse - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - if executionTimeout <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } + // Start creating workflow request. + err = backoff.Retry(ctx, + func() error { + tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags) + defer cancel() - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) - if decisionTaskTimeout < 0 { - return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") - } - if decisionTaskTimeout == 0 { - decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs - } + var err1 error + response, err1 = wc.workflowService.SignalWithStartWorkflowExecution(tchCtx, signalWithStartRequest, opt...) + return err1 + }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) - // Validate type and its arguments. - workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, workflowArgs, wc.dataConverter, wc.registry) if err != nil { return nil, err } - memo, err := getWorkflowMemo(options.Memo, wc.dataConverter) - if err != nil { - return nil, err + if wc.metricsScope != nil { + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, signalWithStartRequest.WorkflowType.GetName()) + scope.Counter(metrics.WorkflowSignalWithStartCounter).Inc(1) } - searchAttr, err := serializeSearchAttributes(options.SearchAttributes) - if err != nil { - return nil, err + executionInfo := &WorkflowExecution{ + ID: options.ID, + RunID: response.GetRunId(), } + return executionInfo, nil +} - delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds()) - if delayStartSeconds < 0 { - return nil, errors.New("Invalid DelayStart option") - } +// SignalWithStartWorkflowAsync behaves like SignalWithStartWorkflow except that the request is queued and processed by Cadence backend asynchronously. +// See SignalWithStartWorkflow for details about inputs and usage. +func (wc *workflowClient) SignalWithStartWorkflowAsync( + ctx context.Context, + workflowID, signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowFunc interface{}, + workflowArgs ...interface{}, +) (*WorkflowExecutionAsync, error) { - jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds()) - if jitterStartSeconds < 0 { - return nil, errors.New("Invalid JitterStart option") + signalWithStartRequest, err := wc.getSignalWithStartRequest(ctx, "SignalWithStartWorkflow", workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) + if err != nil { + return nil, err } - // create a workflow start span and attach it to the context object. finish it immediately - ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("SignalWithStartWorkflow-%s", workflowType.Name), workflowID) - span.Finish() - - // get workflow headers from the context - header := wc.getWorkflowHeader(ctx) - - signalWithStartRequest := &s.SignalWithStartWorkflowExecutionRequest{ - Domain: common.StringPtr(wc.domain), - RequestId: common.StringPtr(uuid.New()), - WorkflowId: common.StringPtr(workflowID), - WorkflowType: workflowTypePtr(*workflowType), - TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}), - Input: input, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTaskTimeout), - SignalName: common.StringPtr(signalName), - SignalInput: signalInput, - Identity: common.StringPtr(wc.identity), - RetryPolicy: convertRetryPolicy(options.RetryPolicy), - CronSchedule: common.StringPtr(options.CronSchedule), - Memo: memo, - SearchAttributes: searchAttr, - WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(), - Header: header, - DelayStartSeconds: common.Int32Ptr(delayStartSeconds), - JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), + asyncSignalWithStartRequest := &s.SignalWithStartWorkflowExecutionAsyncRequest{ + Request: signalWithStartRequest, } - var response *s.StartWorkflowExecutionResponse - - // Start creating workflow request. err = backoff.Retry(ctx, func() error { tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags) defer cancel() var err1 error - response, err1 = wc.workflowService.SignalWithStartWorkflowExecution(tchCtx, signalWithStartRequest, opt...) + _, err1 = wc.workflowService.SignalWithStartWorkflowExecutionAsync(tchCtx, asyncSignalWithStartRequest, opt...) return err1 }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) @@ -445,13 +388,13 @@ func (wc *workflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI } if wc.metricsScope != nil { - scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, workflowType.Name) + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, signalWithStartRequest.WorkflowType.GetName()) scope.Counter(metrics.WorkflowSignalWithStartCounter).Inc(1) } - executionInfo := &WorkflowExecution{ - ID: options.ID, - RunID: response.GetRunId()} + executionInfo := &WorkflowExecutionAsync{ + ID: options.ID, + } return executionInfo, nil } @@ -1060,6 +1003,191 @@ func (wc *workflowClient) getWorkflowHeader(ctx context.Context) *s.Header { return header } +func (wc *workflowClient) getWorkflowStartRequest( + ctx context.Context, + tracePrefix string, + options StartWorkflowOptions, + workflowFunc interface{}, + args ...interface{}, +) (*s.StartWorkflowExecutionRequest, error) { + workflowID := options.ID + if len(workflowID) == 0 { + workflowID = uuid.NewRandom().String() + } + + if options.TaskList == "" { + return nil, errors.New("missing TaskList") + } + + executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) + if executionTimeout <= 0 { + return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") + } + + decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + if decisionTaskTimeout < 0 { + return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") + } + if decisionTaskTimeout == 0 { + decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs + } + + // Validate type and its arguments. + workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, args, wc.dataConverter, wc.registry) + if err != nil { + return nil, err + } + + memo, err := getWorkflowMemo(options.Memo, wc.dataConverter) + if err != nil { + return nil, err + } + + searchAttr, err := serializeSearchAttributes(options.SearchAttributes) + if err != nil { + return nil, err + } + + delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds()) + if delayStartSeconds < 0 { + return nil, errors.New("Invalid DelayStart option") + } + + jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds()) + if jitterStartSeconds < 0 { + return nil, errors.New("Invalid JitterStart option") + } + + // create a workflow start span and attach it to the context object. + // N.B. we need to finish this immediately as jaeger does not give us a way + // to recreate a span given a span context - which means we will run into + // issues during replay. we work around this by creating and ending the + // workflow start span and passing in that context to the workflow. So + // everything beginning with the StartWorkflowExecutionRequest will be + // parented by the created start workflow span. + ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("%s-%s", tracePrefix, workflowType.Name), workflowID) + span.Finish() + + // get workflow headers from the context + header := wc.getWorkflowHeader(ctx) + + // run propagators to extract information about tracing and other stuff, store in headers field + startRequest := &s.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(wc.domain), + RequestId: common.StringPtr(uuid.New()), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: workflowTypePtr(*workflowType), + TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}), + Input: input, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTaskTimeout), + Identity: common.StringPtr(wc.identity), + WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(), + RetryPolicy: convertRetryPolicy(options.RetryPolicy), + CronSchedule: common.StringPtr(options.CronSchedule), + Memo: memo, + SearchAttributes: searchAttr, + Header: header, + DelayStartSeconds: common.Int32Ptr(delayStartSeconds), + JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), + } + + return startRequest, nil +} + +func (wc *workflowClient) getSignalWithStartRequest( + ctx context.Context, + tracePrefix, workflowID, signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowFunc interface{}, + workflowArgs ...interface{}, +) (*s.SignalWithStartWorkflowExecutionRequest, error) { + + signalInput, err := encodeArg(wc.dataConverter, signalArg) + if err != nil { + return nil, err + } + + if workflowID == "" { + workflowID = uuid.NewRandom().String() + } + + if options.TaskList == "" { + return nil, errors.New("missing TaskList") + } + + executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) + if executionTimeout <= 0 { + return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") + } + + decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + if decisionTaskTimeout < 0 { + return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") + } + if decisionTaskTimeout == 0 { + decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs + } + + // Validate type and its arguments. + workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, workflowArgs, wc.dataConverter, wc.registry) + if err != nil { + return nil, err + } + + memo, err := getWorkflowMemo(options.Memo, wc.dataConverter) + if err != nil { + return nil, err + } + + searchAttr, err := serializeSearchAttributes(options.SearchAttributes) + if err != nil { + return nil, err + } + + delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds()) + if delayStartSeconds < 0 { + return nil, errors.New("Invalid DelayStart option") + } + + jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds()) + if jitterStartSeconds < 0 { + return nil, errors.New("Invalid JitterStart option") + } + + // create a workflow start span and attach it to the context object. finish it immediately + ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("%s-%s", tracePrefix, workflowType.Name), workflowID) + span.Finish() + + // get workflow headers from the context + header := wc.getWorkflowHeader(ctx) + + signalWithStartRequest := &s.SignalWithStartWorkflowExecutionRequest{ + Domain: common.StringPtr(wc.domain), + RequestId: common.StringPtr(uuid.New()), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: workflowTypePtr(*workflowType), + TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}), + Input: input, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTaskTimeout), + SignalName: common.StringPtr(signalName), + SignalInput: signalInput, + Identity: common.StringPtr(wc.identity), + RetryPolicy: convertRetryPolicy(options.RetryPolicy), + CronSchedule: common.StringPtr(options.CronSchedule), + Memo: memo, + SearchAttributes: searchAttr, + WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(), + Header: header, + DelayStartSeconds: common.Int32Ptr(delayStartSeconds), + JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), + } + + return signalWithStartRequest, nil +} + // Register a domain with cadence server // The errors it can throw: // - DomainAlreadyExistsError diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 96642e40e..7f71e67ca 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -204,7 +204,7 @@ func (s *historyEventIteratorSuite) TestIterator_NoError() { iter := s.wfClient.GetWorkflowHistory(context.Background(), workflowID, runID, true, shared.HistoryEventFilterTypeAllEvent) for iter.HasNext() { event, err := iter.Next() - s.Nil(err) + s.NoError(err) events = append(events, event) } s.Equal(3, len(events)) @@ -255,7 +255,7 @@ func (s *historyEventIteratorSuite) TestIterator_NoError_EmptyPage() { iter := s.wfClient.GetWorkflowHistory(context.Background(), workflowID, runID, true, shared.HistoryEventFilterTypeAllEvent) for iter.HasNext() { event, err := iter.Next() - s.Nil(err) + s.NoError(err) events = append(events, event) } s.Equal(2, len(events)) @@ -283,14 +283,14 @@ func (s *historyEventIteratorSuite) TestIterator_Error() { s.True(iter.HasNext()) event, err := iter.Next() s.NotNil(event) - s.Nil(err) + s.NoError(err) s.workflowServiceClient.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), request2, gomock.Any()).Return(nil, &shared.EntityNotExistsError{}).Times(1) s.True(iter.HasNext()) event, err = iter.Next() s.Nil(event) - s.NotNil(err) + s.Error(err) } func (s *historyEventIteratorSuite) TestIterator_StopsTryingNearTimeout() { @@ -433,12 +433,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Success() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -481,12 +481,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_RawHistory_Success() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -533,12 +533,12 @@ func (s *workflowRunSuite) TestExecuteWorkflowWorkflowExecutionAlreadyStartedErr WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -589,12 +589,12 @@ func (s *workflowRunSuite) TestExecuteWorkflowWorkflowExecutionAlreadyStartedErr WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -639,11 +639,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoIdInOptions() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) s.Equal(workflowRun.GetID(), *wid) } @@ -692,11 +692,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoIdInOptions_RawHistory() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) s.Equal(workflowRun.GetID(), *wid) } @@ -737,12 +737,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Cancelled() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.NotNil(err) + s.Error(err) _, ok := err.(*CanceledError) s.True(ok) s.Equal(time.Minute, decodedResult) @@ -787,7 +787,7 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Failed() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute @@ -828,7 +828,7 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Terminated() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute @@ -872,12 +872,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_TimedOut() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.NotNil(err) + s.Error(err) _, ok := err.(*TimeoutError) s.True(ok) s.Equal(timeType, err.(*TimeoutError).TimeoutType()) @@ -939,12 +939,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_ContinueAsNew() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -981,7 +981,7 @@ func (s *workflowRunSuite) TestGetWorkflow() { s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err := workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -1049,12 +1049,12 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow() { resp, err := s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, options, workflowType) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) resp, err = s.client.SignalWithStartWorkflow(context.Background(), "", signalName, signalInput, options, workflowType) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } @@ -1063,15 +1063,19 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_Error() { signalInput := []byte("my signal input") options := StartWorkflowOptions{} - resp, err := s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + resp, err := s.client.SignalWithStartWorkflow(ctx, workflowID, signalName, signalInput, options, workflowType) s.Equal(errors.New("missing TaskList"), err) s.Nil(resp) + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() options.TaskList = tasklist - resp, err = s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) - s.NotNil(err) + resp, err = s.client.SignalWithStartWorkflow(ctx, workflowID, signalName, signalInput, options, workflowType) + s.Error(err) s.Nil(resp) options.ExecutionStartToCloseTimeout = timeoutInSeconds @@ -1081,7 +1085,7 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_Error() { s.service.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) resp, err = s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, options, workflowType) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } @@ -1105,7 +1109,7 @@ func (s *workflowClientTestSuite) TestStartWorkflow() { resp, err := client.StartWorkflow(context.Background(), options, f1, []byte("test")) s.Equal(getDefaultDataConverter(), client.dataConverter) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } @@ -1134,7 +1138,7 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithContext() { s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) resp, err := client.StartWorkflow(context.Background(), options, f1, []byte("test")) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } @@ -1169,7 +1173,7 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithDataConverter() { resp, err := client.StartWorkflow(context.Background(), options, f1, input) s.Equal(newTestDataConverter(), client.dataConverter) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } @@ -1204,10 +1208,12 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithMemoAndSearchAttr() { s.NoError(err) s.Equal("attr value", resultAttr) }) - s.client.StartWorkflow(context.Background(), options, wf) + + _, err := s.client.StartWorkflow(context.Background(), options, wf) + s.NoError(err) } -func (s *workflowClientTestSuite) SignalWithStartWorkflowWithMemoAndSearchAttr() { +func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_WithMemoAndSearchAttr() { memo := map[string]interface{}{ "testMemo": "memo value", } @@ -1239,7 +1245,196 @@ func (s *workflowClientTestSuite) SignalWithStartWorkflowWithMemoAndSearchAttr() s.NoError(err) s.Equal("attr value", resultAttr) }) - s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf) + + _, err := s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_WithMemoAndSearchAttr() { + memo := map[string]interface{}{ + "testMemo": "memo value", + } + searchAttributes := map[string]interface{}{ + "testAttr": "attr value", + } + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + Memo: memo, + SearchAttributes: searchAttributes, + } + wf := func(ctx Context) string { + return "result" + } + + s.service.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil). + Do(func(_ interface{}, asyncReq *shared.SignalWithStartWorkflowExecutionAsyncRequest, _ ...interface{}) { + req := asyncReq.Request + var resultMemo, resultAttr string + err := json.Unmarshal(req.Memo.Fields["testMemo"], &resultMemo) + s.NoError(err) + s.Equal("memo value", resultMemo) + + err = json.Unmarshal(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + s.NoError(err) + s.Equal("attr value", resultAttr) + }) + + _, err := s.client.SignalWithStartWorkflowAsync(context.Background(), "wid", "signal", "value", options, wf) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_Error() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + + s.service.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).AnyTimes() + + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := s.client.SignalWithStartWorkflowAsync(ctx, "wid", "signal", "value", options, wf) + s.Error(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflow_Error() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + startResp := &shared.StartWorkflowExecutionResponse{} + + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, errors.New("failed")).AnyTimes() + + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := s.client.StartWorkflow(ctx, options, wf) + s.Error(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync() { + client, ok := s.client.(*workflowClient) + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) string { + return "result" + } + + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + + _, err := client.StartWorkflowAsync(context.Background(), options, f1, []byte("test")) + s.Equal(getDefaultDataConverter(), client.dataConverter) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync_WithDataConverter() { + dc := newTestDataConverter() + s.client = NewClient(s.service, domain, &ClientOptions{DataConverter: dc}) + client, ok := s.client.(*workflowClient) + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) string { + return "result" + } + input := []byte("test") + + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil). + Do(func(_ interface{}, asyncReq *shared.StartWorkflowExecutionAsyncRequest, _ ...interface{}) { + req := asyncReq.Request + dc := client.dataConverter + encodedArg, _ := dc.ToData(input) + s.Equal(req.Input, encodedArg) + var decodedArg []byte + dc.FromData(req.Input, &decodedArg) + s.Equal(input, decodedArg) + }) + + _, err := client.StartWorkflowAsync(context.Background(), options, f1, input) + s.Equal(newTestDataConverter(), client.dataConverter) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync_WithMemoAndSearchAttr() { + memo := map[string]interface{}{ + "testMemo": "memo value", + } + searchAttributes := map[string]interface{}{ + "testAttr": "attr value", + } + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + Memo: memo, + SearchAttributes: searchAttributes, + } + wf := func(ctx Context) string { + return "result" + } + + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil). + Do(func(_ interface{}, asyncReq *shared.StartWorkflowExecutionAsyncRequest, _ ...interface{}) { + req := asyncReq.Request + var resultMemo, resultAttr string + err := json.Unmarshal(req.Memo.Fields["testMemo"], &resultMemo) + s.NoError(err) + s.Equal("memo value", resultMemo) + + err = json.Unmarshal(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + s.NoError(err) + s.Equal("attr value", resultAttr) + }) + + _, err := s.client.StartWorkflowAsync(context.Background(), options, wf) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync_Error() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).AnyTimes() + + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := s.client.StartWorkflowAsync(ctx, options, wf) + s.Error(err) } func (s *workflowClientTestSuite) TestGetWorkflowMemo() { @@ -1302,7 +1497,7 @@ func (s *workflowClientTestSuite) TestListWorkflow() { s.Equal(domain, request.GetDomain()) }) resp, err := s.client.ListWorkflow(context.Background(), request) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1311,7 +1506,7 @@ func (s *workflowClientTestSuite) TestListWorkflow() { Do(func(_ interface{}, req *shared.ListWorkflowExecutionsRequest, _ ...interface{}) { s.Equal("another", request.GetDomain()) }) - resp, err = s.client.ListWorkflow(context.Background(), request) + _, err = s.client.ListWorkflow(context.Background(), request) s.Equal(responseErr, err) } @@ -1325,7 +1520,7 @@ func (s *workflowClientTestSuite) TestListArchivedWorkflow() { ctxWithTimeout, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() resp, err := s.client.ListArchivedWorkflow(ctxWithTimeout, request) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1346,7 +1541,7 @@ func (s *workflowClientTestSuite) TestScanWorkflow() { s.Equal(domain, request.GetDomain()) }) resp, err := s.client.ScanWorkflow(context.Background(), request) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1367,7 +1562,7 @@ func (s *workflowClientTestSuite) TestCountWorkflow() { s.Equal(domain, request.GetDomain()) }) resp, err := s.client.CountWorkflow(context.Background(), request) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1384,7 +1579,7 @@ func (s *workflowClientTestSuite) TestGetSearchAttributes() { response := &shared.GetSearchAttributesResponse{} s.service.EXPECT().GetSearchAttributes(gomock.Any(), gomock.Any()).Return(response, nil) resp, err := s.client.GetSearchAttributes(context.Background()) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1408,7 +1603,7 @@ func (s *workflowClientTestSuite) TestCancelWorkflow() { err := s.client.CancelWorkflow(context.Background(), "testWf", "testRun", WithCancelReason("test reason")) - s.Nil(err) + s.NoError(err) } func (s *workflowClientTestSuite) TestCancelWorkflowBackwardsCompatible() { @@ -1416,7 +1611,7 @@ func (s *workflowClientTestSuite) TestCancelWorkflowBackwardsCompatible() { err := s.client.CancelWorkflow(context.Background(), "testWf", "testRun") - s.Nil(err) + s.NoError(err) } type PartialCancelRequestMatcher struct { diff --git a/internal/workflow.go b/internal/workflow.go index 5568e1608..a7a1a3bab 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -400,6 +400,11 @@ type ( RunID string } + // WorkflowExecutuonAsync Details. + WorkflowExecutionAsync struct { + ID string + } + // EncodedValue is type alias used to encapsulate/extract encoded result from workflow/activity. EncodedValue struct { value []byte diff --git a/mocks/Client.go b/mocks/Client.go index 1f02bda80..f05f803b9 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -48,6 +48,10 @@ func (_m *Client) CancelWorkflow(ctx context.Context, workflowID string, runID s _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for CancelWorkflow") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, ...internal.Option) error); ok { r0 = rf(ctx, workflowID, runID, opts...) @@ -62,6 +66,10 @@ func (_m *Client) CancelWorkflow(ctx context.Context, workflowID string, runID s func (_m *Client) CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error { ret := _m.Called(ctx, taskToken, result, err) + if len(ret) == 0 { + panic("no return value specified for CompleteActivity") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []byte, interface{}, error) error); ok { r0 = rf(ctx, taskToken, result, err) @@ -76,6 +84,10 @@ func (_m *Client) CompleteActivity(ctx context.Context, taskToken []byte, result func (_m *Client) CompleteActivityByID(ctx context.Context, domain string, workflowID string, runID string, activityID string, result interface{}, err error) error { ret := _m.Called(ctx, domain, workflowID, runID, activityID, result, err) + if len(ret) == 0 { + panic("no return value specified for CompleteActivityByID") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, interface{}, error) error); ok { r0 = rf(ctx, domain, workflowID, runID, activityID, result, err) @@ -90,7 +102,15 @@ func (_m *Client) CompleteActivityByID(ctx context.Context, domain string, workf func (_m *Client) CountWorkflow(ctx context.Context, request *shared.CountWorkflowExecutionsRequest) (*shared.CountWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) + if len(ret) == 0 { + panic("no return value specified for CountWorkflow") + } + var r0 *shared.CountWorkflowExecutionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *shared.CountWorkflowExecutionsRequest) (*shared.CountWorkflowExecutionsResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *shared.CountWorkflowExecutionsRequest) *shared.CountWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -99,7 +119,6 @@ func (_m *Client) CountWorkflow(ctx context.Context, request *shared.CountWorkfl } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.CountWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -113,7 +132,15 @@ func (_m *Client) CountWorkflow(ctx context.Context, request *shared.CountWorkfl func (_m *Client) DescribeTaskList(ctx context.Context, tasklist string, tasklistType shared.TaskListType) (*shared.DescribeTaskListResponse, error) { ret := _m.Called(ctx, tasklist, tasklistType) + if len(ret) == 0 { + panic("no return value specified for DescribeTaskList") + } + var r0 *shared.DescribeTaskListResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, shared.TaskListType) (*shared.DescribeTaskListResponse, error)); ok { + return rf(ctx, tasklist, tasklistType) + } if rf, ok := ret.Get(0).(func(context.Context, string, shared.TaskListType) *shared.DescribeTaskListResponse); ok { r0 = rf(ctx, tasklist, tasklistType) } else { @@ -122,7 +149,6 @@ func (_m *Client) DescribeTaskList(ctx context.Context, tasklist string, tasklis } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, shared.TaskListType) error); ok { r1 = rf(ctx, tasklist, tasklistType) } else { @@ -136,7 +162,15 @@ func (_m *Client) DescribeTaskList(ctx context.Context, tasklist string, tasklis func (_m *Client) DescribeWorkflowExecution(ctx context.Context, workflowID string, runID string) (*shared.DescribeWorkflowExecutionResponse, error) { ret := _m.Called(ctx, workflowID, runID) + if len(ret) == 0 { + panic("no return value specified for DescribeWorkflowExecution") + } + var r0 *shared.DescribeWorkflowExecutionResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (*shared.DescribeWorkflowExecutionResponse, error)); ok { + return rf(ctx, workflowID, runID) + } if rf, ok := ret.Get(0).(func(context.Context, string, string) *shared.DescribeWorkflowExecutionResponse); ok { r0 = rf(ctx, workflowID, runID) } else { @@ -145,7 +179,6 @@ func (_m *Client) DescribeWorkflowExecution(ctx context.Context, workflowID stri } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { r1 = rf(ctx, workflowID, runID) } else { @@ -162,7 +195,15 @@ func (_m *Client) ExecuteWorkflow(ctx context.Context, options internal.StartWor _ca = append(_ca, args...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for ExecuteWorkflow") + } + var r0 internal.WorkflowRun + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) (internal.WorkflowRun, error)); ok { + return rf(ctx, options, workflow, args...) + } if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) internal.WorkflowRun); ok { r0 = rf(ctx, options, workflow, args...) } else { @@ -171,7 +212,6 @@ func (_m *Client) ExecuteWorkflow(ctx context.Context, options internal.StartWor } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { r1 = rf(ctx, options, workflow, args...) } else { @@ -185,7 +225,15 @@ func (_m *Client) ExecuteWorkflow(ctx context.Context, options internal.StartWor func (_m *Client) GetSearchAttributes(ctx context.Context) (*shared.GetSearchAttributesResponse, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for GetSearchAttributes") + } + var r0 *shared.GetSearchAttributesResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*shared.GetSearchAttributesResponse, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) *shared.GetSearchAttributesResponse); ok { r0 = rf(ctx) } else { @@ -194,7 +242,6 @@ func (_m *Client) GetSearchAttributes(ctx context.Context) (*shared.GetSearchAtt } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -208,6 +255,10 @@ func (_m *Client) GetSearchAttributes(ctx context.Context) (*shared.GetSearchAtt func (_m *Client) GetWorkflow(ctx context.Context, workflowID string, runID string) internal.WorkflowRun { ret := _m.Called(ctx, workflowID, runID) + if len(ret) == 0 { + panic("no return value specified for GetWorkflow") + } + var r0 internal.WorkflowRun if rf, ok := ret.Get(0).(func(context.Context, string, string) internal.WorkflowRun); ok { r0 = rf(ctx, workflowID, runID) @@ -224,6 +275,10 @@ func (_m *Client) GetWorkflow(ctx context.Context, workflowID string, runID stri func (_m *Client) GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType shared.HistoryEventFilterType) internal.HistoryEventIterator { ret := _m.Called(ctx, workflowID, runID, isLongPoll, filterType) + if len(ret) == 0 { + panic("no return value specified for GetWorkflowHistory") + } + var r0 internal.HistoryEventIterator if rf, ok := ret.Get(0).(func(context.Context, string, string, bool, shared.HistoryEventFilterType) internal.HistoryEventIterator); ok { r0 = rf(ctx, workflowID, runID, isLongPoll, filterType) @@ -240,7 +295,15 @@ func (_m *Client) GetWorkflowHistory(ctx context.Context, workflowID string, run func (_m *Client) ListArchivedWorkflow(ctx context.Context, request *shared.ListArchivedWorkflowExecutionsRequest) (*shared.ListArchivedWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) + if len(ret) == 0 { + panic("no return value specified for ListArchivedWorkflow") + } + var r0 *shared.ListArchivedWorkflowExecutionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *shared.ListArchivedWorkflowExecutionsRequest) (*shared.ListArchivedWorkflowExecutionsResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListArchivedWorkflowExecutionsRequest) *shared.ListArchivedWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -249,7 +312,6 @@ func (_m *Client) ListArchivedWorkflow(ctx context.Context, request *shared.List } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListArchivedWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -263,7 +325,15 @@ func (_m *Client) ListArchivedWorkflow(ctx context.Context, request *shared.List func (_m *Client) ListClosedWorkflow(ctx context.Context, request *shared.ListClosedWorkflowExecutionsRequest) (*shared.ListClosedWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) + if len(ret) == 0 { + panic("no return value specified for ListClosedWorkflow") + } + var r0 *shared.ListClosedWorkflowExecutionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *shared.ListClosedWorkflowExecutionsRequest) (*shared.ListClosedWorkflowExecutionsResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListClosedWorkflowExecutionsRequest) *shared.ListClosedWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -272,7 +342,6 @@ func (_m *Client) ListClosedWorkflow(ctx context.Context, request *shared.ListCl } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListClosedWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -286,7 +355,15 @@ func (_m *Client) ListClosedWorkflow(ctx context.Context, request *shared.ListCl func (_m *Client) ListOpenWorkflow(ctx context.Context, request *shared.ListOpenWorkflowExecutionsRequest) (*shared.ListOpenWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) + if len(ret) == 0 { + panic("no return value specified for ListOpenWorkflow") + } + var r0 *shared.ListOpenWorkflowExecutionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *shared.ListOpenWorkflowExecutionsRequest) (*shared.ListOpenWorkflowExecutionsResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListOpenWorkflowExecutionsRequest) *shared.ListOpenWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -295,7 +372,6 @@ func (_m *Client) ListOpenWorkflow(ctx context.Context, request *shared.ListOpen } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListOpenWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -309,7 +385,15 @@ func (_m *Client) ListOpenWorkflow(ctx context.Context, request *shared.ListOpen func (_m *Client) ListWorkflow(ctx context.Context, request *shared.ListWorkflowExecutionsRequest) (*shared.ListWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) + if len(ret) == 0 { + panic("no return value specified for ListWorkflow") + } + var r0 *shared.ListWorkflowExecutionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *shared.ListWorkflowExecutionsRequest) (*shared.ListWorkflowExecutionsResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListWorkflowExecutionsRequest) *shared.ListWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -318,7 +402,6 @@ func (_m *Client) ListWorkflow(ctx context.Context, request *shared.ListWorkflow } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -335,7 +418,15 @@ func (_m *Client) QueryWorkflow(ctx context.Context, workflowID string, runID st _ca = append(_ca, args...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for QueryWorkflow") + } + var r0 internal.Value + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, ...interface{}) (internal.Value, error)); ok { + return rf(ctx, workflowID, runID, queryType, args...) + } if rf, ok := ret.Get(0).(func(context.Context, string, string, string, ...interface{}) internal.Value); ok { r0 = rf(ctx, workflowID, runID, queryType, args...) } else { @@ -344,7 +435,6 @@ func (_m *Client) QueryWorkflow(ctx context.Context, workflowID string, runID st } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string, string, ...interface{}) error); ok { r1 = rf(ctx, workflowID, runID, queryType, args...) } else { @@ -358,7 +448,15 @@ func (_m *Client) QueryWorkflow(ctx context.Context, workflowID string, runID st func (_m *Client) QueryWorkflowWithOptions(ctx context.Context, request *internal.QueryWorkflowWithOptionsRequest) (*internal.QueryWorkflowWithOptionsResponse, error) { ret := _m.Called(ctx, request) + if len(ret) == 0 { + panic("no return value specified for QueryWorkflowWithOptions") + } + var r0 *internal.QueryWorkflowWithOptionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *internal.QueryWorkflowWithOptionsRequest) (*internal.QueryWorkflowWithOptionsResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *internal.QueryWorkflowWithOptionsRequest) *internal.QueryWorkflowWithOptionsResponse); ok { r0 = rf(ctx, request) } else { @@ -367,7 +465,6 @@ func (_m *Client) QueryWorkflowWithOptions(ctx context.Context, request *interna } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *internal.QueryWorkflowWithOptionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -384,6 +481,10 @@ func (_m *Client) RecordActivityHeartbeat(ctx context.Context, taskToken []byte, _ca = append(_ca, details...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for RecordActivityHeartbeat") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []byte, ...interface{}) error); ok { r0 = rf(ctx, taskToken, details...) @@ -401,6 +502,10 @@ func (_m *Client) RecordActivityHeartbeatByID(ctx context.Context, domain string _ca = append(_ca, details...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for RecordActivityHeartbeatByID") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, ...interface{}) error); ok { r0 = rf(ctx, domain, workflowID, runID, activityID, details...) @@ -415,6 +520,10 @@ func (_m *Client) RecordActivityHeartbeatByID(ctx context.Context, domain string func (_m *Client) RefreshWorkflowTasks(ctx context.Context, workflowID string, runID string) error { ret := _m.Called(ctx, workflowID, runID) + if len(ret) == 0 { + panic("no return value specified for RefreshWorkflowTasks") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { r0 = rf(ctx, workflowID, runID) @@ -429,7 +538,15 @@ func (_m *Client) RefreshWorkflowTasks(ctx context.Context, workflowID string, r func (_m *Client) ResetWorkflow(ctx context.Context, request *shared.ResetWorkflowExecutionRequest) (*shared.ResetWorkflowExecutionResponse, error) { ret := _m.Called(ctx, request) + if len(ret) == 0 { + panic("no return value specified for ResetWorkflow") + } + var r0 *shared.ResetWorkflowExecutionResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *shared.ResetWorkflowExecutionRequest) (*shared.ResetWorkflowExecutionResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *shared.ResetWorkflowExecutionRequest) *shared.ResetWorkflowExecutionResponse); ok { r0 = rf(ctx, request) } else { @@ -438,7 +555,6 @@ func (_m *Client) ResetWorkflow(ctx context.Context, request *shared.ResetWorkfl } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ResetWorkflowExecutionRequest) error); ok { r1 = rf(ctx, request) } else { @@ -452,7 +568,15 @@ func (_m *Client) ResetWorkflow(ctx context.Context, request *shared.ResetWorkfl func (_m *Client) ScanWorkflow(ctx context.Context, request *shared.ListWorkflowExecutionsRequest) (*shared.ListWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) + if len(ret) == 0 { + panic("no return value specified for ScanWorkflow") + } + var r0 *shared.ListWorkflowExecutionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *shared.ListWorkflowExecutionsRequest) (*shared.ListWorkflowExecutionsResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListWorkflowExecutionsRequest) *shared.ListWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -461,7 +585,6 @@ func (_m *Client) ScanWorkflow(ctx context.Context, request *shared.ListWorkflow } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -478,7 +601,15 @@ func (_m *Client) SignalWithStartWorkflow(ctx context.Context, workflowID string _ca = append(_ca, workflowArgs...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for SignalWithStartWorkflow") + } + var r0 *internal.WorkflowExecution + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) (*internal.WorkflowExecution, error)); ok { + return rf(ctx, workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) + } if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecution); ok { r0 = rf(ctx, workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) } else { @@ -487,7 +618,6 @@ func (_m *Client) SignalWithStartWorkflow(ctx context.Context, workflowID string } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { r1 = rf(ctx, workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) } else { @@ -497,10 +627,47 @@ func (_m *Client) SignalWithStartWorkflow(ctx context.Context, workflowID string return r0, r1 } +// SignalWithStartWorkflowAsync provides a mock function with given fields: ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs +func (_m *Client) SignalWithStartWorkflowAsync(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options internal.StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*internal.WorkflowExecutionAsync, error) { + var _ca []interface{} + _ca = append(_ca, ctx, workflowID, signalName, signalArg, options, workflow) + _ca = append(_ca, workflowArgs...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for SignalWithStartWorkflowAsync") + } + + var r0 *internal.WorkflowExecutionAsync + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) (*internal.WorkflowExecutionAsync, error)); ok { + return rf(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecutionAsync); ok { + r0 = rf(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*internal.WorkflowExecutionAsync) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { + r1 = rf(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // SignalWorkflow provides a mock function with given fields: ctx, workflowID, runID, signalName, arg func (_m *Client) SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error { ret := _m.Called(ctx, workflowID, runID, signalName, arg) + if len(ret) == 0 { + panic("no return value specified for SignalWorkflow") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, interface{}) error); ok { r0 = rf(ctx, workflowID, runID, signalName, arg) @@ -518,7 +685,15 @@ func (_m *Client) StartWorkflow(ctx context.Context, options internal.StartWorkf _ca = append(_ca, args...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for StartWorkflow") + } + var r0 *internal.WorkflowExecution + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) (*internal.WorkflowExecution, error)); ok { + return rf(ctx, options, workflowFunc, args...) + } if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecution); ok { r0 = rf(ctx, options, workflowFunc, args...) } else { @@ -527,7 +702,6 @@ func (_m *Client) StartWorkflow(ctx context.Context, options internal.StartWorkf } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { r1 = rf(ctx, options, workflowFunc, args...) } else { @@ -537,10 +711,47 @@ func (_m *Client) StartWorkflow(ctx context.Context, options internal.StartWorkf return r0, r1 } +// StartWorkflowAsync provides a mock function with given fields: ctx, options, workflow, args +func (_m *Client) StartWorkflowAsync(ctx context.Context, options internal.StartWorkflowOptions, workflow interface{}, args ...interface{}) (*internal.WorkflowExecutionAsync, error) { + var _ca []interface{} + _ca = append(_ca, ctx, options, workflow) + _ca = append(_ca, args...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for StartWorkflowAsync") + } + + var r0 *internal.WorkflowExecutionAsync + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) (*internal.WorkflowExecutionAsync, error)); ok { + return rf(ctx, options, workflow, args...) + } + if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecutionAsync); ok { + r0 = rf(ctx, options, workflow, args...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*internal.WorkflowExecutionAsync) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { + r1 = rf(ctx, options, workflow, args...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // TerminateWorkflow provides a mock function with given fields: ctx, workflowID, runID, reason, details func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error { ret := _m.Called(ctx, workflowID, runID, reason, details) + if len(ret) == 0 { + panic("no return value specified for TerminateWorkflow") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, []byte) error); ok { r0 = rf(ctx, workflowID, runID, reason, details) @@ -551,13 +762,12 @@ func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runI return r0 } -type mockConstructorTestingTNewClient interface { +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewClient(t interface { mock.TestingT Cleanup(func()) -} - -// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewClient(t mockConstructorTestingTNewClient) *Client { +}) *Client { mock := &Client{} mock.Mock.Test(t) diff --git a/mocks/README.md b/mocks/README.md new file mode 100644 index 000000000..7d533dae5 --- /dev/null +++ b/mocks/README.md @@ -0,0 +1,23 @@ +# Mocks + +These mocks are generated using mockery. +Prereq: +``` +brew install mockery +``` + +Example command: +``` +mockery --dir=client \ + --name=Client \ + --filename=Client.go \ + --output=mocks \ + --outpkg=mocks +``` + +Flags: +— dir flag is directory of interfaces +— name flag is to generate mock for +— fileName flag is name of generated file +— output flag is directory to output mocks +— outpkg flag is generated file package name diff --git a/workflow/workflow.go b/workflow/workflow.go index b1df0af10..a6437edd4 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -39,6 +39,9 @@ type ( // Execution Details. Execution = internal.WorkflowExecution + // ExecutionAsync Details. + ExecutionAsync = internal.WorkflowExecutionAsync + // Version represents a change version. See GetVersion call. Version = internal.Version From e1be2008b8373e8c86291b415870b8bda1075fa6 Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Mon, 1 Apr 2024 08:57:50 -0700 Subject: [PATCH 2/5] write tests for getWorkflowStartRequest --- internal/internal_workflow_client_test.go | 236 ++++++++++++++++++++-- 1 file changed, 224 insertions(+), 12 deletions(-) diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 7f71e67ca..fab1fde7d 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -31,6 +31,7 @@ import ( "github.com/golang/mock/gomock" "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "go.uber.org/yarpc" @@ -1213,6 +1214,45 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithMemoAndSearchAttr() { s.NoError(err) } +func (s *workflowClientTestSuite) TestStartWorkflow_RequestCreationFails() { + client, ok := s.client.(*workflowClient) + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) string { + return "result" + } + + _, err := client.StartWorkflow(context.Background(), options, f1, []byte("test")) + s.Equal(getDefaultDataConverter(), client.dataConverter) + s.Error(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflow_Error() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + startResp := &shared.StartWorkflowExecutionResponse{} + + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, errors.New("failed")).AnyTimes() + + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := s.client.StartWorkflow(ctx, options, wf) + s.Error(err) +} + func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_WithMemoAndSearchAttr() { memo := map[string]interface{}{ "testMemo": "memo value", @@ -1287,10 +1327,10 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_WithMemoAndSe s.NoError(err) } -func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_Error() { +func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_RequestCreationFails() { options := StartWorkflowOptions{ ID: workflowID, - TaskList: tasklist, + TaskList: "", // this causes error ExecutionStartToCloseTimeout: timeoutInSeconds, DecisionTaskStartToCloseTimeout: timeoutInSeconds, } @@ -1298,17 +1338,26 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_Error() { return "result" } - s.service.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any(), - gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).AnyTimes() + _, err := s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf) + s.Error(err) +} - // Pass a context with a deadline so error retry doesn't take forever - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - _, err := s.client.SignalWithStartWorkflowAsync(ctx, "wid", "signal", "value", options, wf) +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_RequestCreationFails() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + + _, err := s.client.SignalWithStartWorkflowAsync(context.Background(), "wid", "signal", "value", options, wf) s.Error(err) } -func (s *workflowClientTestSuite) TestStartWorkflow_Error() { +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_Error() { options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1318,14 +1367,14 @@ func (s *workflowClientTestSuite) TestStartWorkflow_Error() { wf := func(ctx Context) string { return "result" } - startResp := &shared.StartWorkflowExecutionResponse{} - s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, errors.New("failed")).AnyTimes() + s.service.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).AnyTimes() // Pass a context with a deadline so error retry doesn't take forever ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - _, err := s.client.StartWorkflow(ctx, options, wf) + _, err := s.client.SignalWithStartWorkflowAsync(ctx, "wid", "signal", "value", options, wf) s.Error(err) } @@ -1417,6 +1466,22 @@ func (s *workflowClientTestSuite) TestStartWorkflowAsync_WithMemoAndSearchAttr() s.NoError(err) } +func (s *workflowClientTestSuite) TestStartWorkflowAsync_RequestCreationFails() { + client, ok := s.client.(*workflowClient) + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) string { + return "result" + } + _, err := client.StartWorkflowAsync(context.Background(), options, f1, []byte("test")) + s.Error(err) +} + func (s *workflowClientTestSuite) TestStartWorkflowAsync_Error() { options := StartWorkflowOptions{ ID: workflowID, @@ -1638,3 +1703,150 @@ func (m *PartialCancelRequestMatcher) Matches(a interface{}) bool { func (m *PartialCancelRequestMatcher) String() string { return "partial cancellation request matcher matches cause and wfId fields" } + +func TestGetWorkflowStartRequest(t *testing.T) { + tests := []struct { + name string + options StartWorkflowOptions + workflowFunc interface{} + args []interface{} + wantRequest *shared.StartWorkflowExecutionRequest + wantErr string + }{ + { + name: "success", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantRequest: &shared.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(domain), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: &shared.WorkflowType{ + Name: common.StringPtr("go.uber.org/cadence/internal.TestGetWorkflowStartRequest.func1"), + }, + TaskList: &shared.TaskList{ + Name: common.StringPtr(tasklist), + }, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(5), + DelayStartSeconds: common.Int32Ptr(0), + JitterStartSeconds: common.Int32Ptr(0), + CronSchedule: common.StringPtr(""), + Header: &shared.Header{Fields: map[string][]byte{}}, + WorkflowIdReusePolicy: shared.WorkflowIdReusePolicyAllowDuplicateFailedOnly.Ptr(), + }, + }, + { + name: "missing TaskList", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "missing TaskList", + }, + { + name: "invalid ExecutionStartToCloseTimeout", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 0 * time.Second, // this causes error + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "missing or invalid ExecutionStartToCloseTimeout", + }, + { + name: "negative DecisionTaskStartToCloseTimeout", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: -1 * time.Second, // this causes error + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "negative DecisionTaskStartToCloseTimeout provided", + }, + { + name: "negative DelayStart", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: -1 * time.Second, // this causes error + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "Invalid DelayStart option", + }, + { + name: "negative JitterStart", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: -1 * time.Second, // this causes error + }, + workflowFunc: func(ctx Context) {}, + wantErr: "Invalid JitterStart option", + }, + { + name: "invalid workflow func", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context, a, b int) {}, // this causes error because args not provided + wantErr: "expected 2 args for function", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + service := workflowservicetest.NewMockClient(mockCtrl) + wc, ok := NewClient(service, domain, &ClientOptions{ + Identity: "test-identity", + }).(*workflowClient) + + if !ok { + t.Fatalf("expected NewClient to return a *workflowClient, but got %T", wc) + } + + gotReq, err := wc.getWorkflowStartRequest(context.Background(), "", tc.options, tc.workflowFunc, tc.args...) + if tc.wantErr != "" { + assert.ErrorContains(t, err, tc.wantErr) + return + } + + assert.NoError(t, err) + + // set the fields that are not set in the expected request + tc.wantRequest.Identity = &wc.identity + tc.wantRequest.RequestId = gotReq.RequestId + + assert.Equal(t, tc.wantRequest, gotReq) + }) + } +} From 701e780df3bdbc14eb451f27f10e2040fd5974ee Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Mon, 1 Apr 2024 14:38:15 -0700 Subject: [PATCH 3/5] rebase --- mocks/Client.go | 202 ++++++------------------------------------------ mocks/README.md | 23 ------ 2 files changed, 22 insertions(+), 203 deletions(-) delete mode 100644 mocks/README.md diff --git a/mocks/Client.go b/mocks/Client.go index f05f803b9..d9e87872d 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -48,10 +48,6 @@ func (_m *Client) CancelWorkflow(ctx context.Context, workflowID string, runID s _ca = append(_ca, _va...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for CancelWorkflow") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, ...internal.Option) error); ok { r0 = rf(ctx, workflowID, runID, opts...) @@ -66,10 +62,6 @@ func (_m *Client) CancelWorkflow(ctx context.Context, workflowID string, runID s func (_m *Client) CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error { ret := _m.Called(ctx, taskToken, result, err) - if len(ret) == 0 { - panic("no return value specified for CompleteActivity") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context, []byte, interface{}, error) error); ok { r0 = rf(ctx, taskToken, result, err) @@ -84,10 +76,6 @@ func (_m *Client) CompleteActivity(ctx context.Context, taskToken []byte, result func (_m *Client) CompleteActivityByID(ctx context.Context, domain string, workflowID string, runID string, activityID string, result interface{}, err error) error { ret := _m.Called(ctx, domain, workflowID, runID, activityID, result, err) - if len(ret) == 0 { - panic("no return value specified for CompleteActivityByID") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, interface{}, error) error); ok { r0 = rf(ctx, domain, workflowID, runID, activityID, result, err) @@ -102,15 +90,7 @@ func (_m *Client) CompleteActivityByID(ctx context.Context, domain string, workf func (_m *Client) CountWorkflow(ctx context.Context, request *shared.CountWorkflowExecutionsRequest) (*shared.CountWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) - if len(ret) == 0 { - panic("no return value specified for CountWorkflow") - } - var r0 *shared.CountWorkflowExecutionsResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *shared.CountWorkflowExecutionsRequest) (*shared.CountWorkflowExecutionsResponse, error)); ok { - return rf(ctx, request) - } if rf, ok := ret.Get(0).(func(context.Context, *shared.CountWorkflowExecutionsRequest) *shared.CountWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -119,6 +99,7 @@ func (_m *Client) CountWorkflow(ctx context.Context, request *shared.CountWorkfl } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.CountWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -132,15 +113,7 @@ func (_m *Client) CountWorkflow(ctx context.Context, request *shared.CountWorkfl func (_m *Client) DescribeTaskList(ctx context.Context, tasklist string, tasklistType shared.TaskListType) (*shared.DescribeTaskListResponse, error) { ret := _m.Called(ctx, tasklist, tasklistType) - if len(ret) == 0 { - panic("no return value specified for DescribeTaskList") - } - var r0 *shared.DescribeTaskListResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, shared.TaskListType) (*shared.DescribeTaskListResponse, error)); ok { - return rf(ctx, tasklist, tasklistType) - } if rf, ok := ret.Get(0).(func(context.Context, string, shared.TaskListType) *shared.DescribeTaskListResponse); ok { r0 = rf(ctx, tasklist, tasklistType) } else { @@ -149,6 +122,7 @@ func (_m *Client) DescribeTaskList(ctx context.Context, tasklist string, tasklis } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, shared.TaskListType) error); ok { r1 = rf(ctx, tasklist, tasklistType) } else { @@ -162,15 +136,7 @@ func (_m *Client) DescribeTaskList(ctx context.Context, tasklist string, tasklis func (_m *Client) DescribeWorkflowExecution(ctx context.Context, workflowID string, runID string) (*shared.DescribeWorkflowExecutionResponse, error) { ret := _m.Called(ctx, workflowID, runID) - if len(ret) == 0 { - panic("no return value specified for DescribeWorkflowExecution") - } - var r0 *shared.DescribeWorkflowExecutionResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (*shared.DescribeWorkflowExecutionResponse, error)); ok { - return rf(ctx, workflowID, runID) - } if rf, ok := ret.Get(0).(func(context.Context, string, string) *shared.DescribeWorkflowExecutionResponse); ok { r0 = rf(ctx, workflowID, runID) } else { @@ -179,6 +145,7 @@ func (_m *Client) DescribeWorkflowExecution(ctx context.Context, workflowID stri } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { r1 = rf(ctx, workflowID, runID) } else { @@ -195,15 +162,7 @@ func (_m *Client) ExecuteWorkflow(ctx context.Context, options internal.StartWor _ca = append(_ca, args...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for ExecuteWorkflow") - } - var r0 internal.WorkflowRun - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) (internal.WorkflowRun, error)); ok { - return rf(ctx, options, workflow, args...) - } if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) internal.WorkflowRun); ok { r0 = rf(ctx, options, workflow, args...) } else { @@ -212,6 +171,7 @@ func (_m *Client) ExecuteWorkflow(ctx context.Context, options internal.StartWor } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { r1 = rf(ctx, options, workflow, args...) } else { @@ -225,15 +185,7 @@ func (_m *Client) ExecuteWorkflow(ctx context.Context, options internal.StartWor func (_m *Client) GetSearchAttributes(ctx context.Context) (*shared.GetSearchAttributesResponse, error) { ret := _m.Called(ctx) - if len(ret) == 0 { - panic("no return value specified for GetSearchAttributes") - } - var r0 *shared.GetSearchAttributesResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (*shared.GetSearchAttributesResponse, error)); ok { - return rf(ctx) - } if rf, ok := ret.Get(0).(func(context.Context) *shared.GetSearchAttributesResponse); ok { r0 = rf(ctx) } else { @@ -242,6 +194,7 @@ func (_m *Client) GetSearchAttributes(ctx context.Context) (*shared.GetSearchAtt } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -255,10 +208,6 @@ func (_m *Client) GetSearchAttributes(ctx context.Context) (*shared.GetSearchAtt func (_m *Client) GetWorkflow(ctx context.Context, workflowID string, runID string) internal.WorkflowRun { ret := _m.Called(ctx, workflowID, runID) - if len(ret) == 0 { - panic("no return value specified for GetWorkflow") - } - var r0 internal.WorkflowRun if rf, ok := ret.Get(0).(func(context.Context, string, string) internal.WorkflowRun); ok { r0 = rf(ctx, workflowID, runID) @@ -275,10 +224,6 @@ func (_m *Client) GetWorkflow(ctx context.Context, workflowID string, runID stri func (_m *Client) GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType shared.HistoryEventFilterType) internal.HistoryEventIterator { ret := _m.Called(ctx, workflowID, runID, isLongPoll, filterType) - if len(ret) == 0 { - panic("no return value specified for GetWorkflowHistory") - } - var r0 internal.HistoryEventIterator if rf, ok := ret.Get(0).(func(context.Context, string, string, bool, shared.HistoryEventFilterType) internal.HistoryEventIterator); ok { r0 = rf(ctx, workflowID, runID, isLongPoll, filterType) @@ -295,15 +240,7 @@ func (_m *Client) GetWorkflowHistory(ctx context.Context, workflowID string, run func (_m *Client) ListArchivedWorkflow(ctx context.Context, request *shared.ListArchivedWorkflowExecutionsRequest) (*shared.ListArchivedWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) - if len(ret) == 0 { - panic("no return value specified for ListArchivedWorkflow") - } - var r0 *shared.ListArchivedWorkflowExecutionsResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *shared.ListArchivedWorkflowExecutionsRequest) (*shared.ListArchivedWorkflowExecutionsResponse, error)); ok { - return rf(ctx, request) - } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListArchivedWorkflowExecutionsRequest) *shared.ListArchivedWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -312,6 +249,7 @@ func (_m *Client) ListArchivedWorkflow(ctx context.Context, request *shared.List } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListArchivedWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -325,15 +263,7 @@ func (_m *Client) ListArchivedWorkflow(ctx context.Context, request *shared.List func (_m *Client) ListClosedWorkflow(ctx context.Context, request *shared.ListClosedWorkflowExecutionsRequest) (*shared.ListClosedWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) - if len(ret) == 0 { - panic("no return value specified for ListClosedWorkflow") - } - var r0 *shared.ListClosedWorkflowExecutionsResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *shared.ListClosedWorkflowExecutionsRequest) (*shared.ListClosedWorkflowExecutionsResponse, error)); ok { - return rf(ctx, request) - } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListClosedWorkflowExecutionsRequest) *shared.ListClosedWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -342,6 +272,7 @@ func (_m *Client) ListClosedWorkflow(ctx context.Context, request *shared.ListCl } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListClosedWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -355,15 +286,7 @@ func (_m *Client) ListClosedWorkflow(ctx context.Context, request *shared.ListCl func (_m *Client) ListOpenWorkflow(ctx context.Context, request *shared.ListOpenWorkflowExecutionsRequest) (*shared.ListOpenWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) - if len(ret) == 0 { - panic("no return value specified for ListOpenWorkflow") - } - var r0 *shared.ListOpenWorkflowExecutionsResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *shared.ListOpenWorkflowExecutionsRequest) (*shared.ListOpenWorkflowExecutionsResponse, error)); ok { - return rf(ctx, request) - } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListOpenWorkflowExecutionsRequest) *shared.ListOpenWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -372,6 +295,7 @@ func (_m *Client) ListOpenWorkflow(ctx context.Context, request *shared.ListOpen } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListOpenWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -385,15 +309,7 @@ func (_m *Client) ListOpenWorkflow(ctx context.Context, request *shared.ListOpen func (_m *Client) ListWorkflow(ctx context.Context, request *shared.ListWorkflowExecutionsRequest) (*shared.ListWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) - if len(ret) == 0 { - panic("no return value specified for ListWorkflow") - } - var r0 *shared.ListWorkflowExecutionsResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *shared.ListWorkflowExecutionsRequest) (*shared.ListWorkflowExecutionsResponse, error)); ok { - return rf(ctx, request) - } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListWorkflowExecutionsRequest) *shared.ListWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -402,6 +318,7 @@ func (_m *Client) ListWorkflow(ctx context.Context, request *shared.ListWorkflow } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -418,15 +335,7 @@ func (_m *Client) QueryWorkflow(ctx context.Context, workflowID string, runID st _ca = append(_ca, args...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for QueryWorkflow") - } - var r0 internal.Value - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, string, ...interface{}) (internal.Value, error)); ok { - return rf(ctx, workflowID, runID, queryType, args...) - } if rf, ok := ret.Get(0).(func(context.Context, string, string, string, ...interface{}) internal.Value); ok { r0 = rf(ctx, workflowID, runID, queryType, args...) } else { @@ -435,6 +344,7 @@ func (_m *Client) QueryWorkflow(ctx context.Context, workflowID string, runID st } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string, string, ...interface{}) error); ok { r1 = rf(ctx, workflowID, runID, queryType, args...) } else { @@ -448,15 +358,7 @@ func (_m *Client) QueryWorkflow(ctx context.Context, workflowID string, runID st func (_m *Client) QueryWorkflowWithOptions(ctx context.Context, request *internal.QueryWorkflowWithOptionsRequest) (*internal.QueryWorkflowWithOptionsResponse, error) { ret := _m.Called(ctx, request) - if len(ret) == 0 { - panic("no return value specified for QueryWorkflowWithOptions") - } - var r0 *internal.QueryWorkflowWithOptionsResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *internal.QueryWorkflowWithOptionsRequest) (*internal.QueryWorkflowWithOptionsResponse, error)); ok { - return rf(ctx, request) - } if rf, ok := ret.Get(0).(func(context.Context, *internal.QueryWorkflowWithOptionsRequest) *internal.QueryWorkflowWithOptionsResponse); ok { r0 = rf(ctx, request) } else { @@ -465,6 +367,7 @@ func (_m *Client) QueryWorkflowWithOptions(ctx context.Context, request *interna } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, *internal.QueryWorkflowWithOptionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -481,10 +384,6 @@ func (_m *Client) RecordActivityHeartbeat(ctx context.Context, taskToken []byte, _ca = append(_ca, details...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for RecordActivityHeartbeat") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context, []byte, ...interface{}) error); ok { r0 = rf(ctx, taskToken, details...) @@ -502,10 +401,6 @@ func (_m *Client) RecordActivityHeartbeatByID(ctx context.Context, domain string _ca = append(_ca, details...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for RecordActivityHeartbeatByID") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, ...interface{}) error); ok { r0 = rf(ctx, domain, workflowID, runID, activityID, details...) @@ -520,10 +415,6 @@ func (_m *Client) RecordActivityHeartbeatByID(ctx context.Context, domain string func (_m *Client) RefreshWorkflowTasks(ctx context.Context, workflowID string, runID string) error { ret := _m.Called(ctx, workflowID, runID) - if len(ret) == 0 { - panic("no return value specified for RefreshWorkflowTasks") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { r0 = rf(ctx, workflowID, runID) @@ -538,15 +429,7 @@ func (_m *Client) RefreshWorkflowTasks(ctx context.Context, workflowID string, r func (_m *Client) ResetWorkflow(ctx context.Context, request *shared.ResetWorkflowExecutionRequest) (*shared.ResetWorkflowExecutionResponse, error) { ret := _m.Called(ctx, request) - if len(ret) == 0 { - panic("no return value specified for ResetWorkflow") - } - var r0 *shared.ResetWorkflowExecutionResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *shared.ResetWorkflowExecutionRequest) (*shared.ResetWorkflowExecutionResponse, error)); ok { - return rf(ctx, request) - } if rf, ok := ret.Get(0).(func(context.Context, *shared.ResetWorkflowExecutionRequest) *shared.ResetWorkflowExecutionResponse); ok { r0 = rf(ctx, request) } else { @@ -555,6 +438,7 @@ func (_m *Client) ResetWorkflow(ctx context.Context, request *shared.ResetWorkfl } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ResetWorkflowExecutionRequest) error); ok { r1 = rf(ctx, request) } else { @@ -568,15 +452,7 @@ func (_m *Client) ResetWorkflow(ctx context.Context, request *shared.ResetWorkfl func (_m *Client) ScanWorkflow(ctx context.Context, request *shared.ListWorkflowExecutionsRequest) (*shared.ListWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) - if len(ret) == 0 { - panic("no return value specified for ScanWorkflow") - } - var r0 *shared.ListWorkflowExecutionsResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *shared.ListWorkflowExecutionsRequest) (*shared.ListWorkflowExecutionsResponse, error)); ok { - return rf(ctx, request) - } if rf, ok := ret.Get(0).(func(context.Context, *shared.ListWorkflowExecutionsRequest) *shared.ListWorkflowExecutionsResponse); ok { r0 = rf(ctx, request) } else { @@ -585,6 +461,7 @@ func (_m *Client) ScanWorkflow(ctx context.Context, request *shared.ListWorkflow } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, *shared.ListWorkflowExecutionsRequest) error); ok { r1 = rf(ctx, request) } else { @@ -601,15 +478,7 @@ func (_m *Client) SignalWithStartWorkflow(ctx context.Context, workflowID string _ca = append(_ca, workflowArgs...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for SignalWithStartWorkflow") - } - var r0 *internal.WorkflowExecution - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) (*internal.WorkflowExecution, error)); ok { - return rf(ctx, workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) - } if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecution); ok { r0 = rf(ctx, workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) } else { @@ -618,6 +487,7 @@ func (_m *Client) SignalWithStartWorkflow(ctx context.Context, workflowID string } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { r1 = rf(ctx, workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) } else { @@ -634,15 +504,7 @@ func (_m *Client) SignalWithStartWorkflowAsync(ctx context.Context, workflowID s _ca = append(_ca, workflowArgs...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for SignalWithStartWorkflowAsync") - } - var r0 *internal.WorkflowExecutionAsync - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) (*internal.WorkflowExecutionAsync, error)); ok { - return rf(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) - } if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecutionAsync); ok { r0 = rf(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) } else { @@ -651,6 +513,7 @@ func (_m *Client) SignalWithStartWorkflowAsync(ctx context.Context, workflowID s } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { r1 = rf(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) } else { @@ -664,10 +527,6 @@ func (_m *Client) SignalWithStartWorkflowAsync(ctx context.Context, workflowID s func (_m *Client) SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error { ret := _m.Called(ctx, workflowID, runID, signalName, arg) - if len(ret) == 0 { - panic("no return value specified for SignalWorkflow") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, interface{}) error); ok { r0 = rf(ctx, workflowID, runID, signalName, arg) @@ -685,15 +544,7 @@ func (_m *Client) StartWorkflow(ctx context.Context, options internal.StartWorkf _ca = append(_ca, args...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for StartWorkflow") - } - var r0 *internal.WorkflowExecution - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) (*internal.WorkflowExecution, error)); ok { - return rf(ctx, options, workflowFunc, args...) - } if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecution); ok { r0 = rf(ctx, options, workflowFunc, args...) } else { @@ -702,6 +553,7 @@ func (_m *Client) StartWorkflow(ctx context.Context, options internal.StartWorkf } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { r1 = rf(ctx, options, workflowFunc, args...) } else { @@ -718,15 +570,7 @@ func (_m *Client) StartWorkflowAsync(ctx context.Context, options internal.Start _ca = append(_ca, args...) ret := _m.Called(_ca...) - if len(ret) == 0 { - panic("no return value specified for StartWorkflowAsync") - } - var r0 *internal.WorkflowExecutionAsync - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) (*internal.WorkflowExecutionAsync, error)); ok { - return rf(ctx, options, workflow, args...) - } if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecutionAsync); ok { r0 = rf(ctx, options, workflow, args...) } else { @@ -735,6 +579,7 @@ func (_m *Client) StartWorkflowAsync(ctx context.Context, options internal.Start } } + var r1 error if rf, ok := ret.Get(1).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { r1 = rf(ctx, options, workflow, args...) } else { @@ -748,10 +593,6 @@ func (_m *Client) StartWorkflowAsync(ctx context.Context, options internal.Start func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error { ret := _m.Called(ctx, workflowID, runID, reason, details) - if len(ret) == 0 { - panic("no return value specified for TerminateWorkflow") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, []byte) error); ok { r0 = rf(ctx, workflowID, runID, reason, details) @@ -762,12 +603,13 @@ func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runI return r0 } -// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewClient(t interface { +type mockConstructorTestingTNewClient interface { mock.TestingT Cleanup(func()) -}) *Client { +} + +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewClient(t mockConstructorTestingTNewClient) *Client { mock := &Client{} mock.Mock.Test(t) diff --git a/mocks/README.md b/mocks/README.md deleted file mode 100644 index 7d533dae5..000000000 --- a/mocks/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# Mocks - -These mocks are generated using mockery. -Prereq: -``` -brew install mockery -``` - -Example command: -``` -mockery --dir=client \ - --name=Client \ - --filename=Client.go \ - --output=mocks \ - --outpkg=mocks -``` - -Flags: -— dir flag is directory of interfaces -— name flag is to generate mock for -— fileName flag is name of generated file -— output flag is directory to output mocks -— outpkg flag is generated file package name From e4758f3ad8444fe419a83eef7f3822b779b995a4 Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Mon, 1 Apr 2024 19:02:00 -0700 Subject: [PATCH 4/5] address PR comments, switch to async counters --- internal/common/metrics/constants.go | 28 +++++++++++++++------------- internal/internal_workflow_client.go | 14 +++++++------- internal/workflow.go | 2 +- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index 136e674a5..33081e059 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -22,19 +22,21 @@ package metrics // Workflow Creation metrics const ( - CadenceMetricsPrefix = "cadence-" - WorkflowStartCounter = CadenceMetricsPrefix + "workflow-start" - WorkflowCompletedCounter = CadenceMetricsPrefix + "workflow-completed" - WorkflowCanceledCounter = CadenceMetricsPrefix + "workflow-canceled" - WorkflowFailedCounter = CadenceMetricsPrefix + "workflow-failed" - WorkflowContinueAsNewCounter = CadenceMetricsPrefix + "workflow-continue-as-new" - WorkflowEndToEndLatency = CadenceMetricsPrefix + "workflow-endtoend-latency" // measure workflow execution from start to close - WorkflowGetHistoryCounter = CadenceMetricsPrefix + "workflow-get-history-total" - WorkflowGetHistoryFailedCounter = CadenceMetricsPrefix + "workflow-get-history-failed" - WorkflowGetHistorySucceedCounter = CadenceMetricsPrefix + "workflow-get-history-succeed" - WorkflowGetHistoryLatency = CadenceMetricsPrefix + "workflow-get-history-latency" - WorkflowSignalWithStartCounter = CadenceMetricsPrefix + "workflow-signal-with-start" - DecisionTimeoutCounter = CadenceMetricsPrefix + "decision-timeout" + CadenceMetricsPrefix = "cadence-" + WorkflowStartCounter = CadenceMetricsPrefix + "workflow-start" + WorkflowStartAsyncCounter = CadenceMetricsPrefix + "workflow-start-async" + WorkflowCompletedCounter = CadenceMetricsPrefix + "workflow-completed" + WorkflowCanceledCounter = CadenceMetricsPrefix + "workflow-canceled" + WorkflowFailedCounter = CadenceMetricsPrefix + "workflow-failed" + WorkflowContinueAsNewCounter = CadenceMetricsPrefix + "workflow-continue-as-new" + WorkflowEndToEndLatency = CadenceMetricsPrefix + "workflow-endtoend-latency" // measure workflow execution from start to close + WorkflowGetHistoryCounter = CadenceMetricsPrefix + "workflow-get-history-total" + WorkflowGetHistoryFailedCounter = CadenceMetricsPrefix + "workflow-get-history-failed" + WorkflowGetHistorySucceedCounter = CadenceMetricsPrefix + "workflow-get-history-succeed" + WorkflowGetHistoryLatency = CadenceMetricsPrefix + "workflow-get-history-latency" + WorkflowSignalWithStartCounter = CadenceMetricsPrefix + "workflow-signal-with-start" + WorkflowSignalWithStartAsyncCounter = CadenceMetricsPrefix + "workflow-signal-with-start-async" + DecisionTimeoutCounter = CadenceMetricsPrefix + "decision-timeout" DecisionPollCounter = CadenceMetricsPrefix + "decision-poll-total" DecisionPollFailedCounter = CadenceMetricsPrefix + "decision-poll-failed" diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 012744f44..94078469f 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -178,7 +178,7 @@ func (wc *workflowClient) StartWorkflow( } if wc.metricsScope != nil { - scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, startRequest.WorkflowType.GetName()) + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *startRequest.WorkflowType.Name) scope.Counter(metrics.WorkflowStartCounter).Inc(1) } @@ -189,7 +189,7 @@ func (wc *workflowClient) StartWorkflow( return executionInfo, nil } -// StartWorkflowAsync queues a workflow execution which is going to be picked up and started by Cadence backend asynchronously. +// StartWorkflowAsync behaves like StartWorkflow except that the request is queued and processed by Cadence backend asynchronously. // See StartWorkflow for details about inputs and usage. func (wc *workflowClient) StartWorkflowAsync( ctx context.Context, @@ -222,8 +222,8 @@ func (wc *workflowClient) StartWorkflowAsync( } if wc.metricsScope != nil { - scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, startRequest.WorkflowType.GetName()) - scope.Counter(metrics.WorkflowStartCounter).Inc(1) + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *startRequest.WorkflowType.Name) + scope.Counter(metrics.WorkflowStartAsyncCounter).Inc(1) } executionInfo := &WorkflowExecutionAsync{ @@ -342,7 +342,7 @@ func (wc *workflowClient) SignalWithStartWorkflow( } if wc.metricsScope != nil { - scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, signalWithStartRequest.WorkflowType.GetName()) + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *signalWithStartRequest.WorkflowType.Name) scope.Counter(metrics.WorkflowSignalWithStartCounter).Inc(1) } @@ -388,8 +388,8 @@ func (wc *workflowClient) SignalWithStartWorkflowAsync( } if wc.metricsScope != nil { - scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, signalWithStartRequest.WorkflowType.GetName()) - scope.Counter(metrics.WorkflowSignalWithStartCounter).Inc(1) + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *signalWithStartRequest.WorkflowType.Name) + scope.Counter(metrics.WorkflowSignalWithStartAsyncCounter).Inc(1) } executionInfo := &WorkflowExecutionAsync{ diff --git a/internal/workflow.go b/internal/workflow.go index a7a1a3bab..e467fa663 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -400,7 +400,7 @@ type ( RunID string } - // WorkflowExecutuonAsync Details. + // WorkflowExecutionAsync Details. WorkflowExecutionAsync struct { ID string } From 2a5e110f058f36b345c528caae1c2c6a537043ce Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Mon, 1 Apr 2024 19:32:21 -0700 Subject: [PATCH 5/5] test improvements --- internal/internal_workflow_client_test.go | 49 ++++++++++------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index fab1fde7d..18e2cbabc 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -262,7 +262,7 @@ func (s *historyEventIteratorSuite) TestIterator_NoError_EmptyPage() { s.Equal(2, len(events)) } -func (s *historyEventIteratorSuite) TestIterator_Error() { +func (s *historyEventIteratorSuite) TestIterator_RPCError() { filterType := shared.HistoryEventFilterTypeAllEvent request1 := getGetWorkflowExecutionHistoryRequest(filterType) response1 := &shared.GetWorkflowExecutionHistoryResponse{ @@ -1059,7 +1059,7 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow() { s.Equal(createResponse.GetRunId(), resp.RunID) } -func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_Error() { +func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_RPCError() { signalName := "my signal" signalInput := []byte("my signal input") options := StartWorkflowOptions{} @@ -1091,8 +1091,7 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_Error() { } func (s *workflowClientTestSuite) TestStartWorkflow() { - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1116,8 +1115,7 @@ func (s *workflowClientTestSuite) TestStartWorkflow() { func (s *workflowClientTestSuite) TestStartWorkflow_WithContext() { s.client = NewClient(s.service, domain, &ClientOptions{ContextPropagators: []ContextPropagator{NewStringMapPropagator([]string{testHeader})}}) - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1146,8 +1144,7 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithContext() { func (s *workflowClientTestSuite) TestStartWorkflow_WithDataConverter() { dc := newTestDataConverter() s.client = NewClient(s.service, domain, &ClientOptions{DataConverter: dc}) - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1215,8 +1212,7 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithMemoAndSearchAttr() { } func (s *workflowClientTestSuite) TestStartWorkflow_RequestCreationFails() { - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: "", // this causes error @@ -1228,11 +1224,10 @@ func (s *workflowClientTestSuite) TestStartWorkflow_RequestCreationFails() { } _, err := client.StartWorkflow(context.Background(), options, f1, []byte("test")) - s.Equal(getDefaultDataConverter(), client.dataConverter) - s.Error(err) + s.ErrorContains(err, "missing TaskList") } -func (s *workflowClientTestSuite) TestStartWorkflow_Error() { +func (s *workflowClientTestSuite) TestStartWorkflow_RPCError() { options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1244,7 +1239,7 @@ func (s *workflowClientTestSuite) TestStartWorkflow_Error() { } startResp := &shared.StartWorkflowExecutionResponse{} - s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, errors.New("failed")).AnyTimes() + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, errors.New("failed")).MinTimes(1) // Pass a context with a deadline so error retry doesn't take forever ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -1339,7 +1334,7 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_RequestCreationFai } _, err := s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf) - s.Error(err) + s.ErrorContains(err, "missing TaskList") } func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_RequestCreationFails() { @@ -1354,10 +1349,10 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_RequestCreati } _, err := s.client.SignalWithStartWorkflowAsync(context.Background(), "wid", "signal", "value", options, wf) - s.Error(err) + s.ErrorContains(err, "missing TaskList") } -func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_Error() { +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_RPCError() { options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1369,7 +1364,7 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_Error() { } s.service.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any(), - gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).AnyTimes() + gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).MinTimes(1) // Pass a context with a deadline so error retry doesn't take forever ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -1379,8 +1374,7 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_Error() { } func (s *workflowClientTestSuite) TestStartWorkflowAsync() { - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1401,8 +1395,7 @@ func (s *workflowClientTestSuite) TestStartWorkflowAsync() { func (s *workflowClientTestSuite) TestStartWorkflowAsync_WithDataConverter() { dc := newTestDataConverter() s.client = NewClient(s.service, domain, &ClientOptions{DataConverter: dc}) - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1467,8 +1460,7 @@ func (s *workflowClientTestSuite) TestStartWorkflowAsync_WithMemoAndSearchAttr() } func (s *workflowClientTestSuite) TestStartWorkflowAsync_RequestCreationFails() { - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: "", // this causes error @@ -1479,10 +1471,10 @@ func (s *workflowClientTestSuite) TestStartWorkflowAsync_RequestCreationFails() return "result" } _, err := client.StartWorkflowAsync(context.Background(), options, f1, []byte("test")) - s.Error(err) + s.ErrorContains(err, "missing TaskList") } -func (s *workflowClientTestSuite) TestStartWorkflowAsync_Error() { +func (s *workflowClientTestSuite) TestStartWorkflowAsync_RPCError() { options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1493,7 +1485,7 @@ func (s *workflowClientTestSuite) TestStartWorkflowAsync_Error() { return "result" } - s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).AnyTimes() + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).MinTimes(1) // Pass a context with a deadline so error retry doesn't take forever ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -1818,6 +1810,7 @@ func TestGetWorkflowStartRequest(t *testing.T) { JitterStart: 0 * time.Second, }, workflowFunc: func(ctx Context, a, b int) {}, // this causes error because args not provided + args: []interface{}{}, wantErr: "expected 2 args for function", }, } @@ -1842,7 +1835,7 @@ func TestGetWorkflowStartRequest(t *testing.T) { assert.NoError(t, err) - // set the fields that are not set in the expected request + // set the randomized fields in the expected request before comparison tc.wantRequest.Identity = &wc.identity tc.wantRequest.RequestId = gotReq.RequestId