Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact duplicate code, add more logging #846

Merged
merged 1 commit into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,14 @@ func (t *TransferTaskInfo) GetTaskType() int {
return t.TaskType
}

// String returns string
func (t *TransferTaskInfo) String() string {
return fmt.Sprintf(
"{DomainID: %v, WorkflowID: %v, RunID: %v, TaskID: %v, TargetDomainID: %v, TargetWorkflowID %v, TargetRunID: %v, TargetChildWorkflowOnly: %v, TaskList: %v, TaskType: %v, ScheduleID: %v, Version: %v.}",
t.DomainID, t.WorkflowID, t.RunID, t.TaskID, t.TargetDomainID, t.TargetWorkflowID, t.TargetRunID, t.TargetChildWorkflowOnly, t.TaskList, t.TaskType, t.ScheduleID, t.Version,
)
}

// GetTaskID returns the task ID for replication task
func (t *ReplicationTaskInfo) GetTaskID() int64 {
return t.TaskID
Expand Down Expand Up @@ -1401,6 +1409,14 @@ func (t *TimerTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetTaskType returns the task type for timer task
func (t *TimerTaskInfo) String() string {
return fmt.Sprintf(
"{DomainID: %v, WorkflowID: %v, RunID: %v, VisibilityTimestamp: %v, TaskID: %v, TaskType: %v, TimeoutType: %v, EventID: %v, ScheduleAttempt: %v, Version: %v.}",
t.DomainID, t.WorkflowID, t.RunID, t.VisibilityTimestamp, t.TaskID, t.TaskType, t.TimeoutType, t.EventID, t.ScheduleAttempt, t.Version,
)
}

// NewHistoryEventBatch returns a new instance of HistoryEventBatch
func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch {
return &HistoryEventBatch{
Expand Down
70 changes: 59 additions & 11 deletions service/history/failoverCheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,85 @@ import (
"github.com/uber/cadence/common/persistence"
)

// verifyTimerTaskVersion, when not encounter, will return false if true if failover version check is successful
func verifyTransferTaskVersion(shard ShardContext, domainID string, version int64, task *persistence.TransferTaskInfo) (bool, error) {
if !shard.GetService().GetClusterMetadata().IsGlobalDomainEnabled() {
// verifyActiveTask, will return true if task activeness check is successful
func verifyActiveTask(shard ShardContext, logger bark.Logger, taskDomainID string, task interface{}) (bool, error) {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
domainEntry, err := shard.GetDomainCache().GetDomainByID(taskDomainID)
if err != nil {
// it is possible that the domain is deleted
// we should treat that domain as active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
logger.Warnf("Cannot find domainID: %v, err: %v.", taskDomainID)
return false, err
}
logger.Warnf("Cannot find domainID: %v, default to process task: %v.", taskDomainID, task)
return true, nil
}
if domainEntry.IsGlobalDomain() && currentClusterName != domainEntry.GetReplicationConfig().ActiveClusterName {
// timer task does not belong to cluster name
logger.Debugf("DomainID: %v is not active, skip task: %v.", taskDomainID, task)
return false, nil
}
logger.Debugf("DomainID: %v is active, process task: %v.", taskDomainID, task)
return true, nil
}

// the first return value is whether this task is valid for further processing
domainEntry, err := shard.GetDomainCache().GetDomainByID(domainID)
// verifyFailoverActiveTask, will return true if task activeness check is successful
func verifyFailoverActiveTask(logger bark.Logger, targetDomainID string, taskDomainID string, task interface{}) (bool, error) {
if targetDomainID == taskDomainID {
logger.Debugf("Failover DomainID: %v is active, process task: %v.", taskDomainID, task)
return true, nil
}
logger.Debugf("Failover DomainID: %v is not active, skip task: %v.", taskDomainID, task)
return false, nil
}

// verifyStandbyTask, will return true if task standbyness check is successful
func verifyStandbyTask(shard ShardContext, logger bark.Logger, standbyCluster string, taskDomainID string, task interface{}) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(taskDomainID)
if err != nil {
return false, err
// it is possible that the domain is deleted
// we should treat that domain as not active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
logger.Warnf("Cannot find domainID: %v, err: %v.", taskDomainID)
return false, err
}
logger.Warnf("Cannot find domainID: %v, default to not process task: %v.", taskDomainID, task)
return false, nil
}
if !domainEntry.IsGlobalDomain() {
return true, nil
} else if version != task.GetVersion() {
// non global domain, timer task does not belong here
logger.Debugf("DomainID: %v is not global, skip task: %v.", taskDomainID, task)
return false, nil
} else if domainEntry.IsGlobalDomain() && domainEntry.GetReplicationConfig().ActiveClusterName != standbyCluster {
// timer task does not belong here
logger.Debugf("DomainID: %v is not standby, skip task: %v.", taskDomainID, task)
return false, nil
}
logger.Debugf("DomainID: %v is standby, process task: %v.", taskDomainID, task)
return true, nil
}

// verifyTimerTaskVersion, when not encounter, will return false if true if failover version check is successful
func verifyTimerTaskVersion(shard ShardContext, domainID string, version int64, task *persistence.TimerTaskInfo) (bool, error) {
// verifyTaskVersion, will return true if failover version check is successful
func verifyTaskVersion(shard ShardContext, logger bark.Logger, domainID string, version int64, taskVersion int64, task interface{}) (bool, error) {
if !shard.GetService().GetClusterMetadata().IsGlobalDomainEnabled() {
return true, nil
}

// the first return value is whether this task is valid for further processing
domainEntry, err := shard.GetDomainCache().GetDomainByID(domainID)
if err != nil {
logger.Debugf("Cannot find domainID: %v, err: %v.", domainID, task)
return false, err
}
if !domainEntry.IsGlobalDomain() {
logger.Debugf("DomainID: %v is not active, task: %v version check pass", domainID, task)
return true, nil
} else if version != task.GetVersion() {
} else if version != taskVersion {
logger.Debugf("DomainID: %v is active, task: %v version != target version: %v.", domainID, task, version)
return false, nil
}
logger.Debugf("DomainID: %v is active, task: %v version == target version: %v.", domainID, task, version)
return true, nil
}

Expand All @@ -72,6 +116,8 @@ func loadMutableStateForTransferTask(context *workflowExecutionContext, transfer
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
logger.Debugf("Cannot find execution: domainID: %v, workflowID: %v, runID: %v when processing transfer taskID: %v, eventID: %v",
context.domainID, context.workflowExecution.GetWorkflowId(), context.workflowExecution.GetRunId(), transferTask.TaskID, transferTask.ScheduleID)
return nil, nil
}
return nil, err
Expand Down Expand Up @@ -110,6 +156,8 @@ func loadMutableStateForTimerTask(context *workflowExecutionContext, timerTask *
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
logger.Debugf("Cannot find execution: domainID: %v, workflowID: %v, runID: %v when processing timer timestamp %v, taskID: %v, eventID: %v",
context.domainID, context.workflowExecution.GetWorkflowId(), context.workflowExecution.GetRunId(), timerTask.VisibilityTimestamp, timerTask.TaskID, timerTask.EventID)
return nil, nil
}
return nil, err
Expand Down
36 changes: 10 additions & 26 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,34 +51,21 @@ type (
)

func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEngineImpl, matchingClient matching.Client, logger bark.Logger) *timerQueueActiveProcessorImpl {
clusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
timeNow := func() time.Time {
return shard.GetCurrentTime(clusterName)
return shard.GetCurrentTime(currentClusterName)
}
logger = logger.WithFields(bark.Fields{
logging.TagWorkflowCluster: clusterName,
logging.TagWorkflowCluster: currentClusterName,
})
timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID)
if err != nil {
// it is possible that domain is deleted,
// we should treat that domain being active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
return false, err
}
return true, nil
}
if domainEntry.IsGlobalDomain() && clusterName != domainEntry.GetReplicationConfig().ActiveClusterName {
// timer task does not belong to cluster name
return false, nil
}
return true, nil
return verifyActiveTask(shard, logger, timer.DomainID, timer)
}

timerGate := NewLocalTimerGate()
// this will trigger a timer gate fire event immediately
timerGate.Update(time.Time{})
timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, clusterName, logger)
timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, currentClusterName, logger)
processor := &timerQueueActiveProcessorImpl{
shard: shard,
historyService: historyService,
Expand All @@ -87,7 +74,7 @@ func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEng
logger: logger,
matchingClient: matchingClient,
metricsClient: historyService.metricsClient,
currentClusterName: clusterName,
currentClusterName: currentClusterName,
timerGate: timerGate,
timerQueueProcessorBase: newTimerQueueProcessorBase(shard, historyService, timerQueueAckMgr, timeNow, logger),
timerQueueAckMgr: timerQueueAckMgr,
Expand All @@ -109,10 +96,7 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE
logging.TagFailover: "from: " + standbyClusterName,
})
timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) {
if timer.DomainID == domainID {
return true, nil
}
return false, nil
return verifyFailoverActiveTask(logger, domainID, timer.DomainID, timer)
}

