Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented functionality in timer ack manager & timer processor base to allow failover #648

Merged
merged 5 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
// workflow logging tags
TagWorkflowEventID = "wf-event-id"
TagWorkflowComponent = "wf-component"
TagWorkflowCluster = "wf-cluster"
TagWorkflowErr = "wf-error"
TagHistoryBuilderAction = "history-builder-action"
TagStoreOperation = "store-operation"
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,9 +2037,6 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq

response.Timers = append(response.Timers, t)
}
nextPageToken := iter.PageState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason to remove pagination here? I think pagination through tasks is much more efficient then issuing a limit query each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the existing query is using limit
https://github.com/uber/cadence/blob/master/common/persistence/cassandraPersistence.go#L546

we can change to use the pagination later, this is not a blocking issue

response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
Expand Down
6 changes: 2 additions & 4 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,10 +877,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, nextToken, err1 := s.GetTimerIndexTasks()
timerTasks, err1 := s.GetTimerIndexTasks()
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")
s.Equal(0, len(nextToken))
s.Equal(3, len(timerTasks))
s.Equal(TaskTypeWorkflowTimeout, timerTasks[1].TaskType)
s.Equal(TaskTypeDeleteHistoryEvent, timerTasks[2].TaskType)
Expand All @@ -895,10 +894,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 = s.CompleteTimerTask(timerTasks[2].VisibilityTimestamp, timerTasks[2].TaskID)
s.Nil(err2, "No error expected.")

timerTasks2, nextToken, err2 := s.GetTimerIndexTasks()
timerTasks2, err2 := s.GetTimerIndexTasks()
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
s.Equal(0, len(nextToken))
}

func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,7 @@ type (

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
NextPageToken []byte
Timers []*TimerTaskInfo
}

// SerializedHistoryEventBatch represents a serialized batch of history events
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,17 +748,17 @@ func (s *TestBase) CompleteReplicationTask(taskID int64) error {
}

// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, []byte, error) {
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
MinTimestamp: time.Time{},
MaxTimestamp: time.Unix(0, math.MaxInt64),
BatchSize: 10})

if err != nil {
return nil, nil, err
return nil, err
}

return response.Timers, response.NextPageToken, nil
return response.Timers, nil
}

// CompleteTimerTask is a utility method to complete a timer task
Expand Down
33 changes: 30 additions & 3 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ type MockTimerQueueAckMgr struct {
mock.Mock
}

var _ timerQueueAckMgr = (*MockTimerQueueAckMgr)(nil)

// getFinishedChan is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) getFinishedChan() <-chan struct{} {
ret := _m.Called()

var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}

// readTimerTasks is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) {
ret := _m.Called()
Expand Down Expand Up @@ -71,12 +88,22 @@ func (_m *MockTimerQueueAckMgr) readTimerTasks() ([]*persistence.TimerTaskInfo,
return r0, r1, r2, r3
}

func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskInfo) {
func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
func (_m *MockTimerQueueAckMgr) getAckLevel() TimerSequenceID {
ret := _m.Called()

var r0 TimerSequenceID
if rf, ok := ret.Get(0).(func() TimerSequenceID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(TimerSequenceID)
}
}
return r0
}

func (_m *MockTimerQueueAckMgr) updateAckLevel() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.Vis
historyEventNotifier: historyEventNotifier,
}
txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, executionManager, logger)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, logger)
historyEngImpl.txProcessor = txProcessor
shardWrapper.txProcessor = txProcessor

Expand Down
18 changes: 2 additions & 16 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (s *engine2Suite) SetupTest() {
// this is used by shard context, not relevent to this test, so we do not care how many times "GetCurrentClusterName" os called
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions)
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false)
h := &historyEngineImpl{
currentclusterName: mockShard.GetService().GetClusterMetadata().GetCurrentClusterName(),
shard: mockShard,
Expand All @@ -145,7 +146,7 @@ func (s *engine2Suite) SetupTest() {
hSerializerFactory: persistence.NewHistorySerializerFactory(),
}
h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockExecutionMgr, s.logger)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.logger)
s.historyEngine = h
}

Expand Down Expand Up @@ -303,15 +304,13 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedConflictOnUpdate() {
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&persistence.ConditionFailedError{}).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

ms2 := createMutableState(msBuilder)
gwmsResponse2 := &persistence.GetWorkflowExecutionResponse{State: ms2}

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse2, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

