Skip to content

Commit

Permalink
move the check of domain of timer task from ack manager to processor,…
Browse files Browse the repository at this point in the history
… processor will try to process a standby task indefinitely, until task is finished.
  • Loading branch information
Wenquan Xing committed Apr 9, 2018
1 parent 4a7c840 commit b3bad03
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 382 deletions.
1 change: 1 addition & 0 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,7 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq
minTimestamp,
maxTimestamp,
request.BatchSize)
query.PageState(request.NextPageToken)

iter := query.Iter()
if iter == nil {
Expand Down
7 changes: 4 additions & 3 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,10 @@ type (
// GetTimerIndexTasksRequest is the request for GetTimerIndexTasks
// TODO: replace this with an iterator that can configure min and max index.
GetTimerIndexTasksRequest struct {
MinTimestamp time.Time
MaxTimestamp time.Time
BatchSize int
MinTimestamp time.Time
MaxTimestamp time.Time
BatchSize int
NextPageToken []byte
}

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
Expand Down
14 changes: 5 additions & 9 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,24 @@ func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo,
return r0, r1, r2, r3
}

func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskInfo) {
func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) readRetryTimerTasks() []*persistence.TimerTaskInfo {
func (_m *MockTimerQueueAckMgr) getAckLevel() TimerSequenceID {
ret := _m.Called()

var r0 []*persistence.TimerTaskInfo
if rf, ok := ret.Get(0).(func() []*persistence.TimerTaskInfo); ok {
var r0 TimerSequenceID
if rf, ok := ret.Get(0).(func() TimerSequenceID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*persistence.TimerTaskInfo)
r0 = ret.Get(0).(TimerSequenceID)
}
}
return r0
}

func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) updateAckLevel() {
_m.Called()
}
Expand Down
17 changes: 15 additions & 2 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type (
isWorkflowRunning bool
timestamp time.Time
}

// error which will be thrown if the timer / transfer task should be
// retries due to various of reasons
taskRetryError struct{}

// Engine represents an interface for managing workflow execution history.
Engine interface {
common.Daemon
Expand Down Expand Up @@ -122,10 +127,9 @@ type (

timerQueueAckMgr interface {
getFinishedChan() <-chan struct{}
readRetryTimerTasks() []*persistence.TimerTaskInfo
readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(timerTask *persistence.TimerTaskInfo)
getAckLevel() TimerSequenceID
updateAckLevel()
}

Expand All @@ -136,3 +140,12 @@ type (
UnwatchHistoryEvent(identifier *workflowIdentifier, subscriberID string) error
}
)

// newTaskRetryError create a error which indicate the task should be retry
func newTaskRetryError() *taskRetryError {
return &taskRetryError{}
}

func (e *taskRetryError) Error() string {
return "passive task should retry due to condition in mutable state is not met."
}
26 changes: 20 additions & 6 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,24 @@ func (s *TestShardContext) UpdateReplicatorAckLevel(ackLevel int64) error {
}

// GetTimerAckLevel test implementation
func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
func (s *TestShardContext) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RUnlock()

return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerAckLevel = ackLevel
return nil
}

// GetTimerClusterAckLevel test implementation
func (s *TestShardContext) GetTimerClusterAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

Expand All @@ -203,14 +220,11 @@ func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error {
// UpdateTimerClusterAckLevel test implementation
func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TimerAckLevel = ackLevel
}
s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
return nil
}
Expand Down
78 changes: 41 additions & 37 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ type Config struct {
DefaultStartToCloseActivityTimeoutInSecs int32

// TimerQueueProcessor settings
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorCompleteTimerFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerProcessorCompleteTimerInterval time.Duration

// TransferQueueProcessor settings
TransferTaskBatchSize int
Expand Down Expand Up @@ -86,37 +88,39 @@ type Config struct {
// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
return &Config{
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorCompleteTimerFailureRetryCount: 10,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TimerProcessorCompleteTimerInterval: 5 * time.Second,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationProperty(
dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20,
Expand Down
29 changes: 22 additions & 7 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type (
UpdateTransferAckLevel(ackLevel int64) error
GetReplicatorAckLevel() int64
UpdateReplicatorAckLevel(ackLevel int64) error
GetTimerAckLevel() time.Time
UpdateTimerAckLevel(ackLevel time.Time) error
GetTimerClusterAckLevel(cluster string) time.Time
UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
*persistence.CreateWorkflowExecutionResponse, error)
UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error
Expand All @@ -59,8 +63,6 @@ type (
GetConfig() *Config
GetLogger() bark.Logger
GetMetricsClient() metrics.Client
GetTimerAckLevel(cluster string) time.Time
UpdateTimerAckLevel(cluster string, ackLevel time.Time) error
GetTimeSource() common.TimeSource
SetCurrentTime(cluster string, currentTime time.Time)
GetCurrentTime(cluster string) time.Time
Expand Down Expand Up @@ -164,7 +166,23 @@ func (s *shardContextImpl) UpdateReplicatorAckLevel(ackLevel int64) error {
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time {
func (s *shardContextImpl) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RUnlock()

return s.shardInfo.TimerAckLevel
}

func (s *shardContextImpl) UpdateTimerAckLevel(ackLevel time.Time) error {
s.RLock()
defer s.RUnlock()

s.shardInfo.TimerAckLevel = ackLevel
s.shardInfo.StolenSinceRenew = 0
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetTimerClusterAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

Expand All @@ -177,13 +195,10 @@ func (s *shardContextImpl) GetTimerAckLevel(cluster string) time.Time {
return s.shardInfo.TimerAckLevel
}

func (s *shardContextImpl) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error {
func (s *shardContextImpl) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TimerAckLevel = ackLevel
}
s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
s.shardInfo.StolenSinceRenew = 0
return s.updateShardInfoLocked()
Expand Down
Loading

0 comments on commit b3bad03

Please sign in to comment.