Skip to content

Commit

Permalink
compact duplicate code, add more logging (#846)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Jun 13, 2018
1 parent 4f8c0c3 commit a8958f0
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 94 deletions.
16 changes: 16 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,14 @@ func (t *TransferTaskInfo) GetTaskType() int {
return t.TaskType
}

// String returns string
func (t *TransferTaskInfo) String() string {
return fmt.Sprintf(
"{DomainID: %v, WorkflowID: %v, RunID: %v, TaskID: %v, TargetDomainID: %v, TargetWorkflowID %v, TargetRunID: %v, TargetChildWorkflowOnly: %v, TaskList: %v, TaskType: %v, ScheduleID: %v, Version: %v.}",
t.DomainID, t.WorkflowID, t.RunID, t.TaskID, t.TargetDomainID, t.TargetWorkflowID, t.TargetRunID, t.TargetChildWorkflowOnly, t.TaskList, t.TaskType, t.ScheduleID, t.Version,
)
}

// GetTaskID returns the task ID for replication task
func (t *ReplicationTaskInfo) GetTaskID() int64 {
return t.TaskID
Expand Down Expand Up @@ -1401,6 +1409,14 @@ func (t *TimerTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetTaskType returns the task type for timer task
func (t *TimerTaskInfo) String() string {
return fmt.Sprintf(
"{DomainID: %v, WorkflowID: %v, RunID: %v, VisibilityTimestamp: %v, TaskID: %v, TaskType: %v, TimeoutType: %v, EventID: %v, ScheduleAttempt: %v, Version: %v.}",
t.DomainID, t.WorkflowID, t.RunID, t.VisibilityTimestamp, t.TaskID, t.TaskType, t.TimeoutType, t.EventID, t.ScheduleAttempt, t.Version,
)
}

// NewHistoryEventBatch returns a new instance of HistoryEventBatch
func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch {
return &HistoryEventBatch{
Expand Down
70 changes: 59 additions & 11 deletions service/history/failoverCheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,85 @@ import (
"github.com/uber/cadence/common/persistence"
)

// verifyTimerTaskVersion, when not encounter, will return false if true if failover version check is successful
func verifyTransferTaskVersion(shard ShardContext, domainID string, version int64, task *persistence.TransferTaskInfo) (bool, error) {
if !shard.GetService().GetClusterMetadata().IsGlobalDomainEnabled() {
// verifyActiveTask, will return true if task activeness check is successful
func verifyActiveTask(shard ShardContext, logger bark.Logger, taskDomainID string, task interface{}) (bool, error) {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
domainEntry, err := shard.GetDomainCache().GetDomainByID(taskDomainID)
if err != nil {
// it is possible that the domain is deleted
// we should treat that domain as active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
logger.Warnf("Cannot find domainID: %v, err: %v.", taskDomainID)
return false, err
}
logger.Warnf("Cannot find domainID: %v, default to process task: %v.", taskDomainID, task)
return true, nil
}
if domainEntry.IsGlobalDomain() && currentClusterName != domainEntry.GetReplicationConfig().ActiveClusterName {
// timer task does not belong to cluster name
logger.Debugf("DomainID: %v is not active, skip task: %v.", taskDomainID, task)
return false, nil
}
logger.Debugf("DomainID: %v is active, process task: %v.", taskDomainID, task)
return true, nil
}

// the first return value is whether this task is valid for further processing
domainEntry, err := shard.GetDomainCache().GetDomainByID(domainID)
// verifyFailoverActiveTask, will return true if task activeness check is successful
func verifyFailoverActiveTask(logger bark.Logger, targetDomainID string, taskDomainID string, task interface{}) (bool, error) {
if targetDomainID == taskDomainID {
logger.Debugf("Failover DomainID: %v is active, process task: %v.", taskDomainID, task)
return true, nil
}
logger.Debugf("Failover DomainID: %v is not active, skip task: %v.", taskDomainID, task)
return false, nil
}

// verifyStandbyTask, will return true if task standbyness check is successful
func verifyStandbyTask(shard ShardContext, logger bark.Logger, standbyCluster string, taskDomainID string, task interface{}) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(taskDomainID)
if err != nil {
return false, err
// it is possible that the domain is deleted
// we should treat that domain as not active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
logger.Warnf("Cannot find domainID: %v, err: %v.", taskDomainID)
return false, err
}
logger.Warnf("Cannot find domainID: %v, default to not process task: %v.", taskDomainID, task)
return false, nil
}
if !domainEntry.IsGlobalDomain() {
return true, nil
} else if version != task.GetVersion() {
// non global domain, timer task does not belong here
logger.Debugf("DomainID: %v is not global, skip task: %v.", taskDomainID, task)
return false, nil
} else if domainEntry.IsGlobalDomain() && domainEntry.GetReplicationConfig().ActiveClusterName != standbyCluster {
// timer task does not belong here
logger.Debugf("DomainID: %v is not standby, skip task: %v.", taskDomainID, task)
return false, nil
}
logger.Debugf("DomainID: %v is standby, process task: %v.", taskDomainID, task)
return true, nil
}

// verifyTimerTaskVersion, when not encounter, will return false if true if failover version check is successful
func verifyTimerTaskVersion(shard ShardContext, domainID string, version int64, task *persistence.TimerTaskInfo) (bool, error) {
// verifyTaskVersion, will return true if failover version check is successful
func verifyTaskVersion(shard ShardContext, logger bark.Logger, domainID string, version int64, taskVersion int64, task interface{}) (bool, error) {
if !shard.GetService().GetClusterMetadata().IsGlobalDomainEnabled() {
return true, nil
}

// the first return value is whether this task is valid for further processing
domainEntry, err := shard.GetDomainCache().GetDomainByID(domainID)
if err != nil {
logger.Debugf("Cannot find domainID: %v, err: %v.", domainID, task)
return false, err
}
if !domainEntry.IsGlobalDomain() {
logger.Debugf("DomainID: %v is not active, task: %v version check pass", domainID, task)
return true, nil
} else if version != task.GetVersion() {
} else if version != taskVersion {
logger.Debugf("DomainID: %v is active, task: %v version != target version: %v.", domainID, task, version)
return false, nil
}
logger.Debugf("DomainID: %v is active, task: %v version == target version: %v.", domainID, task, version)
return true, nil
}

Expand All @@ -72,6 +116,8 @@ func loadMutableStateForTransferTask(context *workflowExecutionContext, transfer
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
logger.Debugf("Cannot find execution: domainID: %v, workflowID: %v, runID: %v when processing transfer taskID: %v, eventID: %v",
context.domainID, context.workflowExecution.GetWorkflowId(), context.workflowExecution.GetRunId(), transferTask.TaskID, transferTask.ScheduleID)
return nil, nil
}
return nil, err
Expand Down Expand Up @@ -110,6 +156,8 @@ func loadMutableStateForTimerTask(context *workflowExecutionContext, timerTask *
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
logger.Debugf("Cannot find execution: domainID: %v, workflowID: %v, runID: %v when processing timer timestamp %v, taskID: %v, eventID: %v",
context.domainID, context.workflowExecution.GetWorkflowId(), context.workflowExecution.GetRunId(), timerTask.VisibilityTimestamp, timerTask.TaskID, timerTask.EventID)
return nil, nil
}
return nil, err
Expand Down
36 changes: 10 additions & 26 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,34 +51,21 @@ type (
)

func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEngineImpl, matchingClient matching.Client, logger bark.Logger) *timerQueueActiveProcessorImpl {
clusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
timeNow := func() time.Time {
return shard.GetCurrentTime(clusterName)
return shard.GetCurrentTime(currentClusterName)
}
logger = logger.WithFields(bark.Fields{
logging.TagWorkflowCluster: clusterName,
logging.TagWorkflowCluster: currentClusterName,
})
timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID)
if err != nil {
// it is possible that domain is deleted,
// we should treat that domain being active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
return false, err
}
return true, nil
}
if domainEntry.IsGlobalDomain() && clusterName != domainEntry.GetReplicationConfig().ActiveClusterName {
// timer task does not belong to cluster name
return false, nil
}
return true, nil
return verifyActiveTask(shard, logger, timer.DomainID, timer)
}

timerGate := NewLocalTimerGate()
// this will trigger a timer gate fire event immediately
timerGate.Update(time.Time{})
timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, clusterName, logger)
timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, currentClusterName, logger)
processor := &timerQueueActiveProcessorImpl{
shard: shard,
historyService: historyService,
Expand All @@ -87,7 +74,7 @@ func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEng
logger: logger,
matchingClient: matchingClient,
metricsClient: historyService.metricsClient,
currentClusterName: clusterName,
currentClusterName: currentClusterName,
timerGate: timerGate,
timerQueueProcessorBase: newTimerQueueProcessorBase(shard, historyService, timerQueueAckMgr, timeNow, logger),
timerQueueAckMgr: timerQueueAckMgr,
Expand All @@ -109,10 +96,7 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE
logging.TagFailover: "from: " + standbyClusterName,
})
timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) {
if timer.DomainID == domainID {
return true, nil
}
return false, nil
return verifyFailoverActiveTask(logger, domainID, timer.DomainID, timer)
}

