diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index b032bfdd06e..0acd476baa0 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -1830,21 +1830,24 @@ func (e *historyEngineImpl) SignalWorkflowExecution( namespaceID, execution, func(context workflowExecutionContext, mutableState mutableState) (*updateWorkflowAction, error) { - executionInfo := mutableState.GetExecutionInfo() - createWorkflowTask := true - // Do not create workflow task when the workflow is cron and the cron has not been started yet - if mutableState.GetExecutionInfo().CronSchedule != "" && !mutableState.HasProcessedOrPendingWorkflowTask() { - createWorkflowTask = false - } - postActions := &updateWorkflowAction{ - noop: false, - createWorkflowTask: createWorkflowTask, + if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) { + return &updateWorkflowAction{ + noop: true, + createWorkflowTask: false, + }, nil } if !mutableState.IsWorkflowExecutionRunning() { return nil, ErrWorkflowCompleted } + executionInfo := mutableState.GetExecutionInfo() + createWorkflowTask := true + // Do not create workflow task when the workflow is cron and the cron has not been started yet + if executionInfo.CronSchedule != "" && !mutableState.HasProcessedOrPendingWorkflowTask() { + createWorkflowTask = false + } + maxAllowedSignals := e.config.MaximumSignalsPerExecution(namespaceEntry.GetInfo().Name) if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals { e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount), @@ -1863,14 +1866,9 @@ func (e *historyEngineImpl) SignalWorkflowExecution( } } - // deduplicate by request id for signal workflow task - if requestID := request.GetRequestId(); requestID != "" { - if mutableState.IsSignalRequested(requestID) { - return postActions, nil - } - mutableState.AddSignalRequested(requestID) + if request.GetRequestId() != "" { + mutableState.AddSignalRequested(request.GetRequestId()) } - if _, err := mutableState.AddWorkflowExecutionSignaled( request.GetSignalName(), request.GetInput(), @@ -1878,7 +1876,10 @@ func (e *historyEngineImpl) SignalWorkflowExecution( return nil, serviceerror.NewInternal("Unable to signal workflow execution.") } - return postActions, nil + return &updateWorkflowAction{ + noop: false, + createWorkflowTask: createWorkflowTask, + }, nil }) } diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 54a2f8b46e3..a903a956782 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -4756,7 +4756,50 @@ func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() { gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms} s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse, nil) - s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any()).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil) + + err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + s.Nil(err) +} + +// Test signal workflow task by dedup request ID & workflow finished +func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_Completed() { + signalRequest := &historyservice.SignalWorkflowExecutionRequest{} + err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) + s.EqualError(err, "Missing namespace UUID.") + + we := commonpb.WorkflowExecution{ + WorkflowId: "wId2", + RunId: testRunID, + } + taskqueue := "testTaskQueue" + identity := "testIdentity" + signalName := "my signal name 2" + input := payloads.EncodeString("test input 2") + requestID := uuid.New() + signalRequest = &historyservice.SignalWorkflowExecutionRequest{ + NamespaceId: testNamespaceID, + SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: testNamespaceID, + WorkflowExecution: &we, + Identity: identity, + SignalName: signalName, + Input: input, + RequestId: requestID, + }, + } + + msBuilder := newMutableStateBuilderWithEventV2(s.mockHistoryEngine.shard, s.eventsCache, + log.NewTestLogger(), we.GetRunId()) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) + addWorkflowTaskScheduledEvent(msBuilder) + ms := createMutableState(msBuilder) + // assume duplicate request id + ms.SignalRequestedIds = []string{requestID} + ms.ExecutionInfo.NamespaceId = testNamespaceID + ms.ExecutionState.State = enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED + gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms} + + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse, nil) err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest) s.Nil(err)