Skip to content

Commit

Permalink
Revert "Revert "history service should do event reordering making sur…
Browse files Browse the repository at this point in the history
…e corresponding … (#601)" (#611)" (#612)

This reverts commit dff860e.
  • Loading branch information
wxing1292 authored Mar 16, 2018
1 parent 738d103 commit d486c6c
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 19 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ enum TimeoutType {
HEARTBEAT,
}

// whenever this list of decision is changed
// do change the mutableStateBuilder.go
// function shouldBufferEvent
// to make sure wo do the correct event ordering
enum DecisionType {
ScheduleActivityTask,
RequestCancelActivityTask,
Expand Down
27 changes: 22 additions & 5 deletions service/history/historyBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activityStartedEvent := s.addActivityTaskStartedEvent(5, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, bufferedEventID, 5, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activityStartedEvent, 7, 5, identity)
s.Equal(int64(8), s.getNextEventID())
ai3, activity1Running1 := s.msBuilder.GetActivityInfo(5)
Expand All @@ -150,6 +152,9 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activityCompletedEvent := s.addActivityTaskCompletedEvent(5, 7, activity1Result, identity)
s.validateActivityTaskCompletedEvent(activityCompletedEvent, bufferedEventID, 5, 7, activity1Result,
identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskCompletedEvent(activityCompletedEvent, 8, 5, 7, activity1Result,
identity)
s.Equal(int64(9), s.getNextEventID())
Expand All @@ -166,6 +171,8 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activity2StartedEvent := s.addActivityTaskStartedEvent(6, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activity2StartedEvent, bufferedEventID, 6, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activity2StartedEvent, 10, 6, identity)
s.Equal(int64(11), s.getNextEventID())
ai4, activity2Running1 := s.msBuilder.GetActivityInfo(6)
Expand All @@ -175,6 +182,9 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {

activity2FailedEvent := s.addActivityTaskFailedEvent(6, 10, activity2Reason, activity2Details,
identity)
s.validateActivityTaskFailedEvent(activity2FailedEvent, bufferedEventID, 6, 10, activity2Reason,
activity2Details, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskFailedEvent(activity2FailedEvent, 11, 6, 10, activity2Reason,
activity2Details, identity)
s.Equal(int64(12), s.getNextEventID())
Expand Down Expand Up @@ -387,6 +397,8 @@ func (s *historyBuilderSuite) TestHistoryBuilderFlushBufferedEvents() {

// 7 activity1 started
activityStartedEvent := s.addActivityTaskStartedEvent(5, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, bufferedEventID, 5, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activityStartedEvent, 7, 5, identity)
s.Equal(int64(8), s.getNextEventID())
ai3, activity1Running1 := s.msBuilder.GetActivityInfo(5)
Expand All @@ -396,8 +408,9 @@ func (s *historyBuilderSuite) TestHistoryBuilderFlushBufferedEvents() {

// 8 activity1 completed
activityCompletedEvent := s.addActivityTaskCompletedEvent(5, 7, activity1Result, identity)
s.validateActivityTaskCompletedEvent(activityCompletedEvent, 8, 5, 7, activity1Result,
identity)
s.validateActivityTaskCompletedEvent(activityCompletedEvent, bufferedEventID, 5, 7, activity1Result, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskCompletedEvent(activityCompletedEvent, 8, 5, 7, activity1Result, identity)
s.Equal(int64(9), s.getNextEventID())
_, activity1Running2 := s.msBuilder.GetActivityInfo(5)
s.False(activity1Running2)
Expand Down Expand Up @@ -532,9 +545,9 @@ func (s *historyBuilderSuite) TestHistoryBuilderWorkflowCancellationRequested()
cancellationRequestedEvent := s.addExternalWorkflowExecutionCancelRequested(
5, targetDomain, targetExecution.GetWorkflowId(), targetExecution.GetRunId(),
)
s.validateExternalWorkflowExecutionCancelRequested(
cancellationRequestedEvent, 6, 5, targetDomain, targetExecution,
)
s.validateExternalWorkflowExecutionCancelRequested(cancellationRequestedEvent, bufferedEventID, 5, targetDomain, targetExecution)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateExternalWorkflowExecutionCancelRequested(cancellationRequestedEvent, 6, 5, targetDomain, targetExecution)
s.Equal(int64(7), s.getNextEventID())
}

Expand Down Expand Up @@ -605,6 +618,10 @@ func (s *historyBuilderSuite) TestHistoryBuilderWorkflowCancellationFailed() {
cancellationRequestedEvent := s.addRequestCancelExternalWorkflowExecutionFailedEvent(
4, 5, targetDomain, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), cancellationFailedCause,
)
s.validateRequestCancelExternalWorkflowExecutionFailedEvent(
cancellationRequestedEvent, bufferedEventID, 4, 5, targetDomain, targetExecution, cancellationFailedCause,
)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateRequestCancelExternalWorkflowExecutionFailedEvent(
cancellationRequestedEvent, 6, 4, 5, targetDomain, targetExecution, cancellationFailedCause,
)
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3502,6 +3502,7 @@ func addCompleteWorkflowEvent(builder *mutableStateBuilder, decisionCompletedEve
}

func createMutableState(builder *mutableStateBuilder) *persistence.WorkflowMutableState {
builder.FlushBufferedEvents()
info := copyWorkflowExecutionInfo(builder.executionInfo)
activityInfos := make(map[int64]*persistence.ActivityInfo)
for id, info := range builder.pendingActivityInfoIDs {
Expand Down
50 changes: 39 additions & 11 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,26 +378,54 @@ func (e *mutableStateBuilder) createNewHistoryEvent(eventType workflow.EventType
}

func (e *mutableStateBuilder) shouldBufferEvent(eventType workflow.EventType) bool {
if !e.HasInFlightDecisionTask() {
// do not buffer event if there is no in-flight decision
return false
}

switch eventType {
case workflow.EventTypeDecisionTaskCompleted,
workflow.EventTypeDecisionTaskFailed,
workflow.EventTypeDecisionTaskTimedOut,
case // do not buffer for workflow state change
workflow.EventTypeWorkflowExecutionStarted,
workflow.EventTypeWorkflowExecutionCompleted,
workflow.EventTypeWorkflowExecutionFailed,
workflow.EventTypeWorkflowExecutionTimedOut,
workflow.EventTypeWorkflowExecutionTerminated,
workflow.EventTypeWorkflowExecutionContinuedAsNew,
workflow.EventTypeWorkflowExecutionCanceled:
// do not buffer event if it is any type of close decision or close workflow
return false
case // decision event should not be buffered
workflow.EventTypeDecisionTaskScheduled,
workflow.EventTypeDecisionTaskStarted,
workflow.EventTypeDecisionTaskCompleted,
workflow.EventTypeDecisionTaskFailed,
workflow.EventTypeDecisionTaskTimedOut:
return false
case // events generated directly from decisions should not be buffered
// workflow complete, failed, cancelled and continue-as-new events are duplication of above
// just put is here for reference
// workflow.EventTypeWorkflowExecutionCompleted,
// workflow.EventTypeWorkflowExecutionFailed,
// workflow.EventTypeWorkflowExecutionCanceled,
// workflow.EventTypeWorkflowExecutionContinuedAsNew,
workflow.EventTypeActivityTaskScheduled,
workflow.EventTypeActivityTaskCancelRequested,
workflow.EventTypeTimerStarted,
// DecisionTypeCancelTimer is an excption. This decision will be mapped
// to either workflow.EventTypeTimerCanceled, or workflow.EventTypeCancelTimerFailed.
// So both should not be buffered. Ref: historyEngine, search for "workflow.DecisionTypeCancelTimer"
workflow.EventTypeTimerCanceled,
workflow.EventTypeCancelTimerFailed,
workflow.EventTypeRequestCancelExternalWorkflowExecutionInitiated,
workflow.EventTypeMarkerRecorded,
workflow.EventTypeStartChildWorkflowExecutionInitiated,
workflow.EventTypeSignalExternalWorkflowExecutionInitiated:
// do not buffer event if event is directly generated from a corresponding decision

// sanity check there is no decision on the fly
if e.HasInFlightDecisionTask() {
msg := fmt.Sprintf("history mutable state is processing event: %v while there is decision pending. "+
"domainID: %v, workflow ID: %v, run ID: %v.", eventType, e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID)
panic(msg)
}
return false
default:
return true
}

return true
}

func (e *mutableStateBuilder) createNewHistoryEventWithTimestamp(eventID int64, eventType workflow.EventType,
Expand Down
140 changes: 140 additions & 0 deletions service/history/mutableStateBuilder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"os"
"testing"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/service/dynamicconfig"
)

type (
mutableStateSuite struct {
suite.Suite
msBuilder *mutableStateBuilder
logger bark.Logger
}
)

func TestMutableStateSuite(t *testing.T) {
s := new(mutableStateSuite)
suite.Run(t, s)
}

func (s *mutableStateSuite) SetupSuite() {
if testing.Verbose() {
log.SetOutput(os.Stdout)
}

}

func (s *mutableStateSuite) TearDownSuite() {

}

func (s *mutableStateSuite) SetupTest() {
s.logger = bark.NewLoggerFromLogrus(log.New())
s.msBuilder = newMutableStateBuilder(NewConfig(dynamicconfig.NewNopCollection(), 1), s.logger)
}

func (s *mutableStateSuite) TearDownTest() {

}

func (s *mutableStateSuite) TestShouldBufferEvent() {
// workflow status events will be assign event ID immediately
workflowEvents := map[workflow.EventType]bool{
workflow.EventTypeWorkflowExecutionStarted: true,
workflow.EventTypeWorkflowExecutionCompleted: true,
workflow.EventTypeWorkflowExecutionFailed: true,
workflow.EventTypeWorkflowExecutionTimedOut: true,
workflow.EventTypeWorkflowExecutionTerminated: true,
workflow.EventTypeWorkflowExecutionContinuedAsNew: true,
workflow.EventTypeWorkflowExecutionCanceled: true,
}

// decision events will be assign event ID immediately
decisionTaskEvents := map[workflow.EventType]bool{
workflow.EventTypeDecisionTaskScheduled: true,
workflow.EventTypeDecisionTaskStarted: true,
workflow.EventTypeDecisionTaskCompleted: true,
workflow.EventTypeDecisionTaskFailed: true,
workflow.EventTypeDecisionTaskTimedOut: true,
}

// events corresponding to decisions from client will be assign event ID immediately
decisionEvents := map[workflow.EventType]bool{
workflow.EventTypeWorkflowExecutionCompleted: true,
workflow.EventTypeWorkflowExecutionFailed: true,
workflow.EventTypeWorkflowExecutionCanceled: true,
workflow.EventTypeWorkflowExecutionContinuedAsNew: true,
workflow.EventTypeActivityTaskScheduled: true,
workflow.EventTypeActivityTaskCancelRequested: true,
workflow.EventTypeTimerStarted: true,
workflow.EventTypeTimerCanceled: true,
workflow.EventTypeCancelTimerFailed: true,
workflow.EventTypeRequestCancelExternalWorkflowExecutionInitiated: true,
workflow.EventTypeMarkerRecorded: true,
workflow.EventTypeStartChildWorkflowExecutionInitiated: true,
workflow.EventTypeSignalExternalWorkflowExecutionInitiated: true,
}

// other events will not be assign event ID immediately
otherEvents := map[workflow.EventType]bool{}
OtherEventsLoop:
for _, eventType := range workflow.EventType_Values() {
if _, ok := workflowEvents[eventType]; ok {
continue OtherEventsLoop
}
if _, ok := decisionTaskEvents[eventType]; ok {
continue OtherEventsLoop
}
if _, ok := decisionEvents[eventType]; ok {
continue OtherEventsLoop
}
otherEvents[eventType] = true
}

// test workflowEvents, decisionTaskEvents, decisionEvents will return true
for eventType := range workflowEvents {
s.False(s.msBuilder.shouldBufferEvent(eventType))
}
for eventType := range decisionTaskEvents {
s.False(s.msBuilder.shouldBufferEvent(eventType))
}
for eventType := range decisionEvents {
s.False(s.msBuilder.shouldBufferEvent(eventType))
}
// other events will return false
for eventType := range otherEvents {
s.True(s.msBuilder.shouldBufferEvent(eventType))
}

// +1 is because DecisionTypeCancelTimer will be mapped
// to either workflow.EventTypeTimerCanceled, or workflow.EventTypeCancelTimerFailed.
s.Equal(len(workflow.DecisionType_Values())+1, len(decisionEvents),
"This assertaion will be broken a new decision is added and no corresponding logic added to shouldBufferEvent()")
}
3 changes: 2 additions & 1 deletion service/history/timerQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,14 @@ func (s *timerQueueProcessorSuite) addDecisionTimer(domainID string, we workflow
builder.Load(state)

di := addDecisionTaskScheduledEvent(builder)
addDecisionTaskStartedEvent(builder, di.ScheduleID, state.ExecutionInfo.TaskList, "identity")
startedEvent := addDecisionTaskStartedEvent(builder, di.ScheduleID, state.ExecutionInfo.TaskList, "identity")

timeOutTask := tb.AddDecisionTimoutTask(di.ScheduleID, di.Attempt, 1)
timerTasks := []persistence.Task{timeOutTask}

s.updateTimerSeqNumbers(timerTasks)

addDecisionTaskCompletedEvent(builder, di.ScheduleID, startedEvent.GetEventId(), nil, "identity")
err2 := s.UpdateWorkflowExecution(state.ExecutionInfo, nil, nil, condition, timerTasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")
return timerTasks
Expand Down

0 comments on commit d486c6c

Please sign in to comment.