diff --git a/client/client.go b/client/client.go index 6265852b5..f67ff0f88 100644 --- a/client/client.go +++ b/client/client.go @@ -104,6 +104,15 @@ type ( // - InternalServiceError StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflowFunc interface{}, args ...interface{}) (*workflow.Execution, error) + // StartWorkflowAsync behaves like StartWorkflow except that the request is first queued and then processed asynchronously. + // See StartWorkflow for parameter details. + // The returned AsyncWorkflowExecution doesn't contain run ID, because the workflow hasn't started yet. + // The errors it can return: + // - EntityNotExistsError, if domain does not exists + // - BadRequestError + // - InternalServiceError + StartWorkflowAsync(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*workflow.ExecutionAsync, error) + // ExecuteWorkflow starts a workflow execution and return a WorkflowRun instance and error // The user can use this to start using a function or workflow type name. // Either by @@ -169,6 +178,15 @@ type ( SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflowFunc interface{}, workflowArgs ...interface{}) (*workflow.Execution, error) + // SignalWithStartWorkflowAsync behaves like SignalWithStartWorkflow except that the request is first queued and then processed asynchronously. + // See SignalWithStartWorkflow for parameter details. + // The errors it can return: + // - EntityNotExistsError, if domain does not exist + // - BadRequestError + // - InternalServiceError + SignalWithStartWorkflowAsync(ctx context.Context, workflowID string, signalName string, signalArg interface{}, + options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*workflow.ExecutionAsync, error) + // CancelWorkflow cancels a workflow in execution // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. diff --git a/internal/client.go b/internal/client.go index 0c05e3ad6..3b7b293b5 100644 --- a/internal/client.go +++ b/internal/client.go @@ -93,6 +93,15 @@ type ( // subjected to change in the future. StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecution, error) + // StartWorkflowAsync behaves like StartWorkflow except that the request is first queued and then processed asynchronously. + // See StartWorkflow for parameter details. + // The returned AsyncWorkflowExecution doesn't contain run ID, because the workflow hasn't started yet. + // The errors it can return: + // - EntityNotExistsError, if domain does not exists + // - BadRequestError + // - InternalServiceError + StartWorkflowAsync(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecutionAsync, error) + // ExecuteWorkflow starts a workflow execution and return a WorkflowRun instance and error // The user can use this to start using a function or workflow type name. // Either by @@ -160,6 +169,15 @@ type ( SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*WorkflowExecution, error) + // SignalWithStartWorkflowAsync behaves like SignalWithStartWorkflow except that the request is first queued and then processed asynchronously. + // See SignalWithStartWorkflow for parameter details. + // The errors it can return: + // - EntityNotExistsError, if domain does not exist + // - BadRequestError + // - InternalServiceError + SignalWithStartWorkflowAsync(ctx context.Context, workflowID string, signalName string, signalArg interface{}, + options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*WorkflowExecutionAsync, error) + // CancelWorkflow cancels a workflow in execution // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index 136e674a5..33081e059 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -22,19 +22,21 @@ package metrics // Workflow Creation metrics const ( - CadenceMetricsPrefix = "cadence-" - WorkflowStartCounter = CadenceMetricsPrefix + "workflow-start" - WorkflowCompletedCounter = CadenceMetricsPrefix + "workflow-completed" - WorkflowCanceledCounter = CadenceMetricsPrefix + "workflow-canceled" - WorkflowFailedCounter = CadenceMetricsPrefix + "workflow-failed" - WorkflowContinueAsNewCounter = CadenceMetricsPrefix + "workflow-continue-as-new" - WorkflowEndToEndLatency = CadenceMetricsPrefix + "workflow-endtoend-latency" // measure workflow execution from start to close - WorkflowGetHistoryCounter = CadenceMetricsPrefix + "workflow-get-history-total" - WorkflowGetHistoryFailedCounter = CadenceMetricsPrefix + "workflow-get-history-failed" - WorkflowGetHistorySucceedCounter = CadenceMetricsPrefix + "workflow-get-history-succeed" - WorkflowGetHistoryLatency = CadenceMetricsPrefix + "workflow-get-history-latency" - WorkflowSignalWithStartCounter = CadenceMetricsPrefix + "workflow-signal-with-start" - DecisionTimeoutCounter = CadenceMetricsPrefix + "decision-timeout" + CadenceMetricsPrefix = "cadence-" + WorkflowStartCounter = CadenceMetricsPrefix + "workflow-start" + WorkflowStartAsyncCounter = CadenceMetricsPrefix + "workflow-start-async" + WorkflowCompletedCounter = CadenceMetricsPrefix + "workflow-completed" + WorkflowCanceledCounter = CadenceMetricsPrefix + "workflow-canceled" + WorkflowFailedCounter = CadenceMetricsPrefix + "workflow-failed" + WorkflowContinueAsNewCounter = CadenceMetricsPrefix + "workflow-continue-as-new" + WorkflowEndToEndLatency = CadenceMetricsPrefix + "workflow-endtoend-latency" // measure workflow execution from start to close + WorkflowGetHistoryCounter = CadenceMetricsPrefix + "workflow-get-history-total" + WorkflowGetHistoryFailedCounter = CadenceMetricsPrefix + "workflow-get-history-failed" + WorkflowGetHistorySucceedCounter = CadenceMetricsPrefix + "workflow-get-history-succeed" + WorkflowGetHistoryLatency = CadenceMetricsPrefix + "workflow-get-history-latency" + WorkflowSignalWithStartCounter = CadenceMetricsPrefix + "workflow-signal-with-start" + WorkflowSignalWithStartAsyncCounter = CadenceMetricsPrefix + "workflow-signal-with-start-async" + DecisionTimeoutCounter = CadenceMetricsPrefix + "decision-timeout" DecisionPollCounter = CadenceMetricsPrefix + "decision-poll-total" DecisionPollFailedCounter = CadenceMetricsPrefix + "decision-poll-failed" diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 80d2ecc45..94078469f 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -155,90 +155,57 @@ func (wc *workflowClient) StartWorkflow( workflowFunc interface{}, args ...interface{}, ) (*WorkflowExecution, error) { - workflowID := options.ID - if len(workflowID) == 0 { - workflowID = uuid.NewRandom().String() - } - - if options.TaskList == "" { - return nil, errors.New("missing TaskList") + startRequest, err := wc.getWorkflowStartRequest(ctx, "StartWorkflow", options, workflowFunc, args...) + if err != nil { + return nil, err } - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - if executionTimeout <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } + var response *s.StartWorkflowExecutionResponse - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) - if decisionTaskTimeout < 0 { - return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") - } - if decisionTaskTimeout == 0 { - decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs - } + // Start creating workflow request. + err = backoff.Retry(ctx, + func() error { + tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags) + defer cancel() - // Validate type and its arguments. - workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, args, wc.dataConverter, wc.registry) - if err != nil { - return nil, err - } + var err1 error + response, err1 = wc.workflowService.StartWorkflowExecution(tchCtx, startRequest, opt...) + return err1 + }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) - memo, err := getWorkflowMemo(options.Memo, wc.dataConverter) if err != nil { return nil, err } - searchAttr, err := serializeSearchAttributes(options.SearchAttributes) - if err != nil { - return nil, err + if wc.metricsScope != nil { + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *startRequest.WorkflowType.Name) + scope.Counter(metrics.WorkflowStartCounter).Inc(1) } - delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds()) - if delayStartSeconds < 0 { - return nil, errors.New("Invalid DelayStart option") + executionInfo := &WorkflowExecution{ + ID: *startRequest.WorkflowId, + RunID: response.GetRunId(), } + return executionInfo, nil +} - jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds()) - if jitterStartSeconds < 0 { - return nil, errors.New("Invalid JitterStart option") +// StartWorkflowAsync behaves like StartWorkflow except that the request is queued and processed by Cadence backend asynchronously. +// See StartWorkflow for details about inputs and usage. +func (wc *workflowClient) StartWorkflowAsync( + ctx context.Context, + options StartWorkflowOptions, + workflowFunc interface{}, + args ...interface{}, +) (*WorkflowExecutionAsync, error) { + startRequest, err := wc.getWorkflowStartRequest(ctx, "StartWorkflowAsync", options, workflowFunc, args...) + if err != nil { + return nil, err } - // create a workflow start span and attach it to the context object. - // N.B. we need to finish this immediately as jaeger does not give us a way - // to recreate a span given a span context - which means we will run into - // issues during replay. we work around this by creating and ending the - // workflow start span and passing in that context to the workflow. So - // everything beginning with the StartWorkflowExecutionRequest will be - // parented by the created start workflow span. - ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("StartWorkflow-%s", workflowType.Name), workflowID) - span.Finish() - - // get workflow headers from the context - header := wc.getWorkflowHeader(ctx) - - // run propagators to extract information about tracing and other stuff, store in headers field - startRequest := &s.StartWorkflowExecutionRequest{ - Domain: common.StringPtr(wc.domain), - RequestId: common.StringPtr(uuid.New()), - WorkflowId: common.StringPtr(workflowID), - WorkflowType: workflowTypePtr(*workflowType), - TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}), - Input: input, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTaskTimeout), - Identity: common.StringPtr(wc.identity), - WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(), - RetryPolicy: convertRetryPolicy(options.RetryPolicy), - CronSchedule: common.StringPtr(options.CronSchedule), - Memo: memo, - SearchAttributes: searchAttr, - Header: header, - DelayStartSeconds: common.Int32Ptr(delayStartSeconds), - JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), + asyncStartRequest := &s.StartWorkflowExecutionAsyncRequest{ + Request: startRequest, } - var response *s.StartWorkflowExecutionResponse - // Start creating workflow request. err = backoff.Retry(ctx, func() error { @@ -246,7 +213,7 @@ func (wc *workflowClient) StartWorkflow( defer cancel() var err1 error - response, err1 = wc.workflowService.StartWorkflowExecution(tchCtx, startRequest, opt...) + _, err1 = wc.workflowService.StartWorkflowExecutionAsync(tchCtx, asyncStartRequest, opt...) return err1 }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) @@ -255,13 +222,13 @@ func (wc *workflowClient) StartWorkflow( } if wc.metricsScope != nil { - scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, workflowType.Name) - scope.Counter(metrics.WorkflowStartCounter).Inc(1) + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *startRequest.WorkflowType.Name) + scope.Counter(metrics.WorkflowStartAsyncCounter).Inc(1) } - executionInfo := &WorkflowExecution{ - ID: workflowID, - RunID: response.GetRunId()} + executionInfo := &WorkflowExecutionAsync{ + ID: *startRequest.WorkflowId, + } return executionInfo, nil } @@ -343,100 +310,76 @@ func (wc *workflowClient) SignalWorkflow(ctx context.Context, workflowID string, // SignalWithStartWorkflow sends a signal to a running workflow. // If the workflow is not running or not found, it starts the workflow and then sends the signal in transaction. -func (wc *workflowClient) SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, - options StartWorkflowOptions, workflowFunc interface{}, workflowArgs ...interface{}) (*WorkflowExecution, error) { +func (wc *workflowClient) SignalWithStartWorkflow( + ctx context.Context, + workflowID, signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowFunc interface{}, + workflowArgs ...interface{}, +) (*WorkflowExecution, error) { - signalInput, err := encodeArg(wc.dataConverter, signalArg) + signalWithStartRequest, err := wc.getSignalWithStartRequest(ctx, "SignalWithStartWorkflow", workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) if err != nil { return nil, err } - if workflowID == "" { - workflowID = uuid.NewRandom().String() - } - - if options.TaskList == "" { - return nil, errors.New("missing TaskList") - } + var response *s.StartWorkflowExecutionResponse - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - if executionTimeout <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } + // Start creating workflow request. + err = backoff.Retry(ctx, + func() error { + tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags) + defer cancel() - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) - if decisionTaskTimeout < 0 { - return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") - } - if decisionTaskTimeout == 0 { - decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs - } + var err1 error + response, err1 = wc.workflowService.SignalWithStartWorkflowExecution(tchCtx, signalWithStartRequest, opt...) + return err1 + }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) - // Validate type and its arguments. - workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, workflowArgs, wc.dataConverter, wc.registry) if err != nil { return nil, err } - memo, err := getWorkflowMemo(options.Memo, wc.dataConverter) - if err != nil { - return nil, err + if wc.metricsScope != nil { + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *signalWithStartRequest.WorkflowType.Name) + scope.Counter(metrics.WorkflowSignalWithStartCounter).Inc(1) } - searchAttr, err := serializeSearchAttributes(options.SearchAttributes) - if err != nil { - return nil, err + executionInfo := &WorkflowExecution{ + ID: options.ID, + RunID: response.GetRunId(), } + return executionInfo, nil +} - delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds()) - if delayStartSeconds < 0 { - return nil, errors.New("Invalid DelayStart option") - } +// SignalWithStartWorkflowAsync behaves like SignalWithStartWorkflow except that the request is queued and processed by Cadence backend asynchronously. +// See SignalWithStartWorkflow for details about inputs and usage. +func (wc *workflowClient) SignalWithStartWorkflowAsync( + ctx context.Context, + workflowID, signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowFunc interface{}, + workflowArgs ...interface{}, +) (*WorkflowExecutionAsync, error) { - jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds()) - if jitterStartSeconds < 0 { - return nil, errors.New("Invalid JitterStart option") + signalWithStartRequest, err := wc.getSignalWithStartRequest(ctx, "SignalWithStartWorkflow", workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...) + if err != nil { + return nil, err } - // create a workflow start span and attach it to the context object. finish it immediately - ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("SignalWithStartWorkflow-%s", workflowType.Name), workflowID) - span.Finish() - - // get workflow headers from the context - header := wc.getWorkflowHeader(ctx) - - signalWithStartRequest := &s.SignalWithStartWorkflowExecutionRequest{ - Domain: common.StringPtr(wc.domain), - RequestId: common.StringPtr(uuid.New()), - WorkflowId: common.StringPtr(workflowID), - WorkflowType: workflowTypePtr(*workflowType), - TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}), - Input: input, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTaskTimeout), - SignalName: common.StringPtr(signalName), - SignalInput: signalInput, - Identity: common.StringPtr(wc.identity), - RetryPolicy: convertRetryPolicy(options.RetryPolicy), - CronSchedule: common.StringPtr(options.CronSchedule), - Memo: memo, - SearchAttributes: searchAttr, - WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(), - Header: header, - DelayStartSeconds: common.Int32Ptr(delayStartSeconds), - JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), + asyncSignalWithStartRequest := &s.SignalWithStartWorkflowExecutionAsyncRequest{ + Request: signalWithStartRequest, } - var response *s.StartWorkflowExecutionResponse - - // Start creating workflow request. err = backoff.Retry(ctx, func() error { tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags) defer cancel() var err1 error - response, err1 = wc.workflowService.SignalWithStartWorkflowExecution(tchCtx, signalWithStartRequest, opt...) + _, err1 = wc.workflowService.SignalWithStartWorkflowExecutionAsync(tchCtx, asyncSignalWithStartRequest, opt...) return err1 }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) @@ -445,13 +388,13 @@ func (wc *workflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI } if wc.metricsScope != nil { - scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, workflowType.Name) - scope.Counter(metrics.WorkflowSignalWithStartCounter).Inc(1) + scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *signalWithStartRequest.WorkflowType.Name) + scope.Counter(metrics.WorkflowSignalWithStartAsyncCounter).Inc(1) } - executionInfo := &WorkflowExecution{ - ID: options.ID, - RunID: response.GetRunId()} + executionInfo := &WorkflowExecutionAsync{ + ID: options.ID, + } return executionInfo, nil } @@ -1060,6 +1003,191 @@ func (wc *workflowClient) getWorkflowHeader(ctx context.Context) *s.Header { return header } +func (wc *workflowClient) getWorkflowStartRequest( + ctx context.Context, + tracePrefix string, + options StartWorkflowOptions, + workflowFunc interface{}, + args ...interface{}, +) (*s.StartWorkflowExecutionRequest, error) { + workflowID := options.ID + if len(workflowID) == 0 { + workflowID = uuid.NewRandom().String() + } + + if options.TaskList == "" { + return nil, errors.New("missing TaskList") + } + + executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) + if executionTimeout <= 0 { + return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") + } + + decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + if decisionTaskTimeout < 0 { + return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") + } + if decisionTaskTimeout == 0 { + decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs + } + + // Validate type and its arguments. + workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, args, wc.dataConverter, wc.registry) + if err != nil { + return nil, err + } + + memo, err := getWorkflowMemo(options.Memo, wc.dataConverter) + if err != nil { + return nil, err + } + + searchAttr, err := serializeSearchAttributes(options.SearchAttributes) + if err != nil { + return nil, err + } + + delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds()) + if delayStartSeconds < 0 { + return nil, errors.New("Invalid DelayStart option") + } + + jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds()) + if jitterStartSeconds < 0 { + return nil, errors.New("Invalid JitterStart option") + } + + // create a workflow start span and attach it to the context object. + // N.B. we need to finish this immediately as jaeger does not give us a way + // to recreate a span given a span context - which means we will run into + // issues during replay. we work around this by creating and ending the + // workflow start span and passing in that context to the workflow. So + // everything beginning with the StartWorkflowExecutionRequest will be + // parented by the created start workflow span. + ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("%s-%s", tracePrefix, workflowType.Name), workflowID) + span.Finish() + + // get workflow headers from the context + header := wc.getWorkflowHeader(ctx) + + // run propagators to extract information about tracing and other stuff, store in headers field + startRequest := &s.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(wc.domain), + RequestId: common.StringPtr(uuid.New()), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: workflowTypePtr(*workflowType), + TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}), + Input: input, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTaskTimeout), + Identity: common.StringPtr(wc.identity), + WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(), + RetryPolicy: convertRetryPolicy(options.RetryPolicy), + CronSchedule: common.StringPtr(options.CronSchedule), + Memo: memo, + SearchAttributes: searchAttr, + Header: header, + DelayStartSeconds: common.Int32Ptr(delayStartSeconds), + JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), + } + + return startRequest, nil +} + +func (wc *workflowClient) getSignalWithStartRequest( + ctx context.Context, + tracePrefix, workflowID, signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowFunc interface{}, + workflowArgs ...interface{}, +) (*s.SignalWithStartWorkflowExecutionRequest, error) { + + signalInput, err := encodeArg(wc.dataConverter, signalArg) + if err != nil { + return nil, err + } + + if workflowID == "" { + workflowID = uuid.NewRandom().String() + } + + if options.TaskList == "" { + return nil, errors.New("missing TaskList") + } + + executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) + if executionTimeout <= 0 { + return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") + } + + decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + if decisionTaskTimeout < 0 { + return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") + } + if decisionTaskTimeout == 0 { + decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs + } + + // Validate type and its arguments. + workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, workflowArgs, wc.dataConverter, wc.registry) + if err != nil { + return nil, err + } + + memo, err := getWorkflowMemo(options.Memo, wc.dataConverter) + if err != nil { + return nil, err + } + + searchAttr, err := serializeSearchAttributes(options.SearchAttributes) + if err != nil { + return nil, err + } + + delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds()) + if delayStartSeconds < 0 { + return nil, errors.New("Invalid DelayStart option") + } + + jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds()) + if jitterStartSeconds < 0 { + return nil, errors.New("Invalid JitterStart option") + } + + // create a workflow start span and attach it to the context object. finish it immediately + ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("%s-%s", tracePrefix, workflowType.Name), workflowID) + span.Finish() + + // get workflow headers from the context + header := wc.getWorkflowHeader(ctx) + + signalWithStartRequest := &s.SignalWithStartWorkflowExecutionRequest{ + Domain: common.StringPtr(wc.domain), + RequestId: common.StringPtr(uuid.New()), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: workflowTypePtr(*workflowType), + TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}), + Input: input, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTaskTimeout), + SignalName: common.StringPtr(signalName), + SignalInput: signalInput, + Identity: common.StringPtr(wc.identity), + RetryPolicy: convertRetryPolicy(options.RetryPolicy), + CronSchedule: common.StringPtr(options.CronSchedule), + Memo: memo, + SearchAttributes: searchAttr, + WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(), + Header: header, + DelayStartSeconds: common.Int32Ptr(delayStartSeconds), + JitterStartSeconds: common.Int32Ptr(jitterStartSeconds), + } + + return signalWithStartRequest, nil +} + // Register a domain with cadence server // The errors it can throw: // - DomainAlreadyExistsError diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 96642e40e..18e2cbabc 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -31,6 +31,7 @@ import ( "github.com/golang/mock/gomock" "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "go.uber.org/yarpc" @@ -204,7 +205,7 @@ func (s *historyEventIteratorSuite) TestIterator_NoError() { iter := s.wfClient.GetWorkflowHistory(context.Background(), workflowID, runID, true, shared.HistoryEventFilterTypeAllEvent) for iter.HasNext() { event, err := iter.Next() - s.Nil(err) + s.NoError(err) events = append(events, event) } s.Equal(3, len(events)) @@ -255,13 +256,13 @@ func (s *historyEventIteratorSuite) TestIterator_NoError_EmptyPage() { iter := s.wfClient.GetWorkflowHistory(context.Background(), workflowID, runID, true, shared.HistoryEventFilterTypeAllEvent) for iter.HasNext() { event, err := iter.Next() - s.Nil(err) + s.NoError(err) events = append(events, event) } s.Equal(2, len(events)) } -func (s *historyEventIteratorSuite) TestIterator_Error() { +func (s *historyEventIteratorSuite) TestIterator_RPCError() { filterType := shared.HistoryEventFilterTypeAllEvent request1 := getGetWorkflowExecutionHistoryRequest(filterType) response1 := &shared.GetWorkflowExecutionHistoryResponse{ @@ -283,14 +284,14 @@ func (s *historyEventIteratorSuite) TestIterator_Error() { s.True(iter.HasNext()) event, err := iter.Next() s.NotNil(event) - s.Nil(err) + s.NoError(err) s.workflowServiceClient.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), request2, gomock.Any()).Return(nil, &shared.EntityNotExistsError{}).Times(1) s.True(iter.HasNext()) event, err = iter.Next() s.Nil(event) - s.NotNil(err) + s.Error(err) } func (s *historyEventIteratorSuite) TestIterator_StopsTryingNearTimeout() { @@ -433,12 +434,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Success() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -481,12 +482,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_RawHistory_Success() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -533,12 +534,12 @@ func (s *workflowRunSuite) TestExecuteWorkflowWorkflowExecutionAlreadyStartedErr WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -589,12 +590,12 @@ func (s *workflowRunSuite) TestExecuteWorkflowWorkflowExecutionAlreadyStartedErr WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -639,11 +640,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoIdInOptions() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) s.Equal(workflowRun.GetID(), *wid) } @@ -692,11 +693,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoIdInOptions_RawHistory() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) s.Equal(workflowRun.GetID(), *wid) } @@ -737,12 +738,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Cancelled() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.NotNil(err) + s.Error(err) _, ok := err.(*CanceledError) s.True(ok) s.Equal(time.Minute, decodedResult) @@ -787,7 +788,7 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Failed() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute @@ -828,7 +829,7 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Terminated() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute @@ -872,12 +873,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_TimedOut() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.NotNil(err) + s.Error(err) _, ok := err.(*TimeoutError) s.True(ok) s.Equal(timeType, err.(*TimeoutError).TimeoutType()) @@ -939,12 +940,12 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_ContinueAsNew() { WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) - s.Nil(err) + s.NoError(err) s.Equal(workflowRun.GetID(), workflowID) s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err = workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -981,7 +982,7 @@ func (s *workflowRunSuite) TestGetWorkflow() { s.Equal(workflowRun.GetRunID(), runID) decodedResult := time.Minute err := workflowRun.Get(context.Background(), &decodedResult) - s.Nil(err) + s.NoError(err) s.Equal(workflowResult, decodedResult) } @@ -1049,29 +1050,33 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow() { resp, err := s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, options, workflowType) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) resp, err = s.client.SignalWithStartWorkflow(context.Background(), "", signalName, signalInput, options, workflowType) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } -func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_Error() { +func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_RPCError() { signalName := "my signal" signalInput := []byte("my signal input") options := StartWorkflowOptions{} - resp, err := s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + resp, err := s.client.SignalWithStartWorkflow(ctx, workflowID, signalName, signalInput, options, workflowType) s.Equal(errors.New("missing TaskList"), err) s.Nil(resp) + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() options.TaskList = tasklist - resp, err = s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) - s.NotNil(err) + resp, err = s.client.SignalWithStartWorkflow(ctx, workflowID, signalName, signalInput, options, workflowType) + s.Error(err) s.Nil(resp) options.ExecutionStartToCloseTimeout = timeoutInSeconds @@ -1081,13 +1086,12 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_Error() { s.service.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) resp, err = s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, options, workflowType) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } func (s *workflowClientTestSuite) TestStartWorkflow() { - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1105,14 +1109,13 @@ func (s *workflowClientTestSuite) TestStartWorkflow() { resp, err := client.StartWorkflow(context.Background(), options, f1, []byte("test")) s.Equal(getDefaultDataConverter(), client.dataConverter) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } func (s *workflowClientTestSuite) TestStartWorkflow_WithContext() { s.client = NewClient(s.service, domain, &ClientOptions{ContextPropagators: []ContextPropagator{NewStringMapPropagator([]string{testHeader})}}) - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1134,15 +1137,14 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithContext() { s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) resp, err := client.StartWorkflow(context.Background(), options, f1, []byte("test")) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } func (s *workflowClientTestSuite) TestStartWorkflow_WithDataConverter() { dc := newTestDataConverter() s.client = NewClient(s.service, domain, &ClientOptions{DataConverter: dc}) - client, ok := s.client.(*workflowClient) - s.True(ok) + client := s.client.(*workflowClient) options := StartWorkflowOptions{ ID: workflowID, TaskList: tasklist, @@ -1169,7 +1171,7 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithDataConverter() { resp, err := client.StartWorkflow(context.Background(), options, f1, input) s.Equal(newTestDataConverter(), client.dataConverter) - s.Nil(err) + s.NoError(err) s.Equal(createResponse.GetRunId(), resp.RunID) } @@ -1204,10 +1206,49 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithMemoAndSearchAttr() { s.NoError(err) s.Equal("attr value", resultAttr) }) - s.client.StartWorkflow(context.Background(), options, wf) + + _, err := s.client.StartWorkflow(context.Background(), options, wf) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflow_RequestCreationFails() { + client := s.client.(*workflowClient) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) string { + return "result" + } + + _, err := client.StartWorkflow(context.Background(), options, f1, []byte("test")) + s.ErrorContains(err, "missing TaskList") } -func (s *workflowClientTestSuite) SignalWithStartWorkflowWithMemoAndSearchAttr() { +func (s *workflowClientTestSuite) TestStartWorkflow_RPCError() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + startResp := &shared.StartWorkflowExecutionResponse{} + + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, errors.New("failed")).MinTimes(1) + + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := s.client.StartWorkflow(ctx, options, wf) + s.Error(err) +} + +func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_WithMemoAndSearchAttr() { memo := map[string]interface{}{ "testMemo": "memo value", } @@ -1239,7 +1280,218 @@ func (s *workflowClientTestSuite) SignalWithStartWorkflowWithMemoAndSearchAttr() s.NoError(err) s.Equal("attr value", resultAttr) }) - s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf) + + _, err := s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_WithMemoAndSearchAttr() { + memo := map[string]interface{}{ + "testMemo": "memo value", + } + searchAttributes := map[string]interface{}{ + "testAttr": "attr value", + } + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + Memo: memo, + SearchAttributes: searchAttributes, + } + wf := func(ctx Context) string { + return "result" + } + + s.service.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil). + Do(func(_ interface{}, asyncReq *shared.SignalWithStartWorkflowExecutionAsyncRequest, _ ...interface{}) { + req := asyncReq.Request + var resultMemo, resultAttr string + err := json.Unmarshal(req.Memo.Fields["testMemo"], &resultMemo) + s.NoError(err) + s.Equal("memo value", resultMemo) + + err = json.Unmarshal(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + s.NoError(err) + s.Equal("attr value", resultAttr) + }) + + _, err := s.client.SignalWithStartWorkflowAsync(context.Background(), "wid", "signal", "value", options, wf) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_RequestCreationFails() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + + _, err := s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf) + s.ErrorContains(err, "missing TaskList") +} + +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_RequestCreationFails() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + + _, err := s.client.SignalWithStartWorkflowAsync(context.Background(), "wid", "signal", "value", options, wf) + s.ErrorContains(err, "missing TaskList") +} + +func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAsync_RPCError() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + + s.service.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).MinTimes(1) + + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := s.client.SignalWithStartWorkflowAsync(ctx, "wid", "signal", "value", options, wf) + s.Error(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync() { + client := s.client.(*workflowClient) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) string { + return "result" + } + + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + + _, err := client.StartWorkflowAsync(context.Background(), options, f1, []byte("test")) + s.Equal(getDefaultDataConverter(), client.dataConverter) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync_WithDataConverter() { + dc := newTestDataConverter() + s.client = NewClient(s.service, domain, &ClientOptions{DataConverter: dc}) + client := s.client.(*workflowClient) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) string { + return "result" + } + input := []byte("test") + + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil). + Do(func(_ interface{}, asyncReq *shared.StartWorkflowExecutionAsyncRequest, _ ...interface{}) { + req := asyncReq.Request + dc := client.dataConverter + encodedArg, _ := dc.ToData(input) + s.Equal(req.Input, encodedArg) + var decodedArg []byte + dc.FromData(req.Input, &decodedArg) + s.Equal(input, decodedArg) + }) + + _, err := client.StartWorkflowAsync(context.Background(), options, f1, input) + s.Equal(newTestDataConverter(), client.dataConverter) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync_WithMemoAndSearchAttr() { + memo := map[string]interface{}{ + "testMemo": "memo value", + } + searchAttributes := map[string]interface{}{ + "testAttr": "attr value", + } + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + Memo: memo, + SearchAttributes: searchAttributes, + } + wf := func(ctx Context) string { + return "result" + } + + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil). + Do(func(_ interface{}, asyncReq *shared.StartWorkflowExecutionAsyncRequest, _ ...interface{}) { + req := asyncReq.Request + var resultMemo, resultAttr string + err := json.Unmarshal(req.Memo.Fields["testMemo"], &resultMemo) + s.NoError(err) + s.Equal("memo value", resultMemo) + + err = json.Unmarshal(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + s.NoError(err) + s.Equal("attr value", resultAttr) + }) + + _, err := s.client.StartWorkflowAsync(context.Background(), options, wf) + s.NoError(err) +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync_RequestCreationFails() { + client := s.client.(*workflowClient) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) string { + return "result" + } + _, err := client.StartWorkflowAsync(context.Background(), options, f1, []byte("test")) + s.ErrorContains(err, "missing TaskList") +} + +func (s *workflowClientTestSuite) TestStartWorkflowAsync_RPCError() { + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + wf := func(ctx Context) string { + return "result" + } + + s.service.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("failed")).MinTimes(1) + + // Pass a context with a deadline so error retry doesn't take forever + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := s.client.StartWorkflowAsync(ctx, options, wf) + s.Error(err) } func (s *workflowClientTestSuite) TestGetWorkflowMemo() { @@ -1302,7 +1554,7 @@ func (s *workflowClientTestSuite) TestListWorkflow() { s.Equal(domain, request.GetDomain()) }) resp, err := s.client.ListWorkflow(context.Background(), request) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1311,7 +1563,7 @@ func (s *workflowClientTestSuite) TestListWorkflow() { Do(func(_ interface{}, req *shared.ListWorkflowExecutionsRequest, _ ...interface{}) { s.Equal("another", request.GetDomain()) }) - resp, err = s.client.ListWorkflow(context.Background(), request) + _, err = s.client.ListWorkflow(context.Background(), request) s.Equal(responseErr, err) } @@ -1325,7 +1577,7 @@ func (s *workflowClientTestSuite) TestListArchivedWorkflow() { ctxWithTimeout, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() resp, err := s.client.ListArchivedWorkflow(ctxWithTimeout, request) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1346,7 +1598,7 @@ func (s *workflowClientTestSuite) TestScanWorkflow() { s.Equal(domain, request.GetDomain()) }) resp, err := s.client.ScanWorkflow(context.Background(), request) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1367,7 +1619,7 @@ func (s *workflowClientTestSuite) TestCountWorkflow() { s.Equal(domain, request.GetDomain()) }) resp, err := s.client.CountWorkflow(context.Background(), request) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1384,7 +1636,7 @@ func (s *workflowClientTestSuite) TestGetSearchAttributes() { response := &shared.GetSearchAttributesResponse{} s.service.EXPECT().GetSearchAttributes(gomock.Any(), gomock.Any()).Return(response, nil) resp, err := s.client.GetSearchAttributes(context.Background()) - s.Nil(err) + s.NoError(err) s.Equal(response, resp) responseErr := &shared.BadRequestError{} @@ -1408,7 +1660,7 @@ func (s *workflowClientTestSuite) TestCancelWorkflow() { err := s.client.CancelWorkflow(context.Background(), "testWf", "testRun", WithCancelReason("test reason")) - s.Nil(err) + s.NoError(err) } func (s *workflowClientTestSuite) TestCancelWorkflowBackwardsCompatible() { @@ -1416,7 +1668,7 @@ func (s *workflowClientTestSuite) TestCancelWorkflowBackwardsCompatible() { err := s.client.CancelWorkflow(context.Background(), "testWf", "testRun") - s.Nil(err) + s.NoError(err) } type PartialCancelRequestMatcher struct { @@ -1443,3 +1695,151 @@ func (m *PartialCancelRequestMatcher) Matches(a interface{}) bool { func (m *PartialCancelRequestMatcher) String() string { return "partial cancellation request matcher matches cause and wfId fields" } + +func TestGetWorkflowStartRequest(t *testing.T) { + tests := []struct { + name string + options StartWorkflowOptions + workflowFunc interface{} + args []interface{} + wantRequest *shared.StartWorkflowExecutionRequest + wantErr string + }{ + { + name: "success", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantRequest: &shared.StartWorkflowExecutionRequest{ + Domain: common.StringPtr(domain), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: &shared.WorkflowType{ + Name: common.StringPtr("go.uber.org/cadence/internal.TestGetWorkflowStartRequest.func1"), + }, + TaskList: &shared.TaskList{ + Name: common.StringPtr(tasklist), + }, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(5), + DelayStartSeconds: common.Int32Ptr(0), + JitterStartSeconds: common.Int32Ptr(0), + CronSchedule: common.StringPtr(""), + Header: &shared.Header{Fields: map[string][]byte{}}, + WorkflowIdReusePolicy: shared.WorkflowIdReusePolicyAllowDuplicateFailedOnly.Ptr(), + }, + }, + { + name: "missing TaskList", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: "", // this causes error + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "missing TaskList", + }, + { + name: "invalid ExecutionStartToCloseTimeout", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 0 * time.Second, // this causes error + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "missing or invalid ExecutionStartToCloseTimeout", + }, + { + name: "negative DecisionTaskStartToCloseTimeout", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: -1 * time.Second, // this causes error + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "negative DecisionTaskStartToCloseTimeout provided", + }, + { + name: "negative DelayStart", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: -1 * time.Second, // this causes error + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context) {}, + wantErr: "Invalid DelayStart option", + }, + { + name: "negative JitterStart", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: -1 * time.Second, // this causes error + }, + workflowFunc: func(ctx Context) {}, + wantErr: "Invalid JitterStart option", + }, + { + name: "invalid workflow func", + options: StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: 10 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + DelayStart: 0 * time.Second, + JitterStart: 0 * time.Second, + }, + workflowFunc: func(ctx Context, a, b int) {}, // this causes error because args not provided + args: []interface{}{}, + wantErr: "expected 2 args for function", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + service := workflowservicetest.NewMockClient(mockCtrl) + wc, ok := NewClient(service, domain, &ClientOptions{ + Identity: "test-identity", + }).(*workflowClient) + + if !ok { + t.Fatalf("expected NewClient to return a *workflowClient, but got %T", wc) + } + + gotReq, err := wc.getWorkflowStartRequest(context.Background(), "", tc.options, tc.workflowFunc, tc.args...) + if tc.wantErr != "" { + assert.ErrorContains(t, err, tc.wantErr) + return + } + + assert.NoError(t, err) + + // set the randomized fields in the expected request before comparison + tc.wantRequest.Identity = &wc.identity + tc.wantRequest.RequestId = gotReq.RequestId + + assert.Equal(t, tc.wantRequest, gotReq) + }) + } +} diff --git a/internal/workflow.go b/internal/workflow.go index 5568e1608..e467fa663 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -400,6 +400,11 @@ type ( RunID string } + // WorkflowExecutionAsync Details. + WorkflowExecutionAsync struct { + ID string + } + // EncodedValue is type alias used to encapsulate/extract encoded result from workflow/activity. EncodedValue struct { value []byte diff --git a/mocks/Client.go b/mocks/Client.go index 1f02bda80..d9e87872d 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -497,6 +497,32 @@ func (_m *Client) SignalWithStartWorkflow(ctx context.Context, workflowID string return r0, r1 } +// SignalWithStartWorkflowAsync provides a mock function with given fields: ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs +func (_m *Client) SignalWithStartWorkflowAsync(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options internal.StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*internal.WorkflowExecutionAsync, error) { + var _ca []interface{} + _ca = append(_ca, ctx, workflowID, signalName, signalArg, options, workflow) + _ca = append(_ca, workflowArgs...) + ret := _m.Called(_ca...) + + var r0 *internal.WorkflowExecutionAsync + if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecutionAsync); ok { + r0 = rf(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*internal.WorkflowExecutionAsync) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { + r1 = rf(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // SignalWorkflow provides a mock function with given fields: ctx, workflowID, runID, signalName, arg func (_m *Client) SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error { ret := _m.Called(ctx, workflowID, runID, signalName, arg) @@ -537,6 +563,32 @@ func (_m *Client) StartWorkflow(ctx context.Context, options internal.StartWorkf return r0, r1 } +// StartWorkflowAsync provides a mock function with given fields: ctx, options, workflow, args +func (_m *Client) StartWorkflowAsync(ctx context.Context, options internal.StartWorkflowOptions, workflow interface{}, args ...interface{}) (*internal.WorkflowExecutionAsync, error) { + var _ca []interface{} + _ca = append(_ca, ctx, options, workflow) + _ca = append(_ca, args...) + ret := _m.Called(_ca...) + + var r0 *internal.WorkflowExecutionAsync + if rf, ok := ret.Get(0).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) *internal.WorkflowExecutionAsync); ok { + r0 = rf(ctx, options, workflow, args...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*internal.WorkflowExecutionAsync) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, internal.StartWorkflowOptions, interface{}, ...interface{}) error); ok { + r1 = rf(ctx, options, workflow, args...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // TerminateWorkflow provides a mock function with given fields: ctx, workflowID, runID, reason, details func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error { ret := _m.Called(ctx, workflowID, runID, reason, details) diff --git a/workflow/workflow.go b/workflow/workflow.go index b1df0af10..a6437edd4 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -39,6 +39,9 @@ type ( // Execution Details. Execution = internal.WorkflowExecution + // ExecutionAsync Details. + ExecutionAsync = internal.WorkflowExecutionAsync + // Version represents a change version. See GetVersion call. Version = internal.Version