Skip to content

Commit

Permalink
implemented functionality in timer ack manager & timer processor base…
Browse files Browse the repository at this point in the history
… to allow failover (#648)

* add timer failover ack manager & UT, wire failover ack manager to active timer processor

* move the check of domain of timer task from ack manager to processor, processor will try to process a standby task indefinitely, until task is finished.
  • Loading branch information
wxing1292 authored Apr 18, 2018
1 parent c543b1c commit c4a6e1d
Show file tree
Hide file tree
Showing 23 changed files with 1,689 additions and 421 deletions.
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
// workflow logging tags
TagWorkflowEventID = "wf-event-id"
TagWorkflowComponent = "wf-component"
TagWorkflowCluster = "wf-cluster"
TagWorkflowErr = "wf-error"
TagHistoryBuilderAction = "history-builder-action"
TagStoreOperation = "store-operation"
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,9 +2037,6 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq

response.Timers = append(response.Timers, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
Expand Down
6 changes: 2 additions & 4 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,10 +877,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, nextToken, err1 := s.GetTimerIndexTasks()
timerTasks, err1 := s.GetTimerIndexTasks()
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")
s.Equal(0, len(nextToken))
s.Equal(3, len(timerTasks))
s.Equal(TaskTypeWorkflowTimeout, timerTasks[1].TaskType)
s.Equal(TaskTypeDeleteHistoryEvent, timerTasks[2].TaskType)
Expand All @@ -895,10 +894,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 = s.CompleteTimerTask(timerTasks[2].VisibilityTimestamp, timerTasks[2].TaskID)
s.Nil(err2, "No error expected.")

timerTasks2, nextToken, err2 := s.GetTimerIndexTasks()
timerTasks2, err2 := s.GetTimerIndexTasks()
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
s.Equal(0, len(nextToken))
}

func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,7 @@ type (

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
NextPageToken []byte
Timers []*TimerTaskInfo
}

// SerializedHistoryEventBatch represents a serialized batch of history events
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,17 +748,17 @@ func (s *TestBase) CompleteReplicationTask(taskID int64) error {
}

// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, []byte, error) {
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
MinTimestamp: time.Time{},
MaxTimestamp: time.Unix(0, math.MaxInt64),
BatchSize: 10})

if err != nil {
return nil, nil, err
return nil, err
}

return response.Timers, response.NextPageToken, nil
return response.Timers, nil
}

// CompleteTimerTask is a utility method to complete a timer task
Expand Down
33 changes: 30 additions & 3 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ type MockTimerQueueAckMgr struct {
mock.Mock
}

var _ timerQueueAckMgr = (*MockTimerQueueAckMgr)(nil)

// getFinishedChan is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) getFinishedChan() <-chan struct{} {
ret := _m.Called()

var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}

// readTimerTasks is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) {
ret := _m.Called()
Expand Down Expand Up @@ -71,12 +88,22 @@ func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo,
return r0, r1, r2, r3
}

func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskInfo) {
func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
func (_m *MockTimerQueueAckMgr) getAckLevel() TimerSequenceID {
ret := _m.Called()

var r0 TimerSequenceID
if rf, ok := ret.Get(0).(func() TimerSequenceID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(TimerSequenceID)
}
}
return r0
}

func (_m *MockTimerQueueAckMgr) updateAckLevel() {
Expand Down
4 changes: 3 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type (
var _ Engine = (*historyEngineImpl)(nil)

var (
// ErrTaskRetry is the error indicating that the timer / transfer task should be retried.
ErrTaskRetry = errors.New("passive task should retry due to condition in mutable state is not met")
// ErrDuplicate is exported temporarily for integration test
ErrDuplicate = errors.New("Duplicate task, completing it")
// ErrConflict is exported temporarily for integration test
Expand Down Expand Up @@ -135,7 +137,7 @@ func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.Vis
historyEventNotifier: historyEventNotifier,
}
txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, executionManager, logger)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, logger)
historyEngImpl.txProcessor = txProcessor
shardWrapper.txProcessor = txProcessor

Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *engine2Suite) SetupTest() {
hSerializerFactory: persistence.NewHistorySerializerFactory(),
}
h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockExecutionMgr, s.logger)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.logger)
s.historyEngine = h
}

