Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Apr 17, 2018
1 parent a33303d commit 7eaecd8
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 31 deletions.
2 changes: 2 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type (
var _ Engine = (*historyEngineImpl)(nil)

var (
// ErrTaskRetry is the error indicating that the timer / transfer task should be retried.
ErrTaskRetry = errors.New("passive task should retry due to condition in mutable state is not met")
// ErrDuplicate is exported temporarily for integration test
ErrDuplicate = errors.New("Duplicate task, completing it")
// ErrConflict is exported temporarily for integration test
Expand Down
13 changes: 0 additions & 13 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ type (
timestamp time.Time
}

// error which will be thrown if the timer / transfer task should be
// retries due to various of reasons
taskRetryError struct{}

// Engine represents an interface for managing workflow execution history.
Engine interface {
common.Daemon
Expand Down Expand Up @@ -140,12 +136,3 @@ type (
UnwatchHistoryEvent(identifier *workflowIdentifier, subscriberID string) error
}
)

// newTaskRetryError create a error which indicate the task should be retry
func newTaskRetryError() *taskRetryError {
return &taskRetryError{}
}

func (e *taskRetryError) Error() string {
return "passive task should retry due to condition in mutable state is not met."
}
1 change: 0 additions & 1 deletion service/history/timerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func (t *timerQueueAckMgrImpl) readTimerTasks() ([]*persistence.TimerTaskInfo, *
t.isReadFinished = true
}

// fillin the retry task
filteredTasks := []*persistence.TimerTaskInfo{}

// since we have already checked that the clusterName is a valid key of clusterReadLevel
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (t *timerQueueProcessorBase) processTaskWorker(workerWG *sync.WaitGroup, no

err := t.timerProcessor.process(task)
if err != nil {
if _, ok := err.(*taskRetryError); ok {
if err == ErrTaskRetry {
<-notificationChan
} else {
// We will retry until we don't find the timer task any more.
Expand Down
8 changes: 4 additions & 4 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (t *timerQueueStandbyProcessorImpl) processExpiredUserTimer(timerTask *pers
//
// we do not need to notity new timer to base, since if there is no new event being replicated
// checking again if the timer can be completed is meaningless
return newTaskRetryError()
return ErrTaskRetry
}
// since the user timer are already sorted, so if there is one timer which will not expired
// all user timer after this timer will not expired
Expand Down Expand Up @@ -237,7 +237,7 @@ func (t *timerQueueStandbyProcessorImpl) processActivityTimeout(timerTask *persi
//
// we do not need to notity new timer to base, since if there is no new event being replicated
// checking again if the timer can be completed is meaningless
return newTaskRetryError()
return ErrTaskRetry
}
// since the activity timer are already sorted, so if there is one timer which will not expired
// all activity timer after this timer will not expired
Expand All @@ -262,7 +262,7 @@ func (t *timerQueueStandbyProcessorImpl) processDecisionTimeout(timerTask *persi
//
// we do not need to notity new timer to base, since if there is no new event being replicated
// checking again if the timer can be completed is meaningless
return newTaskRetryError()
return ErrTaskRetry
}
return nil
})
Expand All @@ -276,7 +276,7 @@ func (t *timerQueueStandbyProcessorImpl) processWorkflowTimeout(timerTask *persi
return t.processTimer(timerTask, func(msBuilder *mutableStateBuilder) error {
// we do not need to notity new timer to base, since if there is no new event being replicated
// checking again if the timer can be completed is meaningless
return newTaskRetryError()
return ErrTaskRetry
})
}

Expand Down
16 changes: 4 additions & 12 deletions service/history/timerQueueStandbyProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Pending()
persistenceMutableState := createMutableState(msBuilder)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)

err := s.timerQueueStandbyProcessor.process(timerTask)
_, ok := err.(*taskRetryError)
s.True(ok)
s.Equal(ErrTaskRetry, s.timerQueueStandbyProcessor.process(timerTask))
}

func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Success() {
Expand Down Expand Up @@ -364,9 +362,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Pending() {
persistenceMutableState := createMutableState(msBuilder)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once()

err := s.timerQueueStandbyProcessor.process(timerTask)
_, ok := err.(*taskRetryError)
s.True(ok)
s.Equal(ErrTaskRetry, s.timerQueueStandbyProcessor.process(timerTask))
}

func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Success() {
Expand Down Expand Up @@ -533,9 +529,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessDecisionTimeout_Pending() {
persistenceMutableState := createMutableState(msBuilder)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once()

err := s.timerQueueStandbyProcessor.process(timerTask)
_, ok := err.(*taskRetryError)
s.True(ok)
s.Equal(ErrTaskRetry, s.timerQueueStandbyProcessor.process(timerTask))
}

func (s *timerQueueStandbyProcessorSuite) TestProcessDecisionTimeout_Success() {
Expand Down Expand Up @@ -623,9 +617,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessWorkflowTimeout_Pending() {
persistenceMutableState := createMutableState(msBuilder)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once()

err := s.timerQueueStandbyProcessor.process(timerTask)
_, ok := err.(*taskRetryError)
s.True(ok)
s.Equal(ErrTaskRetry, s.timerQueueStandbyProcessor.process(timerTask))
}

func (s *timerQueueStandbyProcessorSuite) TestProcessWorkflowTimeout_Success() {
Expand Down

0 comments on commit 7eaecd8

Please sign in to comment.