diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 73c850c4e8b..90646c0ad34 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -199,9 +199,16 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() { e.shard.GetShardID(), e.shard.GetDomainCache().GetDomainNotificationVersion(), func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) { + domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion() + shardNotificationVersion := e.shard.GetDomainNotificationVersion() + domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName + + e.logger.Infof("Domain Failover Event: Shard: %v, Domain: %v, ID: %v, Failover Notification Version: %v, Active Cluster: %v, Shard Domain Notification Version: %v\n", + e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID, domainFailoverNotificationVersion, domainActiveCluster, shardNotificationVersion) + if nextDomain.IsGlobalDomain() && - nextDomain.GetFailoverNotificationVersion() >= e.shard.GetDomainCache().GetDomainNotificationVersion() && - nextDomain.GetReplicationConfig().ActiveClusterName == e.currentClusterName { + domainFailoverNotificationVersion >= shardNotificationVersion && + domainActiveCluster == e.currentClusterName { domainID := prevDomain.GetInfo().ID e.txProcessor.FailoverDomain(domainID) e.timerProcessor.FailoverDomain(domainID) diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index 6f73bfa08c7..aeb4d8986b6 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -105,6 +105,7 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE } logger = logger.WithFields(bark.Fields{ logging.TagWorkflowCluster: clusterName, + logging.TagDomainID: domainID, logging.TagFailover: "from: " + standbyClusterName, }) timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) { diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index d6ec1b54734..f50b67709db 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -135,9 +135,9 @@ func (t *timerQueueProcessorImpl) FailoverDomain(domainID string) { standbyClusterName = cluster } } - // the ack manager is exclusive, so just add a cassandra min presicition maxLevel := t.activeTimerProcessor.timerQueueAckMgr.getReadLevel().VisibilityTimestamp.Add(1 * time.Millisecond) + t.logger.Infof("Timer Failover Triggered: %v, min level: %v, max level: %v.\n", domainID, minLevel, maxLevel) // we should consider make the failover idempotent failoverTimerProcessor := newTimerQueueFailoverProcessor(t.shard, t.historyService, domainID, standbyClusterName, minLevel, maxLevel, t.matchingClient, t.logger) diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 84d7affc627..5cc5a07e6b0 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -151,6 +151,7 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() logger = logger.WithFields(bark.Fields{ logging.TagWorkflowCluster: currentClusterName, + logging.TagDomainID: domainID, logging.TagFailover: "from: " + standbyClusterName, }) transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) { diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index 4843c705e8c..16858db6f1a 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -147,8 +147,10 @@ func (t *transferQueueProcessorImpl) FailoverDomain(domainID string) { standbyClusterName = cluster } } + // the ack manager is exclusive, so add 1 maxLevel := t.activeTaskProcessor.getReadLevel() + 1 + t.logger.Infof("Transfer Failover Triggered: %v, min level: %v, max level: %v.\n", domainID, minLevel, maxLevel) failoverTaskProcessor := newTransferQueueFailoverProcessor( t.shard, t.historyService, t.visibilityMgr, t.matchingClient, t.historyClient, domainID, standbyClusterName, minLevel, maxLevel, t.logger,