Skip to content

Commit

Permalink
Bugfix: failover trigger should also notify activer timer / transfer … (
Browse files Browse the repository at this point in the history
#886)

* bugfix: failover trigger should also notify activer timer / transfer processor

* add metrics to worker retry percentage
  • Loading branch information
wxing1292 authored Jun 25, 2018
1 parent 622d1a0 commit ec2c6da
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 53 deletions.
44 changes: 31 additions & 13 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
// requests using the stale entry from cache upto an hour
DomainCache interface {
common.Daemon
RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, fn CallbackFn)
RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, beforeCallback CallbackFn, afterCallback CallbackFn)
UnregisterDomainChangeCallback(shard int)
GetDomain(name string) (*DomainCacheEntry, error)
GetDomainByID(id string) (*DomainCacheEntry, error)
Expand All @@ -89,7 +89,8 @@ type (

sync.RWMutex
domainNotificationVersion int64
callbacks map[int]CallbackFn
beforeCallbacks map[int]CallbackFn
afterCallbacks map[int]CallbackFn
}

// DomainCacheEntries is DomainCacheEntry slice
Expand Down Expand Up @@ -126,7 +127,8 @@ func NewDomainCache(metadataMgr persistence.MetadataManager, clusterMetadata clu
clusterMetadata: clusterMetadata,
timeSource: common.NewRealTimeSource(),
logger: logger,
callbacks: make(map[int]CallbackFn),
beforeCallbacks: make(map[int]CallbackFn),
afterCallbacks: make(map[int]CallbackFn),
}
}

Expand Down Expand Up @@ -178,14 +180,17 @@ func (c *domainCache) GetAllDomain() map[string]*DomainCacheEntry {
}

// RegisterDomainChangeCallback set a domain change callback
// WARN: the callback function will be triggered by domain cache when holding the domain cache lock,
// WARN: the beforeCallback function will be triggered by domain cache when holding the domain cache lock,
// make sure the callback function will not call domain cache again in case of dead lock
func (c *domainCache) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, fn CallbackFn) {
// afterCallback will be invoked when NOT holding the domain cache lock.
func (c *domainCache) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, beforeCallback CallbackFn, afterCallback CallbackFn) {
c.Lock()
defer c.Unlock()

c.callbacks[shard] = fn
c.beforeCallbacks[shard] = beforeCallback
c.afterCallbacks[shard] = afterCallback

// this section is trying to make the shard catch up with domain changes
if c.domainNotificationVersion > initialNotificationVersion {
domains := DomainCacheEntries{}
for _, domain := range c.GetAllDomain() {
Expand All @@ -197,7 +202,8 @@ func (c *domainCache) RegisterDomainChangeCallback(shard int, initialNotificatio
sort.Sort(domains)
for _, domain := range domains {
if domain.notificationVersion >= initialNotificationVersion {
fn(nil, domain)
beforeCallback(nil, domain)
afterCallback(nil, domain)
}
}
}
Expand All @@ -208,7 +214,8 @@ func (c *domainCache) UnregisterDomainChangeCallback(shard int) {
c.Lock()
defer c.Unlock()

delete(c.callbacks, shard)
delete(c.beforeCallbacks, shard)
delete(c.afterCallbacks, shard)
}

// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
Expand Down Expand Up @@ -343,9 +350,8 @@ func (c *domainCache) updateIDToDomainCache(id string, record *DomainCacheEntry)
return nil, err
}
entry := elem.(*DomainCacheEntry)
entry.Lock()
defer entry.Unlock()

entry.Lock()
var prevDomain *DomainCacheEntry
triggerCallback := c.clusterMetadata.IsGlobalDomainEnabled() &&
// expiry will be non zero when the entry is initialized / valid
Expand All @@ -368,7 +374,11 @@ func (c *domainCache) updateIDToDomainCache(id string, record *DomainCacheEntry)

nextDomain := entry.duplicate()
if triggerCallback {
c.triggerDomainChangeCallback(prevDomain, nextDomain)
c.triggerDomainBeforeChangeCallback(prevDomain, nextDomain)
}
entry.Unlock()
if triggerCallback {
c.triggerDomainAfterChangeCallback(prevDomain, nextDomain)
}

return nextDomain, nil
Expand Down Expand Up @@ -445,10 +455,18 @@ func (c *domainCache) getDomainByID(id string) (*DomainCacheEntry, error) {
return newEntry, nil
}

func (c *domainCache) triggerDomainChangeCallback(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
func (c *domainCache) triggerDomainBeforeChangeCallback(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
c.RLock()
defer c.RUnlock()
for _, callback := range c.beforeCallbacks {
callback(prevDomain, nextDomain)
}
}

func (c *domainCache) triggerDomainAfterChangeCallback(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
c.RLock()
defer c.RUnlock()
for _, callback := range c.callbacks {
for _, callback := range c.afterCallbacks {
callback(prevDomain, nextDomain)
}
}
Expand Down
6 changes: 3 additions & 3 deletions common/cache/domainCache_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func (_m *DomainCacheMock) GetDomainNotificationVersion() int64 {
return r0
}

// RegisterDomainChangeCallback provides a mock function with given fields: shard, initialNotificationVersion, fn
func (_m *DomainCacheMock) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, fn CallbackFn) {
_m.Called(shard, initialNotificationVersion, fn)
// RegisterDomainChangeCallback provides a mock function with given fields: shard, initialNotificationVersion, beforeCallback, afterCallback
func (_m *DomainCacheMock) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, beforeCallback CallbackFn, afterCallback CallbackFn) {
_m.Called(shard, initialNotificationVersion, beforeCallback, afterCallback)
}

// Start provides a mock function with given fields:
Expand Down
81 changes: 58 additions & 23 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,26 @@ func (s *domainCacheSuite) TestRegisterCallback_CatchUp() {
s.Nil(s.domainCache.refreshDomains())
s.Equal(domainNotificationVersion, s.domainCache.GetDomainNotificationVersion())

entriesNotification := []*DomainCacheEntry{}
entriesNotificationBefore := []*DomainCacheEntry{}
entriesNotificationAfter := []*DomainCacheEntry{}
// we are not testing catching up, so make this really large
currentDomainNotificationVersion := int64(0)
s.domainCache.RegisterDomainChangeCallback(0, currentDomainNotificationVersion, func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Nil(prevDomain)
entriesNotification = append(entriesNotification, nextDomain)
})
s.domainCache.RegisterDomainChangeCallback(
0,
currentDomainNotificationVersion,
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Nil(prevDomain)
entriesNotificationBefore = append(entriesNotificationBefore, nextDomain)
},
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Nil(prevDomain)
entriesNotificationAfter = append(entriesNotificationAfter, nextDomain)
},
)

// the order matters here, should be ordered by notification version
s.Equal([]*DomainCacheEntry{entry1, entry2}, entriesNotification)
s.Equal([]*DomainCacheEntry{entry1, entry2}, entriesNotificationBefore)
s.Equal([]*DomainCacheEntry{entry1, entry2}, entriesNotificationAfter)
}

