Skip to content

Commit

Permalink
add timer failover ack manager & UT, wire failover ack manager to act…
Browse files Browse the repository at this point in the history
…ive timer processor
  • Loading branch information
Wenquan Xing committed Apr 2, 2018
1 parent e6c257a commit 17bf26f
Show file tree
Hide file tree
Showing 17 changed files with 1,330 additions and 141 deletions.
1 change: 1 addition & 0 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,7 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq
minTimestamp,
maxTimestamp,
request.BatchSize)
query.PageState(request.NextPageToken)

iter := query.Iter()
if iter == nil {
Expand Down
7 changes: 4 additions & 3 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,10 @@ type (
// GetTimerIndexTasksRequest is the request for GetTimerIndexTasks
// TODO: replace this with an iterator that can configure min and max index.
GetTimerIndexTasksRequest struct {
MinTimestamp time.Time
MaxTimestamp time.Time
BatchSize int
MinTimestamp time.Time
MaxTimestamp time.Time
BatchSize int
NextPageToken []byte
}

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
Expand Down
31 changes: 31 additions & 0 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ type MockTimerQueueAckMgr struct {
mock.Mock
}

var _ timerQueueAckMgr = (*MockTimerQueueAckMgr)(nil)

// getFinishedChan is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) getFinishedChan() <-chan struct{} {
ret := _m.Called()

var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}

// readTimerTasks is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) {
ret := _m.Called()
Expand Down Expand Up @@ -75,6 +92,20 @@ func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskI
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) readRetryTimerTasks() []*persistence.TimerTaskInfo {
ret := _m.Called()

var r0 []*persistence.TimerTaskInfo
if rf, ok := ret.Get(0).(func() []*persistence.TimerTaskInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*persistence.TimerTaskInfo)
}
}
return r0
}

func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.Vis
historyEventNotifier: historyEventNotifier,
}
txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, executionManager, logger)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, logger)
historyEngImpl.txProcessor = txProcessor
shardWrapper.txProcessor = txProcessor

Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *engine2Suite) SetupTest() {
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions)
h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockExecutionMgr, s.logger)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.logger)
s.historyEngine = h
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ type (
}

timerQueueAckMgr interface {
getFinishedChan() <-chan struct{}
readRetryTimerTasks() []*persistence.TimerTaskInfo
readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(timerTask *persistence.TimerTaskInfo)
updateAckLevel()
isProcessNow(time.Time) bool
}

historyEventNotifier interface {
Expand Down
7 changes: 6 additions & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *engineSuite) SetupTest() {
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions)
h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.mockExecutionMgr, s.logger)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.logger)
h.historyEventNotifier.Start()
shardContextWrapper.txProcessor = h.txProcessor
s.mockHistoryEngine = h
Expand Down Expand Up @@ -3507,6 +3507,10 @@ func addTimerStartedEvent(builder *mutableStateBuilder, decisionCompletedEventID
})
}

func addTimerFiredEvent(builder *mutableStateBuilder, scheduleID int64, timerID string) *workflow.HistoryEvent {
return builder.AddTimerFiredEvent(scheduleID, timerID)
}

func addRequestCancelInitiatedEvent(builder *mutableStateBuilder, decisionCompletedEventID int64,
cancelRequestID, domain, workflowID, runID string) *workflow.HistoryEvent {
event, _ := builder.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(decisionCompletedEventID,
Expand Down Expand Up @@ -3636,6 +3640,7 @@ func copyActivityInfo(sourceInfo *persistence.ActivityInfo) *persistence.Activit
HeartbeatTimeout: sourceInfo.HeartbeatTimeout,
CancelRequested: sourceInfo.CancelRequested,
CancelRequestID: sourceInfo.CancelRequestID,
TimerTaskStatus: sourceInfo.TimerTaskStatus,
}
}

