Skip to content

Commit

Permalink
TimerQueueProcessor stuck fix on large backlog on available timers (#460
Browse files Browse the repository at this point in the history
)

TimerQueueProcessor issues a query to DB for visibilityTime '>=' from
the readlevel.  This could result in reading same timer multiple times
and trips the processor in thinking it fired all available timers as the
number of tasks returned back to the caller would be less than batch
size.
Updated the logic to return an explicit flag back to the caller so it
can immediately callback again when there are more timers to fire.

fixes #461.
  • Loading branch information
samarabbas authored Dec 12, 2017
1 parent c79581f commit 963d13c
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ continueProcessor:
ProcessPendingTimers:
for {
// Get next set of timer tasks.
timerTasks, lookAheadTask, err := t.getTasksAndNextKey()
timerTasks, lookAheadTask, moreTasks, err := t.getTasksAndNextKey()
if err != nil {
return err
}
Expand All @@ -355,7 +355,7 @@ continueProcessor:
tasksCh <- task
}

if lookAheadTask != nil || len(timerTasks) < t.config.TimerTaskBatchSize {
if !moreTasks {
// We have processed all the tasks.
nextKeyTask = lookAheadTask
break ProcessPendingTimers
Expand All @@ -375,12 +375,13 @@ func (t *timerQueueProcessorImpl) isProcessNow(expiryTime time.Time) bool {
return !expiryTime.IsZero() && expiryTime.UnixNano() <= time.Now().UnixNano()
}

func (t *timerQueueProcessorImpl) getTasksAndNextKey() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, error) {
tasks, lookAheadTask, err := t.ackMgr.readTimerTasks()
func (t *timerQueueProcessorImpl) getTasksAndNextKey() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool,
error) {
tasks, lookAheadTask, moreTasks, err := t.ackMgr.readTimerTasks()
if err != nil {
return nil, nil, err
return nil, nil, false, err
}
return tasks, lookAheadTask, nil
return tasks, lookAheadTask, moreTasks, nil
}

func (t *timerQueueProcessorImpl) getTimerTasks(
Expand Down Expand Up @@ -958,17 +959,18 @@ func newTimerAckMgr(processor *timerQueueProcessorImpl, shard ShardContext, exec
}
}

func (t *timerAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, error) {
func (t *timerAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) {
t.RLock()
rLevel := t.readLevel
t.RUnlock()

tasks, err := t.processor.getTimerTasks(rLevel.VisibilityTimestamp, maxTimestamp, t.processor.config.TimerTaskBatchSize)
if err != nil {
return nil, nil, err
return nil, nil, false, err
}

t.logger.Debugf("readTimerTasks: ReadLevel: (%s) count: %v", rLevel, len(tasks))
taskCount := len(tasks)
t.logger.Debugf("readTimerTasks: ReadLevel: (%s) count: %v", rLevel, taskCount)

// We filter tasks so read only moves to desired timer tasks.
// We also get a look ahead task but it doesn't move the read level, this is for timer
Expand All @@ -981,6 +983,8 @@ func (t *timerAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persisten
for _, task := range tasks {
taskSeq := SequenceID{VisibilityTimestamp: task.VisibilityTimestamp, TaskID: task.TaskID}
if _, ok := t.outstandingTasks[taskSeq]; ok {
t.logger.Infof("Skipping task: %v. WorkflowID: %v, RunID: %v, Type: %v", taskSeq.String(), task.WorkflowID,
task.RunID, task.TaskType)
continue
}
if task.VisibilityTimestamp.Before(t.readLevel.VisibilityTimestamp) {
Expand All @@ -1001,7 +1005,11 @@ func (t *timerAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persisten
}
t.Unlock()

return filteredTasks, lookAheadTask, nil
// We may have large number of timers which need to be fired immediately. Return true in such case so the pump
// can call back immediately to retrieve more tasks
moreTasks := lookAheadTask == nil && taskCount == t.processor.config.TimerTaskBatchSize

return filteredTasks, lookAheadTask, moreTasks, nil
}

func (t *timerAckMgr) completeTimerTask(taskID SequenceID) {
Expand Down

0 comments on commit 963d13c

Please sign in to comment.