Skip to content

Commit

Permalink
separate transfer queue to active and standby processor
Browse files Browse the repository at this point in the history
separate queue ack mgr into separate file
  • Loading branch information
Wenquan Xing committed Apr 11, 2018
1 parent 41fcd03 commit c42b31d
Show file tree
Hide file tree
Showing 27 changed files with 2,757 additions and 1,663 deletions.
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
// workflow logging tags
TagWorkflowEventID = "wf-event-id"
TagWorkflowComponent = "wf-component"
TagWorkflowCluster = "wf-cluster"
TagWorkflowErr = "wf-error"
TagHistoryBuilderAction = "history-builder-action"
TagStoreOperation = "store-operation"
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,9 +2037,6 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq

response.Timers = append(response.Timers, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
Expand Down
6 changes: 2 additions & 4 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,10 +877,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, nextToken, err1 := s.GetTimerIndexTasks()
timerTasks, err1 := s.GetTimerIndexTasks()
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")
s.Equal(0, len(nextToken))
s.Equal(3, len(timerTasks))
s.Equal(TaskTypeWorkflowTimeout, timerTasks[1].TaskType)
s.Equal(TaskTypeDeleteHistoryEvent, timerTasks[2].TaskType)
Expand All @@ -895,10 +894,9 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 = s.CompleteTimerTask(timerTasks[2].VisibilityTimestamp, timerTasks[2].TaskID)
s.Nil(err2, "No error expected.")

timerTasks2, nextToken, err2 := s.GetTimerIndexTasks()
timerTasks2, err2 := s.GetTimerIndexTasks()
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
s.Equal(0, len(nextToken))
}

func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,7 @@ type (

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
NextPageToken []byte
Timers []*TimerTaskInfo
}

// SerializedHistoryEventBatch represents a serialized batch of history events
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,17 +748,17 @@ func (s *TestBase) CompleteReplicationTask(taskID int64) error {
}

// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, []byte, error) {
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
MinTimestamp: time.Time{},
MaxTimestamp: time.Unix(0, math.MaxInt64),
BatchSize: 10})

if err != nil {
return nil, nil, err
return nil, err
}

return response.Timers, response.NextPageToken, nil
return response.Timers, nil
}

