diff --git a/common/logging/tags.go b/common/logging/tags.go index 597080794bb..7878b1b92ca 100644 --- a/common/logging/tags.go +++ b/common/logging/tags.go @@ -31,6 +31,7 @@ const ( // workflow logging tags TagWorkflowEventID = "wf-event-id" TagWorkflowComponent = "wf-component" + TagWorkflowCluster = "wf-cluster" TagWorkflowErr = "wf-error" TagHistoryBuilderAction = "history-builder-action" TagStoreOperation = "store-operation" diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index 98e64499f79..791f6246a7f 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -2037,9 +2037,6 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq response.Timers = append(response.Timers, t) } - nextPageToken := iter.PageState() - response.NextPageToken = make([]byte, len(nextPageToken)) - copy(response.NextPageToken, nextPageToken) if err := iter.Close(); err != nil { if isThrottlingError(err) { diff --git a/common/persistence/cassandraPersistence_test.go b/common/persistence/cassandraPersistence_test.go index e1432d22220..742a162e7ff 100644 --- a/common/persistence/cassandraPersistence_test.go +++ b/common/persistence/cassandraPersistence_test.go @@ -877,10 +877,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() { err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil) s.Nil(err2, "No error expected.") - timerTasks, nextToken, err1 := s.GetTimerIndexTasks() + timerTasks, err1 := s.GetTimerIndexTasks() s.Nil(err1, "No error expected.") s.NotNil(timerTasks, "expected valid list of tasks.") - s.Equal(0, len(nextToken)) s.Equal(3, len(timerTasks)) s.Equal(TaskTypeWorkflowTimeout, timerTasks[1].TaskType) s.Equal(TaskTypeDeleteHistoryEvent, timerTasks[2].TaskType) @@ -895,10 +894,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() { err2 = s.CompleteTimerTask(timerTasks[2].VisibilityTimestamp, timerTasks[2].TaskID) s.Nil(err2, "No error expected.") - timerTasks2, nextToken, err2 := s.GetTimerIndexTasks() + timerTasks2, err2 := s.GetTimerIndexTasks() s.Nil(err2, "No error expected.") s.Empty(timerTasks2, "expected empty task list.") - s.Equal(0, len(nextToken)) } func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() { diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 2c45b53b2ab..eec063adead 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -645,8 +645,7 @@ type ( // GetTimerIndexTasksResponse is the response for GetTimerIndexTasks GetTimerIndexTasksResponse struct { - Timers []*TimerTaskInfo - NextPageToken []byte + Timers []*TimerTaskInfo } // SerializedHistoryEventBatch represents a serialized batch of history events diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index 2934405e98d..df53effdda1 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -748,17 +748,17 @@ func (s *TestBase) CompleteReplicationTask(taskID int64) error { } // GetTimerIndexTasks is a utility method to get tasks from transfer task queue -func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, []byte, error) { +func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) { response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{ MinTimestamp: time.Time{}, MaxTimestamp: time.Unix(0, math.MaxInt64), BatchSize: 10}) if err != nil { - return nil, nil, err + return nil, err } - return response.Timers, response.NextPageToken, nil + return response.Timers, nil } // CompleteTimerTask is a utility method to complete a timer task diff --git a/service/history/MockTimerQueueAckMgr.go b/service/history/MockTimerQueueAckMgr.go index c0b29d117b6..30bdc283434 100644 --- a/service/history/MockTimerQueueAckMgr.go +++ b/service/history/MockTimerQueueAckMgr.go @@ -88,28 +88,24 @@ func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, return r0, r1, r2, r3 } -func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskInfo) { +func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) { _m.Called(timerTask) } -func (_m *MockTimerQueueAckMgr) readRetryTimerTasks() []*persistence.TimerTaskInfo { +func (_m *MockTimerQueueAckMgr) getAckLevel() TimerSequenceID { ret := _m.Called() - var r0 []*persistence.TimerTaskInfo - if rf, ok := ret.Get(0).(func() []*persistence.TimerTaskInfo); ok { + var r0 TimerSequenceID + if rf, ok := ret.Get(0).(func() TimerSequenceID); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*persistence.TimerTaskInfo) + r0 = ret.Get(0).(TimerSequenceID) } } return r0 } -func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) { - _m.Called(timerTask) -} - func (_m *MockTimerQueueAckMgr) updateAckLevel() { _m.Called() } diff --git a/service/history/historyEngineInterfaces.go b/service/history/historyEngineInterfaces.go index d71d43c7e90..e8a25768ffd 100644 --- a/service/history/historyEngineInterfaces.go +++ b/service/history/historyEngineInterfaces.go @@ -44,6 +44,11 @@ type ( isWorkflowRunning bool timestamp time.Time } + + // error which will be thrown if the timer / transfer task should be + // retries due to various of reasons + taskRetryError struct{} + // Engine represents an interface for managing workflow execution history. Engine interface { common.Daemon @@ -122,10 +127,9 @@ type ( timerQueueAckMgr interface { getFinishedChan() <-chan struct{} - readRetryTimerTasks() []*persistence.TimerTaskInfo readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) - retryTimerTask(timerTask *persistence.TimerTaskInfo) completeTimerTask(timerTask *persistence.TimerTaskInfo) + getAckLevel() TimerSequenceID updateAckLevel() } @@ -136,3 +140,12 @@ type ( UnwatchHistoryEvent(identifier *workflowIdentifier, subscriberID string) error } ) + +// newTaskRetryError create a error which indicate the task should be retry +func newTaskRetryError() *taskRetryError { + return &taskRetryError{} +} + +func (e *taskRetryError) Error() string { + return "passive task should retry due to condition in mutable state is not met." +} diff --git a/service/history/historyTestBase.go b/service/history/historyTestBase.go index 676af99b15b..47f9fe7afa1 100644 --- a/service/history/historyTestBase.go +++ b/service/history/historyTestBase.go @@ -190,7 +190,24 @@ func (s *TestShardContext) UpdateReplicatorAckLevel(ackLevel int64) error { } // GetTimerAckLevel test implementation -func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time { +func (s *TestShardContext) GetTimerAckLevel() time.Time { + s.RLock() + defer s.RUnlock() + + return s.shardInfo.TimerAckLevel +} + +// UpdateTimerAckLevel test implementation +func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error { + s.Lock() + defer s.Unlock() + + s.shardInfo.TimerAckLevel = ackLevel + return nil +} + +// GetTimerClusterAckLevel test implementation +func (s *TestShardContext) GetTimerClusterAckLevel(cluster string) time.Time { s.RLock() defer s.RUnlock() @@ -203,14 +220,11 @@ func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time { return s.shardInfo.TimerAckLevel } -// UpdateTimerAckLevel test implementation -func (s *TestShardContext) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error { +// UpdateTimerClusterAckLevel test implementation +func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error { s.Lock() defer s.Unlock() - if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() { - s.shardInfo.TimerAckLevel = ackLevel - } s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel return nil } diff --git a/service/history/service.go b/service/history/service.go index 7e79a8073ae..794a317b169 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -49,12 +49,15 @@ type Config struct { DefaultStartToCloseActivityTimeoutInSecs int32 // TimerQueueProcessor settings - TimerTaskBatchSize int - ProcessTimerTaskWorkerCount int - TimerProcessorUpdateFailureRetryCount int - TimerProcessorGetFailureRetryCount int - TimerProcessorUpdateAckInterval time.Duration - TimerProcessorForceUpdateInterval time.Duration + TimerTaskBatchSize int + ProcessTimerTaskWorkerCount int + TimerProcessorUpdateFailureRetryCount int + TimerProcessorGetFailureRetryCount int + TimerProcessorCompleteTimerFailureRetryCount int + TimerProcessorUpdateAckInterval time.Duration + TimerProcessorForceUpdateInterval time.Duration + TimerProcessorCompleteTimerInterval time.Duration + TimerProcessorMaxPollInterval time.Duration // TransferQueueProcessor settings TransferTaskBatchSize int @@ -86,37 +89,40 @@ type Config struct { // NewConfig returns new service config with default values func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { return &Config{ - NumberOfShards: numberOfShards, - HistoryCacheInitialSize: 128, - HistoryCacheMaxSize: 512, - HistoryCacheTTL: time.Hour, - RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range - AcquireShardInterval: time.Minute, - DefaultScheduleToStartActivityTimeoutInSecs: 10, - DefaultScheduleToCloseActivityTimeoutInSecs: 10, - DefaultStartToCloseActivityTimeoutInSecs: 10, - TimerTaskBatchSize: 100, - ProcessTimerTaskWorkerCount: 30, - TimerProcessorUpdateFailureRetryCount: 5, - TimerProcessorGetFailureRetryCount: 5, - TimerProcessorUpdateAckInterval: 10 * time.Second, - TimerProcessorForceUpdateInterval: 10 * time.Minute, - TransferTaskBatchSize: 10, - TransferProcessorMaxPollRPS: 100, - TransferProcessorMaxPollInterval: 60 * time.Second, - TransferProcessorUpdateAckInterval: 10 * time.Second, - TransferProcessorForceUpdateInterval: 10 * time.Minute, - TransferTaskWorkerCount: 10, - TransferTaskMaxRetryCount: 100, - ReplicatorTaskBatchSize: 10, - ReplicatorProcessorMaxPollRPS: 100, - ReplicatorProcessorMaxPollInterval: 60 * time.Second, - ReplicatorProcessorUpdateAckInterval: 10 * time.Second, - ReplicatorProcessorForceUpdateInterval: 10 * time.Minute, - ReplicatorTaskWorkerCount: 10, - ReplicatorTaskMaxRetryCount: 100, - ExecutionMgrNumConns: 100, - HistoryMgrNumConns: 100, + NumberOfShards: numberOfShards, + HistoryCacheInitialSize: 128, + HistoryCacheMaxSize: 512, + HistoryCacheTTL: time.Hour, + RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range + AcquireShardInterval: time.Minute, + DefaultScheduleToStartActivityTimeoutInSecs: 10, + DefaultScheduleToCloseActivityTimeoutInSecs: 10, + DefaultStartToCloseActivityTimeoutInSecs: 10, + TimerTaskBatchSize: 100, + ProcessTimerTaskWorkerCount: 30, + TimerProcessorUpdateFailureRetryCount: 5, + TimerProcessorGetFailureRetryCount: 5, + TimerProcessorCompleteTimerFailureRetryCount: 10, + TimerProcessorUpdateAckInterval: 10 * time.Second, + TimerProcessorForceUpdateInterval: 10 * time.Minute, + TimerProcessorCompleteTimerInterval: 1 * time.Second, + TimerProcessorMaxPollInterval: 60 * time.Second, + TransferTaskBatchSize: 10, + TransferProcessorMaxPollRPS: 100, + TransferProcessorMaxPollInterval: 60 * time.Second, + TransferProcessorUpdateAckInterval: 10 * time.Second, + TransferProcessorForceUpdateInterval: 10 * time.Minute, + TransferTaskWorkerCount: 10, + TransferTaskMaxRetryCount: 100, + ReplicatorTaskBatchSize: 10, + ReplicatorProcessorMaxPollRPS: 100, + ReplicatorProcessorMaxPollInterval: 60 * time.Second, + ReplicatorProcessorUpdateAckInterval: 10 * time.Second, + ReplicatorProcessorForceUpdateInterval: 10 * time.Minute, + ReplicatorTaskWorkerCount: 10, + ReplicatorTaskMaxRetryCount: 100, + ExecutionMgrNumConns: 100, + HistoryMgrNumConns: 100, // history client: client/history/client.go set the client timeout 30s LongPollExpirationInterval: dc.GetDurationProperty( dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20, diff --git a/service/history/shardContext.go b/service/history/shardContext.go index b2d29ee0237..1f455e15d6a 100644 --- a/service/history/shardContext.go +++ b/service/history/shardContext.go @@ -51,6 +51,10 @@ type ( UpdateTransferAckLevel(ackLevel int64) error GetReplicatorAckLevel() int64 UpdateReplicatorAckLevel(ackLevel int64) error + GetTimerAckLevel() time.Time + UpdateTimerAckLevel(ackLevel time.Time) error + GetTimerClusterAckLevel(cluster string) time.Time + UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) ( *persistence.CreateWorkflowExecutionResponse, error) UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error @@ -59,8 +63,6 @@ type ( GetConfig() *Config GetLogger() bark.Logger GetMetricsClient() metrics.Client - GetTimerAckLevel(cluster string) time.Time - UpdateTimerAckLevel(cluster string, ackLevel time.Time) error GetTimeSource() common.TimeSource SetCurrentTime(cluster string, currentTime time.Time) GetCurrentTime(cluster string) time.Time @@ -164,7 +166,23 @@ func (s *shardContextImpl) UpdateReplicatorAckLevel(ackLevel int64) error { return s.updateShardInfoLocked() } -func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time { +func (s *shardContextImpl) GetTimerAckLevel() time.Time { + s.RLock() + defer s.RUnlock() + + return s.shardInfo.TimerAckLevel +} + +func (s *shardContextImpl) UpdateTimerAckLevel(ackLevel time.Time) error { + s.RLock() + defer s.RUnlock() + + s.shardInfo.TimerAckLevel = ackLevel + s.shardInfo.StolenSinceRenew = 0 + return s.updateShardInfoLocked() +} + +func (s *shardContextImpl) GetTimerClusterAckLevel(cluster string) time.Time { s.RLock() defer s.RUnlock() @@ -177,13 +195,10 @@ func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time { return s.shardInfo.TimerAckLevel } -func (s *shardContextImpl) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error { +func (s *shardContextImpl) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error { s.Lock() defer s.Unlock() - if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() { - s.shardInfo.TimerAckLevel = ackLevel - } s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel s.shardInfo.StolenSinceRenew = 0 return s.updateShardInfoLocked() diff --git a/service/history/timerQueueAckMgr.go b/service/history/timerQueueAckMgr.go index 9da0051fdd7..5be6a2f1db3 100644 --- a/service/history/timerQueueAckMgr.go +++ b/service/history/timerQueueAckMgr.go @@ -38,8 +38,6 @@ var ( type ( timerSequenceIDs []TimerSequenceID - timerTaskPredicate func(timerDomainID string) (bool, error) - timerQueueAckMgrImpl struct { isFailover bool clusterName string @@ -49,8 +47,6 @@ type ( metricsClient metrics.Client lastUpdated time.Time config *Config - // timer predicate for filtering - timerTaskPredicate timerTaskPredicate // immutable max possible timer level maxAckLevel time.Time // isReadFinished indicate timer queue ack manager @@ -64,12 +60,10 @@ type ( sync.Mutex // outstanding timer task -> finished (true) outstandingTasks map[TimerSequenceID]bool - // outstanding timer tasks, which cannot be processed right now - retryTasks []*persistence.TimerTaskInfo // timer task read level readLevel TimerSequenceID // timer task ack level - ackLevel time.Time + ackLevel TimerSequenceID } // for each cluster, the ack level is the point in time when // all timers before the ack level are processed. @@ -97,72 +91,49 @@ func (t timerSequenceIDs) Less(i, j int) bool { } func newTimerQueueAckMgr(shard ShardContext, metricsClient metrics.Client, clusterName string, logger bark.Logger) *timerQueueAckMgrImpl { - timerTaskPredicate := func(timerDomainID string) (bool, error) { - domainEntry, err := shard.GetDomainCache().GetDomainByID(timerDomainID) - if err != nil { - return false, err - } - if !domainEntry.GetIsGlobalDomain() && - clusterName != shard.GetService().GetClusterMetadata().GetCurrentClusterName() { - // timer task does not belong to cluster name - return false, nil - } else if domainEntry.GetIsGlobalDomain() && - domainEntry.GetReplicationConfig().ActiveClusterName != clusterName { - // timer task does not belong here - return false, nil - } - return true, nil - } - ackLevel := shard.GetTimerAckLevel(clusterName) + ackLevel := TimerSequenceID{VisibilityTimestamp: shard.GetTimerClusterAckLevel(clusterName)} maxAckLevel := timerQueueAckMgrMaxTimestamp timerQueueAckMgrImpl := &timerQueueAckMgrImpl{ - isFailover: false, - clusterName: clusterName, - shard: shard, - executionMgr: shard.GetExecutionManager(), - metricsClient: metricsClient, - logger: logger, - lastUpdated: time.Now(), // this has nothing to do with remote cluster, so use the local time - config: shard.GetConfig(), - outstandingTasks: make(map[TimerSequenceID]bool), - retryTasks: []*persistence.TimerTaskInfo{}, - readLevel: TimerSequenceID{VisibilityTimestamp: ackLevel}, - ackLevel: ackLevel, - maxAckLevel: maxAckLevel, - timerTaskPredicate: timerTaskPredicate, - isReadFinished: false, - finishedChan: make(chan struct{}, 1), + isFailover: false, + clusterName: clusterName, + shard: shard, + executionMgr: shard.GetExecutionManager(), + metricsClient: metricsClient, + logger: logger, + lastUpdated: time.Now(), // this has nothing to do with remote cluster, so use the local time + config: shard.GetConfig(), + outstandingTasks: make(map[TimerSequenceID]bool), + readLevel: ackLevel, + ackLevel: ackLevel, + maxAckLevel: maxAckLevel, + isReadFinished: false, + finishedChan: nil, } return timerQueueAckMgrImpl } -func newTimerQueueFailoverAckMgr(shard ShardContext, metricsClient metrics.Client, domainID string, standbyClusterName string, logger bark.Logger) *timerQueueAckMgrImpl { - timerTaskPredicate := func(timerDomainID string) (bool, error) { - return timerDomainID == domainID, nil - } +func newTimerQueueFailoverAckMgr(shard ShardContext, metricsClient metrics.Client, standbyClusterName string, logger bark.Logger) *timerQueueAckMgrImpl { // failover ack manager will start from the standby cluster's ack level to active cluster's ack level - ackLevel := shard.GetTimerAckLevel(standbyClusterName) - maxAckLevel := shard.GetTimerAckLevel(shard.GetService().GetClusterMetadata().GetCurrentClusterName()) + ackLevel := TimerSequenceID{VisibilityTimestamp: shard.GetTimerClusterAckLevel(standbyClusterName)} + maxAckLevel := shard.GetTimerClusterAckLevel(shard.GetService().GetClusterMetadata().GetCurrentClusterName()) timerQueueAckMgrImpl := &timerQueueAckMgrImpl{ - isFailover: true, - clusterName: standbyClusterName, - shard: shard, - executionMgr: shard.GetExecutionManager(), - metricsClient: metricsClient, - logger: logger, - lastUpdated: time.Now(), // this has nothing to do with remote cluster, so use the local time - config: shard.GetConfig(), - outstandingTasks: make(map[TimerSequenceID]bool), - retryTasks: []*persistence.TimerTaskInfo{}, - readLevel: TimerSequenceID{VisibilityTimestamp: ackLevel}, - ackLevel: ackLevel, - maxAckLevel: maxAckLevel, - timerTaskPredicate: timerTaskPredicate, - isReadFinished: false, - finishedChan: make(chan struct{}, 1), + isFailover: true, + clusterName: standbyClusterName, + shard: shard, + executionMgr: shard.GetExecutionManager(), + metricsClient: metricsClient, + logger: logger, + lastUpdated: time.Now(), // this has nothing to do with remote cluster, so use the local time + config: shard.GetConfig(), + outstandingTasks: make(map[TimerSequenceID]bool), + readLevel: ackLevel, + ackLevel: ackLevel, + maxAckLevel: maxAckLevel, + isReadFinished: false, + finishedChan: make(chan struct{}, 1), } return timerQueueAckMgrImpl @@ -175,20 +146,17 @@ func (t *timerQueueAckMgrImpl) getFinishedChan() <-chan struct{} { func (t *timerQueueAckMgrImpl) readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) { t.Lock() readLevel := t.readLevel - timerTaskRetrySize := len(t.retryTasks) t.Unlock() var tasks []*persistence.TimerTaskInfo - morePage := timerTaskRetrySize > 0 + morePage := false var err error - if timerTaskRetrySize < t.config.TimerTaskBatchSize && readLevel.VisibilityTimestamp.Before(t.maxAckLevel) { - var token []byte - tasks, token, err = t.getTimerTasks(readLevel.VisibilityTimestamp, t.maxAckLevel, t.config.TimerTaskBatchSize) + if readLevel.VisibilityTimestamp.Before(t.maxAckLevel) { + tasks, morePage, err = t.getTimerTasks(readLevel.VisibilityTimestamp, t.maxAckLevel, t.config.TimerTaskBatchSize) if err != nil { return nil, nil, false, err } - t.logger.Debugf("readTimerTasks: ReadLevel: (%s) count: %v, next token: %v", readLevel, len(tasks), token) - morePage = len(token) > 0 + t.logger.Debugf("readTimerTasks: ReadLevel: (%s) count: %v, more timer: %v", readLevel, len(tasks), morePage) } // We filter tasks so read only moves to desired timer tasks. @@ -203,8 +171,7 @@ func (t *timerQueueAckMgrImpl) readTimerTasks() ([]*persistence.TimerTaskInfo, * } // fillin the retry task - filteredTasks := t.retryTasks - t.retryTasks = []*persistence.TimerTaskInfo{} + filteredTasks := []*persistence.TimerTaskInfo{} // since we have already checked that the clusterName is a valid key of clusterReadLevel // there shall be no validation @@ -216,7 +183,7 @@ TaskFilterLoop: _, isLoaded := outstandingTasks[timerSequenceID] if isLoaded { // timer already loaded - t.logger.Infof("Skipping task: %v. WorkflowID: %v, RunID: %v, Type: %v", + t.logger.Debugf("Skipping timer task: %v. WorkflowID: %v, RunID: %v, Type: %v", timerSequenceID.String(), task.WorkflowID, task.RunID, task.TaskType) continue TaskFilterLoop } @@ -231,18 +198,6 @@ TaskFilterLoop: timerSequenceID, readLevel) } - ok, err := t.timerTaskPredicate(task.DomainID) - if err != nil { - return nil, nil, false, err - } - if !ok { - // we are not interestes in this timer task, - // however we should update the read level so - // to skip this timer task next time - readLevel = timerSequenceID - continue TaskFilterLoop - } - if !t.isProcessNow(task.VisibilityTimestamp) { lookAheadTask = task break TaskFilterLoop @@ -262,33 +217,18 @@ TaskFilterLoop: return filteredTasks, lookAheadTask, moreTasks, nil } -func (t *timerQueueAckMgrImpl) retryTimerTask(timerTask *persistence.TimerTaskInfo) { - t.Lock() - defer t.Unlock() - - t.retryTasks = append(t.retryTasks, timerTask) -} - -func (t *timerQueueAckMgrImpl) readRetryTimerTasks() []*persistence.TimerTaskInfo { +func (t *timerQueueAckMgrImpl) completeTimerTask(timerTask *persistence.TimerTaskInfo) { + timerSequenceID := TimerSequenceID{VisibilityTimestamp: timerTask.VisibilityTimestamp, TaskID: timerTask.TaskID} t.Lock() defer t.Unlock() - retryTasks := t.retryTasks - t.retryTasks = []*persistence.TimerTaskInfo{} - return retryTasks + t.outstandingTasks[timerSequenceID] = true } -func (t *timerQueueAckMgrImpl) completeTimerTask(timerTask *persistence.TimerTaskInfo) { - timerSequenceID := TimerSequenceID{VisibilityTimestamp: timerTask.VisibilityTimestamp, TaskID: timerTask.TaskID} +func (t *timerQueueAckMgrImpl) getAckLevel() TimerSequenceID { t.Lock() defer t.Unlock() - - t.outstandingTasks[timerSequenceID] = true - if err := t.executionMgr.CompleteTimerTask(&persistence.CompleteTimerTaskRequest{ - VisibilityTimestamp: timerTask.VisibilityTimestamp, - TaskID: timerTask.TaskID}); err != nil { - t.logger.Warnf("Timer queue ack manager unable to complete timer task: %v; %v", timerSequenceID, err) - } + return t.ackLevel } func (t *timerQueueAckMgrImpl) updateAckLevel() { @@ -311,7 +251,7 @@ MoveAckLevelLoop: for _, current := range sequenceIDs { acked := outstandingTasks[current] if acked { - updatedAckLevel = current.VisibilityTimestamp + updatedAckLevel = current delete(outstandingTasks, current) } else { break MoveAckLevelLoop @@ -336,7 +276,7 @@ MoveAckLevelLoop: // this function does not take cluster name as parameter, due to we only have one timer queue on Cassandra // all timer tasks are in this queue and filter will be applied. -func (t *timerQueueAckMgrImpl) getTimerTasks(minTimestamp time.Time, maxTimestamp time.Time, batchSize int) ([]*persistence.TimerTaskInfo, []byte, error) { +func (t *timerQueueAckMgrImpl) getTimerTasks(minTimestamp time.Time, maxTimestamp time.Time, batchSize int) ([]*persistence.TimerTaskInfo, bool, error) { request := &persistence.GetTimerIndexTasksRequest{ MinTimestamp: minTimestamp, MaxTimestamp: maxTimestamp, @@ -347,21 +287,21 @@ func (t *timerQueueAckMgrImpl) getTimerTasks(minTimestamp time.Time, maxTimestam for attempt := 0; attempt < retryCount; attempt++ { response, err := t.executionMgr.GetTimerIndexTasks(request) if err == nil { - return response.Timers, response.NextPageToken, nil + return response.Timers, len(response.Timers) >= batchSize, nil } backoff := time.Duration(attempt * 100) time.Sleep(backoff * time.Millisecond) } - return nil, nil, ErrMaxAttemptsExceeded + return nil, false, ErrMaxAttemptsExceeded } -func (t *timerQueueAckMgrImpl) updateTimerAckLevel(ackLevel time.Time) { +func (t *timerQueueAckMgrImpl) updateTimerAckLevel(ackLevel TimerSequenceID) { t.logger.Debugf("Updating timer ack level: %v", ackLevel) // not failover ack level updating if !t.isFailover { // Always update ackLevel to detect if the shared is stolen - if err := t.shard.UpdateTimerAckLevel(t.clusterName, ackLevel); err != nil { + if err := t.shard.UpdateTimerClusterAckLevel(t.clusterName, ackLevel.VisibilityTimestamp); err != nil { t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.AckLevelUpdateFailedCounter) t.logger.Errorf("Error updating timer ack level for shard: %v", err) } else { @@ -370,7 +310,6 @@ func (t *timerQueueAckMgrImpl) updateTimerAckLevel(ackLevel time.Time) { } else { // TODO failover ack manager should persist failover ack level to Cassandra: issue #646 } - } func (t *timerQueueAckMgrImpl) isProcessNow(expiryTime time.Time) bool { diff --git a/service/history/timerQueueAckMgr_test.go b/service/history/timerQueueAckMgr_test.go index a51bad9483b..91f40f3c211 100644 --- a/service/history/timerQueueAckMgr_test.go +++ b/service/history/timerQueueAckMgr_test.go @@ -50,7 +50,7 @@ type ( mockShardMgr *mocks.ShardManager mockMetadataMgr *mocks.MetadataManager mockHistoryMgr *mocks.HistoryManager - mockShard ShardContext + mockShard *shardContextImpl mockService service.Service mockMessagingClient messaging.Client mockProducer *mocks.KafkaProducer @@ -68,7 +68,7 @@ type ( mockShardMgr *mocks.ShardManager mockMetadataMgr *mocks.MetadataManager mockHistoryMgr *mocks.HistoryManager - mockShard ShardContext + mockShard *shardContextImpl mockService service.Service mockMessagingClient messaging.Client mockProducer *mocks.KafkaProducer @@ -161,7 +161,42 @@ func (s *timerQueueAckMgrSuite) TestIsProcessNow() { s.False(s.timerQueueAckMgr.isProcessNow(timeAfter)) } -func (s *timerQueueAckMgrSuite) TestGetTimerTasks() { +func (s *timerQueueAckMgrSuite) TestGetTimerTasks_More() { + minTimestamp := time.Now().Add(-10 * time.Second) + maxTimestamp := time.Now().Add(10 * time.Second) + batchSize := 1 + + request := &persistence.GetTimerIndexTasksRequest{ + MinTimestamp: minTimestamp, + MaxTimestamp: maxTimestamp, + BatchSize: batchSize, + } + + response := &persistence.GetTimerIndexTasksResponse{ + Timers: []*persistence.TimerTaskInfo{ + &persistence.TimerTaskInfo{ + DomainID: "some random domain ID", + WorkflowID: "some random workflow ID", + RunID: uuid.New(), + VisibilityTimestamp: time.Now().Add(-5 * time.Second), + TaskID: int64(59), + TaskType: 1, + TimeoutType: 2, + EventID: int64(28), + ScheduleAttempt: 0, + }, + }, + } + + s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() + + timers, more, err := s.timerQueueAckMgr.getTimerTasks(minTimestamp, maxTimestamp, batchSize) + s.Nil(err) + s.Equal(response.Timers, timers) + s.True(more) +} + +func (s *timerQueueAckMgrSuite) TestGetTimerTasks_NoMore() { minTimestamp := time.Now().Add(-10 * time.Second) maxTimestamp := time.Now().Add(10 * time.Second) batchSize := 10 @@ -186,15 +221,14 @@ func (s *timerQueueAckMgrSuite) TestGetTimerTasks() { ScheduleAttempt: 0, }, }, - NextPageToken: []byte("some random next page token"), } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() - timers, token, err := s.timerQueueAckMgr.getTimerTasks(minTimestamp, maxTimestamp, batchSize) + timers, more, err := s.timerQueueAckMgr.getTimerTasks(minTimestamp, maxTimestamp, batchSize) s.Nil(err) s.Equal(response.Timers, timers) - s.Equal(response.NextPageToken, token) + s.False(more) } func (s *timerQueueAckMgrSuite) TestReadTimerTasks_NoLookAhead_NoNextPage() { @@ -204,19 +238,10 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_NoLookAhead_NoNextPage() { macAckLevel := s.timerQueueAckMgr.maxAckLevel // test ack && read level is initialized correctly - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), ackLevel) - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), readLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), ackLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), readLevel.VisibilityTimestamp) s.Equal(timerQueueAckMgrMaxTimestamp, macAckLevel) - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ - // only thing used is the replication config and is global domain - IsGlobalDomain: false, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: s.clusterName, - // Clusters attr is not used. - }, - }, nil).Once() - timer := &persistence.TimerTaskInfo{ DomainID: domainID, WorkflowID: "some random workflow ID", @@ -234,8 +259,7 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_NoLookAhead_NoNextPage() { BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer}, - NextPageToken: nil, + Timers: []*persistence.TimerTaskInfo{timer}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() filteredTasks, lookAheadTask, moreTasks, err := s.timerQueueAckMgr.readTimerTasks() @@ -256,18 +280,10 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_NoLookAhead_HasNextPage() { macAckLevel := s.timerQueueAckMgr.maxAckLevel // test ack && read level is initialized correctly - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), ackLevel) - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), readLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), ackLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), readLevel.VisibilityTimestamp) s.Equal(timerQueueAckMgrMaxTimestamp, macAckLevel) - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ - IsGlobalDomain: true, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: s.clusterName, - // Clusters attr is not used. - }, - }, nil).Once() - timer := &persistence.TimerTaskInfo{ DomainID: domainID, WorkflowID: "some random workflow ID", @@ -280,14 +296,15 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_NoLookAhead_HasNextPage() { ScheduleAttempt: 0, Version: int64(79), } + // make the batch size == return size to there will be a next page + s.mockShard.config.TimerTaskBatchSize = 1 request := &persistence.GetTimerIndexTasksRequest{ MinTimestamp: readLevel.VisibilityTimestamp, MaxTimestamp: macAckLevel, BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer}, - NextPageToken: []byte("some random next page token"), + Timers: []*persistence.TimerTaskInfo{timer}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() filteredTasks, lookAheadTask, moreTasks, err := s.timerQueueAckMgr.readTimerTasks() @@ -308,19 +325,10 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_HasLookAhead_NoNextPage() { macAckLevel := s.timerQueueAckMgr.maxAckLevel // test ack && read level is initialized correctly - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), ackLevel) - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), readLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), ackLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), readLevel.VisibilityTimestamp) s.Equal(timerQueueAckMgrMaxTimestamp, macAckLevel) - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ - // only thing used is the replication config and is global domain - IsGlobalDomain: false, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: s.clusterName, - // Clusters attr is not used. - }, - }, nil).Once() - timer := &persistence.TimerTaskInfo{ DomainID: domainID, WorkflowID: "some random workflow ID", @@ -339,8 +347,7 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_HasLookAhead_NoNextPage() { BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer}, - NextPageToken: nil, + Timers: []*persistence.TimerTaskInfo{timer}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() filteredTasks, lookAheadTask, moreTasks, err := s.timerQueueAckMgr.readTimerTasks() @@ -361,18 +368,10 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_HasLookAhead_HasNextPage() { macAckLevel := s.timerQueueAckMgr.maxAckLevel // test ack && read level is initialized correctly - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), ackLevel) - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), readLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), ackLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), readLevel.VisibilityTimestamp) s.Equal(timerQueueAckMgrMaxTimestamp, macAckLevel) - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ - IsGlobalDomain: true, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: s.clusterName, - // Clusters attr is not used. - }, - }, nil).Once() - timer := &persistence.TimerTaskInfo{ DomainID: domainID, WorkflowID: "some random workflow ID", @@ -385,14 +384,15 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_HasLookAhead_HasNextPage() { ScheduleAttempt: 0, Version: int64(79), } + // make the batch size == return size to there will be a next page + s.mockShard.config.TimerTaskBatchSize = 1 request := &persistence.GetTimerIndexTasksRequest{ MinTimestamp: readLevel.VisibilityTimestamp, MaxTimestamp: macAckLevel, BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer}, - NextPageToken: []byte("some random next page token"), + Timers: []*persistence.TimerTaskInfo{timer}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() filteredTasks, lookAheadTask, moreTasks, err := s.timerQueueAckMgr.readTimerTasks() @@ -406,25 +406,17 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_HasLookAhead_HasNextPage() { s.Equal(ackLevel, s.timerQueueAckMgr.ackLevel) } -func (s *timerQueueAckMgrSuite) TestReadTimerTasks_NoInterestedTask_ReadLevel() { +func (s *timerQueueAckMgrSuite) TestReadTimerTasks_AnyTask_ReadLevel() { domainID := "some random domain ID" ackLevel := s.timerQueueAckMgr.ackLevel readLevel := s.timerQueueAckMgr.readLevel macAckLevel := s.timerQueueAckMgr.maxAckLevel // test ack && read level is initialized correctly - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), ackLevel) - s.Equal(s.mockShard.GetTimerAckLevel(s.clusterName), readLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), ackLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.clusterName), readLevel.VisibilityTimestamp) s.Equal(timerQueueAckMgrMaxTimestamp, macAckLevel) - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ - IsGlobalDomain: true, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: "some other random cluster name", - // Clusters attr is not used. - }, - }, nil).Once() - timer := &persistence.TimerTaskInfo{ DomainID: domainID, WorkflowID: "some random workflow ID", @@ -437,23 +429,24 @@ func (s *timerQueueAckMgrSuite) TestReadTimerTasks_NoInterestedTask_ReadLevel() ScheduleAttempt: 0, Version: int64(79), } + // make the batch size == return size to there will be a next page + s.mockShard.config.TimerTaskBatchSize = 1 request := &persistence.GetTimerIndexTasksRequest{ MinTimestamp: readLevel.VisibilityTimestamp, MaxTimestamp: macAckLevel, BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer}, - NextPageToken: []byte("some random next page token"), + Timers: []*persistence.TimerTaskInfo{timer}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() filteredTasks, lookAheadTask, moreTasks, err := s.timerQueueAckMgr.readTimerTasks() s.Nil(err) - s.Equal(0, len(filteredTasks)) + s.Equal(1, len(filteredTasks)) s.Nil(lookAheadTask) s.True(moreTasks) timerSequenceID := TimerSequenceID{VisibilityTimestamp: timer.VisibilityTimestamp, TaskID: timer.TaskID} - s.Equal(0, len(s.timerQueueAckMgr.outstandingTasks)) + s.Equal(1, len(s.timerQueueAckMgr.outstandingTasks)) s.Equal(timerSequenceID, s.timerQueueAckMgr.readLevel) s.Equal(ackLevel, s.timerQueueAckMgr.ackLevel) } @@ -463,15 +456,6 @@ func (s *timerQueueAckMgrSuite) TestReadCompleteUpdateTimerTasks() { readLevel := s.timerQueueAckMgr.readLevel macAckLevel := s.timerQueueAckMgr.maxAckLevel - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ - // only thing used is the replication config and is global domain - IsGlobalDomain: true, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: s.clusterName, - // Clusters attr is not used. - }, - }, nil).Once() - // create 3 timers, timer1 < timer2 < timer3 < now timer1 := &persistence.TimerTaskInfo{ DomainID: domainID, @@ -512,8 +496,7 @@ func (s *timerQueueAckMgrSuite) TestReadCompleteUpdateTimerTasks() { BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer1, timer2, timer3}, - NextPageToken: nil, + Timers: []*persistence.TimerTaskInfo{timer1, timer2, timer3}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() filteredTasks, lookAheadTask, moreTasks, err := s.timerQueueAckMgr.readTimerTasks() @@ -524,110 +507,27 @@ func (s *timerQueueAckMgrSuite) TestReadCompleteUpdateTimerTasks() { // we are not testing shard context s.mockShardMgr.On("UpdateShard", mock.Anything).Return(nil).Once() - s.mockExecutionMgr.On("CompleteTimerTask", &persistence.CompleteTimerTaskRequest{ - VisibilityTimestamp: timer1.VisibilityTimestamp, - TaskID: timer1.TaskID}).Return(nil).Once() timerSequenceID1 := TimerSequenceID{VisibilityTimestamp: timer1.VisibilityTimestamp, TaskID: timer1.TaskID} s.timerQueueAckMgr.completeTimerTask(timer1) s.True(s.timerQueueAckMgr.outstandingTasks[timerSequenceID1]) s.timerQueueAckMgr.updateAckLevel() - s.Equal(timer1.VisibilityTimestamp, s.mockShard.GetTimerAckLevel(s.clusterName)) + s.Equal(timer1.VisibilityTimestamp, s.mockShard.GetTimerClusterAckLevel(s.clusterName)) // there will be no call to update shard - s.mockExecutionMgr.On("CompleteTimerTask", &persistence.CompleteTimerTaskRequest{ - VisibilityTimestamp: timer3.VisibilityTimestamp, - TaskID: timer3.TaskID}).Return(nil).Once() timerSequenceID3 := TimerSequenceID{VisibilityTimestamp: timer3.VisibilityTimestamp, TaskID: timer3.TaskID} s.timerQueueAckMgr.completeTimerTask(timer3) s.True(s.timerQueueAckMgr.outstandingTasks[timerSequenceID3]) s.timerQueueAckMgr.updateAckLevel() // ack level remains unchanged - s.Equal(timer1.VisibilityTimestamp, s.mockShard.GetTimerAckLevel(s.clusterName)) + s.Equal(timer1.VisibilityTimestamp, s.mockShard.GetTimerClusterAckLevel(s.clusterName)) // we are not testing shard context s.mockShardMgr.On("UpdateShard", mock.Anything).Return(nil).Once() - s.mockExecutionMgr.On("CompleteTimerTask", &persistence.CompleteTimerTaskRequest{ - VisibilityTimestamp: timer2.VisibilityTimestamp, - TaskID: timer2.TaskID}).Return(nil).Once() timerSequenceID2 := TimerSequenceID{VisibilityTimestamp: timer2.VisibilityTimestamp, TaskID: timer2.TaskID} s.timerQueueAckMgr.completeTimerTask(timer2) s.True(s.timerQueueAckMgr.outstandingTasks[timerSequenceID2]) s.timerQueueAckMgr.updateAckLevel() - s.Equal(timer3.VisibilityTimestamp, s.mockShard.GetTimerAckLevel(s.clusterName)) -} - -func (s *timerQueueAckMgrSuite) TestReadRetryCompleteUpdateTimerTasks() { - domainID := "some random domain ID" - ackLevel := s.timerQueueAckMgr.ackLevel - readLevel := s.timerQueueAckMgr.readLevel - macAckLevel := s.timerQueueAckMgr.maxAckLevel - - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ - // only thing used is the replication config and is global domain - IsGlobalDomain: true, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: s.clusterName, - // Clusters attr is not used. - }, - }, nil).Once() - - // create 3 timers, timer1 < timer2 < timer3 < now - timer := &persistence.TimerTaskInfo{ - DomainID: domainID, - WorkflowID: "some random workflow ID", - RunID: uuid.New(), - VisibilityTimestamp: time.Now().Add(-5 * time.Second), - TaskID: int64(59), - TaskType: 1, - TimeoutType: 2, - EventID: int64(28), - ScheduleAttempt: 0, - } - request := &persistence.GetTimerIndexTasksRequest{ - MinTimestamp: readLevel.VisibilityTimestamp, - MaxTimestamp: macAckLevel, - BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, - } - response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer}, - NextPageToken: nil, - } - s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() - filteredTasks, lookAheadTask, moreTasks, err := s.timerQueueAckMgr.readTimerTasks() - s.Nil(err) - s.Equal([]*persistence.TimerTaskInfo{timer}, filteredTasks) - s.Nil(lookAheadTask) - s.False(moreTasks) - - timerSequenceID := TimerSequenceID{VisibilityTimestamp: timer.VisibilityTimestamp, TaskID: timer.TaskID} - s.timerQueueAckMgr.retryTimerTask(timer) - // nothing changed to ack level, as well as outstanding task and task to cluster map - s.False(s.timerQueueAckMgr.outstandingTasks[timerSequenceID]) - s.Equal(timerSequenceID, s.timerQueueAckMgr.readLevel) - s.Equal(ackLevel, s.timerQueueAckMgr.ackLevel) - s.Equal([]*persistence.TimerTaskInfo{timer}, s.timerQueueAckMgr.retryTasks) - - // do another round of processing, db layer return nothing - request = &persistence.GetTimerIndexTasksRequest{ - MinTimestamp: s.timerQueueAckMgr.readLevel.VisibilityTimestamp, - MaxTimestamp: macAckLevel, - BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, - } - response = &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{}, - NextPageToken: nil, - } - s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() - filteredTasks, lookAheadTask, moreTasks, err = s.timerQueueAckMgr.readTimerTasks() - s.Nil(err) - s.Equal([]*persistence.TimerTaskInfo{timer}, filteredTasks) - s.Nil(lookAheadTask) - s.False(moreTasks) - // nothing changed to ack level, as well as outstanding task and task to cluster map - s.False(s.timerQueueAckMgr.outstandingTasks[timerSequenceID]) - s.Equal(timerSequenceID, s.timerQueueAckMgr.readLevel) - s.Equal(ackLevel, s.timerQueueAckMgr.ackLevel) - s.Equal([]*persistence.TimerTaskInfo{}, s.timerQueueAckMgr.retryTasks) + s.Equal(timer3.VisibilityTimestamp, s.mockShard.GetTimerClusterAckLevel(s.clusterName)) } // Tests for failover ack manager @@ -683,7 +583,7 @@ func (s *timerQueueFailoverAckMgrSuite) SetupTest() { s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) s.domainID = "some random domain ID" s.standbyClusterName = cluster.TestAlternativeClusterName - s.timerQueueFailoverAckMgr = newTimerQueueFailoverAckMgr(s.mockShard, s.metricsClient, s.domainID, s.standbyClusterName, s.logger) + s.timerQueueFailoverAckMgr = newTimerQueueFailoverAckMgr(s.mockShard, s.metricsClient, s.standbyClusterName, s.logger) } func (s *timerQueueFailoverAckMgrSuite) TearDownTest() { @@ -706,10 +606,12 @@ func (s *timerQueueFailoverAckMgrSuite) TestReadTimerTasks_HasNextPage() { macAckLevel := s.timerQueueFailoverAckMgr.maxAckLevel // test ack && read level is initialized correctly - s.Equal(s.mockShard.GetTimerAckLevel(s.standbyClusterName), ackLevel) - s.Equal(s.mockShard.GetTimerAckLevel(s.standbyClusterName), readLevel.VisibilityTimestamp) - s.Equal(s.mockShard.GetTimerAckLevel(s.mockShard.GetService().GetClusterMetadata().GetCurrentClusterName()), macAckLevel) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.standbyClusterName), ackLevel) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.standbyClusterName), readLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.mockShard.GetService().GetClusterMetadata().GetCurrentClusterName()), macAckLevel) + // make the batch size == return size to there will be a next page + s.mockShard.config.TimerTaskBatchSize = 1 request := &persistence.GetTimerIndexTasksRequest{ MinTimestamp: readLevel.VisibilityTimestamp, MaxTimestamp: macAckLevel, @@ -741,12 +643,10 @@ func (s *timerQueueFailoverAckMgrSuite) TestReadTimerTasks_HasNextPage() { } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer1, timer2}, - NextPageToken: []byte("some random next page token"), + Timers: []*persistence.TimerTaskInfo{timer1, timer2}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() - timers, lookAheadTimer, more, err := s.timerQueueFailoverAckMgr.readTimerTasks() s.Nil(err) s.Equal([]*persistence.TimerTaskInfo{timer1}, timers) @@ -762,9 +662,9 @@ func (s *timerQueueFailoverAckMgrSuite) TestReadTimerTasks_NoNextPage() { macAckLevel := s.timerQueueFailoverAckMgr.maxAckLevel // test ack && read level is initialized correctly - s.Equal(s.mockShard.GetTimerAckLevel(s.standbyClusterName), ackLevel) - s.Equal(s.mockShard.GetTimerAckLevel(s.standbyClusterName), readLevel.VisibilityTimestamp) - s.Equal(s.mockShard.GetTimerAckLevel(s.mockShard.GetService().GetClusterMetadata().GetCurrentClusterName()), macAckLevel) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.standbyClusterName), ackLevel) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.standbyClusterName), readLevel.VisibilityTimestamp) + s.Equal(s.mockShard.GetTimerClusterAckLevel(s.mockShard.GetService().GetClusterMetadata().GetCurrentClusterName()), macAckLevel) request := &persistence.GetTimerIndexTasksRequest{ MinTimestamp: readLevel.VisibilityTimestamp, @@ -773,8 +673,7 @@ func (s *timerQueueFailoverAckMgrSuite) TestReadTimerTasks_NoNextPage() { } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{}, - NextPageToken: nil, + Timers: []*persistence.TimerTaskInfo{}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() @@ -784,8 +683,7 @@ func (s *timerQueueFailoverAckMgrSuite) TestReadTimerTasks_NoNextPage() { s.Equal([]*persistence.TimerTaskInfo{}, timers) s.Nil(lookAheadTimer) s.False(more) - timerSequenceID := TimerSequenceID{VisibilityTimestamp: ackLevel} - s.Equal(timerSequenceID, s.timerQueueFailoverAckMgr.readLevel) + s.Equal(ackLevel, s.timerQueueFailoverAckMgr.readLevel) } func (s *timerQueueFailoverAckMgrSuite) TestReadTimerTasks_InTheFuture() { @@ -851,8 +749,7 @@ func (s *timerQueueFailoverAckMgrSuite) TestReadCompleteUpdateTimerTasks() { BatchSize: s.mockShard.GetConfig().TimerTaskBatchSize, } response := &persistence.GetTimerIndexTasksResponse{ - Timers: []*persistence.TimerTaskInfo{timer1, timer3}, - NextPageToken: nil, + Timers: []*persistence.TimerTaskInfo{timer1, timer3}, } s.mockExecutionMgr.On("GetTimerIndexTasks", request).Return(response, nil).Once() filteredTasks, lookAheadTask, moreTasks, err := s.timerQueueFailoverAckMgr.readTimerTasks() @@ -862,9 +759,6 @@ func (s *timerQueueFailoverAckMgrSuite) TestReadCompleteUpdateTimerTasks() { s.False(moreTasks) // there will be no call to update shard - s.mockExecutionMgr.On("CompleteTimerTask", &persistence.CompleteTimerTaskRequest{ - VisibilityTimestamp: timer3.VisibilityTimestamp, - TaskID: timer3.TaskID}).Return(nil).Once() timerSequenceID3 := TimerSequenceID{VisibilityTimestamp: timer3.VisibilityTimestamp, TaskID: timer3.TaskID} s.timerQueueFailoverAckMgr.completeTimerTask(timer3) s.True(s.timerQueueFailoverAckMgr.outstandingTasks[timerSequenceID3]) @@ -876,9 +770,6 @@ func (s *timerQueueFailoverAckMgrSuite) TestReadCompleteUpdateTimerTasks() { } // there will be no call to update shard, see issue #646 - s.mockExecutionMgr.On("CompleteTimerTask", &persistence.CompleteTimerTaskRequest{ - VisibilityTimestamp: timer1.VisibilityTimestamp, - TaskID: timer1.TaskID}).Return(nil).Once() timerSequenceID1 := TimerSequenceID{VisibilityTimestamp: timer1.VisibilityTimestamp, TaskID: timer1.TaskID} s.timerQueueFailoverAckMgr.completeTimerTask(timer1) s.True(s.timerQueueFailoverAckMgr.outstandingTasks[timerSequenceID1]) diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index ca74d70a375..e456a4cb90b 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -26,6 +26,7 @@ import ( "github.com/uber-common/bark" workflow "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" ) @@ -35,8 +36,10 @@ type ( shard ShardContext historyService *historyEngineImpl cache *historyCache + timerTaskFilter timerTaskFilter logger bark.Logger metricsClient metrics.Client + currentClusterName string timerGate LocalTimerGate timerQueueProcessorBase *timerQueueProcessorBase timerQueueAckMgr timerQueueAckMgr @@ -48,13 +51,36 @@ func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEng timeNow := func() time.Time { return shard.GetCurrentTime(clusterName) } + logger = logger.WithFields(bark.Fields{ + logging.TagWorkflowCluster: clusterName, + }) + timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) { + domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID) + if err != nil { + // it is possible that domain is deleted, + // we should treat that domain being active + if _, ok := err.(*workflow.EntityNotExistsError); !ok { + return false, err + } + return true, nil + } + if domainEntry.GetIsGlobalDomain() && + clusterName != domainEntry.GetReplicationConfig().ActiveClusterName { + // timer task does not belong to cluster name + return false, nil + } + return true, nil + } + timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, clusterName, logger) processor := &timerQueueActiveProcessorImpl{ shard: shard, historyService: historyService, cache: historyService.historyCache, + timerTaskFilter: timerTaskFilter, logger: logger, metricsClient: historyService.metricsClient, + currentClusterName: clusterName, timerGate: NewLocalTimerGate(), timerQueueProcessorBase: newTimerQueueProcessorBase(shard, historyService, timerQueueAckMgr, timeNow, logger), timerQueueAckMgr: timerQueueAckMgr, @@ -69,11 +95,22 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE // should use current cluster's time when doing domain failover return shard.GetCurrentTime(clusterName) } - timerQueueAckMgr := newTimerQueueFailoverAckMgr(shard, historyService.metricsClient, domainID, standbyClusterName, logger) + logger = logger.WithFields(bark.Fields{ + logging.TagWorkflowCluster: clusterName, + }) + timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) { + if timer.DomainID == domainID { + return true, nil + } + return false, nil + } + + timerQueueAckMgr := newTimerQueueFailoverAckMgr(shard, historyService.metricsClient, standbyClusterName, logger) processor := &timerQueueActiveProcessorImpl{ shard: shard, historyService: historyService, cache: historyService.historyCache, + timerTaskFilter: timerTaskFilter, logger: logger, metricsClient: historyService.metricsClient, timerGate: NewLocalTimerGate(), @@ -108,12 +145,19 @@ func (t *timerQueueActiveProcessorImpl) notifyNewTimers(timerTasks []persistence } func (t *timerQueueActiveProcessorImpl) process(timerTask *persistence.TimerTaskInfo) error { + ok, err := t.timerTaskFilter(timerTask) + if err != nil { + return err + } else if !ok { + t.timerQueueAckMgr.completeTimerTask(timerTask) + return nil + } + taskID := TimerSequenceID{VisibilityTimestamp: timerTask.VisibilityTimestamp, TaskID: timerTask.TaskID} t.logger.Debugf("Processing timer: (%s), for WorkflowID: %v, RunID: %v, Type: %v, TimeoutType: %v, EventID: %v", taskID, timerTask.WorkflowID, timerTask.RunID, t.timerQueueProcessorBase.getTimerTaskType(timerTask.TaskType), workflow.TimeoutType(timerTask.TimeoutType).String(), timerTask.EventID) - var err error scope := metrics.TimerQueueProcessorScope switch timerTask.TaskType { case persistence.TaskTypeUserTimer: diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 44988af67d7..9b40810d0eb 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -22,6 +22,7 @@ package history import ( "fmt" + "sync/atomic" "time" "github.com/uber-common/bark" @@ -32,11 +33,17 @@ import ( type ( timeNow func() time.Time + timerTaskFilter func(timer *persistence.TimerTaskInfo) (bool, error) timerQueueProcessorImpl struct { currentClusterName string shard ShardContext + config *Config historyService *historyEngineImpl + ackLevel TimerSequenceID logger bark.Logger + isStarted int32 + isStopped int32 + shutdownChan chan struct{} activeTimerProcessor *timerQueueActiveProcessorImpl standbyTimerProcessors map[string]*timerQueueStandbyProcessorImpl } @@ -57,25 +64,37 @@ func newTimerQueueProcessor(shard ShardContext, historyService *historyEngineImp return &timerQueueProcessorImpl{ currentClusterName: currentClusterName, shard: shard, + config: shard.GetConfig(), historyService: historyService, + ackLevel: TimerSequenceID{VisibilityTimestamp: shard.GetTimerAckLevel()}, logger: logger, + shutdownChan: make(chan struct{}), activeTimerProcessor: newTimerQueueActiveProcessor(shard, historyService, logger), standbyTimerProcessors: standbyTimerProcessors, } } func (t *timerQueueProcessorImpl) Start() { + if !atomic.CompareAndSwapInt32(&t.isStarted, 0, 1) { + return + } t.activeTimerProcessor.Start() for _, standbyTimerProcessor := range t.standbyTimerProcessors { standbyTimerProcessor.Start() } + + go t.completeTimersLoop() } func (t *timerQueueProcessorImpl) Stop() { + if !atomic.CompareAndSwapInt32(&t.isStopped, 0, 1) { + return + } t.activeTimerProcessor.Stop() for _, standbyTimerProcessor := range t.standbyTimerProcessors { standbyTimerProcessor.Stop() } + close(t.shutdownChan) } // NotifyNewTimers - Notify the processor about the new active / standby timer arrival. @@ -122,3 +141,82 @@ func (t *timerQueueProcessorImpl) getTimerFiredCount(clusterName string) uint64 } return standbyTimerProcessor.getTimerFiredCount() } + +func (t *timerQueueProcessorImpl) completeTimersLoop() { + timer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval) + for { + select { + case <-t.shutdownChan: + // before shutdown, make sure the ack level is up to date + t.completeTimers() + return + case <-timer.C: + CompleteLoop: + for attempt := 0; attempt < t.config.TimerProcessorCompleteTimerFailureRetryCount; attempt++ { + err := t.completeTimers() + if err != nil { + t.logger.Infof("Failed to complete timers: %v.", err) + backoff := time.Duration(attempt * 100) + time.Sleep(backoff * time.Millisecond) + } else { + break CompleteLoop + } + } + timer.Reset(t.config.TimerProcessorCompleteTimerInterval) + } + } +} + +func (t *timerQueueProcessorImpl) completeTimers() error { + lowerAckLevel := t.ackLevel + + upperAckLevel := t.activeTimerProcessor.timerQueueAckMgr.getAckLevel() + for _, standbyTimerProcessor := range t.standbyTimerProcessors { + ackLevel := standbyTimerProcessor.timerQueueAckMgr.getAckLevel() + if !compareTimerIDLess(&upperAckLevel, &ackLevel) { + upperAckLevel = ackLevel + } + } + + if !compareTimerIDLess(&lowerAckLevel, &upperAckLevel) { + return nil + } + + executionMgr := t.shard.GetExecutionManager() + minTimestamp := lowerAckLevel.VisibilityTimestamp + batchSize := t.config.TimerTaskBatchSize + +LoadCompleteLoop: + for { + request := &persistence.GetTimerIndexTasksRequest{ + MinTimestamp: minTimestamp, + MaxTimestamp: timerQueueAckMgrMaxTimestamp, + BatchSize: batchSize, + } + response, err := executionMgr.GetTimerIndexTasks(request) + if err != nil { + return err + } + + more := len(response.Timers) >= batchSize + for _, timer := range response.Timers { + timerSequenceID := TimerSequenceID{VisibilityTimestamp: timer.VisibilityTimestamp, TaskID: timer.TaskID} + if compareTimerIDLess(&upperAckLevel, &timerSequenceID) { + break LoadCompleteLoop + } + minTimestamp = timer.VisibilityTimestamp + if err := executionMgr.CompleteTimerTask(&persistence.CompleteTimerTaskRequest{ + VisibilityTimestamp: timer.VisibilityTimestamp, + TaskID: timer.TaskID}); err != nil { + t.logger.Warnf("Timer queue ack manager unable to complete timer task: %v; %v", timer, err) + } + } + + if !more { + break LoadCompleteLoop + } + } + t.ackLevel = upperAckLevel + t.shard.UpdateTimerAckLevel(t.ackLevel.VisibilityTimestamp) + return nil +} diff --git a/service/history/timerQueueProcessor2_test.go b/service/history/timerQueueProcessor2_test.go index 1674955ae95..019ddc568e6 100644 --- a/service/history/timerQueueProcessor2_test.go +++ b/service/history/timerQueueProcessor2_test.go @@ -203,8 +203,6 @@ func (s *timerQueueProcessor2Suite) TestTimerUpdateTimesOut() { s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return( &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{}}, nil) - s.mockExecutionMgr.On("CompleteTimerTask", mock.Anything).Return(nil).Once() - s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(errors.New("FAILED")).Once() s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once() @@ -269,7 +267,6 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() { wfResponse := &persistence.GetWorkflowExecutionResponse{State: ms} s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once() - s.mockExecutionMgr.On("CompleteTimerTask", mock.Anything).Return(nil).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Run(func(arguments mock.Arguments) { // Done. diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 6b97e561020..2a6944c85be 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -62,6 +62,10 @@ type ( timerProcessor timerProcessor timerQueueAckMgr timerQueueAckMgr + // worker coroutines notification + workerNotificationChans []chan struct{} + + // timer notification newTimerCh chan struct{} newTimeLock sync.Mutex newTime time.Time @@ -72,19 +76,27 @@ func newTimerQueueProcessorBase(shard ShardContext, historyService *historyEngin log := logger.WithFields(bark.Fields{ logging.TagWorkflowComponent: logging.TagValueTimerQueueComponent, }) + + workerNotificationChans := []chan struct{}{} + for index := 0; index < shard.GetConfig().ProcessTimerTaskWorkerCount; index++ { + workerNotificationChans = append(workerNotificationChans, make(chan struct{}, 1)) + } + base := &timerQueueProcessorBase{ - shard: shard, - historyService: historyService, - executionManager: shard.GetExecutionManager(), - shutdownCh: make(chan struct{}), - tasksCh: make(chan *persistence.TimerTaskInfo, 10*shard.GetConfig().TimerTaskBatchSize), - config: shard.GetConfig(), - logger: log, - metricsClient: historyService.metricsClient, - now: timeNow, - timerQueueAckMgr: timerQueueAckMgr, - newTimerCh: make(chan struct{}, 1), + shard: shard, + historyService: historyService, + executionManager: shard.GetExecutionManager(), + shutdownCh: make(chan struct{}), + tasksCh: make(chan *persistence.TimerTaskInfo, 10*shard.GetConfig().TimerTaskBatchSize), + config: shard.GetConfig(), + logger: log, + metricsClient: historyService.metricsClient, + now: timeNow, + timerQueueAckMgr: timerQueueAckMgr, + workerNotificationChans: workerNotificationChans, + newTimerCh: make(chan struct{}, 1), } + return base } @@ -94,7 +106,7 @@ func (t *timerQueueProcessorBase) Start() { } t.shutdownWG.Add(1) - go t.processorPump(t.config.ProcessTimerTaskWorkerCount) + go t.processorPump() t.logger.Info("Timer queue processor started.") } @@ -115,15 +127,16 @@ func (t *timerQueueProcessorBase) Stop() { t.logger.Info("Timer queue processor stopped.") } -func (t *timerQueueProcessorBase) processorPump(taskWorkerCount int) { +func (t *timerQueueProcessorBase) processorPump() { defer t.shutdownWG.Done() // Workers to process timer tasks that are expired. var workerWG sync.WaitGroup - for i := 0; i < taskWorkerCount; i++ { + for i := 0; i < t.config.ProcessTimerTaskWorkerCount; i++ { workerWG.Add(1) - go t.processTaskWorker(&workerWG) + notificationChan := t.workerNotificationChans[i] + go t.processTaskWorker(&workerWG, notificationChan) } RetryProcessor: @@ -146,7 +159,7 @@ RetryProcessor: t.logger.Info("Timer processor exiting.") } -func (t *timerQueueProcessorBase) processTaskWorker(workerWG *sync.WaitGroup) { +func (t *timerQueueProcessorBase) processTaskWorker(workerWG *sync.WaitGroup, notificationChan chan struct{}) { defer workerWG.Done() for { select { @@ -156,13 +169,25 @@ func (t *timerQueueProcessorBase) processTaskWorker(workerWG *sync.WaitGroup) { } UpdateFailureLoop: - for attempt := 1; attempt <= t.config.TimerProcessorUpdateFailureRetryCount; attempt++ { + for attempt := 1; attempt <= t.config.TimerProcessorUpdateFailureRetryCount; { + + // clear the existing notification + select { + case <-notificationChan: + default: + } + err := t.timerProcessor.process(task) if err != nil { - // We will retry until we don't find the timer task any more. - t.logger.Infof("Failed to process timer: %v; %v.", task, err) - backoff := time.Duration(attempt * 100) - time.Sleep(backoff * time.Millisecond) + if _, ok := err.(*taskRetryError); ok { + <-notificationChan + } else { + // We will retry until we don't find the timer task any more. + t.logger.Infof("Failed to process timer: %v; %v.", task, err) + backoff := time.Duration(attempt * 100) + time.Sleep(backoff * time.Millisecond) + attempt++ + } } else { atomic.AddUint64(&t.timerFiredCount, 1) break UpdateFailureLoop @@ -217,6 +242,8 @@ func (t *timerQueueProcessorBase) notifyNewTimers(timerTasks []persistence.Task, func (t *timerQueueProcessorBase) internalProcessor() error { timerGate := t.timerProcessor.getTimerGate() + pollTimer := time.NewTimer(t.config.TimerProcessorMaxPollInterval) + defer pollTimer.Stop() updateAckChan := time.NewTicker(t.shard.GetConfig().TimerProcessorUpdateAckInterval).C var nextKeyTask *persistence.TimerTaskInfo @@ -246,6 +273,10 @@ continueProcessor: case <-timerGate.FireChan(): // Timer Fired. + case <-pollTimer.C: + // forced timer scan + pollTimer.Reset(t.config.TimerProcessorMaxPollInterval) + case <-updateAckChan: t.timerQueueAckMgr.updateAckLevel() continue continueProcessor @@ -307,10 +338,12 @@ func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerT } } -func (t *timerQueueProcessorBase) retryTimerTasks() { - timerTasks := t.timerQueueAckMgr.readRetryTimerTasks() - for _, task := range timerTasks { - t.tasksCh <- task +func (t *timerQueueProcessorBase) retryTasks() { + for _, workerNotificationChan := range t.workerNotificationChans { + select { + case workerNotificationChan <- struct{}{}: + default: + } } } diff --git a/service/history/timerQueueProcessor_test.go b/service/history/timerQueueProcessor_test.go index d4cc7662ef0..4d1fd4e3453 100644 --- a/service/history/timerQueueProcessor_test.go +++ b/service/history/timerQueueProcessor_test.go @@ -70,6 +70,9 @@ func (s *timerQueueProcessorSuite) SetupSuite() { historyCache := newHistoryCache(s.ShardContext, s.logger) historyCache.disabled = true + // set the standby cluster's timer ack level to max since we are not testing it + // but we are testing the complete timer functionality + s.ShardContext.UpdateTimerClusterAckLevel(cluster.TestAlternativeClusterName, timerQueueAckMgrMaxTimestamp) s.engineImpl = &historyEngineImpl{ currentclusterName: s.ShardContext.GetService().GetClusterMetadata().GetCurrentClusterName(), shard: s.ShardContext, @@ -242,10 +245,9 @@ func (s *timerQueueProcessorSuite) TestSingleTimerTask() { identity := "testIdentity" _, tt := s.createExecutionWithTimers(domainID, workflowExecution, taskList, identity, []int32{1}) - timerInfo, nextToken, err := s.GetTimerIndexTasks() + timerInfo, err := s.GetTimerIndexTasks() s.Nil(err, "No error expected.") s.NotEmpty(timerInfo, "Expected non empty timers list") - s.Equal(0, len(nextToken)) s.Equal(1, len(timerInfo)) processor := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.logger).(*timerQueueProcessorImpl) @@ -253,7 +255,7 @@ func (s *timerQueueProcessorSuite) TestSingleTimerTask() { processor.NotifyNewTimers(cluster.TestCurrentClusterName, tt) for { - timerInfo, _, err := s.GetTimerIndexTasks() + timerInfo, err := s.GetTimerIndexTasks() s.Nil(err, "No error expected.") if len(timerInfo) == 0 { processor.Stop() @@ -262,10 +264,9 @@ func (s *timerQueueProcessorSuite) TestSingleTimerTask() { time.Sleep(10 * time.Millisecond) } - timerInfo, nextToken, err = s.GetTimerIndexTasks() + timerInfo, err = s.GetTimerIndexTasks() s.Nil(err, "No error expected.") s.Equal(0, len(timerInfo)) - s.Equal(0, len(nextToken)) } func (s *timerQueueProcessorSuite) TestManyTimerTasks() { @@ -277,19 +278,17 @@ func (s *timerQueueProcessorSuite) TestManyTimerTasks() { identity := "testIdentity" _, tt := s.createExecutionWithTimers(domainID, workflowExecution, taskList, identity, []int32{1, 2, 3}) - timerInfo, nextToken, err := s.GetTimerIndexTasks() + timerInfo, err := s.GetTimerIndexTasks() s.Nil(err, "No error expected.") s.NotEmpty(timerInfo, "Expected non empty timers list") s.Equal(1, len(timerInfo)) - s.Equal(0, len(nextToken)) processor := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.logger).(*timerQueueProcessorImpl) processor.Start() - processor.NotifyNewTimers(cluster.TestCurrentClusterName, tt) for { - timerInfo, _, err := s.GetTimerIndexTasks() + timerInfo, err := s.GetTimerIndexTasks() s.logger.Infof("TestManyTimerTasks: GetTimerIndexTasks: Response Count: %d \n", len(timerInfo)) s.Nil(err, "No error expected.") if len(timerInfo) == 0 { @@ -299,10 +298,9 @@ func (s *timerQueueProcessorSuite) TestManyTimerTasks() { time.Sleep(1000 * time.Millisecond) } - timerInfo, nextToken, err = s.GetTimerIndexTasks() + timerInfo, err = s.GetTimerIndexTasks() s.Nil(err, "No error expected.") s.Equal(0, len(timerInfo)) - s.Equal(0, len(nextToken)) s.Equal(uint64(3), processor.getTimerFiredCount(cluster.TestCurrentClusterName)) } @@ -317,10 +315,9 @@ func (s *timerQueueProcessorSuite) TestTimerTaskAfterProcessorStart() { s.createExecutionWithTimers(domainID, workflowExecution, taskList, identity, []int32{}) - timerInfo, nextToken, err := s.GetTimerIndexTasks() + timerInfo, err := s.GetTimerIndexTasks() s.Nil(err, "No error expected.") s.Empty(timerInfo, "Expected empty timers list") - s.Equal(0, len(nextToken)) processor := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.logger).(*timerQueueProcessorImpl) processor.Start() @@ -331,24 +328,23 @@ func (s *timerQueueProcessorSuite) TestTimerTaskAfterProcessorStart() { s.waitForTimerTasksToProcess(processor) - timerInfo, nextToken, err = s.GetTimerIndexTasks() + timerInfo, err = s.GetTimerIndexTasks() s.Nil(err, "No error expected.") s.Equal(0, len(timerInfo)) - s.Equal(0, len(nextToken)) s.Equal(uint64(1), processor.getTimerFiredCount(cluster.TestCurrentClusterName)) } func (s *timerQueueProcessorSuite) waitForTimerTasksToProcess(p timerQueueProcessor) { - for i := 0; i < 10; i++ { - timerInfo, _, err := s.GetTimerIndexTasks() + for { + timerInfo, err := s.GetTimerIndexTasks() s.Nil(err, "No error expected.") if len(timerInfo) == 0 { - p.Stop() break } time.Sleep(1000 * time.Millisecond) } + p.Stop() } func (s *timerQueueProcessorSuite) checkTimedOutEventFor(domainID string, we workflow.WorkflowExecution, @@ -770,7 +766,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask_SameExpiry() { s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{}) // TimeoutTypeScheduleToClose - Scheduled, started, completed. - p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl) + p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.logger).(*timerQueueProcessorImpl) p.Start() state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution) diff --git a/service/history/timerQueueStandbyProcessor.go b/service/history/timerQueueStandbyProcessor.go index 0d5cdac3e88..593a0dd8473 100644 --- a/service/history/timerQueueStandbyProcessor.go +++ b/service/history/timerQueueStandbyProcessor.go @@ -27,6 +27,7 @@ import ( "github.com/uber-common/bark" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" ) @@ -36,6 +37,7 @@ type ( shard ShardContext historyService *historyEngineImpl cache *historyCache + timerTaskFilter timerTaskFilter logger bark.Logger metricsClient metrics.Client clusterName string @@ -49,11 +51,31 @@ func newTimerQueueStandbyProcessor(shard ShardContext, historyService *historyEn timeNow := func() time.Time { return shard.GetCurrentTime(clusterName) } + logger = logger.WithFields(bark.Fields{ + logging.TagWorkflowCluster: clusterName, + }) + timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) { + domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID) + if err != nil { + return false, err + } + if !domainEntry.GetIsGlobalDomain() { + // non global domain, timer task does not belong here + return false, nil + } else if domainEntry.GetIsGlobalDomain() && + domainEntry.GetReplicationConfig().ActiveClusterName != clusterName { + // timer task does not belong here + return false, nil + } + return true, nil + } + timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, clusterName, logger) processor := &timerQueueStandbyProcessorImpl{ shard: shard, historyService: historyService, cache: historyService.historyCache, + timerTaskFilter: timerTaskFilter, logger: logger, metricsClient: historyService.metricsClient, clusterName: clusterName, @@ -83,7 +105,7 @@ func (t *timerQueueStandbyProcessorImpl) getTimerGate() TimerGate { func (t *timerQueueStandbyProcessorImpl) setCurrentTime(currentTime time.Time) { t.timerGate.SetCurrentTime(currentTime) - t.timerQueueProcessorBase.retryTimerTasks() + t.timerQueueProcessorBase.retryTasks() } // NotifyNewTimers - Notify the processor about the new standby timer events arrival. @@ -93,12 +115,19 @@ func (t *timerQueueStandbyProcessorImpl) notifyNewTimers(timerTasks []persistenc } func (t *timerQueueStandbyProcessorImpl) process(timerTask *persistence.TimerTaskInfo) error { + ok, err := t.timerTaskFilter(timerTask) + if err != nil { + return err + } else if !ok { + t.timerQueueAckMgr.completeTimerTask(timerTask) + return nil + } + taskID := TimerSequenceID{VisibilityTimestamp: timerTask.VisibilityTimestamp, TaskID: timerTask.TaskID} t.logger.Debugf("Processing timer: (%s), for WorkflowID: %v, RunID: %v, Type: %v, TimeoutType: %v, EventID: %v", taskID, timerTask.WorkflowID, timerTask.RunID, t.timerQueueProcessorBase.getTimerTaskType(timerTask.TaskType), workflow.TimeoutType(timerTask.TimeoutType).String(), timerTask.EventID) - var err error scope := metrics.TimerQueueProcessorScope switch timerTask.TaskType { case persistence.TaskTypeUserTimer: @@ -132,6 +161,8 @@ func (t *timerQueueStandbyProcessorImpl) process(timerTask *persistence.TimerTas if err != nil { t.metricsClient.IncCounter(scope, metrics.TaskFailures) } + } else { + t.timerQueueAckMgr.completeTimerTask(timerTask) } return err @@ -166,15 +197,13 @@ func (t *timerQueueStandbyProcessorImpl) processExpiredUserTimer(timerTask *pers // // we do not need to notity new timer to base, since if there is no new event being replicated // checking again if the timer can be completed is meaningless - t.timerQueueAckMgr.retryTimerTask(timerTask) - return nil + return newTaskRetryError() } // since the user timer are already sorted, so if there is one timer which will not expired // all user timer after this timer will not expired break ExpireUserTimers } // if there is no user timer expired, then we are good - t.timerQueueAckMgr.completeTimerTask(timerTask) return nil }) } @@ -208,15 +237,13 @@ func (t *timerQueueStandbyProcessorImpl) processActivityTimeout(timerTask *persi // // we do not need to notity new timer to base, since if there is no new event being replicated // checking again if the timer can be completed is meaningless - t.timerQueueAckMgr.retryTimerTask(timerTask) - return nil + return newTaskRetryError() } // since the activity timer are already sorted, so if there is one timer which will not expired // all activity timer after this timer will not expired break ExpireActivityTimers } // if there is no user timer expired, then we are good - t.timerQueueAckMgr.completeTimerTask(timerTask) return nil }) } @@ -235,10 +262,8 @@ func (t *timerQueueStandbyProcessorImpl) processDecisionTimeout(timerTask *persi // // we do not need to notity new timer to base, since if there is no new event being replicated // checking again if the timer can be completed is meaningless - t.timerQueueAckMgr.retryTimerTask(timerTask) - return nil + return newTaskRetryError() } - t.timerQueueAckMgr.completeTimerTask(timerTask) return nil }) } @@ -251,8 +276,7 @@ func (t *timerQueueStandbyProcessorImpl) processWorkflowTimeout(timerTask *persi return t.processTimer(timerTask, func(msBuilder *mutableStateBuilder) error { // we do not need to notity new timer to base, since if there is no new event being replicated // checking again if the timer can be completed is meaningless - t.timerQueueAckMgr.retryTimerTask(timerTask) - return nil + return newTaskRetryError() }) } @@ -282,7 +306,6 @@ Process_Loop: if !msBuilder.isWorkflowExecutionRunning() { // workflow already finished, no need to process the timer - t.timerQueueAckMgr.completeTimerTask(timerTask) return nil } diff --git a/service/history/timerQueueStandbyProcessor_test.go b/service/history/timerQueueStandbyProcessor_test.go index 2a121425f8d..b24df29079c 100644 --- a/service/history/timerQueueStandbyProcessor_test.go +++ b/service/history/timerQueueStandbyProcessor_test.go @@ -93,9 +93,10 @@ func (s *timerQueueStandbyProcessorSuite) SetupTest() { // ack manager will use the domain information s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(&persistence.GetDomainResponse{ // only thing used is the replication config - Config: &persistence.DomainConfig{Retention: 1}, + Config: &persistence.DomainConfig{Retention: 1}, + IsGlobalDomain: true, ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: cluster.TestCurrentClusterName, + ActiveClusterName: cluster.TestAlternativeClusterName, // Clusters attr is not used. }, }, nil).Once() @@ -132,11 +133,8 @@ func (s *timerQueueStandbyProcessorSuite) SetupTest() { hSerializerFactory: persistence.NewHistorySerializerFactory(), metricsClient: s.mockShard.GetMetricsClient(), } - // // this is used by shard context, not relevent to this test, so we do not care how many times "GetCurrentClusterName" os called - // s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - // s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions) s.mockHistoryEngine = h - + s.clusterName = cluster.TestAlternativeClusterName s.timerQueueStandbyProcessor = newTimerQueueStandbyProcessor(s.mockShard, h, s.clusterName, s.logger) s.mocktimerQueueAckMgr = &MockTimerQueueAckMgr{} s.timerQueueStandbyProcessor.timerQueueAckMgr = s.mocktimerQueueAckMgr @@ -149,6 +147,7 @@ func (s *timerQueueStandbyProcessorSuite) TearDownTest() { s.mockVisibilityMgr.AssertExpectations(s.T()) s.mockClusterMetadata.AssertExpectations(s.T()) s.mockProducer.AssertExpectations(s.T()) + s.mocktimerQueueAckMgr.AssertExpectations(s.T()) } func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Pending() { @@ -197,9 +196,10 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Pending() persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mocktimerQueueAckMgr.On("retryTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + err := s.timerQueueStandbyProcessor.process(timerTask) + _, ok := err.(*taskRetryError) + s.True(ok) } func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Success() { @@ -252,7 +252,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Success() s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() s.mocktimerQueueAckMgr.On("completeTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + s.Nil(s.timerQueueStandbyProcessor.process(timerTask)) } func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Multiple() { @@ -311,7 +311,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessExpiredUserTimer_Multiple() s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() s.mocktimerQueueAckMgr.On("completeTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + s.Nil(s.timerQueueStandbyProcessor.process(timerTask)) } func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Pending() { @@ -363,9 +363,10 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Pending() { persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() - s.mocktimerQueueAckMgr.On("retryTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + err := s.timerQueueStandbyProcessor.process(timerTask) + _, ok := err.(*taskRetryError) + s.True(ok) } func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Success() { @@ -424,7 +425,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Success() { s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() s.mocktimerQueueAckMgr.On("completeTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + s.Nil(s.timerQueueStandbyProcessor.process(timerTask)) } func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Multiple() { @@ -490,7 +491,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessActivityTimeout_Multiple() s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() s.mocktimerQueueAckMgr.On("completeTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + s.Nil(s.timerQueueStandbyProcessor.process(timerTask)) } func (s *timerQueueStandbyProcessorSuite) TestProcessDecisionTimeout_Pending() { @@ -531,9 +532,10 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessDecisionTimeout_Pending() { persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() - s.mocktimerQueueAckMgr.On("retryTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + err := s.timerQueueStandbyProcessor.process(timerTask) + _, ok := err.(*taskRetryError) + s.True(ok) } func (s *timerQueueStandbyProcessorSuite) TestProcessDecisionTimeout_Success() { @@ -578,7 +580,7 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessDecisionTimeout_Success() { s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() s.mocktimerQueueAckMgr.On("completeTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + s.Nil(s.timerQueueStandbyProcessor.process(timerTask)) } func (s *timerQueueStandbyProcessorSuite) TestProcessWorkflowTimeout_Pending() { @@ -620,9 +622,10 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessWorkflowTimeout_Pending() { persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() - s.mocktimerQueueAckMgr.On("retryTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + err := s.timerQueueStandbyProcessor.process(timerTask) + _, ok := err.(*taskRetryError) + s.True(ok) } func (s *timerQueueStandbyProcessorSuite) TestProcessWorkflowTimeout_Success() { @@ -667,5 +670,5 @@ func (s *timerQueueStandbyProcessorSuite) TestProcessWorkflowTimeout_Success() { s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once() s.mocktimerQueueAckMgr.On("completeTimerTask", timerTask).Return(nil).Once() - s.timerQueueStandbyProcessor.process(timerTask) + s.Nil(s.timerQueueStandbyProcessor.process(timerTask)) }