From a194c5af37c6fe62317de6aa0a5da7e201ece90e Mon Sep 17 00:00:00 2001 From: Zijian Date: Mon, 20 May 2024 23:24:57 +0000 Subject: [PATCH] Enable sanity check fr strong idempotency check --- common/dynamicconfig/constants.go | 13 ++++ service/history/config/config.go | 6 +- service/history/execution/context.go | 72 ++++++++++++++--------- service/history/execution/context_test.go | 72 ++++++++++++++++------- 4 files changed, 110 insertions(+), 53 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 056cc0fea39..59ad94963a7 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2016,6 +2016,13 @@ const ( // Allowed filters: DomainName EnableStrongIdempotency + // EnableStrongIdempotencySanityCheck enables sanity check for strong idempotency + // KeyName: history.enableStrongIdempotencySanityCheck + // Value type: Bool + // Default value: false + // Allowed filters: DomainName + EnableStrongIdempotencySanityCheck + // LastBoolKey must be the last one in this const group LastBoolKey ) @@ -4335,6 +4342,12 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "EnableStrongIdempotency enables strong idempotency for APIs", DefaultValue: false, }, + EnableStrongIdempotencySanityCheck: DynamicBool{ + KeyName: "history.enableStrongIdempotencySanityCheck", + Filters: []Filter{DomainName}, + Description: "EnableStrongIdempotencySanityCheck enables sanity check for strong idempotency", + DefaultValue: false, + }, } var FloatKeys = map[FloatKey]DynamicFloat{ diff --git a/service/history/config/config.go b/service/history/config/config.go index ee72acdc0fe..c44202b1466 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -338,7 +338,8 @@ type Config struct { LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn - EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter + EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter + EnableStrongIdempotencySanityCheck dynamicconfig.BoolPropertyFnWithDomainFilter // HostName for machine running the service HostName string @@ -598,7 +599,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold), LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold), - EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency), + EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency), + EnableStrongIdempotencySanityCheck: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotencySanityCheck), HostName: hostname, } diff --git a/service/history/execution/context.go b/service/history/execution/context.go index edb93fc0781..c093474c79c 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -420,15 +420,17 @@ func (c *contextImpl) CreateWorkflowExecution( c.Clear() } }() - err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode) - if err != nil { - // TODO(CDNC-8519): convert it to an error after verification in production - c.logger.Error("workflow requests and mode validation error", tag.Error(err)) - } domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID) if errorDomainName != nil { return errorDomainName } + err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode) + if err != nil { + if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) { + return err + } + c.logger.Error("workflow requests and mode validation error", tag.Error(err)) + } createRequest := &persistence.CreateWorkflowExecutionRequest{ // workflow create mode & prev run ID & version Mode: createMode, @@ -484,6 +486,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( return err } + domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID) + if errorDomainName != nil { + return errorDomainName + } var persistedBlobs events.PersistedBlobs resetHistorySize := c.GetHistorySize() for _, workflowEvents := range resetWorkflowEventsSeq { @@ -512,8 +518,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( return err } if len(resetWorkflow.WorkflowRequests) != 0 && len(newWorkflow.WorkflowRequests) != 0 { - // TODO(CDNC-8519): convert it to an error after verification in production - c.logger.Error("Workflow reqeusts are only expected to be generated from one workflow for ConflictResolveWorkflowExecution") + if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) { + return &types.InternalServiceError{Message: "workflow requests are only expected to be generated from either reset workflow or continue-as-new workflow for ConflictResolveWorkflowExecution"} + } + c.logger.Error("workflow requests are only expected to be generated from either reset workflow or continue-as-new workflow for ConflictResolveWorkflowExecution", tag.Number(int64(len(resetWorkflow.WorkflowRequests))), tag.NextNumber(int64(len(newWorkflow.WorkflowRequests)))) } newWorkflowSizeSize := newContext.GetHistorySize() startEvents := newWorkflowEventsSeq[0] @@ -542,8 +550,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( return err } if len(currentWorkflow.WorkflowRequests) != 0 { - // TODO(CDNC-8519): convert it to an error after verification in production - c.logger.Error("workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution") + if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) { + return &types.InternalServiceError{Message: "workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution"} + } + c.logger.Error("workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution", tag.Counter(len(currentWorkflow.WorkflowRequests))) } currentWorkflowSize := currentContext.GetHistorySize() for _, workflowEvents := range currentWorkflowEventsSeq { @@ -568,10 +578,6 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( ); err != nil { return err } - domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID) - if errorDomainName != nil { - return errorDomainName - } resp, err := c.shard.ConflictResolveWorkflowExecution(ctx, &persistence.ConflictResolveWorkflowExecutionRequest{ // RangeID , this is set by shard context Mode: conflictResolveMode, @@ -679,17 +685,19 @@ func (c *contextImpl) UpdateWorkflowExecutionTasks( Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new history events", } } + domainName, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID) + if errorDomainName != nil { + return errorDomainName + } if len(currentWorkflow.WorkflowRequests) != 0 { - // TODO(CDNC-8519): convert it to an error after verification in production - c.logger.Error("UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests") + if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domainName) { + return &types.InternalServiceError{Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests"} + } + c.logger.Error("UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests", tag.Counter(len(currentWorkflow.WorkflowRequests))) } currentWorkflow.ExecutionStats = &persistence.ExecutionStats{ HistorySize: c.GetHistorySize(), } - domainName, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID) - if errorDomainName != nil { - return errorDomainName - } resp, err := c.updateWorkflowExecutionFn(ctx, &persistence.UpdateWorkflowExecutionRequest{ // RangeID , this is set by shard context Mode: persistence.UpdateWorkflowModeIgnoreCurrent, @@ -729,9 +737,15 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( if err != nil { return err } + domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID) + if errorDomainName != nil { + return errorDomainName + } err = validateWorkflowRequestsAndMode(currentWorkflow.WorkflowRequests, workflowRequestMode) if err != nil { - // TODO(CDNC-8519): convert it to an error after verification in production + if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) { + return err + } c.logger.Error("workflow requests and mode validation error", tag.Error(err)) } var persistedBlobs events.PersistedBlobs @@ -769,13 +783,17 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( return err } if len(newWorkflow.WorkflowRequests) != 0 && len(currentWorkflow.WorkflowRequests) != 0 { - // TODO(CDNC-8519): convert it to an error after verification in production - c.logger.Error("Workflow reqeusts are only expected to be generated from one workflow for UpdateWorkflowExecution") + if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) { + return &types.InternalServiceError{Message: "workflow requests are only expected to be generated from one workflow for UpdateWorkflowExecution"} + } + c.logger.Error("workflow requests are only expected to be generated from one workflow for UpdateWorkflowExecution", tag.Number(int64(len(currentWorkflow.WorkflowRequests))), tag.NextNumber(int64(len(newWorkflow.WorkflowRequests)))) } err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode) if err != nil { - // TODO(CDNC-8519): convert it to an error after verification in production + if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) { + return err + } c.logger.Error("workflow requests and mode validation error", tag.Error(err)) } newWorkflowSizeSize := newContext.GetHistorySize() @@ -810,10 +828,6 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( if err := c.updateWorkflowExecutionEventReapplyFn(updateMode, currentWorkflowEventsSeq, newWorkflowEventsSeq); err != nil { return err } - domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID) - if errorDomainName != nil { - return errorDomainName - } resp, err := c.updateWorkflowExecutionFn(ctx, &persistence.UpdateWorkflowExecutionRequest{ // RangeID , this is set by shard context Mode: updateMode, @@ -1412,14 +1426,14 @@ func validateWorkflowRequestsAndMode(requests []*persistence.WorkflowRequest, mo return nil } if len(requests) > 2 { - return &types.InternalServiceError{Message: "Too many workflow requests for a single API request."} + return &types.InternalServiceError{Message: "too many workflow request entities generated from a single API request"} } else if len(requests) == 2 { // SignalWithStartWorkflow API can generate 2 workflow requests if (requests[0].RequestType == persistence.WorkflowRequestTypeStart && requests[1].RequestType == persistence.WorkflowRequestTypeSignal) || (requests[1].RequestType == persistence.WorkflowRequestTypeStart && requests[0].RequestType == persistence.WorkflowRequestTypeSignal) { return nil } - return &types.InternalServiceError{Message: "Too many workflow requests for a single API request."} + return &types.InternalServiceError{Message: "too many workflow request entities generated from a single API request"} } return nil } diff --git a/service/history/execution/context_test.go b/service/history/execution/context_test.go index 418483c3665..7ba789a1574 100644 --- a/service/history/execution/context_test.go +++ b/service/history/execution/context_test.go @@ -36,12 +36,14 @@ import ( "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" hcommon "github.com/uber/cadence/service/history/common" + "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/resource" @@ -1144,13 +1146,7 @@ func TestCreateWorkflowExecution(t *testing.T) { prevRunID: "test-prev-run-id", prevLastWriteVersion: 123, createWorkflowRequestMode: persistence.CreateWorkflowRequestModeNew, - mockCreateWorkflowExecutionFn: func(context.Context, *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) { - return nil, &types.InternalServiceError{} - }, - mockNotifyTasksFromWorkflowSnapshotFn: func(_ *persistence.WorkflowSnapshot, _ events.PersistedBlobs, persistenceError bool) { - assert.Equal(t, true, persistenceError) - }, - wantErr: true, + wantErr: true, }, { name: "success", @@ -1231,6 +1227,9 @@ func TestCreateWorkflowExecution(t *testing.T) { mockShard := shard.NewMockContext(mockCtrl) mockDomainCache := cache.NewMockDomainCache(mockCtrl) mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockShard.EXPECT().GetConfig().Return(&config.Config{ + EnableStrongIdempotencySanityCheck: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), + }).AnyTimes() mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) ctx := &contextImpl{ logger: testlogger.New(t), @@ -1294,11 +1293,14 @@ func TestUpdateWorkflowExecutionTasks(t *testing.T) { WorkflowRequests: []*persistence.WorkflowRequest{{}}, }, []*persistence.WorkflowEvents{}, nil) mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) - mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("", errors.New("some error")) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) + mockShard.EXPECT().GetConfig().Return(&config.Config{ + EnableStrongIdempotencySanityCheck: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), + }) }, wantErr: true, assertErr: func(t *testing.T, err error) { - assert.Equal(t, errors.New("some error"), err) + assert.Equal(t, &types.InternalServiceError{Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests"}, err) }, }, { @@ -1461,6 +1463,8 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { name: "PersistNonStartWorkflowBatchEvents failed", currentWorkflowTransactionPolicy: TransactionPolicyPassive, mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -1490,6 +1494,8 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { currentWorkflowTransactionPolicy: TransactionPolicyActive, newWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -1521,6 +1527,11 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { currentWorkflowTransactionPolicy: TransactionPolicyActive, newWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) + mockShard.EXPECT().GetConfig().Return(&config.Config{ + EnableStrongIdempotencySanityCheck: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), + }) mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{ WorkflowRequests: []*persistence.WorkflowRequest{{}}, }, []*persistence.WorkflowEvents{ @@ -1551,15 +1562,9 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { mockPersistNonStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { return events.PersistedBlob{}, nil }, - mockPersistStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { - return events.PersistedBlob{}, nil - }, - mockMergeContinueAsNewReplicationTasksFn: func(persistence.UpdateWorkflowMode, *persistence.WorkflowMutation, *persistence.WorkflowSnapshot) error { - return errors.New("some error") - }, wantErr: true, assertErr: func(t *testing.T, err error) { - assert.Equal(t, errors.New("some error"), err) + assert.Equal(t, &types.InternalServiceError{Message: "workflow requests are only expected to be generated from one workflow for UpdateWorkflowExecution"}, err) }, }, { @@ -1571,6 +1576,8 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { currentWorkflowTransactionPolicy: TransactionPolicyActive, newWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -1617,6 +1624,8 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { currentWorkflowTransactionPolicy: TransactionPolicyActive, newWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -2040,6 +2049,8 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { { name: "persistNonStartWorkflowEvents failed", mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -2066,6 +2077,8 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { metricsClient: metrics.NewNoopMetricsClient(), }, mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -2093,6 +2106,11 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { metricsClient: metrics.NewNoopMetricsClient(), }, mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) + mockShard.EXPECT().GetConfig().Return(&config.Config{ + EnableStrongIdempotencySanityCheck: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), + }) mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{ WorkflowRequests: []*persistence.WorkflowRequest{ { @@ -2133,12 +2151,9 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { mockPersistNonStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { return events.PersistedBlob{}, nil }, - mockPersistStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { - return events.PersistedBlob{}, errors.New("some error") - }, wantErr: true, assertErr: func(t *testing.T, err error) { - assert.Equal(t, errors.New("some error"), err) + assert.Equal(t, &types.InternalServiceError{Message: "workflow requests are only expected to be generated from either reset workflow or continue-as-new workflow for ConflictResolveWorkflowExecution"}, err) }, }, { @@ -2148,6 +2163,8 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { metricsClient: metrics.NewNoopMetricsClient(), }, mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -2192,6 +2209,8 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { }, currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -2237,6 +2256,11 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { }, currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) + mockShard.EXPECT().GetConfig().Return(&config.Config{ + EnableStrongIdempotencySanityCheck: dynamicconfig.GetBoolPropertyFnFilteredByDomain(true), + }) mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -2280,14 +2304,14 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { if history.BranchToken[0] == 1 { return events.PersistedBlob{}, nil } - return events.PersistedBlob{}, errors.New("some error") + return events.PersistedBlob{}, nil }, mockPersistStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { return events.PersistedBlob{}, nil }, wantErr: true, assertErr: func(t *testing.T, err error) { - assert.Equal(t, errors.New("some error"), err) + assert.Equal(t, &types.InternalServiceError{Message: "workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution"}, err) }, }, { @@ -2302,6 +2326,8 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { }, currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{ @@ -2359,6 +2385,8 @@ func TestConflictResolveWorkflowExecution(t *testing.T) { }, currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ { Events: []*types.HistoryEvent{