timerQueueAckMgr := newTimerQueueFailoverAckMgr(shard, historyService.metricsClient, standbyClusterName, minLevel, maxLevel, logger)
Expand Down Expand Up @@ -465,7 +449,7 @@ Update_History_Loop:
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TaskTypeDecisionTimeout, task.TaskID, scheduleID)
return nil
}
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, di.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, di.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -539,7 +523,7 @@ func (t *timerQueueActiveProcessorImpl) processRetryTimer(task *persistence.Time
if !running || task.ScheduleAttempt < int64(ai.Attempt) {
return nil
}
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, ai.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, ai.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -612,7 +596,7 @@ Update_History_Loop:

// do version check for global domain task
if msBuilder.GetReplicationState() != nil {
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetReplicationState().StartVersion, task)
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, msBuilder.GetReplicationState().StartVersion, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (t *timerQueueProcessorBase) processDeleteHistoryEvent(task *persistence.Ti
} else if msBuilder == nil {
return nil
}
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetLastWriteVersion(), task)
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, msBuilder.GetLastWriteVersion(), task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down
17 changes: 3 additions & 14 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,7 @@ func newTimerQueueStandbyProcessor(shard ShardContext, historyService *historyEn
logging.TagWorkflowCluster: clusterName,
})
timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID)
if err != nil {
return false, err
}
if !domainEntry.IsGlobalDomain() {
// non global domain, timer task does not belong here
return false, nil
} else if domainEntry.IsGlobalDomain() && domainEntry.GetReplicationConfig().ActiveClusterName != clusterName {
// timer task does not belong here
return false, nil
}
return true, nil
return verifyStandbyTask(shard, logger, clusterName, timer.DomainID, timer)
}