// CompleteTimerTask is a utility method to complete a timer task
Expand Down
29 changes: 16 additions & 13 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (

type (
historyEngineImpl struct {
currentclusterName string
currentClusterName string
shard ShardContext
historyMgr persistence.HistoryManager
executionManager persistence.ExecutionManager
Expand All @@ -69,6 +69,7 @@ type (
// shardContextWrapper wraps ShardContext to notify transferQueueProcessor on new tasks.
// TODO: use to notify timerQueueProcessor as well.
shardContextWrapper struct {
currentClusterName string
ShardContext
txProcessor transferQueueProcessor
replcatorProcessor queueProcessor
Expand Down Expand Up @@ -110,7 +111,9 @@ var (
// NewEngineWithShardContext creates an instance of history engine
func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.VisibilityManager,
matching matching.Client, historyClient hc.Client, historyEventNotifier historyEventNotifier, publisher messaging.Producer) Engine {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
shardWrapper := &shardContextWrapper{
currentClusterName: currentClusterName,
ShardContext: shard,
historyEventNotifier: historyEventNotifier,
}
Expand All @@ -121,7 +124,7 @@ func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.Vis
historyCache := newHistoryCache(shard, logger)
historySerializerFactory := persistence.NewHistorySerializerFactory()
historyEngImpl := &historyEngineImpl{
currentclusterName: shard.GetService().GetClusterMetadata().GetCurrentClusterName(),
currentClusterName: currentClusterName,
shard: shard,
historyMgr: historyManager,
executionManager: executionManager,
Expand All @@ -134,15 +137,15 @@ func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.Vis
metricsClient: shard.GetMetricsClient(),
historyEventNotifier: historyEventNotifier,
}
txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, executionManager, logger)
txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient, logger)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, logger)
historyEngImpl.txProcessor = txProcessor
shardWrapper.txProcessor = txProcessor

// Only start the replicator processor if valid publisher is passed in
if publisher != nil {
replicatorProcessor := newReplicatorQueueProcessor(shard, publisher, executionManager, historyManager,
historySerializerFactory)
historySerializerFactory, logger)
historyEngImpl.replicatorProcessor = replicatorProcessor
shardWrapper.replcatorProcessor = replicatorProcessor
historyEngImpl.replicator = newHistoryReplicator(shard, historyCache, shard.GetDomainCache(), historyManager,
Expand Down Expand Up @@ -385,7 +388,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
}

if err == nil {
e.timerProcessor.NotifyNewTimers(e.currentclusterName, timerTasks)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, timerTasks)

return &workflow.StartWorkflowExecutionResponse{
RunId: common.StringPtr(resultRunID),
Expand Down Expand Up @@ -661,7 +664,7 @@ Update_History_Loop:
// Start a timer for the decision task.
timeOutTask := tBuilder.AddDecisionTimoutTask(scheduleID, di.Attempt, di.DecisionTimeout)
timerTasks := []persistence.Task{timeOutTask}
defer e.timerProcessor.NotifyNewTimers(e.currentclusterName, timerTasks)
defer e.timerProcessor.NotifyNewTimers(e.currentClusterName, timerTasks)

// Generate a transaction ID for appending events to history
transactionID, err2 := e.shard.GetNextTransferTaskID()
Expand Down Expand Up @@ -1266,7 +1269,7 @@ Update_History_Loop:
// add continueAsNewTimerTask
timerTasks = append(timerTasks, continueAsNewTimerTasks...)
// Inform timer about the new ones.
e.timerProcessor.NotifyNewTimers(e.currentclusterName, timerTasks)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, timerTasks)

return err
}
Expand Down Expand Up @@ -1704,7 +1707,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(signalWithStartRequ
}
return nil, err
}
e.timerProcessor.NotifyNewTimers(e.currentclusterName, timerTasks)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, timerTasks)
return &workflow.StartWorkflowExecutionResponse{RunId: context.workflowExecution.RunId}, nil
} // end for Just_Signal_Loop
if attempt == conditionalRetryCount {
Expand Down Expand Up @@ -1826,7 +1829,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(signalWithStartRequ
// try to create the workflow execution
resultRunID, err := createWorkflow(isBrandNew, prevRunID) // (true, "") or (false, "prevRunID")
if err == nil {
e.timerProcessor.NotifyNewTimers(e.currentclusterName, timerTasks)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, timerTasks)

return &workflow.StartWorkflowExecutionResponse{
RunId: common.StringPtr(resultRunID),
Expand Down Expand Up @@ -2035,7 +2038,7 @@ Update_History_Loop:
}
return err
}
e.timerProcessor.NotifyNewTimers(e.currentclusterName, timerTasks)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, timerTasks)
return nil
}
return ErrMaxAttemptsExceeded
Expand Down Expand Up @@ -2132,7 +2135,7 @@ func (s *shardContextWrapper) UpdateWorkflowExecution(request *persistence.Updat
err := s.ShardContext.UpdateWorkflowExecution(request)
if err == nil {
if len(request.TransferTasks) > 0 {
s.txProcessor.NotifyNewTask()
s.txProcessor.NotifyNewTask(s.currentClusterName)
}
if len(request.ReplicationTasks) > 0 {
s.replcatorProcessor.NotifyNewTask()
Expand All @@ -2146,7 +2149,7 @@ func (s *shardContextWrapper) CreateWorkflowExecution(request *persistence.Creat
resp, err := s.ShardContext.CreateWorkflowExecution(request)
if err == nil {
if len(request.TransferTasks) > 0 {
s.txProcessor.NotifyNewTask()
s.txProcessor.NotifyNewTask(s.currentClusterName)
}
if len(request.ReplicationTasks) > 0 {
s.replcatorProcessor.NotifyNewTask()
Expand Down
11 changes: 5 additions & 6 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (s *engine2Suite) SetupTest() {
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil)
s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.logger)
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions)

domainCache := cache.NewDomainCache(s.mockMetadataMgr, s.mockClusterMetadata, s.logger)
mockShard := &shardContextImpl{
Expand All @@ -130,11 +132,8 @@ func (s *engine2Suite) SetupTest() {
}

historyCache := newHistoryCache(mockShard, s.logger)
// this is used by shard context, not relevent to this test, so we do not care how many times "GetCurrentClusterName" os called
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions)
h := &historyEngineImpl{
currentclusterName: mockShard.GetService().GetClusterMetadata().GetCurrentClusterName(),
currentClusterName: mockShard.GetService().GetClusterMetadata().GetCurrentClusterName(),
shard: mockShard,
executionManager: s.mockExecutionMgr,
historyMgr: s.mockHistoryMgr,
Expand All @@ -144,8 +143,8 @@ func (s *engine2Suite) SetupTest() {
tokenSerializer: common.NewJSONTaskTokenSerializer(),
hSerializerFactory: persistence.NewHistorySerializerFactory(),
}
h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockExecutionMgr, s.logger)
h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient, s.logger)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.logger)
s.historyEngine = h
}

Expand Down
29 changes: 21 additions & 8 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type (
isWorkflowRunning bool
timestamp time.Time
}

// error which will be thrown if the timer / transfer task should be
// retries due to various of reasons
taskRetryError struct{}

// Engine represents an interface for managing workflow execution history.
Engine interface {
common.Daemon
Expand Down Expand Up @@ -94,15 +99,15 @@ type (
}

processor interface {
GetName() string
Process(task queueTaskInfo) error
ReadTasks(readLevel int64) ([]queueTaskInfo, error)
CompleteTask(taskID int64) error
process(task queueTaskInfo) error
readTasks(readLevel int64) ([]queueTaskInfo, bool, error)
completeTask(taskID int64) error
updateAckLevel(taskID int64) error
}

transferQueueProcessor interface {
common.Daemon
NotifyNewTask()
NotifyNewTask(clusterName string)
}

timerQueueProcessor interface {
Expand All @@ -118,12 +123,11 @@ type (
}

timerQueueAckMgr interface {
readRetryTimerTasks() []*persistence.TimerTaskInfo
getFinishedChan() <-chan struct{}
readTimerTasks() ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(timerTask *persistence.TimerTaskInfo)
getAckLevel() TimerSequenceID
updateAckLevel()
isProcessNow(time.Time) bool
}

historyEventNotifier interface {
Expand All @@ -133,3 +137,12 @@ type (
UnwatchHistoryEvent(identifier *workflowIdentifier, subscriberID string) error
}
)

// newTaskRetryError create a error which indicate the task should be retry
func newTaskRetryError() *taskRetryError {
return &taskRetryError{}
}

func (e *taskRetryError) Error() string {
return "passive task should retry due to condition in mutable state is not met."
}
13 changes: 7 additions & 6 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func (s *engineSuite) SetupTest() {
s.mockMetricClient = metrics.NewClient(tally.NoopScope, metrics.History)
s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil)
s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.logger)
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions)

historyEventNotifier := newHistoryEventNotifier(
s.mockMetricClient,
Expand All @@ -136,17 +138,16 @@ func (s *engineSuite) SetupTest() {
logger: s.logger,
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
}
currentClusterName := s.mockService.GetClusterMetadata().GetCurrentClusterName()
shardContextWrapper := &shardContextWrapper{
currentClusterName: currentClusterName,
ShardContext: mockShard,
historyEventNotifier: historyEventNotifier,
}

historyCache := newHistoryCache(shardContextWrapper, s.logger)
// this is used by shard context, not relevent to this test, so we do not care how many times "GetCurrentClusterName" os called
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestAllClusterFailoverVersions)
h := &historyEngineImpl{
currentclusterName: shardContextWrapper.GetService().GetClusterMetadata().GetCurrentClusterName(),
currentClusterName: currentClusterName,
shard: shardContextWrapper,
executionManager: s.mockExecutionMgr,
historyMgr: s.mockHistoryMgr,
Expand All @@ -157,8 +158,8 @@ func (s *engineSuite) SetupTest() {
hSerializerFactory: persistence.NewHistorySerializerFactory(),
historyEventNotifier: historyEventNotifier,
}
h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.mockExecutionMgr, s.logger)
h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient, s.logger)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.logger)
h.historyEventNotifier.Start()
shardContextWrapper.txProcessor = h.txProcessor
s.mockHistoryEngine = h
Expand Down
Loading

0 comments on commit c42b31d

Please sign in to comment.