response, err := s.historyEngine.RecordDecisionTaskStarted(&h.RecordDecisionTaskStartedRequest{
DomainUUID: common.StringPtr("domainId"),
Expand Down Expand Up @@ -350,7 +349,6 @@ func (s *engine2Suite) TestRecordDecisionTaskRetrySameRequest() {
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&persistence.ConditionFailedError{}).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

startedEventID := addDecisionTaskStartedEventWithRequestID(msBuilder, int64(2), requestID, tl, identity)
ms2 := createMutableState(msBuilder)
Expand Down Expand Up @@ -394,7 +392,6 @@ func (s *engine2Suite) TestRecordDecisionTaskRetryDifferentRequest() {
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&persistence.ConditionFailedError{}).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

// Add event.
addDecisionTaskStartedEventWithRequestID(msBuilder, int64(2), "some_other_req", tl, identity)
Expand Down Expand Up @@ -443,7 +440,6 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedMaxAttemptsExceeded() {
conditionalRetryCount)
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(
&persistence.ConditionFailedError{}).Times(conditionalRetryCount)
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Times(conditionalRetryCount)

response, err := s.historyEngine.RecordDecisionTaskStarted(&h.RecordDecisionTaskStartedRequest{
DomainUUID: common.StringPtr("domainId"),
Expand Down Expand Up @@ -479,7 +475,6 @@ func (s *engine2Suite) TestRecordDecisionTaskSuccess() {
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

response, err := s.historyEngine.RecordDecisionTaskStarted(&h.RecordDecisionTaskStartedRequest{
DomainUUID: common.StringPtr("domainId"),
Expand Down Expand Up @@ -558,7 +553,6 @@ func (s *engine2Suite) TestRecordActivityTaskStartedSuccess() {
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse1, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

response, err := s.historyEngine.RecordActivityTaskStarted(&h.RecordActivityTaskStartedRequest{
DomainUUID: common.StringPtr("domainId"),
Expand Down Expand Up @@ -597,7 +591,6 @@ func (s *engine2Suite) TestRequestCancelWorkflowExecutionSuccess() {
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse1, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

err := s.historyEngine.RequestCancelWorkflowExecution(&h.RequestCancelWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
Expand Down Expand Up @@ -703,7 +696,6 @@ func (s *engine2Suite) TestRespondDecisionTaskCompletedRecordMarkerDecision() {
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

err := s.historyEngine.RespondDecisionTaskCompleted(context.Background(), &h.RespondDecisionTaskCompletedRequest{
DomainUUID: common.StringPtr(domainID),
Expand Down Expand Up @@ -731,7 +723,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() {

s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
Expand Down Expand Up @@ -769,7 +760,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() {
}).Once()
s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once()

s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()
resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
Expand Down Expand Up @@ -806,7 +796,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() {
}).Once()
s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once()

s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()
resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
Expand Down Expand Up @@ -865,7 +854,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() {
s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once()
}

s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()
resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
Expand Down Expand Up @@ -942,7 +930,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() {
s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once()
}

s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()
resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
Expand Down Expand Up @@ -1002,7 +989,6 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal() {
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once()
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false).Once()

resp, err := s.historyEngine.SignalWithStartWorkflowExecution(sRequest)
s.Nil(err)
Expand Down
22 changes: 19 additions & 3 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type (
isWorkflowRunning bool
timestamp time.Time
}

// error which will be thrown if the timer / transfer task should be
// retries due to various of reasons
taskRetryError struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is weird to use an empty struct as an error. Can you instead use errors.New?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nop, i need to use a dedicated error so the worker can do retry accordingly.


// Engine represents an interface for managing workflow execution history.
Engine interface {
common.Daemon
Expand Down Expand Up @@ -105,6 +110,9 @@ type (
NotifyNewTask()
}

// TODO the timer quque processor and the one below, timer processor
// in combination are confusing, we should consider a better naming
// convention, or at least come with a better name for this case.
timerQueueProcessor interface {
common.Daemon
NotifyNewTimers(clusterName string, timerTask []persistence.Task)
Expand All @@ -118,12 +126,11 @@ type (
}

timerQueueAckMgr interface {
readRetryTimerTasks() []*persistence.TimerTaskInfo
getFinishedChan() <-chan struct{}
readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(timerTask *persistence.TimerTaskInfo)
getAckLevel() TimerSequenceID
updateAckLevel()
isProcessNow(time.Time) bool
}

historyEventNotifier interface {
Expand All @@ -133,3 +140,12 @@ type (
UnwatchHistoryEvent(identifier *workflowIdentifier, subscriberID string) error
}
)

// newTaskRetryError create a error which indicate the task should be retry
func newTaskRetryError() *taskRetryError {
return &taskRetryError{}
}

func (e *taskRetryError) Error() string {
return "passive task should retry due to condition in mutable state is not met."
}
Loading