Skip to content

Commit

Permalink
move the check of domain of timer task from ack manager to processor,…
Browse files Browse the repository at this point in the history
… processor will try to process a standby task indefinitely, until task is finished.
  • Loading branch information
Wenquan Xing committed Apr 11, 2018
1 parent 1a1f0db commit f08bd6b
Show file tree
Hide file tree
Showing 19 changed files with 498 additions and 442 deletions.
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
// workflow logging tags
TagWorkflowEventID = "wf-event-id"
TagWorkflowComponent = "wf-component"
TagWorkflowCluster = "wf-cluster"
TagWorkflowErr = "wf-error"
TagHistoryBuilderAction = "history-builder-action"
TagStoreOperation = "store-operation"
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,9 +2037,6 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq

response.Timers = append(response.Timers, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
Expand Down
6 changes: 2 additions & 4 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,10 +877,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, nextToken, err1 := s.GetTimerIndexTasks()
timerTasks, err1 := s.GetTimerIndexTasks()
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")
s.Equal(0, len(nextToken))
s.Equal(3, len(timerTasks))
s.Equal(TaskTypeWorkflowTimeout, timerTasks[1].TaskType)
s.Equal(TaskTypeDeleteHistoryEvent, timerTasks[2].TaskType)
Expand All @@ -895,10 +894,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 = s.CompleteTimerTask(timerTasks[2].VisibilityTimestamp, timerTasks[2].TaskID)
s.Nil(err2, "No error expected.")

timerTasks2, nextToken, err2 := s.GetTimerIndexTasks()
timerTasks2, err2 := s.GetTimerIndexTasks()
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
s.Equal(0, len(nextToken))
}

func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,7 @@ type (

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
NextPageToken []byte
Timers []*TimerTaskInfo
}

// SerializedHistoryEventBatch represents a serialized batch of history events
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,17 +748,17 @@ func (s *TestBase) CompleteReplicationTask(taskID int64) error {
}

// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, []byte, error) {
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
MinTimestamp: time.Time{},
MaxTimestamp: time.Unix(0, math.MaxInt64),
BatchSize: 10})

if err != nil {
return nil, nil, err
return nil, err
}

return response.Timers, response.NextPageToken, nil
return response.Timers, nil
}

// CompleteTimerTask is a utility method to complete a timer task
Expand Down
14 changes: 5 additions & 9 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,24 @@ func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo,
return r0, r1, r2, r3
}

func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskInfo) {
func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) readRetryTimerTasks() []*persistence.TimerTaskInfo {
func (_m *MockTimerQueueAckMgr) getAckLevel() TimerSequenceID {
ret := _m.Called()

var r0 []*persistence.TimerTaskInfo
if rf, ok := ret.Get(0).(func() []*persistence.TimerTaskInfo); ok {
var r0 TimerSequenceID
if rf, ok := ret.Get(0).(func() TimerSequenceID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*persistence.TimerTaskInfo)
r0 = ret.Get(0).(TimerSequenceID)
}
}
return r0
}

func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) updateAckLevel() {
_m.Called()
}
Expand Down
17 changes: 15 additions & 2 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type (
isWorkflowRunning bool
timestamp time.Time
}

// error which will be thrown if the timer / transfer task should be
// retries due to various of reasons
taskRetryError struct{}

// Engine represents an interface for managing workflow execution history.
Engine interface {
common.Daemon
Expand Down Expand Up @@ -122,10 +127,9 @@ type (

timerQueueAckMgr interface {
getFinishedChan() <-chan struct{}
readRetryTimerTasks() []*persistence.TimerTaskInfo
readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(timerTask *persistence.TimerTaskInfo)
getAckLevel() TimerSequenceID
updateAckLevel()
}

Expand All @@ -136,3 +140,12 @@ type (
UnwatchHistoryEvent(identifier *workflowIdentifier, subscriberID string) error
}
)

// newTaskRetryError create a error which indicate the task should be retry
func newTaskRetryError() *taskRetryError {
return &taskRetryError{}
}

func (e *taskRetryError) Error() string {
return "passive task should retry due to condition in mutable state is not met."
}
26 changes: 20 additions & 6 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,24 @@ func (s *TestShardContext) UpdateReplicatorAckLevel(ackLevel int64) error {
}

// GetTimerAckLevel test implementation
func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
func (s *TestShardContext) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RUnlock()

return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerAckLevel = ackLevel
return nil
}

// GetTimerClusterAckLevel test implementation
func (s *TestShardContext) GetTimerClusterAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

Expand All @@ -203,14 +220,11 @@ func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error {
// UpdateTimerClusterAckLevel test implementation
func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TimerAckLevel = ackLevel
}
s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
return nil
}
Expand Down
80 changes: 43 additions & 37 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ type Config struct {
DefaultStartToCloseActivityTimeoutInSecs int32

// TimerQueueProcessor settings
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorCompleteTimerFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerProcessorCompleteTimerInterval time.Duration
TimerProcessorMaxPollInterval time.Duration

// TransferQueueProcessor settings
TransferTaskBatchSize int
Expand Down Expand Up @@ -86,37 +89,40 @@ type Config struct {
// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
return &Config{
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorCompleteTimerFailureRetryCount: 10,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TimerProcessorCompleteTimerInterval: 1 * time.Second,
TimerProcessorMaxPollInterval: 60 * time.Second,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationProperty(
dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20,
Expand Down
29 changes: 22 additions & 7 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type (
UpdateTransferAckLevel(ackLevel int64) error
GetReplicatorAckLevel() int64
UpdateReplicatorAckLevel(ackLevel int64) error
GetTimerAckLevel() time.Time
UpdateTimerAckLevel(ackLevel time.Time) error
GetTimerClusterAckLevel(cluster string) time.Time
UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
*persistence.CreateWorkflowExecutionResponse, error)
UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error
Expand All @@ -59,8 +63,6 @@ type (
GetConfig() *Config
GetLogger() bark.Logger
GetMetricsClient() metrics.Client
GetTimerAckLevel(cluster string) time.Time
UpdateTimerAckLevel(cluster string, ackLevel time.Time) error
GetTimeSource() common.TimeSource
SetCurrentTime(cluster string, currentTime time.Time)
GetCurrentTime(cluster string) time.Time
Expand Down Expand Up @@ -164,7 +166,23 @@ func (s *shardContextImpl) UpdateReplicatorAckLevel(ackLevel int64) error {
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time {
func (s *shardContextImpl) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RUnlock()

return s.shardInfo.TimerAckLevel
}

func (s *shardContextImpl) UpdateTimerAckLevel(ackLevel time.Time) error {
s.RLock()
defer s.RUnlock()

s.shardInfo.TimerAckLevel = ackLevel
s.shardInfo.StolenSinceRenew = 0
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetTimerClusterAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

Expand All @@ -177,13 +195,10 @@ func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time {
return s.shardInfo.TimerAckLevel
}

func (s *shardContextImpl) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error {
func (s *shardContextImpl) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TimerAckLevel = ackLevel
}
s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
s.shardInfo.StolenSinceRenew = 0
return s.updateShardInfoLocked()
Expand Down
Loading

0 comments on commit f08bd6b

Please sign in to comment.