func (s *domainCacheSuite) TestUpdateCache_ListTrigger() {
Expand Down Expand Up @@ -377,16 +387,28 @@ func (s *domainCacheSuite) TestUpdateCache_ListTrigger() {
entry1New := s.buildEntryFromRecord(domainRecord1New)
domainNotificationVersion++

entriesOld := []*DomainCacheEntry{}
entriesNew := []*DomainCacheEntry{}
entriesOldBefore := []*DomainCacheEntry{}
entriesNewBefore := []*DomainCacheEntry{}
entriesOldAfter := []*DomainCacheEntry{}
entriesNewAfter := []*DomainCacheEntry{}
// we are not testing catching up, so make this really large
currentDomainNotificationVersion := int64(9999999)
s.domainCache.RegisterDomainChangeCallback(0, currentDomainNotificationVersion, func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
entriesOld = append(entriesOld, prevDomain)
entriesNew = append(entriesNew, nextDomain)
})
s.Empty(entriesOld)
s.Empty(entriesNew)
s.domainCache.RegisterDomainChangeCallback(
0,
currentDomainNotificationVersion,
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
entriesOldBefore = append(entriesOldBefore, prevDomain)
entriesNewBefore = append(entriesNewBefore, nextDomain)
},
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
entriesOldAfter = append(entriesOldAfter, prevDomain)
entriesNewAfter = append(entriesNewAfter, nextDomain)
},
)
s.Empty(entriesOldBefore)
s.Empty(entriesNewBefore)
s.Empty(entriesOldAfter)
s.Empty(entriesNewAfter)

s.metadataMgr.On("GetMetadata").Return(&persistence.GetMetadataResponse{NotificationVersion: domainNotificationVersion}, nil).Once()
s.metadataMgr.On("ListDomains", &persistence.ListDomainsRequest{
Expand All @@ -402,8 +424,10 @@ func (s *domainCacheSuite) TestUpdateCache_ListTrigger() {
// the record 1 got updated later, thus a higher notification version.
// making sure notifying from lower to higher version helps the shard to keep track the
// domain change events
s.Equal([]*DomainCacheEntry{entry2Old, entry1Old}, entriesOld)
s.Equal([]*DomainCacheEntry{entry2New, entry1New}, entriesNew)
s.Equal([]*DomainCacheEntry{entry2Old, entry1Old}, entriesOldBefore)
s.Equal([]*DomainCacheEntry{entry2New, entry1New}, entriesNewBefore)
s.Equal([]*DomainCacheEntry{entry2Old, entry1Old}, entriesOldAfter)
s.Equal([]*DomainCacheEntry{entry2New, entry1New}, entriesNewAfter)
}

func (s *domainCacheSuite) TestUpdateCache_GetNotTrigger() {
Expand Down Expand Up @@ -442,19 +466,30 @@ func (s *domainCacheSuite) TestUpdateCache_GetNotTrigger() {
s.Nil(err)
s.Equal(entryOld, entry)

callbackInvoked := false
callbackBeforeInvoked := false
callbackAfterInvoked := false
// we are not testing catching up, so make this really large
currentDomainNotificationVersion := int64(9999999)
s.domainCache.RegisterDomainChangeCallback(0, currentDomainNotificationVersion, func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Equal(entryOld, prevDomain)
s.Equal(entryNew, nextDomain)
callbackInvoked = true
})
s.domainCache.RegisterDomainChangeCallback(
0,
currentDomainNotificationVersion,
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Equal(entryOld, prevDomain)
s.Equal(entryNew, nextDomain)
callbackBeforeInvoked = true
},
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Equal(entryOld, prevDomain)
s.Equal(entryNew, nextDomain)
callbackAfterInvoked = true
},
)

entry, err = s.domainCache.updateIDToDomainCache(domainRecordNew.Info.ID, entryNew)
s.Nil(err)
s.Equal(entryNew, entry)
s.False(callbackInvoked)
s.False(callbackBeforeInvoked)
s.False(callbackAfterInvoked)
}

