From 209c69d5f901e7b75922c20b3173c2091cc67dda Mon Sep 17 00:00:00 2001 From: Zijian Date: Mon, 29 Apr 2024 19:16:11 +0000 Subject: [PATCH] Add staleness check to RecordChildExecutionCompleted --- .../engine/engineimpl/historyEngine.go | 42 +++- .../engine/engineimpl/historyEngine_test.go | 219 ++++++++++++++++++ 2 files changed, 255 insertions(+), 6 deletions(-) diff --git a/service/history/engine/engineimpl/historyEngine.go b/service/history/engine/engineimpl/historyEngine.go index 590f1d644bd..9467de2b33c 100644 --- a/service/history/engine/engineimpl/historyEngine.go +++ b/service/history/engine/engineimpl/historyEngine.go @@ -127,6 +127,8 @@ type ( failoverMarkerNotifier failover.MarkerNotifier wfIDCache workflowcache.WFCache ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter + + updateWithActionFn func(context.Context, *execution.Cache, string, types.WorkflowExecution, bool, time.Time, func(wfContext execution.Context, mutableState execution.MutableState) error) error } ) @@ -242,6 +244,7 @@ func NewEngineWithShardContext( shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()), wfIDCache: wfIDCache, ratelimitInternalPerWorkflowID: ratelimitInternalPerWorkflowID, + updateWithActionFn: workflow.UpdateWithAction, } historyEngImpl.decisionHandler = decision.NewHandler( shard, @@ -2826,27 +2829,55 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted( domainID := domainEntry.GetInfo().ID workflowExecution := types.WorkflowExecution{ - WorkflowID: completionRequest.WorkflowExecution.WorkflowID, - RunID: completionRequest.WorkflowExecution.RunID, + WorkflowID: completionRequest.WorkflowExecution.GetWorkflowID(), + RunID: completionRequest.WorkflowExecution.GetRunID(), } - return workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, true, e.timeSource.Now(), + return e.updateWithActionFn(ctx, e.executionCache, domainID, workflowExecution, true, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return workflow.ErrNotExists } initiatedID := completionRequest.InitiatedID + startedID := completionRequest.StartedID completedExecution := completionRequest.CompletedExecution completionEvent := completionRequest.CompletionEvent // Check mutable state to make sure child execution is in pending child executions ci, isRunning := mutableState.GetChildExecutionInfo(initiatedID) - if !isRunning || ci.StartedID == common.EmptyEventID { + if !isRunning { + if initiatedID >= mutableState.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRecordChildExecutionCompletedScope, metrics.StaleMutableStateCounter) + e.logger.Error("Encounter stale mutable state in RecordChildExecutionCompleted", + tag.WorkflowDomainName(domainEntry.GetInfo().Name), + tag.WorkflowID(workflowExecution.GetWorkflowID()), + tag.WorkflowRunID(workflowExecution.GetRunID()), + tag.WorkflowInitiatedID(initiatedID), + tag.WorkflowStartedID(startedID), + tag.WorkflowNextEventID(mutableState.GetNextEventID()), + ) + return workflow.ErrStaleState + } return &types.EntityNotExistsError{Message: "Pending child execution not found."} } + if ci.StartedID == common.EmptyEventID { + if startedID >= mutableState.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRecordChildExecutionCompletedScope, metrics.StaleMutableStateCounter) + e.logger.Error("Encounter stale mutable state in RecordChildExecutionCompleted", + tag.WorkflowDomainName(domainEntry.GetInfo().Name), + tag.WorkflowID(workflowExecution.GetWorkflowID()), + tag.WorkflowRunID(workflowExecution.GetRunID()), + tag.WorkflowInitiatedID(initiatedID), + tag.WorkflowStartedID(startedID), + tag.WorkflowNextEventID(mutableState.GetNextEventID()), + ) + return workflow.ErrStaleState + } + return &types.EntityNotExistsError{Message: "Pending child execution not started."} + } if ci.StartedWorkflowID != completedExecution.GetWorkflowID() { - return &types.EntityNotExistsError{Message: "Pending child execution not found."} + return &types.EntityNotExistsError{Message: "Pending child execution workflowID mismatch."} } switch *completionEvent.EventType { @@ -2866,7 +2897,6 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted( attributes := completionEvent.WorkflowExecutionTimedOutEventAttributes _, err = mutableState.AddChildWorkflowExecutionTimedOutEvent(initiatedID, completedExecution, attributes) } - return err }) } diff --git a/service/history/engine/engineimpl/historyEngine_test.go b/service/history/engine/engineimpl/historyEngine_test.go index 8270e4bf811..aed2550367f 100644 --- a/service/history/engine/engineimpl/historyEngine_test.go +++ b/service/history/engine/engineimpl/historyEngine_test.go @@ -31,9 +31,11 @@ import ( "github.com/golang/mock/gomock" "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally" "go.uber.org/yarpc/api/encoding" "go.uber.org/yarpc/api/transport" @@ -46,6 +48,7 @@ import ( "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" @@ -5245,3 +5248,219 @@ func (s *engineSuite) getActivityScheduledEvent( func (s *engineSuite) printHistory(builder execution.MutableState) string { return thrift.FromHistory(builder.GetHistoryBuilder().GetHistory()).String() } + +func TestRecordChildExecutionCompleted(t *testing.T) { + testCases := []struct { + name string + request *types.RecordChildExecutionCompletedRequest + mockSetup func(*execution.MockMutableState) + wantErr bool + assertErr func(t *testing.T, err error) + }{ + { + name: "workflow not running", + request: &types.RecordChildExecutionCompletedRequest{ + DomainUUID: "58f7998e-9c00-4827-bbd3-6a891b3ca0ca", + InitiatedID: 1, + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "4353ddce-ca34-4c78-9785-dc0b83af4bbc", + }, + CompletedExecution: &types.WorkflowExecution{ + WorkflowID: "wid1", + RunID: "312b2440-2859-4e50-a59f-d92a300a072d", + }, + }, + mockSetup: func(ms *execution.MockMutableState) { + ms.EXPECT().IsWorkflowExecutionRunning().Return(false) + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, workflow.ErrNotExists, err) + }, + }, + { + name: "stale mutable state - not containing initiated event", + request: &types.RecordChildExecutionCompletedRequest{ + DomainUUID: "58f7998e-9c00-4827-bbd3-6a891b3ca0ca", + InitiatedID: 10, + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "4353ddce-ca34-4c78-9785-dc0b83af4bbc", + }, + CompletedExecution: &types.WorkflowExecution{ + WorkflowID: "wid1", + RunID: "312b2440-2859-4e50-a59f-d92a300a072d", + }, + }, + mockSetup: func(ms *execution.MockMutableState) { + ms.EXPECT().IsWorkflowExecutionRunning().Return(true) + ms.EXPECT().GetChildExecutionInfo(int64(10)).Return(nil, false) + ms.EXPECT().GetNextEventID().Return(int64(10)).Times(2) + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, workflow.ErrStaleState, err) + }, + }, + { + name: "pending child execution not found", + request: &types.RecordChildExecutionCompletedRequest{ + DomainUUID: "58f7998e-9c00-4827-bbd3-6a891b3ca0ca", + InitiatedID: 10, + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "4353ddce-ca34-4c78-9785-dc0b83af4bbc", + }, + CompletedExecution: &types.WorkflowExecution{ + WorkflowID: "wid1", + RunID: "312b2440-2859-4e50-a59f-d92a300a072d", + }, + }, + mockSetup: func(ms *execution.MockMutableState) { + ms.EXPECT().IsWorkflowExecutionRunning().Return(true) + ms.EXPECT().GetChildExecutionInfo(int64(10)).Return(nil, false) + ms.EXPECT().GetNextEventID().Return(int64(11)).Times(1) + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, &types.EntityNotExistsError{Message: "Pending child execution not found."}, err) + }, + }, + { + name: "stale mutable state - not containing started event", + request: &types.RecordChildExecutionCompletedRequest{ + DomainUUID: "58f7998e-9c00-4827-bbd3-6a891b3ca0ca", + InitiatedID: 10, + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "4353ddce-ca34-4c78-9785-dc0b83af4bbc", + }, + CompletedExecution: &types.WorkflowExecution{ + WorkflowID: "wid1", + RunID: "312b2440-2859-4e50-a59f-d92a300a072d", + }, + StartedID: 11, + }, + mockSetup: func(ms *execution.MockMutableState) { + ms.EXPECT().IsWorkflowExecutionRunning().Return(true) + ms.EXPECT().GetChildExecutionInfo(int64(10)).Return(&persistence.ChildExecutionInfo{StartedID: common.EmptyEventID}, true) + ms.EXPECT().GetNextEventID().Return(int64(11)).Times(2) + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, workflow.ErrStaleState, err) + }, + }, + { + name: "pending child execution not started", + request: &types.RecordChildExecutionCompletedRequest{ + DomainUUID: "58f7998e-9c00-4827-bbd3-6a891b3ca0ca", + InitiatedID: 10, + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "4353ddce-ca34-4c78-9785-dc0b83af4bbc", + }, + CompletedExecution: &types.WorkflowExecution{ + WorkflowID: "wid1", + RunID: "312b2440-2859-4e50-a59f-d92a300a072d", + }, + StartedID: 11, + }, + mockSetup: func(ms *execution.MockMutableState) { + ms.EXPECT().IsWorkflowExecutionRunning().Return(true) + ms.EXPECT().GetChildExecutionInfo(int64(10)).Return(&persistence.ChildExecutionInfo{StartedID: common.EmptyEventID}, true) + ms.EXPECT().GetNextEventID().Return(int64(12)).Times(1) + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, &types.EntityNotExistsError{Message: "Pending child execution not started."}, err) + }, + }, + { + name: "pending child execution workflowID mismatch", + request: &types.RecordChildExecutionCompletedRequest{ + DomainUUID: "58f7998e-9c00-4827-bbd3-6a891b3ca0ca", + InitiatedID: 10, + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "4353ddce-ca34-4c78-9785-dc0b83af4bbc", + }, + CompletedExecution: &types.WorkflowExecution{ + WorkflowID: "wid1", + RunID: "312b2440-2859-4e50-a59f-d92a300a072d", + }, + StartedID: 11, + }, + mockSetup: func(ms *execution.MockMutableState) { + ms.EXPECT().IsWorkflowExecutionRunning().Return(true) + ms.EXPECT().GetChildExecutionInfo(int64(10)).Return(&persistence.ChildExecutionInfo{StartedID: 11, StartedWorkflowID: "wid0"}, true) + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, &types.EntityNotExistsError{Message: "Pending child execution workflowID mismatch."}, err) + }, + }, + { + name: "success - child workflow completed", + request: &types.RecordChildExecutionCompletedRequest{ + DomainUUID: "58f7998e-9c00-4827-bbd3-6a891b3ca0ca", + InitiatedID: 10, + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: "wid", + RunID: "4353ddce-ca34-4c78-9785-dc0b83af4bbc", + }, + CompletedExecution: &types.WorkflowExecution{ + WorkflowID: "wid1", + RunID: "312b2440-2859-4e50-a59f-d92a300a072d", + }, + CompletionEvent: &types.HistoryEvent{ + EventType: types.EventTypeWorkflowExecutionCompleted.Ptr(), + WorkflowExecutionCompletedEventAttributes: &types.WorkflowExecutionCompletedEventAttributes{ + Result: []byte("success"), + }, + }, + StartedID: 11, + }, + mockSetup: func(ms *execution.MockMutableState) { + ms.EXPECT().IsWorkflowExecutionRunning().Return(true) + ms.EXPECT().GetChildExecutionInfo(int64(10)).Return(&persistence.ChildExecutionInfo{StartedID: 11, StartedWorkflowID: "wid1"}, true) + ms.EXPECT().AddChildWorkflowExecutionCompletedEvent(int64(10), gomock.Any(), gomock.Any()).Return(nil, nil) + }, + wantErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockShard := shard.NewTestContext(t, ctrl, &persistence.ShardInfo{}, config.NewForTest()) + mockDomainCache := mockShard.Resource.DomainCache + testDomainEntry := cache.NewLocalDomainCacheEntryForTest( + &persistence.DomainInfo{ID: constants.TestDomainID}, &persistence.DomainConfig{}, "", + ) + mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(testDomainEntry, nil).AnyTimes() + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(constants.TestDomainID, nil).AnyTimes() + ms := execution.NewMockMutableState(ctrl) + tc.mockSetup(ms) + + historyEngine := &historyEngineImpl{ + shard: mockShard, + clusterMetadata: mockShard.GetClusterMetadata(), + timeSource: mockShard.GetTimeSource(), + metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), + logger: mockShard.GetLogger(), + updateWithActionFn: func(_ context.Context, _ *execution.Cache, _ string, _ types.WorkflowExecution, _ bool, _ time.Time, actionFn func(wfContext execution.Context, mutableState execution.MutableState) error) error { + return actionFn(nil, ms) + }, + } + + err := historyEngine.RecordChildExecutionCompleted(context.Background(), tc.request) + if tc.wantErr { + tc.assertErr(t, err) + } else { + assert.NoError(t, err) + } + }) + } +}