diff --git a/common/persistence/workflowExecutionInfo.go b/common/persistence/workflowExecutionInfo.go index 45de86430ee..80105bc76f3 100644 --- a/common/persistence/workflowExecutionInfo.go +++ b/common/persistence/workflowExecutionInfo.go @@ -164,6 +164,23 @@ func (e *WorkflowExecutionInfo) UpdateWorkflowStateCloseStatus( } +func (e *WorkflowExecutionInfo) IsRunning() bool { + switch e.State { + case WorkflowStateCreated: + return true + case WorkflowStateRunning: + return true + case WorkflowStateCompleted: + return false + case WorkflowStateZombie: + return false + case WorkflowStateCorrupted: + return false + default: + panic(fmt.Sprintf("unknown workflow state: %v", e.State)) + } +} + // UpdateWorkflowStateCloseStatus update the workflow state func (e *WorkflowExecutionInfo) createInvalidStateTransitionErr( currentState int, diff --git a/service/history/common/type.go b/service/history/common/type.go index eda6a3786a3..def10db08d0 100644 --- a/service/history/common/type.go +++ b/service/history/common/type.go @@ -24,6 +24,7 @@ package common import ( "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/service/history/events" ) type ( @@ -31,6 +32,9 @@ type ( NotifyTaskInfo struct { ExecutionInfo *persistence.WorkflowExecutionInfo Tasks []persistence.Task + VersionHistories *persistence.VersionHistories + Activities map[int64]*persistence.ActivityInfo + History events.PersistedBlobs PersistenceError bool } ) diff --git a/service/history/engine/interface.go b/service/history/engine/interface.go index ab2b295f7ee..bb885e03796 100644 --- a/service/history/engine/interface.go +++ b/service/history/engine/interface.go @@ -83,5 +83,6 @@ type ( NotifyNewTransferTasks(info *hcommon.NotifyTaskInfo) NotifyNewTimerTasks(info *hcommon.NotifyTaskInfo) NotifyNewCrossClusterTasks(info *hcommon.NotifyTaskInfo) + NotifyNewReplicationTasks(info *hcommon.NotifyTaskInfo) } ) diff --git a/service/history/engine/interface_mock.go b/service/history/engine/interface_mock.go index 763e04d2904..c4694b9871b 100644 --- a/service/history/engine/interface_mock.go +++ b/service/history/engine/interface_mock.go @@ -249,6 +249,18 @@ func (mr *MockEngineMockRecorder) NotifyNewHistoryEvent(event interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewHistoryEvent", reflect.TypeOf((*MockEngine)(nil).NotifyNewHistoryEvent), event) } +// NotifyNewReplicationTasks mocks base method. +func (m *MockEngine) NotifyNewReplicationTasks(info *common.NotifyTaskInfo) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "NotifyNewReplicationTasks", info) +} + +// NotifyNewReplicationTasks indicates an expected call of NotifyNewReplicationTasks. +func (mr *MockEngineMockRecorder) NotifyNewReplicationTasks(info interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyNewReplicationTasks", reflect.TypeOf((*MockEngine)(nil).NotifyNewReplicationTasks), info) +} + // NotifyNewTimerTasks mocks base method. func (m *MockEngine) NotifyNewTimerTasks(info *common.NotifyTaskInfo) { m.ctrl.T.Helper() diff --git a/service/history/events/blob.go b/service/history/events/blob.go new file mode 100644 index 00000000000..cfd2f08dfdf --- /dev/null +++ b/service/history/events/blob.go @@ -0,0 +1,53 @@ +// The MIT License (MIT) + +// Copyright (c) 2022 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package events + +import ( + "bytes" + + "github.com/uber/cadence/common/persistence" +) + +type ( + // PersistedBlob is a wrapper on persistence.DataBlob with additional field indicating what was persisted. + // Additional fields are used as an identification key among other blobs. + PersistedBlob struct { + persistence.DataBlob + + BranchToken []byte + FirstEventID int64 + } + // PersistedBlobs is a slice of PersistedBlob + PersistedBlobs []PersistedBlob +) + +// Find searches for persisted event blob. Returns nil when not found. +func (blobs PersistedBlobs) Find(branchToken []byte, firstEventID int64) *persistence.DataBlob { + // Linear search is ok here, as we will only have 1-2 persisted blobs per transaction + for _, blob := range blobs { + if bytes.Equal(blob.BranchToken, branchToken) && blob.FirstEventID == firstEventID { + return &blob.DataBlob + } + } + return nil +} diff --git a/service/history/events/blob_test.go b/service/history/events/blob_test.go new file mode 100644 index 00000000000..d8a2b60c327 --- /dev/null +++ b/service/history/events/blob_test.go @@ -0,0 +1,49 @@ +// The MIT License (MIT) + +// Copyright (c) 2022 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package events + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common/persistence" +) + +func TestPersistedBlobs_Find(t *testing.T) { + blob1 := persistence.DataBlob{Data: []byte{1, 2, 3}} + blob2 := persistence.DataBlob{Data: []byte{4, 5, 6}} + blob3 := persistence.DataBlob{Data: []byte{7, 8, 9}} + branchA := []byte{11, 11, 11} + branchB := []byte{22, 22, 22} + persistedBlobs := PersistedBlobs{ + PersistedBlob{BranchToken: branchA, FirstEventID: 100, DataBlob: blob1}, + PersistedBlob{BranchToken: branchA, FirstEventID: 105, DataBlob: blob2}, + PersistedBlob{BranchToken: branchB, FirstEventID: 100, DataBlob: blob3}, + } + assert.Equal(t, blob1, *persistedBlobs.Find(branchA, 100)) + assert.Equal(t, blob2, *persistedBlobs.Find(branchA, 105)) + assert.Equal(t, blob3, *persistedBlobs.Find(branchB, 100)) + assert.Nil(t, persistedBlobs.Find(branchB, 105)) + assert.Nil(t, persistedBlobs.Find([]byte{99}, 100)) +} diff --git a/service/history/execution/context.go b/service/history/execution/context.go index b4c139ce61f..e8ecac3bde1 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -98,16 +98,16 @@ type ( PersistStartWorkflowBatchEvents( ctx context.Context, workflowEvents *persistence.WorkflowEvents, - ) (persistence.DataBlob, error) + ) (events.PersistedBlob, error) PersistNonStartWorkflowBatchEvents( ctx context.Context, workflowEvents *persistence.WorkflowEvents, - ) (persistence.DataBlob, error) + ) (events.PersistedBlob, error) CreateWorkflowExecution( ctx context.Context, newWorkflow *persistence.WorkflowSnapshot, - historySize int64, + persistedHistory events.PersistedBlob, createMode persistence.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64, @@ -337,7 +337,7 @@ func (c *contextImpl) LoadWorkflowExecution( func (c *contextImpl) CreateWorkflowExecution( ctx context.Context, newWorkflow *persistence.WorkflowSnapshot, - historySize int64, + persistedHistory events.PersistedBlob, createMode persistence.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64, @@ -359,6 +359,7 @@ func (c *contextImpl) CreateWorkflowExecution( DomainName: domainName, } + historySize := int64(len(persistedHistory.Data)) historySize += c.GetHistorySize() c.SetHistorySize(historySize) createRequest.NewWorkflowSnapshot.ExecutionStats = &persistence.ExecutionStats{ @@ -368,12 +369,12 @@ func (c *contextImpl) CreateWorkflowExecution( resp, err := c.createWorkflowExecutionWithRetry(ctx, createRequest) if err != nil { if c.isPersistenceTimeoutError(err) { - c.notifyTasksFromWorkflowSnapshot(newWorkflow, true) + c.notifyTasksFromWorkflowSnapshot(newWorkflow, events.PersistedBlobs{persistedHistory}, true) } return err } - c.notifyTasksFromWorkflowSnapshot(newWorkflow, false) + c.notifyTasksFromWorkflowSnapshot(newWorkflow, events.PersistedBlobs{persistedHistory}, false) // finally emit session stats emitSessionUpdateStats( @@ -410,6 +411,8 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( if err != nil { return err } + + var persistedBlobs events.PersistedBlobs resetHistorySize := c.GetHistorySize() for _, workflowEvents := range resetWorkflowEventsSeq { blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) @@ -417,6 +420,7 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( return err } resetHistorySize += int64(len(blob.Data)) + persistedBlobs = append(persistedBlobs, blob) } c.SetHistorySize(resetHistorySize) resetWorkflow.ExecutionStats = &persistence.ExecutionStats{ @@ -451,6 +455,7 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( newWorkflow.ExecutionStats = &persistence.ExecutionStats{ HistorySize: newWorkflowSizeSize, } + persistedBlobs = append(persistedBlobs, blob) } var currentWorkflow *persistence.WorkflowMutation @@ -477,6 +482,7 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( return err } currentWorkflowSize += int64(len(blob.Data)) + persistedBlobs = append(persistedBlobs, blob) } currentContext.SetHistorySize(currentWorkflowSize) currentWorkflow.ExecutionStats = &persistence.ExecutionStats{ @@ -507,9 +513,9 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( }) if err != nil { if c.isPersistenceTimeoutError(err) { - c.notifyTasksFromWorkflowSnapshot(resetWorkflow, true) - c.notifyTasksFromWorkflowSnapshot(newWorkflow, true) - c.notifyTasksFromWorkflowMutation(currentWorkflow, true) + c.notifyTasksFromWorkflowSnapshot(resetWorkflow, persistedBlobs, true) + c.notifyTasksFromWorkflowSnapshot(newWorkflow, persistedBlobs, true) + c.notifyTasksFromWorkflowMutation(currentWorkflow, persistedBlobs, true) } return err } @@ -532,9 +538,9 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( workflowCloseState, )) - c.notifyTasksFromWorkflowSnapshot(resetWorkflow, false) - c.notifyTasksFromWorkflowSnapshot(newWorkflow, false) - c.notifyTasksFromWorkflowMutation(currentWorkflow, false) + c.notifyTasksFromWorkflowSnapshot(resetWorkflow, persistedBlobs, false) + c.notifyTasksFromWorkflowSnapshot(newWorkflow, persistedBlobs, false) + c.notifyTasksFromWorkflowMutation(currentWorkflow, persistedBlobs, false) // finally emit session stats domainName := c.GetDomainName() @@ -671,7 +677,7 @@ func (c *contextImpl) UpdateWorkflowExecutionTasks( }) if err != nil { if c.isPersistenceTimeoutError(err) { - c.notifyTasksFromWorkflowMutation(currentWorkflow, true) + c.notifyTasksFromWorkflowMutation(currentWorkflow, nil, true) } return err } @@ -680,7 +686,7 @@ func (c *contextImpl) UpdateWorkflowExecutionTasks( c.updateCondition = currentWorkflow.ExecutionInfo.NextEventID // notify current workflow tasks - c.notifyTasksFromWorkflowMutation(currentWorkflow, false) + c.notifyTasksFromWorkflowMutation(currentWorkflow, nil, false) emitSessionUpdateStats( c.metricsClient, @@ -715,6 +721,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( return err } + var persistedBlobs events.PersistedBlobs currentWorkflowSize := c.GetHistorySize() for _, workflowEvents := range currentWorkflowEventsSeq { blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) @@ -722,6 +729,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( return err } currentWorkflowSize += int64(len(blob.Data)) + persistedBlobs = append(persistedBlobs, blob) } c.SetHistorySize(currentWorkflowSize) currentWorkflow.ExecutionStats = &persistence.ExecutionStats{ @@ -748,7 +756,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( newWorkflowSizeSize := newContext.GetHistorySize() startEvents := newWorkflowEventsSeq[0] firstEventID := startEvents.Events[0].ID - var blob persistence.DataBlob + var blob events.PersistedBlob if firstEventID == common.FirstEventID { blob, err = c.PersistStartWorkflowBatchEvents(ctx, startEvents) if err != nil { @@ -762,6 +770,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( } } + persistedBlobs = append(persistedBlobs, blob) newWorkflowSizeSize += int64(len(blob.Data)) newContext.SetHistorySize(newWorkflowSizeSize) newWorkflow.ExecutionStats = &persistence.ExecutionStats{ @@ -798,8 +807,8 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( }) if err != nil { if c.isPersistenceTimeoutError(err) { - c.notifyTasksFromWorkflowMutation(currentWorkflow, true) - c.notifyTasksFromWorkflowSnapshot(newWorkflow, true) + c.notifyTasksFromWorkflowMutation(currentWorkflow, persistedBlobs, true) + c.notifyTasksFromWorkflowSnapshot(newWorkflow, persistedBlobs, true) } return err } @@ -825,10 +834,10 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( )) // notify current workflow tasks - c.notifyTasksFromWorkflowMutation(currentWorkflow, false) + c.notifyTasksFromWorkflowMutation(currentWorkflow, persistedBlobs, false) // notify new workflow tasks - c.notifyTasksFromWorkflowSnapshot(newWorkflow, false) + c.notifyTasksFromWorkflowSnapshot(newWorkflow, persistedBlobs, false) // finally emit session stats domainName := c.GetDomainName() @@ -859,6 +868,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( func (c *contextImpl) notifyTasksFromWorkflowSnapshot( workflowSnapShot *persistence.WorkflowSnapshot, + history events.PersistedBlobs, persistenceError bool, ) { if workflowSnapShot == nil { @@ -867,15 +877,20 @@ func (c *contextImpl) notifyTasksFromWorkflowSnapshot( c.notifyTasks( workflowSnapShot.ExecutionInfo, + workflowSnapShot.VersionHistories, + activityInfosToMap(workflowSnapShot.ActivityInfos), workflowSnapShot.TransferTasks, workflowSnapShot.TimerTasks, workflowSnapShot.CrossClusterTasks, + workflowSnapShot.ReplicationTasks, + history, persistenceError, ) } func (c *contextImpl) notifyTasksFromWorkflowMutation( workflowMutation *persistence.WorkflowMutation, + history events.PersistedBlobs, persistenceError bool, ) { if workflowMutation == nil { @@ -884,18 +899,34 @@ func (c *contextImpl) notifyTasksFromWorkflowMutation( c.notifyTasks( workflowMutation.ExecutionInfo, + workflowMutation.VersionHistories, + activityInfosToMap(workflowMutation.UpsertActivityInfos), workflowMutation.TransferTasks, workflowMutation.TimerTasks, workflowMutation.CrossClusterTasks, + workflowMutation.ReplicationTasks, + history, persistenceError, ) } +func activityInfosToMap(ais []*persistence.ActivityInfo) map[int64]*persistence.ActivityInfo { + m := make(map[int64]*persistence.ActivityInfo, len(ais)) + for _, ai := range ais { + m[ai.ScheduleID] = ai + } + return m +} + func (c *contextImpl) notifyTasks( executionInfo *persistence.WorkflowExecutionInfo, + versionHistories *persistence.VersionHistories, + activities map[int64]*persistence.ActivityInfo, transferTasks []persistence.Task, timerTasks []persistence.Task, crossClusterTasks []persistence.Task, + replicationTasks []persistence.Task, + history events.PersistedBlobs, persistenceError bool, ) { transferTaskInfo := &hcommon.NotifyTaskInfo{ @@ -913,10 +944,19 @@ func (c *contextImpl) notifyTasks( Tasks: crossClusterTasks, PersistenceError: persistenceError, } + replicationTaskInfo := &hcommon.NotifyTaskInfo{ + ExecutionInfo: executionInfo, + Tasks: replicationTasks, + VersionHistories: versionHistories, + Activities: activities, + History: history, + PersistenceError: persistenceError, + } c.shard.GetEngine().NotifyNewTransferTasks(transferTaskInfo) c.shard.GetEngine().NotifyNewTimerTasks(timerTaskInfo) c.shard.GetEngine().NotifyNewCrossClusterTasks(crossClusterTaskInfo) + c.shard.GetEngine().NotifyNewReplicationTasks(replicationTaskInfo) } func (c *contextImpl) mergeContinueAsNewReplicationTasks( @@ -970,10 +1010,10 @@ func (c *contextImpl) mergeContinueAsNewReplicationTasks( func (c *contextImpl) PersistStartWorkflowBatchEvents( ctx context.Context, workflowEvents *persistence.WorkflowEvents, -) (persistence.DataBlob, error) { +) (events.PersistedBlob, error) { if len(workflowEvents.Events) == 0 { - return persistence.DataBlob{}, &types.InternalServiceError{ + return events.PersistedBlob{}, &types.InternalServiceError{ Message: "cannot persist first workflow events with empty events", } } @@ -981,7 +1021,7 @@ func (c *contextImpl) PersistStartWorkflowBatchEvents( domainID := workflowEvents.DomainID domainName, err := c.shard.GetDomainCache().GetDomainName(domainID) if err != nil { - return persistence.DataBlob{}, err + return events.PersistedBlob{}, err } workflowID := workflowEvents.WorkflowID runID := workflowEvents.RunID @@ -989,8 +1029,6 @@ func (c *contextImpl) PersistStartWorkflowBatchEvents( WorkflowID: workflowEvents.WorkflowID, RunID: workflowEvents.RunID, } - branchToken := workflowEvents.BranchToken - events := workflowEvents.Events resp, err := c.appendHistoryV2EventsWithRetry( ctx, @@ -999,38 +1037,40 @@ func (c *contextImpl) PersistStartWorkflowBatchEvents( &persistence.AppendHistoryNodesRequest{ IsNewBranch: true, Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID), - BranchToken: branchToken, - Events: events, + BranchToken: workflowEvents.BranchToken, + Events: workflowEvents.Events, DomainName: domainName, // TransactionID is set by shard context }, ) if err != nil { - return persistence.DataBlob{}, err + return events.PersistedBlob{}, err } - return resp.DataBlob, nil + return events.PersistedBlob{ + DataBlob: resp.DataBlob, + BranchToken: workflowEvents.BranchToken, + FirstEventID: workflowEvents.Events[0].ID, + }, nil } func (c *contextImpl) PersistNonStartWorkflowBatchEvents( ctx context.Context, workflowEvents *persistence.WorkflowEvents, -) (persistence.DataBlob, error) { +) (events.PersistedBlob, error) { if len(workflowEvents.Events) == 0 { - return persistence.DataBlob{}, nil // allow update workflow without events + return events.PersistedBlob{}, nil // allow update workflow without events } domainID := workflowEvents.DomainID domainName, err := c.shard.GetDomainCache().GetDomainName(domainID) if err != nil { - return persistence.DataBlob{}, err + return events.PersistedBlob{}, err } execution := types.WorkflowExecution{ WorkflowID: workflowEvents.WorkflowID, RunID: workflowEvents.RunID, } - branchToken := workflowEvents.BranchToken - events := workflowEvents.Events resp, err := c.appendHistoryV2EventsWithRetry( ctx, @@ -1038,16 +1078,20 @@ func (c *contextImpl) PersistNonStartWorkflowBatchEvents( execution, &persistence.AppendHistoryNodesRequest{ IsNewBranch: false, - BranchToken: branchToken, - Events: events, + BranchToken: workflowEvents.BranchToken, + Events: workflowEvents.Events, DomainName: domainName, // TransactionID is set by shard context }, ) if err != nil { - return persistence.DataBlob{}, err + return events.PersistedBlob{}, err } - return resp.DataBlob, nil + return events.PersistedBlob{ + DataBlob: resp.DataBlob, + BranchToken: workflowEvents.BranchToken, + FirstEventID: workflowEvents.Events[0].ID, + }, nil } func (c *contextImpl) appendHistoryV2EventsWithRetry( diff --git a/service/history/execution/context_mock.go b/service/history/execution/context_mock.go index 5219cee120a..db3172303c7 100644 --- a/service/history/execution/context_mock.go +++ b/service/history/execution/context_mock.go @@ -35,6 +35,7 @@ import ( persistence "github.com/uber/cadence/common/persistence" types "github.com/uber/cadence/common/types" + events "github.com/uber/cadence/service/history/events" ) // MockContext is a mock of Context interface. @@ -87,17 +88,17 @@ func (mr *MockContextMockRecorder) ConflictResolveWorkflowExecution(ctx, now, co } // CreateWorkflowExecution mocks base method. -func (m *MockContext) CreateWorkflowExecution(ctx context.Context, newWorkflow *persistence.WorkflowSnapshot, historySize int64, createMode persistence.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64) error { +func (m *MockContext) CreateWorkflowExecution(ctx context.Context, newWorkflow *persistence.WorkflowSnapshot, persistedHistory events.PersistedBlob, createMode persistence.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, newWorkflow, historySize, createMode, prevRunID, prevLastWriteVersion) + ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion) ret0, _ := ret[0].(error) return ret0 } // CreateWorkflowExecution indicates an expected call of CreateWorkflowExecution. -func (mr *MockContextMockRecorder) CreateWorkflowExecution(ctx, newWorkflow, historySize, createMode, prevRunID, prevLastWriteVersion interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) CreateWorkflowExecution(ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockContext)(nil).CreateWorkflowExecution), ctx, newWorkflow, historySize, createMode, prevRunID, prevLastWriteVersion) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockContext)(nil).CreateWorkflowExecution), ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion) } // GetDomainID mocks base method. @@ -230,10 +231,10 @@ func (mr *MockContextMockRecorder) Lock(ctx interface{}) *gomock.Call { } // PersistNonStartWorkflowBatchEvents mocks base method. -func (m *MockContext) PersistNonStartWorkflowBatchEvents(ctx context.Context, workflowEvents *persistence.WorkflowEvents) (persistence.DataBlob, error) { +func (m *MockContext) PersistNonStartWorkflowBatchEvents(ctx context.Context, workflowEvents *persistence.WorkflowEvents) (events.PersistedBlob, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PersistNonStartWorkflowBatchEvents", ctx, workflowEvents) - ret0, _ := ret[0].(persistence.DataBlob) + ret0, _ := ret[0].(events.PersistedBlob) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -245,10 +246,10 @@ func (mr *MockContextMockRecorder) PersistNonStartWorkflowBatchEvents(ctx, workf } // PersistStartWorkflowBatchEvents mocks base method. -func (m *MockContext) PersistStartWorkflowBatchEvents(ctx context.Context, workflowEvents *persistence.WorkflowEvents) (persistence.DataBlob, error) { +func (m *MockContext) PersistStartWorkflowBatchEvents(ctx context.Context, workflowEvents *persistence.WorkflowEvents) (events.PersistedBlob, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PersistStartWorkflowBatchEvents", ctx, workflowEvents) - ret0, _ := ret[0].(persistence.DataBlob) + ret0, _ := ret[0].(events.PersistedBlob) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index 37a867f612d..730e5dbd566 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -1547,20 +1547,7 @@ func (e *mutableStateBuilder) GetPreviousStartedEventID() int64 { } func (e *mutableStateBuilder) IsWorkflowExecutionRunning() bool { - switch e.executionInfo.State { - case persistence.WorkflowStateCreated: - return true - case persistence.WorkflowStateRunning: - return true - case persistence.WorkflowStateCompleted: - return false - case persistence.WorkflowStateZombie: - return false - case persistence.WorkflowStateCorrupted: - return false - default: - panic(fmt.Sprintf("unknown workflow state: %v", e.executionInfo.State)) - } + return e.executionInfo.IsRunning() } func (e *mutableStateBuilder) IsWorkflowCompleted() bool { diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 169aa99da84..56105f39598 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -25,6 +25,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "time" @@ -743,7 +744,7 @@ func (e *historyEngineImpl) startWorkflowHelper( err = wfContext.CreateWorkflowExecution( ctx, newWorkflow, - int64(len(historyBlob.Data)), + historyBlob, createMode, prevRunID, prevLastWriteVersion, @@ -809,7 +810,7 @@ func (e *historyEngineImpl) startWorkflowHelper( err = wfContext.CreateWorkflowExecution( ctx, newWorkflow, - int64(len(historyBlob.Data)), + historyBlob, createMode, prevRunID, t.LastWriteVersion, @@ -2945,6 +2946,59 @@ func (e *historyEngineImpl) NotifyNewCrossClusterTasks( } } +func (e *historyEngineImpl) NotifyNewReplicationTasks(info *hcommon.NotifyTaskInfo) { + for _, task := range info.Tasks { + hTask, err := hydrateReplicationTask(task, info.ExecutionInfo, info.VersionHistories, info.Activities, info.History) + if err != nil { + e.logger.Error("failed to preemptively hydrate replication task", tag.Error(err)) + continue + } + e.replicationTaskStore.Put(hTask) + } +} + +func hydrateReplicationTask( + task persistence.Task, + exec *persistence.WorkflowExecutionInfo, + versionHistories *persistence.VersionHistories, + activities map[int64]*persistence.ActivityInfo, + history events.PersistedBlobs, +) (*types.ReplicationTask, error) { + info := persistence.ReplicationTaskInfo{ + DomainID: exec.DomainID, + WorkflowID: exec.WorkflowID, + RunID: exec.RunID, + TaskType: task.GetType(), + CreationTime: task.GetVisibilityTimestamp().UnixNano(), + TaskID: task.GetTaskID(), + Version: task.GetVersion(), + } + + switch t := task.(type) { + case *persistence.HistoryReplicationTask: + info.BranchToken = t.BranchToken + info.NewRunBranchToken = t.NewRunBranchToken + info.FirstEventID = t.FirstEventID + info.NextEventID = t.NextEventID + case *persistence.SyncActivityTask: + info.ScheduledID = t.ScheduledID + case *persistence.FailoverMarkerTask: + // No specific fields, but supported + default: + return nil, errors.New("unknown replication task") + } + + hydrator := replication.NewImmediateTaskHydrator( + exec.IsRunning(), + versionHistories, + activities, + history.Find(info.BranchToken, info.FirstEventID), + history.Find(info.NewRunBranchToken, common.FirstEventID), + ) + + return hydrator.Hydrate(context.Background(), info) +} + func (e *historyEngineImpl) ResetTransferQueue( ctx context.Context, clusterName string, diff --git a/service/history/ndc/new_workflow_transaction_manager.go b/service/history/ndc/new_workflow_transaction_manager.go index 574f57f6950..a3c55baff0b 100644 --- a/service/history/ndc/new_workflow_transaction_manager.go +++ b/service/history/ndc/new_workflow_transaction_manager.go @@ -29,6 +29,7 @@ import ( "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/execution" ) @@ -157,7 +158,7 @@ func (r *transactionManagerForNewWorkflowImpl) createAsCurrent( return err } - var targetWorkflowHistoryBlob persistence.DataBlob + var targetWorkflowHistoryBlob events.PersistedBlob if len(targetWorkflowEventsSeq[0].Events) > 0 { if targetWorkflowEventsSeq[0].Events[0].GetEventType() == types.EventTypeWorkflowExecutionStarted { targetWorkflowHistoryBlob, err = targetWorkflow.GetContext().PersistStartWorkflowBatchEvents( @@ -187,7 +188,7 @@ func (r *transactionManagerForNewWorkflowImpl) createAsCurrent( return targetWorkflow.GetContext().CreateWorkflowExecution( ctx, targetWorkflowSnapshot, - int64(len(targetWorkflowHistoryBlob.Data)), + targetWorkflowHistoryBlob, createMode, prevRunID, prevLastWriteVersion, @@ -201,7 +202,7 @@ func (r *transactionManagerForNewWorkflowImpl) createAsCurrent( return targetWorkflow.GetContext().CreateWorkflowExecution( ctx, targetWorkflowSnapshot, - int64(len(targetWorkflowHistoryBlob.Data)), + targetWorkflowHistoryBlob, createMode, prevRunID, prevLastWriteVersion, @@ -235,7 +236,7 @@ func (r *transactionManagerForNewWorkflowImpl) createAsZombie( return err } - var targetWorkflowHistoryBlob persistence.DataBlob + var targetWorkflowHistoryBlob events.PersistedBlob if len(targetWorkflowEventsSeq[0].Events) > 0 { if targetWorkflowEventsSeq[0].Events[0].GetEventType() == types.EventTypeWorkflowExecutionStarted { targetWorkflowHistoryBlob, err = targetWorkflow.GetContext().PersistStartWorkflowBatchEvents( @@ -271,7 +272,7 @@ func (r *transactionManagerForNewWorkflowImpl) createAsZombie( err = targetWorkflow.GetContext().CreateWorkflowExecution( ctx, targetWorkflowSnapshot, - int64(len(targetWorkflowHistoryBlob.Data)), + targetWorkflowHistoryBlob, createMode, prevRunID, prevLastWriteVersion, diff --git a/service/history/ndc/new_workflow_transaction_manager_test.go b/service/history/ndc/new_workflow_transaction_manager_test.go index eea1ea4355f..339f06b93ff 100644 --- a/service/history/ndc/new_workflow_transaction_manager_test.go +++ b/service/history/ndc/new_workflow_transaction_manager_test.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/execution" ) @@ -119,7 +120,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Brand }, }, } - workflowHistorySize := int64(12345) + workflowHistory := events.PersistedBlob{DataBlob: persistence.DataBlob{Data: make([]byte, 12345)}} mutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{ DomainID: domainID, WorkflowID: workflowID, @@ -136,11 +137,11 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Brand context.EXPECT().PersistStartWorkflowBatchEvents( gomock.Any(), workflowEventsSeq[0], - ).Return(persistence.DataBlob{Data: make([]byte, workflowHistorySize)}, nil).Times(1) + ).Return(workflowHistory, nil).Times(1) context.EXPECT().CreateWorkflowExecution( gomock.Any(), workflowSnapshot, - workflowHistorySize, + workflowHistory, persistence.CreateWorkflowModeBrandNew, "", int64(0), @@ -189,7 +190,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat }, }, } - targetWorkflowHistorySize := int64(12345) + targetWorkflowHistory := events.PersistedBlob{DataBlob: persistence.DataBlob{Data: make([]byte, 12345)}} targetMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{ DomainID: domainID, WorkflowID: workflowID, @@ -214,11 +215,11 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat targetContext.EXPECT().PersistNonStartWorkflowBatchEvents( gomock.Any(), targetWorkflowEventsSeq[0], - ).Return(persistence.DataBlob{Data: make([]byte, targetWorkflowHistorySize)}, nil).Times(1) + ).Return(targetWorkflowHistory, nil).Times(1) targetContext.EXPECT().CreateWorkflowExecution( gomock.Any(), targetWorkflowSnapshot, - targetWorkflowHistorySize, + targetWorkflowHistory, persistence.CreateWorkflowModeWorkflowIDReuse, currentRunID, currentLastWriteVersion, @@ -270,7 +271,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat }, }, } - targetWorkflowHistorySize := int64(12345) + targetWorkflowHistory := events.PersistedBlob{DataBlob: persistence.DataBlob{Data: make([]byte, 12345)}} targetMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{ DomainID: domainID, WorkflowID: workflowID, @@ -289,11 +290,11 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat targetContext.EXPECT().PersistStartWorkflowBatchEvents( gomock.Any(), targetWorkflowEventsSeq[0], - ).Return(persistence.DataBlob{Data: make([]byte, targetWorkflowHistorySize)}, nil).Times(1) + ).Return(targetWorkflowHistory, nil).Times(1) targetContext.EXPECT().CreateWorkflowExecution( gomock.Any(), targetWorkflowSnapshot, - targetWorkflowHistorySize, + targetWorkflowHistory, persistence.CreateWorkflowModeZombie, "", int64(0), @@ -346,7 +347,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat }, }, } - targetWorkflowHistorySize := int64(12345) + targetWorkflowHistory := events.PersistedBlob{DataBlob: persistence.DataBlob{Data: make([]byte, 12345)}} targetMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{ DomainID: domainID, WorkflowID: workflowID, @@ -365,11 +366,11 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat targetContext.EXPECT().PersistNonStartWorkflowBatchEvents( gomock.Any(), targetWorkflowEventsSeq[0], - ).Return(persistence.DataBlob{Data: make([]byte, targetWorkflowHistorySize)}, nil).Times(1) + ).Return(targetWorkflowHistory, nil).Times(1) targetContext.EXPECT().CreateWorkflowExecution( gomock.Any(), targetWorkflowSnapshot, - targetWorkflowHistorySize, + targetWorkflowHistory, persistence.CreateWorkflowModeZombie, "", int64(0), diff --git a/service/history/ndc/transaction_manager_test.go b/service/history/ndc/transaction_manager_test.go index e3585a28170..f18c994d6d3 100644 --- a/service/history/ndc/transaction_manager_test.go +++ b/service/history/ndc/transaction_manager_test.go @@ -40,6 +40,7 @@ import ( "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/constants" + "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" @@ -159,7 +160,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Op mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() mutableState.EXPECT().GetDomainEntry().Return(s.domainEntry).AnyTimes() mutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{RunID: runID}).Times(1) - context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(persistence.DataBlob{}, nil).Times(1) + context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyActive, (*execution.TransactionPolicy)(nil), ).Return(nil).Times(1) @@ -233,7 +234,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Cl DomainName: domainName, }).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Once() - context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(persistence.DataBlob{}, nil).Times(1) + context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), ).Return(nil).Times(1) @@ -266,7 +267,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_O mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() mutableState.EXPECT().GetDomainEntry().Return(s.domainEntry).AnyTimes() context.EXPECT().ReapplyEvents([]*persistence.WorkflowEvents{workflowEvents}).Times(1) - context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(persistence.DataBlob{}, nil).Times(1) + context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), ).Return(nil).Times(1) @@ -313,7 +314,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_C DomainName: domainName, }).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Once() context.EXPECT().ReapplyEvents([]*persistence.WorkflowEvents{workflowEvents}).Times(1) - context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(persistence.DataBlob{}, nil).Times(1) + context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), ).Return(nil).Times(1) @@ -367,7 +368,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_NotCurrentWorkflow_Active DomainName: domainName, }).Return(&persistence.GetCurrentExecutionResponse{RunID: currentRunID}, nil).Once() context.EXPECT().ReapplyEvents([]*persistence.WorkflowEvents{workflowEvents}).Times(1) - context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(persistence.DataBlob{}, nil).Times(1) + context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), ).Return(nil).Times(1) @@ -419,7 +420,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_NotCurrentWorkflow_Passiv DomainName: domainName, }).Return(&persistence.GetCurrentExecutionResponse{RunID: currentRunID}, nil).Once() context.EXPECT().ReapplyEvents([]*persistence.WorkflowEvents{workflowEvents}).Times(1) - context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(persistence.DataBlob{}, nil).Times(1) + context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), ).Return(nil).Times(1) diff --git a/service/history/replication/task_hydrator.go b/service/history/replication/task_hydrator.go index b8fad958cd9..93652d0dfe9 100644 --- a/service/history/replication/task_hydrator.go +++ b/service/history/replication/task_hydrator.go @@ -60,6 +60,14 @@ type ( } ) +// NewImmediateTaskHydrator will enrich replication tasks with additional information that is immediately available. +func NewImmediateTaskHydrator(isRunning bool, vh *persistence.VersionHistories, activities map[int64]*persistence.ActivityInfo, blob, nextBlob *persistence.DataBlob) TaskHydrator { + return TaskHydrator{ + history: immediateHistoryProvider{blob: blob, nextBlob: nextBlob}, + msProvider: immediateMutableStateProvider{immediateMutableState{isRunning, activities, vh}}, + } +} + // NewDeferredTaskHydrator will enrich replication tasks with additional information that is not available on hand, // but is rather loaded in a deferred way later from a database and cache. func NewDeferredTaskHydrator(shardID int, historyManager persistence.HistoryManager, executionCache *execution.Cache, domains domainCache) TaskHydrator { @@ -275,3 +283,47 @@ func (l mutableStateLoader) GetMutableState(ctx context.Context, domainID, workf func timeToUnixNano(t time.Time) *int64 { return common.Int64Ptr(t.UnixNano()) } + +type immediateHistoryProvider struct { + blob *persistence.DataBlob + nextBlob *persistence.DataBlob +} + +func (h immediateHistoryProvider) GetEventBlob(_ context.Context, _ persistence.ReplicationTaskInfo) (*types.DataBlob, error) { + if h.blob == nil { + return nil, errors.New("history blob not set") + } + return h.blob.ToInternal(), nil +} + +func (h immediateHistoryProvider) GetNextRunEventBlob(_ context.Context, _ persistence.ReplicationTaskInfo) (*types.DataBlob, error) { + if h.nextBlob == nil { + return nil, nil // Expected and common + } + return h.nextBlob.ToInternal(), nil +} + +type immediateMutableStateProvider struct { + ms immediateMutableState +} + +func (r immediateMutableStateProvider) GetMutableState(_ context.Context, _, _, _ string) (mutableState, execution.ReleaseFunc, error) { + return r.ms, execution.NoopReleaseFn, nil +} + +type immediateMutableState struct { + isRunning bool + activities map[int64]*persistence.ActivityInfo + versionHistories *persistence.VersionHistories +} + +func (ms immediateMutableState) IsWorkflowExecutionRunning() bool { + return ms.isRunning +} +func (ms immediateMutableState) GetActivityInfo(id int64) (*persistence.ActivityInfo, bool) { + info, ok := ms.activities[id] + return info, ok +} +func (ms immediateMutableState) GetVersionHistories() *persistence.VersionHistories { + return ms.versionHistories +} diff --git a/service/history/replication/task_hydrator_test.go b/service/history/replication/task_hydrator_test.go index 5b47632c258..55b4a7f4971 100644 --- a/service/history/replication/task_hydrator_test.go +++ b/service/history/replication/task_hydrator_test.go @@ -596,6 +596,181 @@ func TestMutableStateLoader_GetMutableState(t *testing.T) { assert.NotNil(t, release) } +func TestImmediateTaskHydrator(t *testing.T) { + activityInfo := persistence.ActivityInfo{ + Version: testVersion, + ScheduleID: testScheduleID, + ScheduledTime: testScheduleTime, + StartedID: testStartedID, + StartedTime: testStartedTime, + DomainID: testDomainID, + LastHeartBeatUpdatedTime: testHeartbeatTime, + Details: testDetails, + Attempt: testAttempt, + LastFailureReason: testLastFailureReason, + LastFailureDetails: testLastFailureDetails, + LastWorkerIdentity: testWorkerIdentity, + } + versionHistories := &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: testBranchTokenVersionHistory, + Items: []*persistence.VersionHistoryItem{ + {EventID: testFirstEventID, Version: testVersion}, + }, + }, + }, + } + + tests := []struct { + name string + versionHistories *persistence.VersionHistories + activities map[int64]*persistence.ActivityInfo + blob *persistence.DataBlob + nextRunBlob *persistence.DataBlob + task persistence.ReplicationTaskInfo + expectResult *types.ReplicationTask + expectErr string + }{ + { + name: "sync activity task - happy path", + versionHistories: versionHistories, + activities: map[int64]*persistence.ActivityInfo{testScheduleID: &activityInfo}, + task: persistence.ReplicationTaskInfo{ + TaskType: persistence.ReplicationTaskTypeSyncActivity, + TaskID: testTaskID, + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + ScheduledID: testScheduleID, + CreationTime: testCreationTime, + }, + expectResult: &types.ReplicationTask{ + TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(), + SourceTaskID: testTaskID, + CreationTime: common.Int64Ptr(testCreationTime), + SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{ + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + Version: testVersion, + ScheduledID: testScheduleID, + ScheduledTime: common.Int64Ptr(testScheduleTime.UnixNano()), + StartedID: testStartedID, + StartedTime: common.Int64Ptr(testStartedTime.UnixNano()), + LastHeartbeatTime: common.Int64Ptr(testHeartbeatTime.UnixNano()), + Details: testDetails, + Attempt: testAttempt, + LastFailureReason: common.StringPtr(testLastFailureReason), + LastWorkerIdentity: testWorkerIdentity, + LastFailureDetails: testLastFailureDetails, + VersionHistory: &types.VersionHistory{ + Items: []*types.VersionHistoryItem{{EventID: testFirstEventID, Version: testVersion}}, + BranchToken: testBranchTokenVersionHistory, + }, + }, + }, + }, + { + name: "sync activity task - missing activity info", + versionHistories: versionHistories, + activities: map[int64]*persistence.ActivityInfo{}, + task: persistence.ReplicationTaskInfo{ + TaskType: persistence.ReplicationTaskTypeSyncActivity, + ScheduledID: testScheduleID, + }, + expectResult: nil, + }, + { + name: "history task - happy path", + versionHistories: versionHistories, + blob: persistence.NewDataBlobFromInternal(testDataBlob), + nextRunBlob: persistence.NewDataBlobFromInternal(testDataBlobNewRun), + task: persistence.ReplicationTaskInfo{ + TaskType: persistence.ReplicationTaskTypeHistory, + TaskID: testTaskID, + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + FirstEventID: testFirstEventID, + NextEventID: testNextEventID, + BranchToken: testBranchToken, + NewRunBranchToken: testBranchTokenNewRun, + Version: testVersion, + CreationTime: testCreationTime, + }, + expectResult: &types.ReplicationTask{ + TaskType: types.ReplicationTaskTypeHistoryV2.Ptr(), + SourceTaskID: testTaskID, + CreationTime: common.Int64Ptr(testCreationTime), + HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{ + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + VersionHistoryItems: []*types.VersionHistoryItem{{EventID: testFirstEventID, Version: testVersion}}, + Events: testDataBlob, + NewRunEvents: testDataBlobNewRun, + }, + }, + }, + { + name: "history task - no next run", + versionHistories: versionHistories, + blob: persistence.NewDataBlobFromInternal(testDataBlob), + task: persistence.ReplicationTaskInfo{ + TaskType: persistence.ReplicationTaskTypeHistory, + TaskID: testTaskID, + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + FirstEventID: testFirstEventID, + NextEventID: testNextEventID, + BranchToken: testBranchToken, + Version: testVersion, + CreationTime: testCreationTime, + }, + expectResult: &types.ReplicationTask{ + TaskType: types.ReplicationTaskTypeHistoryV2.Ptr(), + SourceTaskID: testTaskID, + CreationTime: common.Int64Ptr(testCreationTime), + HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{ + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + VersionHistoryItems: []*types.VersionHistoryItem{{EventID: testFirstEventID, Version: testVersion}}, + Events: testDataBlob, + }, + }, + }, + { + name: "history task - missing data blob", + versionHistories: versionHistories, + task: persistence.ReplicationTaskInfo{ + TaskType: persistence.ReplicationTaskTypeHistory, + FirstEventID: testFirstEventID, + Version: testVersion, + BranchToken: testBranchToken, + }, + expectErr: "history blob not set", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + h := NewImmediateTaskHydrator(true, tt.versionHistories, tt.activities, tt.blob, tt.nextRunBlob) + result, err := h.Hydrate(context.Background(), tt.task) + + if tt.expectErr != "" { + assert.EqualError(t, err, tt.expectErr) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectResult, result) + } + }) + } +} + type fakeMutableStateProvider struct { workflows map[definition.WorkflowIdentifier]mutableState released bool diff --git a/service/history/reset/resetter.go b/service/history/reset/resetter.go index 3ebd4099845..a37301addb7 100644 --- a/service/history/reset/resetter.go +++ b/service/history/reset/resetter.go @@ -283,7 +283,6 @@ func (r *workflowResetterImpl) persistToDB( return err } - resetHistorySize := int64(0) if len(resetWorkflowEventsSeq) != 1 { return &types.InternalServiceError{ Message: "there should be EXACTLY one batch of events for reset", @@ -291,16 +290,15 @@ func (r *workflowResetterImpl) persistToDB( } // reset workflow with decision task failed or timed out - blob, err := resetWorkflow.GetContext().PersistNonStartWorkflowBatchEvents(ctx, resetWorkflowEventsSeq[0]) + resetWorkflowHistory, err := resetWorkflow.GetContext().PersistNonStartWorkflowBatchEvents(ctx, resetWorkflowEventsSeq[0]) if err != nil { return err } - resetHistorySize = int64(len(blob.Data)) return resetWorkflow.GetContext().CreateWorkflowExecution( ctx, resetWorkflowSnapshot, - resetHistorySize, + resetWorkflowHistory, persistence.CreateWorkflowModeContinueAsNew, currentRunID, currentLastWriteVersion, diff --git a/service/history/reset/resetter_test.go b/service/history/reset/resetter_test.go index 73ad91e6d2a..71777086668 100644 --- a/service/history/reset/resetter_test.go +++ b/service/history/reset/resetter_test.go @@ -40,6 +40,7 @@ import ( "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/constants" + "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" ) @@ -188,16 +189,16 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() { ID: 123, }}, }} - resetEventsSize := int64(4321) + resetEvents := events.PersistedBlob{DataBlob: persistence.DataBlob{Data: make([]byte, 4321)}} resetMutableState.EXPECT().CloseTransactionAsSnapshot( gomock.Any(), execution.TransactionPolicyActive, ).Return(resetSnapshot, resetEventsSeq, nil).Times(1) - resetContext.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), resetEventsSeq[0]).Return(persistence.DataBlob{Data: make([]byte, resetEventsSize)}, nil).Times(1) + resetContext.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), resetEventsSeq[0]).Return(resetEvents, nil).Times(1) resetContext.EXPECT().CreateWorkflowExecution( gomock.Any(), resetSnapshot, - resetEventsSize, + resetEvents, persistence.CreateWorkflowModeContinueAsNew, s.currentRunID, currentLastWriteVersion, diff --git a/service/history/task/cross_cluster_source_task_executor_test.go b/service/history/task/cross_cluster_source_task_executor_test.go index 30541482a7c..2b8ed287f0d 100644 --- a/service/history/task/cross_cluster_source_task_executor_test.go +++ b/service/history/task/cross_cluster_source_task_executor_test.go @@ -93,6 +93,7 @@ func (s *crossClusterSourceTaskExecutorSuite) SetupTest() { s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any()).AnyTimes() + s.mockEngine.EXPECT().NotifyNewReplicationTasks(gomock.Any()).AnyTimes() s.mockShard.SetEngine(s.mockEngine) s.mockDomainCache = s.mockShard.Resource.DomainCache diff --git a/service/history/task/timer_active_task_executor_test.go b/service/history/task/timer_active_task_executor_test.go index 9d32041469d..f7e899ca4dc 100644 --- a/service/history/task/timer_active_task_executor_test.go +++ b/service/history/task/timer_active_task_executor_test.go @@ -122,6 +122,7 @@ func (s *timerActiveTaskExecutorSuite) SetupTest() { s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any()).AnyTimes() + s.mockEngine.EXPECT().NotifyNewReplicationTasks(gomock.Any()).AnyTimes() s.mockShard.SetEngine(s.mockEngine) s.mockDomainCache = s.mockShard.Resource.DomainCache diff --git a/service/history/task/timer_standby_task_executor_test.go b/service/history/task/timer_standby_task_executor_test.go index 98e457b9291..92fdeb5d295 100644 --- a/service/history/task/timer_standby_task_executor_test.go +++ b/service/history/task/timer_standby_task_executor_test.go @@ -124,6 +124,7 @@ func (s *timerStandbyTaskExecutorSuite) SetupTest() { s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any()).AnyTimes() + s.mockEngine.EXPECT().NotifyNewReplicationTasks(gomock.Any()).AnyTimes() s.mockShard.SetEngine(s.mockEngine) s.mockNDCHistoryResender = ndc.NewMockHistoryResender(s.controller) diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index c3b368492f0..61425a0d4f2 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -157,6 +157,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() { s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any()).AnyTimes() + s.mockEngine.EXPECT().NotifyNewReplicationTasks(gomock.Any()).AnyTimes() s.mockShard.SetEngine(s.mockEngine) s.mockParentClosePolicyClient = &parentclosepolicy.ClientMock{}