Expand Down
9 changes: 6 additions & 3 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type (
isWorkflowRunning bool
timestamp time.Time
}

// Engine represents an interface for managing workflow execution history.
Engine interface {
common.Daemon
Expand Down Expand Up @@ -105,6 +106,9 @@ type (
NotifyNewTask()
}

// TODO the timer quque processor and the one below, timer processor
// in combination are confusing, we should consider a better naming
// convention, or at least come with a better name for this case.
timerQueueProcessor interface {
common.Daemon
NotifyNewTimers(clusterName string, timerTask []persistence.Task)
Expand All @@ -118,12 +122,11 @@ type (
}

timerQueueAckMgr interface {
readRetryTimerTasks() []*persistence.TimerTaskInfo
getFinishedChan() <-chan struct{}
readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(timerTask *persistence.TimerTaskInfo)
getAckLevel() TimerSequenceID
updateAckLevel()
isProcessNow(time.Time) bool
}

historyEventNotifier interface {
Expand Down
7 changes: 6 additions & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *engineSuite) SetupTest() {
historyEventNotifier: historyEventNotifier,
}
h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.mockExecutionMgr, s.logger)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.logger)
h.historyEventNotifier.Start()
shardContextWrapper.txProcessor = h.txProcessor
s.mockHistoryEngine = h
Expand Down Expand Up @@ -3503,6 +3503,10 @@ func addTimerStartedEvent(builder *mutableStateBuilder, decisionCompletedEventID
})
}

func addTimerFiredEvent(builder *mutableStateBuilder, scheduleID int64, timerID string) *workflow.HistoryEvent {
return builder.AddTimerFiredEvent(scheduleID, timerID)
}

func addRequestCancelInitiatedEvent(builder *mutableStateBuilder, decisionCompletedEventID int64,
cancelRequestID, domain, workflowID, runID string) *workflow.HistoryEvent {
event, _ := builder.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(decisionCompletedEventID,
Expand Down Expand Up @@ -3632,6 +3636,7 @@ func copyActivityInfo(sourceInfo *persistence.ActivityInfo) *persistence.Activit
HeartbeatTimeout: sourceInfo.HeartbeatTimeout,
CancelRequested: sourceInfo.CancelRequested,
CancelRequestID: sourceInfo.CancelRequestID,
TimerTaskStatus: sourceInfo.TimerTaskStatus,
}
}

Expand Down
26 changes: 20 additions & 6 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,24 @@ func (s *TestShardContext) UpdateReplicatorAckLevel(ackLevel int64) error {
}

// GetTimerAckLevel test implementation
func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
func (s *TestShardContext) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RUnlock()

return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerAckLevel = ackLevel
return nil
}

// GetTimerClusterAckLevel test implementation
func (s *TestShardContext) GetTimerClusterAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

Expand All @@ -203,14 +220,11 @@ func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error {
// UpdateTimerClusterAckLevel test implementation
func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TimerAckLevel = ackLevel
}
s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
return nil
}
Expand Down
80 changes: 43 additions & 37 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ type Config struct {
DefaultStartToCloseActivityTimeoutInSecs int32

// TimerQueueProcessor settings
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorCompleteTimerFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerProcessorCompleteTimerInterval time.Duration
TimerProcessorMaxPollInterval time.Duration

// TransferQueueProcessor settings
TransferTaskBatchSize int
Expand Down Expand Up @@ -86,37 +89,40 @@ type Config struct {
// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
return &Config{
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorCompleteTimerFailureRetryCount: 10,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TimerProcessorCompleteTimerInterval: 1 * time.Second,
TimerProcessorMaxPollInterval: 60 * time.Second,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationProperty(
dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20,
Expand Down
Loading

0 comments on commit c4a6e1d

Please sign in to comment.