timerQueueAckMgr := newTimerQueueFailoverAckMgr(shard, historyService.metricsClient, standbyClusterName, minLevel, maxLevel, logger)
Expand Down Expand Up @@ -465,7 +449,7 @@ Update_History_Loop:
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TaskTypeDecisionTimeout, task.TaskID, scheduleID)
return nil
}
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, di.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, di.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -539,7 +523,7 @@ func (t *timerQueueActiveProcessorImpl) processRetryTimer(task *persistence.Time
if !running || task.ScheduleAttempt < int64(ai.Attempt) {
return nil
}
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, ai.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, ai.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -612,7 +596,7 @@ Update_History_Loop:

// do version check for global domain task
if msBuilder.GetReplicationState() != nil {
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetReplicationState().StartVersion, task)
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, msBuilder.GetReplicationState().StartVersion, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (t *timerQueueProcessorBase) processDeleteHistoryEvent(task *persistence.Ti
} else if msBuilder == nil {
return nil
}
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetLastWriteVersion(), task)
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, msBuilder.GetLastWriteVersion(), task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down
17 changes: 3 additions & 14 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,7 @@ func newTimerQueueStandbyProcessor(shard ShardContext, historyService *historyEn
logging.TagWorkflowCluster: clusterName,
})
timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID)
if err != nil {
return false, err
}
if !domainEntry.IsGlobalDomain() {
// non global domain, timer task does not belong here
return false, nil
} else if domainEntry.IsGlobalDomain() && domainEntry.GetReplicationConfig().ActiveClusterName != clusterName {
// timer task does not belong here
return false, nil
}
return true, nil
return verifyStandbyTask(shard, logger, clusterName, timer.DomainID, timer)
}

