Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented functionality in timer ack manager & timer processor base to allow failover #648

Merged
merged 5 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
// workflow logging tags
TagWorkflowEventID = "wf-event-id"
TagWorkflowComponent = "wf-component"
TagWorkflowCluster = "wf-cluster"
TagWorkflowErr = "wf-error"
TagHistoryBuilderAction = "history-builder-action"
TagStoreOperation = "store-operation"
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,9 +2037,6 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq

response.Timers = append(response.Timers, t)
}
nextPageToken := iter.PageState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason to remove pagination here? I think pagination through tasks is much more efficient then issuing a limit query each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the existing query is using limit
https://github.com/uber/cadence/blob/master/common/persistence/cassandraPersistence.go#L546

we can change to use the pagination later, this is not a blocking issue

response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
Expand Down
6 changes: 2 additions & 4 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,10 +877,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, nextToken, err1 := s.GetTimerIndexTasks()
timerTasks, err1 := s.GetTimerIndexTasks()
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")
s.Equal(0, len(nextToken))
s.Equal(3, len(timerTasks))
s.Equal(TaskTypeWorkflowTimeout, timerTasks[1].TaskType)
s.Equal(TaskTypeDeleteHistoryEvent, timerTasks[2].TaskType)
Expand All @@ -895,10 +894,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 = s.CompleteTimerTask(timerTasks[2].VisibilityTimestamp, timerTasks[2].TaskID)
s.Nil(err2, "No error expected.")

timerTasks2, nextToken, err2 := s.GetTimerIndexTasks()
timerTasks2, err2 := s.GetTimerIndexTasks()
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
s.Equal(0, len(nextToken))
}

func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,7 @@ type (

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
NextPageToken []byte
Timers []*TimerTaskInfo
}

// SerializedHistoryEventBatch represents a serialized batch of history events
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,17 +748,17 @@ func (s *TestBase) CompleteReplicationTask(taskID int64) error {
}

// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, []byte, error) {
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
MinTimestamp: time.Time{},
MaxTimestamp: time.Unix(0, math.MaxInt64),
BatchSize: 10})

if err != nil {
return nil, nil, err
return nil, err
}

return response.Timers, response.NextPageToken, nil
return response.Timers, nil
}

// CompleteTimerTask is a utility method to complete a timer task
Expand Down
33 changes: 30 additions & 3 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ type MockTimerQueueAckMgr struct {
mock.Mock
}

var _ timerQueueAckMgr = (*MockTimerQueueAckMgr)(nil)

// getFinishedChan is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) getFinishedChan() <-chan struct{} {
ret := _m.Called()

var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}

// readTimerTasks is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) {
ret := _m.Called()
Expand Down Expand Up @@ -71,12 +88,22 @@ func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo,
return r0, r1, r2, r3
}

func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskInfo) {
func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
func (_m *MockTimerQueueAckMgr) getAckLevel() TimerSequenceID {
ret := _m.Called()

var r0 TimerSequenceID
if rf, ok := ret.Get(0).(func() TimerSequenceID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(TimerSequenceID)
}
}
return r0
}

func (_m *MockTimerQueueAckMgr) updateAckLevel() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.Vis
historyEventNotifier: historyEventNotifier,
}
txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, executionManager, logger)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, logger)
historyEngImpl.txProcessor = txProcessor
shardWrapper.txProcessor = txProcessor

Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *engine2Suite) SetupTest() {
hSerializerFactory: persistence.NewHistorySerializerFactory(),
}
h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockExecutionMgr, s.logger)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.logger)
s.historyEngine = h
}

Expand Down
22 changes: 19 additions & 3 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type (
isWorkflowRunning bool
timestamp time.Time
}

// error which will be thrown if the timer / transfer task should be
// retries due to various of reasons
taskRetryError struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is weird to use an empty struct as an error. Can you instead use errors.New?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nop, i need to use a dedicated error so the worker can do retry accordingly.


// Engine represents an interface for managing workflow execution history.
Engine interface {
common.Daemon
Expand Down Expand Up @@ -105,6 +110,9 @@ type (
NotifyNewTask()
}

// TODO the timer quque processor and the one below, timer processor
// in combination are confusing, we should consider a better naming
// convention, or at least come with a better name for this case.
timerQueueProcessor interface {
common.Daemon
NotifyNewTimers(clusterName string, timerTask []persistence.Task)
Expand All @@ -118,12 +126,11 @@ type (
}

timerQueueAckMgr interface {
readRetryTimerTasks() []*persistence.TimerTaskInfo
getFinishedChan() <-chan struct{}
readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(timerTask *persistence.TimerTaskInfo)
getAckLevel() TimerSequenceID
updateAckLevel()
isProcessNow(time.Time) bool
}

historyEventNotifier interface {
Expand All @@ -133,3 +140,12 @@ type (
UnwatchHistoryEvent(identifier *workflowIdentifier, subscriberID string) error
}
)

// newTaskRetryError create a error which indicate the task should be retry
func newTaskRetryError() *taskRetryError {
return &taskRetryError{}
}

func (e *taskRetryError) Error() string {
return "passive task should retry due to condition in mutable state is not met."
}
7 changes: 6 additions & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *engineSuite) SetupTest() {
historyEventNotifier: historyEventNotifier,
}
h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.mockExecutionMgr, s.logger)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.logger)
h.historyEventNotifier.Start()
shardContextWrapper.txProcessor = h.txProcessor
s.mockHistoryEngine = h
Expand Down Expand Up @@ -3546,6 +3546,10 @@ func addTimerStartedEvent(builder *mutableStateBuilder, decisionCompletedEventID
})
}

