Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear buffers on resetting of mutable state #978

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,17 @@ const (
`and task_id = ? ` +
`IF next_event_id = ?`

templateClearBufferedReplicationTaskQuery = `UPDATE executions ` +
`SET buffered_replication_tasks_map = {} ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteWorkflowExecutionMutableStateQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -1732,6 +1743,8 @@ func (d *cassandraPersistence) ResetMutableState(request *ResetMutableStateReque
d.resetSignalRequested(batch, request.InsertSignalRequestedIDs, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID, request.Condition)

d.resetBufferedEvents(batch, executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition)

// Verifies that the RangeID has not changed
batch.Query(templateUpdateLeaseQuery,
request.RangeID,
Expand Down Expand Up @@ -2648,6 +2661,29 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI
}
}

func (d *cassandraPersistence) resetBufferedEvents(batch *gocql.Batch, domainID, workflowID, runID string,
condition int64) {
batch.Query(templateDeleteBufferedEventsQuery,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)

batch.Query(templateClearBufferedReplicationTaskQuery,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
}

func (d *cassandraPersistence) resetActivityInfos(batch *gocql.Batch, activityInfos []*ActivityInfo, domainID,
workflowID, runID string, condition int64) {
batch.Query(templateResetActivityInfoQuery,
Expand Down
70 changes: 70 additions & 0 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,52 @@ func (s *cassandraPersistenceSuite) TestResetMutableState() {
updatedInfo.LastProcessedEvent = int64(2)
currentTime := time.Now().UTC()
expiryTime := currentTime.Add(10 * time.Second)
eventsBatch1 := []*gen.HistoryEvent{
&gen.HistoryEvent{
EventId: common.Int64Ptr(5),
EventType: gen.EventTypeDecisionTaskCompleted.Ptr(),
Version: common.Int64Ptr(11),
DecisionTaskCompletedEventAttributes: &gen.DecisionTaskCompletedEventAttributes{
ScheduledEventId: common.Int64Ptr(2),
StartedEventId: common.Int64Ptr(3),
Identity: common.StringPtr("test_worker"),
},
},
&gen.HistoryEvent{
EventId: common.Int64Ptr(6),
EventType: gen.EventTypeTimerStarted.Ptr(),
Version: common.Int64Ptr(11),
TimerStartedEventAttributes: &gen.TimerStartedEventAttributes{
TimerId: common.StringPtr("ID1"),
StartToFireTimeoutSeconds: common.Int64Ptr(101),
DecisionTaskCompletedEventId: common.Int64Ptr(5),
},
},
}
bufferedTask1 := &BufferedReplicationTask{
FirstEventID: int64(5),
NextEventID: int64(7),
Version: int64(11),
History: s.serializeHistoryEvents(eventsBatch1),
}

eventsBatch2 := []*gen.HistoryEvent{
&gen.HistoryEvent{
EventId: common.Int64Ptr(21),
EventType: gen.EventTypeTimerFired.Ptr(),
Version: common.Int64Ptr(12),
TimerFiredEventAttributes: &gen.TimerFiredEventAttributes{
TimerId: common.StringPtr("2"),
StartedEventId: common.Int64Ptr(3),
},
},
}
bufferedTask2 := &BufferedReplicationTask{
FirstEventID: int64(21),
NextEventID: int64(22),
Version: int64(12),
History: s.serializeHistoryEvents(eventsBatch2),
}
updatedState := &WorkflowMutableState{
ExecutionInfo: updatedInfo,
ActivitInfos: map[int64]*ActivityInfo{
Expand Down Expand Up @@ -2100,6 +2146,24 @@ func (s *cassandraPersistenceSuite) TestResetMutableState() {
err2 := s.UpdateAllMutableState(updatedState, int64(3))
s.Nil(err2, "No error expected.")

partialState, err2 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err2, "No error expected.")
s.NotNil(partialState, "expected valid state.")
partialInfo := partialState.ExecutionInfo
s.NotNil(partialInfo, "Valid Workflow info expected.")

bufferUpdateInfo := copyWorkflowExecutionInfo(partialInfo)
err2 = s.UpdateWorklowStateAndReplication(bufferUpdateInfo, nil, bufferedTask1, nil, bufferUpdateInfo.NextEventID, nil)
s.Nil(err2, "No error expected.")
err2 = s.UpdateWorklowStateAndReplication(bufferUpdateInfo, nil, bufferedTask2, nil, bufferUpdateInfo.NextEventID, nil)
s.Nil(err2, "No error expected.")
err2 = s.UpdateWorkflowExecutionForBufferEvents(bufferUpdateInfo, nil, bufferUpdateInfo.NextEventID,
s.serializeHistoryEvents(eventsBatch1))
s.Nil(err2, "No error expected.")
err2 = s.UpdateWorkflowExecutionForBufferEvents(bufferUpdateInfo, nil, bufferUpdateInfo.NextEventID,
s.serializeHistoryEvents(eventsBatch2))
s.Nil(err2, "No error expected.")

state1, err1 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err1, "No error expected.")
s.NotNil(state1, "expected valid state.")
Expand Down Expand Up @@ -2195,6 +2259,9 @@ func (s *cassandraPersistenceSuite) TestResetMutableState() {
_, contains = state1.SignalRequestedIDs["00000000-0000-0000-0000-000000000003"]
s.True(contains)

s.Equal(2, len(state1.BufferedReplicationTasks))
s.Equal(2, len(state1.BufferedEvents))

updatedInfo1 := copyWorkflowExecutionInfo(info1)
updatedInfo1.NextEventID = int64(3)
resetActivityInfos := []*ActivityInfo{
Expand Down Expand Up @@ -2354,6 +2421,9 @@ func (s *cassandraPersistenceSuite) TestResetMutableState() {
s.Equal([]byte("signal_control_c"), si.Control)

s.Equal(0, len(state4.SignalRequestedIDs))

s.Equal(0, len(state4.BufferedReplicationTasks))
s.Equal(0, len(state4.BufferedEvents))
}

func copyWorkflowExecutionInfo(sourceInfo *WorkflowExecutionInfo) *WorkflowExecutionInfo {
Expand Down
13 changes: 13 additions & 0 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,19 @@ func (s *TestBase) UpdateWorkflowExecutionForSignal(
})
}

// UpdateWorkflowExecutionForSignal is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecutionForBufferEvents(
updatedInfo *WorkflowExecutionInfo, rState *ReplicationState, condition int64,
bufferEvents *SerializedHistoryEventBatch) error {
return s.WorkflowMgr.UpdateWorkflowExecution(&UpdateWorkflowExecutionRequest{
ExecutionInfo: updatedInfo,
ReplicationState: rState,
NewBufferedEvents: bufferEvents,
Condition: condition,
RangeID: s.ShardInfo.RangeID,
})
}

// UpdateAllMutableState is a utility method to update workflow execution
func (s *TestBase) UpdateAllMutableState(updatedMutableState *WorkflowMutableState, condition int64) error {
var aInfos []*ActivityInfo
Expand Down