timerGate := NewRemoteTimerGate()
Expand Down Expand Up @@ -272,7 +261,7 @@ func (t *timerQueueStandbyProcessorImpl) processDecisionTimeout(timerTask *persi
return nil
}

ok, err := verifyTimerTaskVersion(t.shard, timerTask.DomainID, di.Version, timerTask)
ok, err := verifyTaskVersion(t.shard, t.logger, timerTask.DomainID, di.Version, timerTask.Version, timerTask)
if err != nil {
return err
} else if !ok {
Expand All @@ -298,7 +287,7 @@ func (t *timerQueueStandbyProcessorImpl) processWorkflowTimeout(timerTask *persi
// we do not need to notity new timer to base, since if there is no new event being replicated
// checking again if the timer can be completed is meaningless

ok, err := verifyTimerTaskVersion(t.shard, timerTask.DomainID, msBuilder.GetStartVersion(), timerTask)
ok, err := verifyTaskVersion(t.shard, t.logger, timerTask.DomainID, msBuilder.GetStartVersion(), timerTask.Version, timerTask)
if err != nil {
return err
} else if !ok {
Expand Down
32 changes: 8 additions & 24 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,7 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history
logging.TagWorkflowCluster: currentClusterName,
})
transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID)
if err != nil {
// it is possible that the domain is deleted
// we should treat that domain as active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
return false, err
}
return true, nil
}
if domainEntry.IsGlobalDomain() && currentClusterName != domainEntry.GetReplicationConfig().ActiveClusterName {
// timer task does not belong to cluster name
return false, nil
}
return true, nil
return verifyActiveTask(shard, logger, task.DomainID, task)
}
maxReadAckLevel := func() int64 {
return shard.GetTransferMaxReadLevel()
Expand Down Expand Up @@ -150,10 +137,7 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo
logging.TagFailover: "from: " + standbyClusterName,
})
transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) {
if task.DomainID == domainID {
return true, nil
}
return false, nil
return verifyFailoverActiveTask(logger, domainID, task.DomainID, task)
}
maxReadAckLevel := func() int64 {
return maxLevel // this is a const
Expand Down Expand Up @@ -282,7 +266,7 @@ func (t *transferQueueActiveProcessorImpl) processActivityTask(task *persistence
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeActivityTask, task.TaskID, task.ScheduleID)
return
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, ai.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ai.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -340,7 +324,7 @@ func (t *transferQueueActiveProcessorImpl) processDecisionTask(task *persistence
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TaskTypeDecisionTimeout, task.TaskID, task.ScheduleID)
return nil
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, di.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, di.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -411,7 +395,7 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(task *persisten
return nil
}

ok, err := verifyTransferTaskVersion(t.shard, domainID, msBuilder.GetLastWriteVersion(), task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, msBuilder.GetLastWriteVersion(), task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -525,7 +509,7 @@ func (t *transferQueueActiveProcessorImpl) processCancelExecution(task *persiste
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeCancelExecution, task.TaskID, task.ScheduleID)
return nil
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, ri.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ri.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -648,7 +632,7 @@ func (t *transferQueueActiveProcessorImpl) processSignalExecution(task *persiste
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeCancelExecution, task.TaskID, task.ScheduleID)
return nil
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, si.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, si.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -802,7 +786,7 @@ func (t *transferQueueActiveProcessorImpl) processStartChildExecution(task *pers
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeStartChildExecution, task.TaskID, task.ScheduleID)
return nil
}
ok, err := verifyTransferTaskVersion(t.shard, domainID, ci.Version, task)
ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ci.Version, task.Version, task)
if err != nil {
return err
} else if !ok {
Expand Down
Loading