Skip to content

Commit

Permalink
Force close decision on limit exceeded during task processing (#971)
Browse files Browse the repository at this point in the history
Transfer/Timer task processing should force close any outstanding
decision if it cannot update execution state due to limit exceeded
error.

fixes #966
  • Loading branch information
samarabbas authored Jul 17, 2018
1 parent b21b809 commit 37c98ee
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 35 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 124 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6246,6 +6246,130 @@ func (s *integrationSuite) TestTransientDecisionTimeout() {
s.True(workflowComplete)
}

func (s *integrationSuite) TestTaskProcessingProtectionForRateLimitError() {
id := "integration-task-processing-protection-for-rate-limit-error-test"
wt := "integration-task-processing-protection-for-rate-limit-error-test-type"
tl := "integration-task-processing-protection-for-rate-limit-error-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(601),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(600),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)
workflowExecution := &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
}

// decider logic
workflowComplete := false
signalCount := 0
createUserTimer := false
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, h *workflow.History) ([]byte, []*workflow.Decision, error) {

if !createUserTimer {
createUserTimer = true

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartTimer),
StartTimerDecisionAttributes: &workflow.StartTimerDecisionAttributes{
TimerId: common.StringPtr("timer-id-1"),
StartToFireTimeoutSeconds: common.Int64Ptr(5),
},
}}, nil
}

// Count signals
for _, event := range h.Events[previousStartedEventID:] {
if event.GetEventType() == workflow.EventTypeWorkflowExecutionSignaled {
signalCount++
}
}

workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: nil,
logger: s.logger,
suite: s,
}

// Process first decision to create user timer
_, err := poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Send one signal to create a new decision
for i := 0; i < 1; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
s.Nil(s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity))
}

// Drop decision to cause all events to be buffered from now on
_, err = poller.pollAndProcessDecisionTask(false, true)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Buffered Signals
for i := 1; i < 101; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
s.Nil(s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity))
}

// Rate limitted signals
for i := 0; i < 10; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
signalErr := s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity)
s.NotNil(signalErr)
s.Equal(history.ErrBufferedEventsLimitExceeded, signalErr)
}

// Process signal in decider
_, err = poller.pollAndProcessDecisionTaskWithAttempt(true, false, false, false, 1)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

s.printWorkflowHistory(s.domainName, workflowExecution)

s.True(workflowComplete)
s.Equal(101, signalCount)
}

func (s *integrationSuite) getHistory(domain string, execution *workflow.WorkflowExecution) []*workflow.HistoryEvent {
historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ enum DecisionTaskFailedCause {
WORKFLOW_WORKER_UNHANDLED_FAILURE,
BAD_SIGNAL_WORKFLOW_EXECUTION_ATTRIBUTES,
BAD_START_CHILD_EXECUTION_ATTRIBUTES,
FORCE_CLOSE_DECISION,
}

enum CancelExternalWorkflowExecutionFailedCause {
Expand Down
22 changes: 22 additions & 0 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,28 @@ func (_m *mockMutableState) GetHistoryEvent(serializedEvent []byte) (*shared.His
return r0, r1
}

func (_m *mockMutableState) GetInFlightDecisionTask() (*decisionInfo, bool) {
ret := _m.Called()

var r0 *decisionInfo
if rf, ok := ret.Get(0).(func() *decisionInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*decisionInfo)
}
}

var r1 bool
if rf, ok := ret.Get(1).(func() bool); ok {
r1 = rf()
} else {
r1 = ret.Get(1).(bool)
}

return r0, r1
}