func addTimerFiredEvent(builder *mutableStateBuilder, scheduleID int64, timerID string) *workflow.HistoryEvent {
return builder.AddTimerFiredEvent(scheduleID, timerID)
}

func addRequestCancelInitiatedEvent(builder *mutableStateBuilder, decisionCompletedEventID int64,
cancelRequestID, domain, workflowID, runID string) *workflow.HistoryEvent {
event, _ := builder.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(decisionCompletedEventID,
Expand Down Expand Up @@ -3675,6 +3679,7 @@ func copyActivityInfo(sourceInfo *persistence.ActivityInfo) *persistence.Activit
HeartbeatTimeout: sourceInfo.HeartbeatTimeout,
CancelRequested: sourceInfo.CancelRequested,
CancelRequestID: sourceInfo.CancelRequestID,
TimerTaskStatus: sourceInfo.TimerTaskStatus,
}
}

Expand Down
26 changes: 20 additions & 6 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,24 @@ func (s *TestShardContext) UpdateReplicatorAckLevel(ackLevel int64) error {
}

// GetTimerAckLevel test implementation
func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
func (s *TestShardContext) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RUnlock()

return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerAckLevel = ackLevel
return nil
}

// GetTimerClusterAckLevel test implementation
func (s *TestShardContext) GetTimerClusterAckLevel(cluster string) time.Time {
s.RLock()
defer s.RUnlock()

Expand All @@ -203,14 +220,11 @@ func (s *TestShardContext) GetTimerAckLevel(cluster string) time.Time {
return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(cluster string, ackLevel time.Time) error {
// UpdateTimerClusterAckLevel test implementation
func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
s.Lock()
defer s.Unlock()

if cluster == s.GetService().GetClusterMetadata().GetCurrentClusterName() {
s.shardInfo.TimerAckLevel = ackLevel
}
s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
return nil
}
Expand Down
80 changes: 43 additions & 37 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ type Config struct {
DefaultStartToCloseActivityTimeoutInSecs int32

// TimerQueueProcessor settings
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerTaskBatchSize int
ProcessTimerTaskWorkerCount int
TimerProcessorUpdateFailureRetryCount int
TimerProcessorGetFailureRetryCount int
TimerProcessorCompleteTimerFailureRetryCount int
TimerProcessorUpdateAckInterval time.Duration
TimerProcessorForceUpdateInterval time.Duration
TimerProcessorCompleteTimerInterval time.Duration
TimerProcessorMaxPollInterval time.Duration

// TransferQueueProcessor settings
TransferTaskBatchSize int
Expand Down Expand Up @@ -86,37 +89,40 @@ type Config struct {
// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
return &Config{
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
DefaultScheduleToStartActivityTimeoutInSecs: 10,
DefaultScheduleToCloseActivityTimeoutInSecs: 10,
DefaultStartToCloseActivityTimeoutInSecs: 10,
TimerTaskBatchSize: 100,
ProcessTimerTaskWorkerCount: 30,
TimerProcessorUpdateFailureRetryCount: 5,
TimerProcessorGetFailureRetryCount: 5,
TimerProcessorCompleteTimerFailureRetryCount: 10,
TimerProcessorUpdateAckInterval: 10 * time.Second,
TimerProcessorForceUpdateInterval: 10 * time.Minute,
TimerProcessorCompleteTimerInterval: 1 * time.Second,
TimerProcessorMaxPollInterval: 60 * time.Second,
TransferTaskBatchSize: 10,
TransferProcessorMaxPollRPS: 100,
TransferProcessorMaxPollInterval: 60 * time.Second,
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
TransferTaskMaxRetryCount: 100,
ReplicatorTaskBatchSize: 10,
ReplicatorProcessorMaxPollRPS: 100,
ReplicatorProcessorMaxPollInterval: 60 * time.Second,
ReplicatorProcessorUpdateAckInterval: 10 * time.Second,
ReplicatorProcessorForceUpdateInterval: 10 * time.Minute,
ReplicatorTaskWorkerCount: 10,
ReplicatorTaskMaxRetryCount: 100,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationProperty(
dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20,
Expand Down
Loading