Skip to content

Commit

Permalink
bugfix: when multiple activity got timeouted, there will be at most o…
Browse files Browse the repository at this point in the history
…ne being actually deleted in Cassandra (#655)
  • Loading branch information
wxing1292 authored Apr 4, 2018
1 parent 8b6694a commit 49a7202
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 21 deletions.
8 changes: 4 additions & 4 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,7 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
d.createTimerTasks(batch, request.TimerTasks, request.DeleteTimerTask, request.ExecutionInfo.DomainID,
executionInfo.WorkflowID, executionInfo.RunID, cqlNowTimestamp)

d.updateActivityInfos(batch, request.UpsertActivityInfos, request.DeleteActivityInfo, executionInfo.DomainID,
d.updateActivityInfos(batch, request.UpsertActivityInfos, request.DeleteActivityInfos, executionInfo.DomainID,
executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

d.updateTimerInfos(batch, request.UpserTimerInfos, request.DeleteTimerInfos, executionInfo.DomainID,
Expand Down Expand Up @@ -2229,7 +2229,7 @@ func (d *cassandraPersistence) createTimerTasks(batch *gocql.Batch, timerTasks [
}
}

func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityInfos []*ActivityInfo, deleteInfo *int64,
func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityInfos []*ActivityInfo, deleteInfos []int64,
domainID, workflowID, runID string, condition int64, rangeID int64) {

for _, a := range activityInfos {
Expand Down Expand Up @@ -2262,9 +2262,9 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI
condition)
}

if deleteInfo != nil {
for _, deleteInfo := range deleteInfos {
batch.Query(templateDeleteActivityInfoQuery,
*deleteInfo,
deleteInfo,
d.shardID,
rowTypeExecution,
domainID,
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
s.Equal(currentTime.Unix(), ai.LastHeartBeatUpdatedTime.Unix())
s.Equal(int32(1), ai.TimerTaskStatus)

err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, nil, nil, common.Int64Ptr(1), nil, nil)
err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, nil, nil, []int64{1}, nil, nil)
s.Nil(err2, "No error expected.")

state, err1 = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ type (

// Mutable state
UpsertActivityInfos []*ActivityInfo
DeleteActivityInfo *int64
DeleteActivityInfos []int64
UpserTimerInfos []*TimerInfo
DeleteTimerInfos []string
UpsertChildExecutionInfos []*ChildExecutionInfo
Expand Down
16 changes: 8 additions & 8 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co
DeleteTimerTask: nil,
RangeID: s.ShardInfo.RangeID,
UpsertActivityInfos: nil,
DeleteActivityInfo: nil,
DeleteActivityInfos: nil,
UpserTimerInfos: nil,
DeleteTimerInfos: nil,
ContinueAsNew: &CreateWorkflowExecutionRequest{
Expand Down Expand Up @@ -454,10 +454,10 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co
// UpdateWorkflowExecution is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task,
upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64,
upsertActivityInfos []*ActivityInfo, deleteActivityInfos []int64,
upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error {
return s.UpdateWorkflowExecutionWithRangeID(updatedInfo, decisionScheduleIDs, activityScheduleIDs,
s.ShardInfo.RangeID, condition, timerTasks, deleteTimerTask, upsertActivityInfos, deleteActivityInfo,
s.ShardInfo.RangeID, condition, timerTasks, deleteTimerTask, upsertActivityInfos, deleteActivityInfos,
upsertTimerInfos, deleteTimerInfos, nil, nil, nil, nil,
nil, nil, nil, "")
}
Expand All @@ -474,7 +474,7 @@ func (s *TestBase) UpdateWorkflowExecutionAndFinish(updatedInfo *WorkflowExecuti
DeleteTimerTask: nil,
RangeID: s.ShardInfo.RangeID,
UpsertActivityInfos: nil,
DeleteActivityInfo: nil,
DeleteActivityInfos: nil,
UpserTimerInfos: nil,
DeleteTimerInfos: nil,
FinishExecution: true,
Expand Down Expand Up @@ -563,13 +563,13 @@ func (s *TestBase) UpdateWorklowStateAndReplication(updatedInfo *WorkflowExecuti
// UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task,
upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo,
upsertActivityInfos []*ActivityInfo, deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo,
deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64,
upsertCancelInfos []*RequestCancelInfo, deleteCancelInfo *int64,
upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64,
upsertSignalRequestedIDs []string, deleteSignalRequestedID string) error {
return s.UpdateWorkflowExecutionWithReplication(updatedInfo, nil, decisionScheduleIDs, activityScheduleIDs, rangeID,
condition, timerTasks, []Task{}, deleteTimerTask, upsertActivityInfos, deleteActivityInfo, upsertTimerInfos, deleteTimerInfos,
condition, timerTasks, []Task{}, deleteTimerTask, upsertActivityInfos, deleteActivityInfos, upsertTimerInfos, deleteTimerInfos,
upsertChildInfos, deleteChildInfo, upsertCancelInfos, deleteCancelInfo, upsertSignalInfos, deleteSignalInfo,
upsertSignalRequestedIDs, deleteSignalRequestedID)
}
Expand All @@ -578,7 +578,7 @@ func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecu
func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *WorkflowExecutionInfo,
updatedReplicationState *ReplicationState, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID,
condition int64, timerTasks []Task, txTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo,
deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string,
deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string,
upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64, upsertCancelInfos []*RequestCancelInfo,
deleteCancelInfo *int64, upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64, upsertSignalRequestedIDs []string,
deleteSignalRequestedID string) error {
Expand Down Expand Up @@ -620,7 +620,7 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *WorkflowE
DeleteTimerTask: deleteTimerTask,
RangeID: rangeID,
UpsertActivityInfos: upsertActivityInfos,
DeleteActivityInfo: deleteActivityInfo,
DeleteActivityInfos: deleteActivityInfos,
UpserTimerInfos: upsertTimerInfos,
DeleteTimerInfos: deleteTimerInfos,
UpsertChildExecutionInfos: upsertChildInfos,
Expand Down
11 changes: 6 additions & 5 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type (
pendingActivityInfoIDs map[int64]*persistence.ActivityInfo // Schedule Event ID -> Activity Info.
pendingActivityInfoByActivityID map[string]int64 // Activity ID -> Schedule Event ID of the activity.
updateActivityInfos []*persistence.ActivityInfo // Modified activities from last update.
deleteActivityInfo *int64 // Deleted activities from last update.
deleteActivityInfos []int64 // Deleted activities from last update.

pendingTimerInfoIDs map[string]*persistence.TimerInfo // User Timer ID -> Timer Info.
updateTimerInfos []*persistence.TimerInfo // Modified timers from last update.
Expand Down Expand Up @@ -82,7 +82,7 @@ type (
mutableStateSessionUpdates struct {
newEventsBuilder *historyBuilder
updateActivityInfos []*persistence.ActivityInfo
deleteActivityInfo *int64
deleteActivityInfos []int64
updateTimerInfos []*persistence.TimerInfo
deleteTimerInfos []string
updateChildExecutionInfos []*persistence.ChildExecutionInfo
Expand Down Expand Up @@ -115,6 +115,7 @@ func newMutableStateBuilder(config *Config, logger bark.Logger) *mutableStateBui
updateActivityInfos: []*persistence.ActivityInfo{},
pendingActivityInfoIDs: make(map[int64]*persistence.ActivityInfo),
pendingActivityInfoByActivityID: make(map[string]int64),
deleteActivityInfos: []int64{},
pendingTimerInfoIDs: make(map[string]*persistence.TimerInfo),
updateTimerInfos: []*persistence.TimerInfo{},
deleteTimerInfos: []string{},
Expand Down Expand Up @@ -244,7 +245,7 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates,
updates := &mutableStateSessionUpdates{
newEventsBuilder: e.hBuilder,
updateActivityInfos: e.updateActivityInfos,
deleteActivityInfo: e.deleteActivityInfo,
deleteActivityInfos: e.deleteActivityInfos,
updateTimerInfos: e.updateTimerInfos,
deleteTimerInfos: e.deleteTimerInfos,
updateChildExecutionInfos: e.updateChildExecutionInfos,
Expand All @@ -263,7 +264,7 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates,
// Clear all updates to prepare for the next session
e.hBuilder = newHistoryBuilder(e, e.logger)
e.updateActivityInfos = []*persistence.ActivityInfo{}
e.deleteActivityInfo = nil
e.deleteActivityInfos = []int64{}
e.updateTimerInfos = []*persistence.TimerInfo{}
e.deleteTimerInfos = []string{}
e.updateChildExecutionInfos = []*persistence.ChildExecutionInfo{}
Expand Down Expand Up @@ -675,7 +676,7 @@ func (e *mutableStateBuilder) DeleteActivity(scheduleEventID int64) error {
}
delete(e.pendingActivityInfoByActivityID, a.ActivityID)

e.deleteActivityInfo = common.Int64Ptr(scheduleEventID)
e.deleteActivityInfos = append(e.deleteActivityInfos, scheduleEventID)
return nil
}

Expand Down
72 changes: 71 additions & 1 deletion service/history/timerQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,69 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskHeartBeat_JustStarted()
s.False(running)
}

func (s *timerQueueProcessorSuite) TestTimerActivityTask_SameExpiry() {
domainID := testDomainActiveID
workflowExecution := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("activity-timer-same-expiry-test"),
RunId: common.StringPtr(validRunID),
}

taskList := "activity-timer-queue"
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToClose - Scheduled, started, completed.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err)
builder := newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger)
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

ase1, ai1 := builder.AddActivityTaskScheduledEvent(emptyEventID,
&workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("testID-1"),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
})
s.NotNil(ase1)
ase2, ai2 := builder.AddActivityTaskScheduledEvent(emptyEventID,
&workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("testID-2"),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
})
s.NotNil(ase2)

// create a schedule to close timeout
tBuilder := newTimerBuilder(s.ShardContext.GetConfig(), s.logger, &mockTimeSource{currTime: time.Now()})
t, err := tBuilder.AddScheduleToCloseActivityTimeout(ai1)
s.NoError(err)
s.NotNil(t)
t, err = tBuilder.AddScheduleToCloseActivityTimeout(ai2)
s.NoError(err)
s.NotNil(t)
timerTasks := []persistence.Task{t}

s.updateHistoryAndTimers(builder, timerTasks, condition)
p.NotifyNewTimers(cluster.TestCurrentClusterName, timerTasks)

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.getTimerFiredCount(cluster.TestCurrentClusterName))
running := s.checkTimedOutEventFor(domainID, workflowExecution, *ase1.EventId)
s.False(running)
running = s.checkTimedOutEventFor(domainID, workflowExecution, *ase2.EventId)
s.False(running)

// assert activity infos are deleted
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err)
builder = newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger)
builder.Load(state)
s.Equal(0, len(builder.pendingActivityInfoIDs))
}

func (s *timerQueueProcessorSuite) TestTimerUserTimers() {
domainID := testDomainActiveID
workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("user-timer-test"),
Expand All @@ -781,7 +844,7 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimers() {
s.False(running)
}

func (s *timerQueueProcessorSuite) TestTimerUserTimersSameExpiry() {
func (s *timerQueueProcessorSuite) TestTimerUserTimers_SameExpiry() {
domainID := testDomainActiveID
workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("user-timer-same-expiry-test"),
RunId: common.StringPtr(validRunID)}
Expand Down Expand Up @@ -823,6 +886,13 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimersSameExpiry() {
s.False(running)
running = s.checkTimedOutEventForUserTimer(domainID, workflowExecution, ti2.TimerID)
s.False(running)

// assert user timer infos are deleted
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err)
builder = newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger)
builder.Load(state)
s.Equal(0, len(builder.pendingTimerInfoIDs))
}

func (s *timerQueueProcessorSuite) TestTimersOnClosedWorkflow() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe
Condition: c.updateCondition,
DeleteTimerTask: c.deleteTimerTask,
UpsertActivityInfos: updates.updateActivityInfos,
DeleteActivityInfo: updates.deleteActivityInfo,
DeleteActivityInfos: updates.deleteActivityInfos,
UpserTimerInfos: updates.updateTimerInfos,
DeleteTimerInfos: updates.deleteTimerInfos,
UpsertChildExecutionInfos: updates.updateChildExecutionInfos,
Expand Down

0 comments on commit 49a7202

Please sign in to comment.