Expand Down
181 changes: 138 additions & 43 deletions service/history/timerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,28 @@ var (
type (
timerSequenceIDs []TimerSequenceID

timerTaskPredicate func(timerDomainID string) (bool, error)

timerQueueAckMgrImpl struct {
isFailover bool
clusterName string
shard ShardContext
executionMgr persistence.ExecutionManager
logger bark.Logger
metricsClient metrics.Client
lastUpdated time.Time
config *Config
// timer predicate for filtering
timerTaskPredicate timerTaskPredicate
// immutable max possible timer level
maxAckLevel time.Time
// isReadFinished indicate timer queue ack manager
// have no more task to send out
isReadFinished bool
// finishedChan will send out signal when timer
// queue ack manager have no more task to send out and all
// tasks sent are finished
finishedChan chan struct{}

sync.Mutex
// outstanding timer task -> finished (true)
Expand All @@ -65,6 +79,8 @@ type (
// TODO this processing logic potentially has bug, refer to #605, #608
)

var _ timerQueueAckMgr = (*timerQueueAckMgrImpl)(nil)

// Len implements sort.Interace
func (t timerSequenceIDs) Len() int {
return len(t)
Expand All @@ -80,26 +96,82 @@ func (t timerSequenceIDs) Less(i, j int) bool {
return compareTimerIDLess(&t[i], &t[j])
}

func newTimerQueueAckMgr(shard ShardContext, metricsClient metrics.Client, executionMgr persistence.ExecutionManager, clusterName string, logger bark.Logger) *timerQueueAckMgrImpl {
config := shard.GetConfig()
func newTimerQueueAckMgr(shard ShardContext, metricsClient metrics.Client, clusterName string, logger bark.Logger) *timerQueueAckMgrImpl {
timerTaskPredicate := func(timerDomainID string) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(timerDomainID)
if err != nil {
return false, err
}
if !domainEntry.GetIsGlobalDomain() &&
clusterName != shard.GetService().GetClusterMetadata().GetCurrentClusterName() {
// timer task does not belong to cluster name
return false, nil
} else if domainEntry.GetIsGlobalDomain() &&
domainEntry.GetReplicationConfig().ActiveClusterName != clusterName {
// timer task does not belong here
return false, nil
}
return true, nil
}
ackLevel := shard.GetTimerAckLevel(clusterName)
maxAckLevel := timerQueueAckMgrMaxTimestamp

timerQueueAckMgrImpl := &timerQueueAckMgrImpl{
isFailover: false,
clusterName: clusterName,
shard: shard,
executionMgr: shard.GetExecutionManager(),
metricsClient: metricsClient,
logger: logger,
lastUpdated: time.Now(), // this has nothing to do with remote cluster, so use the local time
config: shard.GetConfig(),
outstandingTasks: make(map[TimerSequenceID]bool),
retryTasks: []*persistence.TimerTaskInfo{},
readLevel: TimerSequenceID{VisibilityTimestamp: ackLevel},
ackLevel: ackLevel,
maxAckLevel: maxAckLevel,
timerTaskPredicate: timerTaskPredicate,
isReadFinished: false,
finishedChan: make(chan struct{}, 1),
}

return timerQueueAckMgrImpl
}

func newTimerQueueFailoverAckMgr(shard ShardContext, metricsClient metrics.Client, domainID string, standbyClusterName string, logger bark.Logger) *timerQueueAckMgrImpl {
timerTaskPredicate := func(timerDomainID string) (bool, error) {
return timerDomainID == domainID, nil
}
// failover ack manager will start from the standby cluster's ack level to active cluster's ack level
ackLevel := shard.GetTimerAckLevel(standbyClusterName)
maxAckLevel := shard.GetTimerAckLevel(shard.GetService().GetClusterMetadata().GetCurrentClusterName())

timerQueueAckMgrImpl := &timerQueueAckMgrImpl{
clusterName: clusterName,
shard: shard,
executionMgr: executionMgr,
metricsClient: metricsClient,
logger: logger,
lastUpdated: time.Now(), // this has nothing to do with remote cluster, so use the local time
config: config,
outstandingTasks: make(map[TimerSequenceID]bool),
retryTasks: []*persistence.TimerTaskInfo{},
readLevel: TimerSequenceID{VisibilityTimestamp: ackLevel},
ackLevel: ackLevel,
isFailover: true,
clusterName: standbyClusterName,
shard: shard,
executionMgr: shard.GetExecutionManager(),
metricsClient: metricsClient,
logger: logger,
lastUpdated: time.Now(), // this has nothing to do with remote cluster, so use the local time
config: shard.GetConfig(),
outstandingTasks: make(map[TimerSequenceID]bool),
retryTasks: []*persistence.TimerTaskInfo{},
readLevel: TimerSequenceID{VisibilityTimestamp: ackLevel},
ackLevel: ackLevel,
maxAckLevel: maxAckLevel,
timerTaskPredicate: timerTaskPredicate,
isReadFinished: false,
finishedChan: make(chan struct{}, 1),
}

return timerQueueAckMgrImpl
}

func (t *timerQueueAckMgrImpl) getFinishedChan() <-chan struct{} {
return t.finishedChan
}

func (t *timerQueueAckMgrImpl) readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) {
t.Lock()
readLevel := t.readLevel
Expand All @@ -109,9 +181,9 @@ func (t *timerQueueAckMgrImpl) readTimerTasks() ([]*persistence.TimerTaskInfo, *
var tasks []*persistence.TimerTaskInfo
morePage := timerTaskRetrySize > 0
var err error
if timerTaskRetrySize < t.config.TimerTaskBatchSize {
if timerTaskRetrySize < t.config.TimerTaskBatchSize && readLevel.VisibilityTimestamp.Before(t.maxAckLevel) {
var token []byte
tasks, token, err = t.getTimerTasks(readLevel.VisibilityTimestamp, timerQueueAckMgrMaxTimestamp, t.config.TimerTaskBatchSize)
tasks, token, err = t.getTimerTasks(readLevel.VisibilityTimestamp, t.maxAckLevel, t.config.TimerTaskBatchSize)
if err != nil {
return nil, nil, false, err
}
Expand All @@ -126,6 +198,10 @@ func (t *timerQueueAckMgrImpl) readTimerTasks() ([]*persistence.TimerTaskInfo, *
var lookAheadTask *persistence.TimerTaskInfo
t.Lock()
defer t.Unlock()
if t.isFailover && !morePage {
t.isReadFinished = true
}

// fillin the retry task
filteredTasks := t.retryTasks
t.retryTasks = []*persistence.TimerTaskInfo{}
Expand All @@ -145,24 +221,6 @@ TaskFilterLoop:
continue TaskFilterLoop
}

domainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.DomainID)
if err != nil {
return nil, nil, false, err
}
if !domainEntry.GetIsGlobalDomain() {
if t.clusterName != t.shard.GetService().GetClusterMetadata().GetCurrentClusterName() {
// timer task does not belong to cluster name
continue TaskFilterLoop
}
} else {
// global domain we need to check the cluster name
clusterName := domainEntry.GetReplicationConfig().ActiveClusterName
if clusterName != t.clusterName {
// timer task does not belong here
continue TaskFilterLoop
}
}

// TODO potential bug here
// there can be severe case when this readTimerTasks is called multiple times
// and one of the call is really slow, causing the read level updated by other threads,
Expand All @@ -173,6 +231,18 @@ TaskFilterLoop:
timerSequenceID, readLevel)
}

ok, err := t.timerTaskPredicate(task.DomainID)
if err != nil {
return nil, nil, false, err
}
if !ok {
// we are not interestes in this timer task,
// however we should update the read level so
// to skip this timer task next time
readLevel = timerSequenceID
continue TaskFilterLoop
}

if !t.isProcessNow(task.VisibilityTimestamp) {
lookAheadTask = task
break TaskFilterLoop
Expand Down Expand Up @@ -248,22 +318,20 @@ MoveAckLevelLoop:
}
}
t.ackLevel = updatedAckLevel

if t.isFailover && t.isReadFinished && len(outstandingTasks) == 0 {
// this means in failover mode, all possible failover timer tasks
// are processed and we are free to shundown
t.finishedChan <- struct{}{}
}
t.Unlock()

// Do not update Acklevel if nothing changed upto force update interval
if initialAckLevel == updatedAckLevel && time.Since(t.lastUpdated) < t.config.TimerProcessorForceUpdateInterval {
return
}

t.logger.Debugf("Updating timer ack level: %v", updatedAckLevel)

// Always update ackLevel to detect if the shared is stolen
if err := t.shard.UpdateTimerAckLevel(t.clusterName, updatedAckLevel); err != nil {
t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.AckLevelUpdateFailedCounter)
t.logger.Errorf("Error updating timer ack level for shard: %v", err)
} else {
t.lastUpdated = time.Now() // this has nothing to do with remote cluster, so use the local time
}
t.updateTimerAckLevel(updatedAckLevel)
}

// this function does not take cluster name as parameter, due to we only have one timer queue on Cassandra
Expand All @@ -287,6 +355,33 @@ func (t *timerQueueAckMgrImpl) getTimerTasks(minTimestamp time.Time, maxTimestam
return nil, nil, ErrMaxAttemptsExceeded
}

func (t *timerQueueAckMgrImpl) updateTimerAckLevel(ackLevel time.Time) {
t.logger.Debugf("Updating timer ack level: %v", ackLevel)

// not failover ack level updating
if !t.isFailover {
// Always update ackLevel to detect if the shared is stolen
if err := t.shard.UpdateTimerAckLevel(t.clusterName, ackLevel); err != nil {
t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.AckLevelUpdateFailedCounter)
t.logger.Errorf("Error updating timer ack level for shard: %v", err)
} else {
t.lastUpdated = time.Now() // this has nothing to do with remote cluster, so use the local time
}
} else {
// TODO failover ack manager should persist failover ack level to Cassandra: issue #646
}

}

func (t *timerQueueAckMgrImpl) isProcessNow(expiryTime time.Time) bool {
return !expiryTime.IsZero() && expiryTime.UnixNano() <= t.shard.GetCurrentTime(t.clusterName).UnixNano()
var now time.Time
if !t.isFailover {
// not failover, use the cluster's local time
now = t.shard.GetCurrentTime(t.clusterName)
} else {
// if ack manager is a failover manager, we need to use the current local time
now = t.shard.GetCurrentTime(t.shard.GetService().GetClusterMetadata().GetCurrentClusterName())
}

return !expiryTime.IsZero() && expiryTime.UnixNano() <= now.UnixNano()
}
Loading

0 comments on commit 17bf26f

Please sign in to comment.