func (s *domainCacheSuite) TestGetUpdateCache_ConcurrentAccess() {
Expand Down
8 changes: 5 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ const (
ReplicatorMessages = iota + NumCommonMetrics
ReplicatorFailures
ReplicatorLatency
ReplicatorRetryPercentage
)

// MetricDefs record the metrics for all services
Expand Down Expand Up @@ -801,9 +802,10 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
BufferThrottleCounter: {metricName: "buffer.throttle.count"},
},
Worker: {
ReplicatorMessages: {metricName: "replicator.messages"},
ReplicatorFailures: {metricName: "replicator.errors"},
ReplicatorLatency: {metricName: "replicator.latency"},
ReplicatorMessages: {metricName: "replicator.messages"},
ReplicatorFailures: {metricName: "replicator.errors"},
ReplicatorLatency: {metricName: "replicator.latency"},
ReplicatorRetryPercentage: {metricName: "replicator.retry-percentage", metricType: Gauge},
},
}

Expand Down
48 changes: 37 additions & 11 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,26 +194,52 @@ func (e *historyEngineImpl) Stop() {
}

func (e *historyEngineImpl) registerDomainFailoverCallback() {

failoverPredicate := func(nextDomain *cache.DomainCacheEntry, action func()) {
domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion()
shardNotificationVersion := e.shard.GetDomainNotificationVersion()
domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName

if nextDomain.IsGlobalDomain() &&
domainFailoverNotificationVersion >= shardNotificationVersion &&
domainActiveCluster == e.currentClusterName {
action()
}
}

// first set the failover callback
e.shard.GetDomainCache().RegisterDomainChangeCallback(
e.shard.GetShardID(),
e.shard.GetDomainCache().GetDomainNotificationVersion(),
// before the domain change, this will be invoked when (most of time) domain cache is locked
func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) {
domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion()
shardNotificationVersion := e.shard.GetDomainNotificationVersion()
domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName

e.logger.Infof("Domain Change Event: Shard: %v, Domain: %v, ID: %v, Failover Notification Version: %v, Active Cluster: %v, Shard Domain Notification Version: %v\n",
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID, domainFailoverNotificationVersion, domainActiveCluster, shardNotificationVersion)
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID,
nextDomain.GetFailoverNotificationVersion(), nextDomain.GetReplicationConfig().ActiveClusterName, e.shard.GetDomainNotificationVersion())

if nextDomain.IsGlobalDomain() &&
domainFailoverNotificationVersion >= shardNotificationVersion &&
domainActiveCluster == e.currentClusterName {
domainID := prevDomain.GetInfo().ID
failoverPredicate(nextDomain, func() {
e.logger.Infof("Domain Failover Start: Shard: %v, Domain: %v, ID: %v\n",
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID)

domainID := nextDomain.GetInfo().ID
e.txProcessor.FailoverDomain(domainID)
e.timerProcessor.FailoverDomain(domainID)
}

})
},
// after the domain change, this will be invoked when domain cache is NOT locked
func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) {
failoverPredicate(nextDomain, func() {
e.logger.Infof("Domain Failover Notify Active: Shard: %v, Domain: %v, ID: %v\n",
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID)

now := e.shard.GetTimeSource().Now()
// the fake tasks will not be actually used, we just need to make sure
// its length > 0 and has correct timestamp, to trkgger a db scan
fakeDecisionTask := []persistence.Task{&persistence.DecisionTask{}}
fakeDecisionTimeoutTask := []persistence.Task{&persistence.DecisionTimeoutTask{VisibilityTimestamp: now}}
e.txProcessor.NotifyNewTask(e.currentClusterName, now, fakeDecisionTask)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, now, fakeDecisionTimeoutTask)
})
e.shard.UpdateDomainNotificationVersion(nextDomain.GetNotificationVersion() + 1)
},
)
Expand Down
2 changes: 2 additions & 0 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ func (p *replicationTaskProcessor) getRemainingRetryCount(remainingRetryCount in
numWorker := float64(p.config.ReplicatorConcurrency)
retryPercentage := workerInRetry / numWorker

p.metricsClient.UpdateGauge(metrics.ReplicatorScope, metrics.ReplicatorRetryPercentage, retryPercentage)

min := func(i int64, j int64) int64 {
if i < j {
return i
Expand Down

0 comments on commit ec2c6da

Please sign in to comment.