// GetLastFirstEventID provides a mock function with given fields:
func (_m *mockMutableState) GetLastFirstEventID() int64 {
ret := _m.Called()
Expand Down
21 changes: 18 additions & 3 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type (
GetExecutionInfo() *persistence.WorkflowExecutionInfo
GetHistoryBuilder() *historyBuilder
GetHistoryEvent(serializedEvent []byte) (*workflow.HistoryEvent, bool)
GetInFlightDecisionTask() (*decisionInfo, bool)
GetLastFirstEventID() int64
GetLastUpdatedTimestamp() int64
GetLastWriteVersion() int64
Expand Down Expand Up @@ -1110,9 +1111,8 @@ func (e *mutableStateBuilder) DeleteUserTimer(timerID string) {
e.deleteTimerInfos[timerID] = struct{}{}
}

// GetPendingDecision returns details about the in-progress decision task
func (e *mutableStateBuilder) GetPendingDecision(scheduleEventID int64) (*decisionInfo, bool) {
di := &decisionInfo{
func (e *mutableStateBuilder) getDecisionInfo() *decisionInfo {
return &decisionInfo{
Version: e.executionInfo.DecisionVersion,
ScheduleID: e.executionInfo.DecisionScheduleID,
StartedID: e.executionInfo.DecisionStartedID,
Expand All @@ -1121,6 +1121,11 @@ func (e *mutableStateBuilder) GetPendingDecision(scheduleEventID int64) (*decisi
Attempt: e.executionInfo.DecisionAttempt,
Timestamp: e.executionInfo.DecisionTimestamp,
}
}

// GetPendingDecision returns details about the in-progress decision task
func (e *mutableStateBuilder) GetPendingDecision(scheduleEventID int64) (*decisionInfo, bool) {
di := e.getDecisionInfo()
if scheduleEventID == di.ScheduleID {
return di, true
}
Expand All @@ -1147,6 +1152,16 @@ func (e *mutableStateBuilder) HasInFlightDecisionTask() bool {
return e.executionInfo.DecisionStartedID > 0
}

func (e *mutableStateBuilder) GetInFlightDecisionTask() (*decisionInfo, bool) {
if e.executionInfo.DecisionScheduleID == common.EmptyEventID ||
e.executionInfo.DecisionStartedID == common.EmptyEventID {
return nil, false
}

di := e.getDecisionInfo()
return di, true
}

func (e *mutableStateBuilder) HasBufferedEvents() bool {
if len(e.bufferedEvents) > 0 || e.updateBufferedEvents != nil {
return true
Expand Down
55 changes: 43 additions & 12 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,19 +659,12 @@ func (t *timerQueueActiveProcessorImpl) updateWorkflowExecution(
) error {
executionInfo := msBuilder.GetExecutionInfo()
var transferTasks []persistence.Task
var err error
if scheduleNewDecision {
// Schedule a new decision.
di := msBuilder.AddDecisionTaskScheduledEvent()
transferTasks = []persistence.Task{&persistence.DecisionTask{
DomainID: executionInfo.DomainID,
TaskList: di.TaskList,
ScheduleID: di.ScheduleID,
}}
if msBuilder.IsStickyTaskListEnabled() {
tBuilder := t.historyService.getTimerBuilder(&context.workflowExecution)
stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt,
executionInfo.StickyScheduleToStartTimeout)
timerTasks = append(timerTasks, stickyTaskTimeoutTimer)
transferTasks, timerTasks, err = context.scheduleNewDecision(transferTasks, timerTasks)
if err != nil {
return err
}
}

Expand All @@ -691,13 +684,51 @@ func (t *timerQueueActiveProcessorImpl) updateWorkflowExecution(
return err1
}

err := context.updateWorkflowExecutionWithDeleteTask(transferTasks, timerTasks, clearTimerTask, transactionID)
err = context.updateWorkflowExecutionWithDeleteTask(transferTasks, timerTasks, clearTimerTask, transactionID)
if err != nil {
if isShardOwnershiptLostError(err) {
// Shard is stolen. Stop timer processing to reduce duplicates
t.timerQueueProcessorBase.Stop()
return err
}

// Check if the processing is blocked due to limit exceeded error and fail any outstanding decision to
// unblock processing
if err == ErrBufferedEventsLimitExceeded {
context.clear()

var err1 error
// Reload workflow execution so we can apply the decision task failure event
msBuilder, err1 = context.loadWorkflowExecution()
if err1 != nil {
return err1
}

if di, ok := msBuilder.GetInFlightDecisionTask(); ok {
msBuilder.AddDecisionTaskFailedEvent(di.ScheduleID, di.StartedID,
workflow.DecisionTaskFailedCauseForceCloseDecision, nil, identityHistoryService)

var transT, timerT []persistence.Task
transT, timerT, err1 = context.scheduleNewDecision(transT, timerT)
if err1 != nil {
return err1
}

// Generate a transaction ID for appending events to history
transactionID, err1 := t.historyService.shard.GetNextTransferTaskID()
if err1 != nil {
return err1
}
err1 = context.updateWorkflowExecution(transT, timerT, transactionID)
if err1 != nil {
return err1
}
}

return err
}
}

t.notifyNewTimers(timerTasks)
return err
}
Loading

0 comments on commit 37c98ee

Please sign in to comment.