Skip to content

Commit

Permalink
move the check of domain of timer task from ack manager to processor,…
Browse files Browse the repository at this point in the history
… processor will try to process a standby task indefinitely, until task is finished.
  • Loading branch information
Wenquan Xing committed Apr 11, 2018
1 parent 1a1f0db commit dcb3011
Show file tree
Hide file tree
Showing 19 changed files with 492 additions and 442 deletions.
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
// workflow logging tags
TagWorkflowEventID = "wf-event-id"
TagWorkflowComponent = "wf-component"
TagWorkflowCluster = "wf-cluster"
TagWorkflowErr = "wf-error"
TagHistoryBuilderAction = "history-builder-action"
TagStoreOperation = "store-operation"
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,9 +2037,6 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq

response.Timers = append(response.Timers, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
Expand Down
6 changes: 2 additions & 4 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,10 +877,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, nextToken, err1 := s.GetTimerIndexTasks()
timerTasks, err1 := s.GetTimerIndexTasks()
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")
s.Equal(0, len(nextToken))
s.Equal(3, len(timerTasks))
s.Equal(TaskTypeWorkflowTimeout, timerTasks[1].TaskType)
s.Equal(TaskTypeDeleteHistoryEvent, timerTasks[2].TaskType)
Expand All @@ -895,10 +894,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 = s.CompleteTimerTask(timerTasks[2].VisibilityTimestamp, timerTasks[2].TaskID)
s.Nil(err2, "No error expected.")

timerTasks2, nextToken, err2 := s.GetTimerIndexTasks()
timerTasks2, err2 := s.GetTimerIndexTasks()
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
s.Equal(0, len(nextToken))
}

func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,7 @@ type (

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
NextPageToken []byte
Timers []*TimerTaskInfo
}

// SerializedHistoryEventBatch represents a serialized batch of history events
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,17 +748,17 @@ func (s *TestBase) CompleteReplicationTask(taskID int64) error {
}

// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, []byte, error) {
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
MinTimestamp: time.Time{},
MaxTimestamp: time.Unix(0, math.MaxInt64),
BatchSize: 10})

if err != nil {
return nil, nil, err
return nil, err
}

return response.Timers, response.NextPageToken, nil
return response.Timers, nil
}

// CompleteTimerTask is a utility method to complete a timer task
Expand Down
14 changes: 5 additions & 9 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,24 @@ func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo,
return r0, r1, r2, r3
}

func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskInfo) {
func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) readRetryTimerTasks() []*persistence.TimerTaskInfo {
func (_m *MockTimerQueueAckMgr) getAckLevel() TimerSequenceID {
ret := _m.Called()

var r0 []*persistence.TimerTaskInfo
if rf, ok := ret.Get(0).(func() []*persistence.TimerTaskInfo); ok {
var r0 TimerSequenceID
if rf, ok := ret.Get(0).(func() TimerSequenceID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*persistence.TimerTaskInfo)
r0 = ret.Get(0).(TimerSequenceID)
}
}
return r0
}

func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) updateAckLevel() {
_m.Called()
}
Expand Down
17 changes: 15 additions & 2 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type (
isWorkflowRunning bool
timestamp time.Time
}

// error which will be thrown if the timer / transfer task should be
// retries due to various of reasons
taskRetryError struct{}

// Engine represents an interface for managing workflow execution history.
Engine interface {
common.Daemon
Expand Down Expand Up @@ -122,10 +127,9 @@ type (

timerQueueAckMgr interface {
getFinishedChan() <-chan struct{}
readRetryTimerTasks() []*persistence.TimerTaskInfo
readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(timerTask *persistence.TimerTaskInfo)
getAckLevel() TimerSequenceID
updateAckLevel()
}

Expand All @@ -136,3 +140,12 @@ type (
UnwatchHistoryEvent(identifier *workflowIdentifier, subscriberID string) error
}
)

// newTaskRetryError create a error which indicate the task should be retry
func newTaskRetryError() *taskRetryError {
return &taskRetryError{}
}

func (e *taskRetryError) Error() string {
return "passive task should retry due to condition in mutable state is not met."
}
26 changes: 20 additions & 6 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,24 @@ func (s *TestShardContext) UpdateReplicatorAckLevel(ackLevel int64) error {
}

// GetTimerAckLevel test implementation
func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
func (s *TestShardContext) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RUnlock()

return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerAckLevel = ackLevel
return nil
}

// GetTimerClusterAckLevel test implementation
func (s *TestShardContext) GetTimerClusterAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

Expand All @@ -203,14 +220,11 @@ func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error {
// UpdateTimerClusterAckLevel test implementation
func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TimerAckLevel = ackLevel
}
s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
return nil
}
Expand Down
78 changes: 41 additions & 37 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ type Config struct {
DefaultStartToCloseActivityTimeoutInSecs int32

// TimerQueueProcessor settings
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorCompleteTimerFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerProcessorCompleteTimerInterval time.Duration

// TransferQueueProcessor settings
TransferTaskBatchSize int
Expand Down Expand Up @@ -86,37 +88,39 @@ type Config struct {
// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
return &Config{
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorCompleteTimerFailureRetryCount: 10,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TimerProcessorCompleteTimerInterval: 1 * time.Second,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationProperty(
dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20,
Expand Down
29 changes: 22 additions & 7 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type (
UpdateTransferAckLevel(ackLevel int64) error
GetReplicatorAckLevel() int64
UpdateReplicatorAckLevel(ackLevel int64) error
GetTimerAckLevel() time.Time
UpdateTimerAckLevel(ackLevel time.Time) error
GetTimerClusterAckLevel(cluster string) time.Time
UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
*persistence.CreateWorkflowExecutionResponse, error)
UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error
Expand All @@ -59,8 +63,6 @@ type (
GetConfig() *Config
GetLogger() bark.Logger
GetMetricsClient() metrics.Client
GetTimerAckLevel(cluster string) time.Time
UpdateTimerAckLevel(cluster string, ackLevel time.Time) error
GetTimeSource() common.TimeSource
SetCurrentTime(cluster string, currentTime time.Time)
GetCurrentTime(cluster string) time.Time
Expand Down Expand Up @@ -164,7 +166,23 @@ func (s *shardContextImpl) UpdateReplicatorAckLevel(ackLevel int64) error {
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time {
func (s *shardContextImpl) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RUnlock()

return s.shardInfo.TimerAckLevel
}

func (s *shardContextImpl) UpdateTimerAckLevel(ackLevel time.Time) error {
s.RLock()
defer s.RUnlock()

s.shardInfo.TimerAckLevel = ackLevel
s.shardInfo.StolenSinceRenew = 0
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetTimerClusterAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

Expand All @@ -177,13 +195,10 @@ func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time {
return s.shardInfo.TimerAckLevel
}

func (s *shardContextImpl) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error {
func (s *shardContextImpl) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TimerAckLevel = ackLevel
}
s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
s.shardInfo.StolenSinceRenew = 0
return s.updateShardInfoLocked()
Expand Down
Loading

0 comments on commit dcb3011

Please sign in to comment.