Skip to content

Commit

Permalink
bugfix: should use shard's domain notification version (#818)
Browse files Browse the repository at this point in the history
* bugfix: should use shard's domain notification version
  • Loading branch information
wxing1292 authored Jun 6, 2018
1 parent 9eaa3dc commit b947875
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 3 deletions.
11 changes: 9 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,16 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
e.shard.GetShardID(),
e.shard.GetDomainCache().GetDomainNotificationVersion(),
func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) {
domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion()
shardNotificationVersion := e.shard.GetDomainNotificationVersion()
domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName

e.logger.Infof("Domain Failover Event: Shard: %v, Domain: %v, ID: %v, Failover Notification Version: %v, Active Cluster: %v, Shard Domain Notification Version: %v\n",
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID, domainFailoverNotificationVersion, domainActiveCluster, shardNotificationVersion)

if nextDomain.IsGlobalDomain() &&
nextDomain.GetFailoverNotificationVersion() >= e.shard.GetDomainCache().GetDomainNotificationVersion() &&
nextDomain.GetReplicationConfig().ActiveClusterName == e.currentClusterName {
domainFailoverNotificationVersion >= shardNotificationVersion &&
domainActiveCluster == e.currentClusterName {
domainID := prevDomain.GetInfo().ID
e.txProcessor.FailoverDomain(domainID)
e.timerProcessor.FailoverDomain(domainID)
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE
}
logger = logger.WithFields(bark.Fields{
logging.TagWorkflowCluster: clusterName,
logging.TagDomainID: domainID,
logging.TagFailover: "from: " + standbyClusterName,
})
timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ func (t *timerQueueProcessorImpl) FailoverDomain(domainID string) {
standbyClusterName = cluster
}
}

// the ack manager is exclusive, so just add a cassandra min presicition
maxLevel := t.activeTimerProcessor.timerQueueAckMgr.getReadLevel().VisibilityTimestamp.Add(1 * time.Millisecond)
t.logger.Infof("Timer Failover Triggered: %v, min level: %v, max level: %v.\n", domainID, minLevel, maxLevel)
// we should consider make the failover idempotent
failoverTimerProcessor := newTimerQueueFailoverProcessor(t.shard, t.historyService, domainID,
standbyClusterName, minLevel, maxLevel, t.matchingClient, t.logger)
Expand Down
1 change: 1 addition & 0 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
logger = logger.WithFields(bark.Fields{
logging.TagWorkflowCluster: currentClusterName,
logging.TagDomainID: domainID,
logging.TagFailover: "from: " + standbyClusterName,
})
transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) {
Expand Down
2 changes: 2 additions & 0 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ func (t *transferQueueProcessorImpl) FailoverDomain(domainID string) {
standbyClusterName = cluster
}
}

// the ack manager is exclusive, so add 1
maxLevel := t.activeTaskProcessor.getReadLevel() + 1
t.logger.Infof("Transfer Failover Triggered: %v, min level: %v, max level: %v.\n", domainID, minLevel, maxLevel)
failoverTaskProcessor := newTransferQueueFailoverProcessor(
t.shard, t.historyService, t.visibilityMgr, t.matchingClient, t.historyClient,
domainID, standbyClusterName, minLevel, maxLevel, t.logger,
Expand Down

0 comments on commit b947875

Please sign in to comment.