timerGate := NewRemoteTimerGate()
Expand Down Expand Up @@ -272,7 +261,7 @@ func (t *timerQueueStandbyProcessorImpl) processDecisionTimeout(timerTask *persi
return nil
}

ok, err := verifyTimerTaskVersion(t.shard, timerTask.DomainID, di.Version, timerTask)
ok, err := verifyTaskVersion(t.shard, t.logger, timerTask.DomainID, di.Version, timerTask.Version, timerTask)
if err != nil {
return err
} else if !ok {
Expand All @@ -298,7 +287,7 @@ func (t *timerQueueStandbyProcessorImpl) processWorkflowTimeout(timerTask *persi
// we do not need to notity new timer to base, since if there is no new event being replicated
// checking again if the timer can be completed is meaningless

ok, err := verifyTimerTaskVersion(t.shard, timerTask.DomainID, msBuilder.GetStartVersion(), timerTask)
ok, err := verifyTaskVersion(t.shard, t.logger, timerTask.DomainID, msBuilder.GetStartVersion(), timerTask.Version, timerTask)
if err != nil {
return err
} else if !ok {
Expand Down
32 changes: 8 additions & 24 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,7 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history
logging.TagWorkflowCluster: currentClusterName,
})
transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID)
if err != nil {
// it is possible that the domain is deleted
// we should treat that domain as active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
return false, err
}
return true, nil
}
if domainEntry.IsGlobalDomain() && currentClusterName != domainEntry.GetReplicationConfig().ActiveClusterName {
// timer task does not belong to cluster name
return false, nil
}
return true, nil
return verifyActiveTask(shard, logger, task.DomainID, task)
}
maxReadAckLevel := func() int64 {
return shard.GetTransferMaxReadLevel()
Expand Down Expand Up @@ -150,10 +137,7 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo
logging.TagFailover: "from: " + standbyClusterName,
})
transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) {
if task.DomainID == domainID {
return true, nil
}
return false, nil
return verifyFailoverActiveTask(logger, domainID, task.DomainID, task)
}
maxReadAckLevel := func() int64 {
return maxLevel // this is a const
Expand Down Expand Up @@ -282,7 +266,7 @@ func (t *transferQueueActiveProcessorImpl) processActivityTask(task *persistence
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeActivityTask, task.TaskID, task.ScheduleID)
return
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, ai.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ai.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -340,7 +324,7 @@ func (t *transferQueueActiveProcessorImpl) processDecisionTask(task *persistence
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TaskTypeDecisionTimeout, task.TaskID, task.ScheduleID)
return nil
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, di.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, di.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -411,7 +395,7 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(task *persisten
return nil
}

ok, err := verifyTransferTaskVersion(t.shard, domainID, msBuilder.GetLastWriteVersion(), task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, msBuilder.GetLastWriteVersion(), task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -525,7 +509,7 @@ func (t *transferQueueActiveProcessorImpl) processCancelExecution(task *persiste
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeCancelExecution, task.TaskID, task.ScheduleID)
return nil
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, ri.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ri.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -648,7 +632,7 @@ func (t *transferQueueActiveProcessorImpl) processSignalExecution(task *persiste
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeCancelExecution, task.TaskID, task.ScheduleID)
return nil
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, si.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, si.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -802,7 +786,7 @@ func (t *transferQueueActiveProcessorImpl) processStartChildExecution(task *pers
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeStartChildExecution, task.TaskID, task.ScheduleID)
return nil
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, ci.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ci.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down
Loading

0 comments on commit a8958f0

Please sign in to comment.