Skip to content

Commit

Permalink
Few performance optimization. (#960)
Browse files Browse the repository at this point in the history
* Use exponential backoff for task processing
* Do not send shard time sync if has sent out replication task
  • Loading branch information
wxing1292 authored Jul 12, 2018
1 parent 23ba2b9 commit 302f239
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 99 deletions.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var keys = map[Key]string{
HistoryMgrNumConns: "history.historyMgrNumConns",
MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch",
ShardUpdateMinInterval: "history.shardUpdateMinInterval",
ShardSyncMinInterval: "history.shardSyncMinInterval",

// worker settings
WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
Expand Down Expand Up @@ -278,6 +279,8 @@ const (
MaximumBufferedEventsBatch
// ShardUpdateMinInterval is the minimal time interval which the shard info can be updated
ShardUpdateMinInterval
// ShardSyncMinInterval is the minimal time interval which the shard info should be sync to remote
ShardSyncMinInterval

// key for histoworkerry

Expand Down
2 changes: 1 addition & 1 deletion service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (h *Handler) Start() error {
h.domainCache = cache.NewDomainCache(h.metadataMgr, h.GetClusterMetadata(), h.GetMetricsClient(), h.GetLogger())
h.domainCache.Start()
h.controller = newShardController(h.Service, h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr,
h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient(), h.publisher)
h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient())
h.metricsClient = h.GetMetricsClient()
h.historyEventNotifier = newHistoryEventNotifier(h.GetMetricsClient(), h.config.GetShardID)
// events notifier must starts before controller
Expand Down
38 changes: 22 additions & 16 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
metricsClient metrics.Client
rateLimiter common.TokenBucket // Read rate limiter
ackMgr queueAckMgr
retryPolicy backoff.RetryPolicy

// worker coroutines notification
workerNotificationChans []chan struct{}
Expand Down Expand Up @@ -95,6 +96,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p
metricsClient: shard.GetMetricsClient(),
logger: logger,
ackMgr: queueAckMgr,
retryPolicy: common.CreatePersistanceRetryPolicy(),
lastPollTime: time.Time{},
}

Expand Down Expand Up @@ -248,8 +250,20 @@ func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{},
var logger bark.Logger
var err error
startTime := time.Now()

retryCount := 0
op := func() error {
err = p.processor.process(task)
if err != nil && err != ErrTaskRetry {
retryCount++
logger = p.initializeLoggerForTask(task, logger)
logging.LogTaskProcessingFailedEvent(logger, err)
}
return err
}

ProcessRetryLoop:
for retryCount := 1; retryCount <= p.options.MaxRetryCount(); {
for retryCount < p.options.MaxRetryCount() {
select {
case <-p.shutdownCh:
return
Expand All @@ -260,25 +274,17 @@ ProcessRetryLoop:
default:
}

err = p.processor.process(task)
err = backoff.Retry(op, p.retryPolicy, func(err error) bool {
return err != ErrTaskRetry
})

if err != nil {
if err == ErrTaskRetry {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskStandbyRetryCounter)
<-notificationChan
} else {
logger = p.initializeLoggerForTask(task, logger)
logging.LogTaskProcessingFailedEvent(logger, err)

// it is possible that DomainNotActiveError is thrown
// just keep try for cache.DomainCacheRefreshInterval
// and giveup
if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter)
return
}
backoff := time.Duration(retryCount * 100)
time.Sleep(backoff * time.Millisecond)
retryCount++
} else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter)
return
}
continue ProcessRetryLoop
}
Expand Down
75 changes: 57 additions & 18 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package history

import (
"errors"
"sync"
"time"

"github.com/uber-common/bark"
h "github.com/uber/cadence/.gen/go/history"
Expand All @@ -36,16 +38,20 @@ import (

type (
replicatorQueueProcessorImpl struct {
shard ShardContext
executionMgr persistence.ExecutionManager
historyMgr persistence.HistoryManager
hSerializerFactory persistence.HistorySerializerFactory
replicator messaging.Producer
metricsClient metrics.Client
options *QueueProcessorOptions
logger bark.Logger
currentClusterNamer string
shard ShardContext
executionMgr persistence.ExecutionManager
historyMgr persistence.HistoryManager
hSerializerFactory persistence.HistorySerializerFactory
replicator messaging.Producer
metricsClient metrics.Client
options *QueueProcessorOptions
logger bark.Logger
*queueProcessorBase
queueAckMgr

sync.Mutex
lastShardSyncTimestamp time.Time
}
)

Expand Down Expand Up @@ -76,14 +82,15 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc
})

processor := &replicatorQueueProcessorImpl{
shard: shard,
executionMgr: executionMgr,
historyMgr: historyMgr,
hSerializerFactory: hSerializerFactory,
replicator: replicator,
metricsClient: shard.GetMetricsClient(),
options: options,
logger: logger,
currentClusterNamer: shard.GetService().GetClusterMetadata().GetCurrentClusterName(),
shard: shard,
executionMgr: executionMgr,
historyMgr: historyMgr,
hSerializerFactory: hSerializerFactory,
replicator: replicator,
metricsClient: shard.GetMetricsClient(),
options: options,
logger: logger,
}

queueAckMgr := newQueueAckMgr(shard, options, processor, shard.GetReplicatorAckLevel(), logger)
Expand Down Expand Up @@ -166,7 +173,13 @@ func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persi
},
}

return p.replicator.Publish(replicationTask)
err = p.replicator.Publish(replicationTask)
if err == nil {
p.Lock()
p.lastShardSyncTimestamp = common.NewRealTimeSource().Now()
p.Unlock()
}
return err
}

func (p *replicatorQueueProcessorImpl) readTasks(readLevel int64) ([]queueTaskInfo, bool, error) {
Expand Down Expand Up @@ -195,7 +208,33 @@ func (p *replicatorQueueProcessorImpl) completeTask(taskID int64) error {
}

func (p *replicatorQueueProcessorImpl) updateAckLevel(ackLevel int64) error {
return p.shard.UpdateReplicatorAckLevel(ackLevel)
err := p.shard.UpdateReplicatorAckLevel(ackLevel)

// this is a hack, since there is not dedicated ticker on the queue processor
// to periodically send out sync shard message, put it here
now := common.NewRealTimeSource().Now()
sendSyncTask := false
p.Lock()
if p.lastShardSyncTimestamp.Add(p.shard.GetConfig().ShardSyncMinInterval()).Before(now) {
p.lastShardSyncTimestamp = now
sendSyncTask = true
}
p.Unlock()

if sendSyncTask {
syncStatusTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus),
SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{
SourceCluster: common.StringPtr(p.currentClusterNamer),
ShardId: common.Int64Ptr(int64(p.shard.GetShardID())),
Timestamp: common.Int64Ptr(now.UnixNano()),
},
}
// ignore the error
p.replicator.Publish(syncStatusTask)
}

return err
}

func (p *replicatorQueueProcessorImpl) getHistory(domainID, workflowID, runID string, firstEventID,
Expand Down
3 changes: 3 additions & 0 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type Config struct {

// ShardUpdateMinInterval the minimal time interval which the shard info can be updated
ShardUpdateMinInterval dynamicconfig.DurationPropertyFn
// ShardSyncMinInterval the minimal time interval which the shard info should be sync to remote
ShardSyncMinInterval dynamicconfig.DurationPropertyFn

// Time to hold a poll request before returning an empty response
// right now only used by GetMutableState
Expand Down Expand Up @@ -154,6 +156,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns, 50),
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),
// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain(
dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20,
Expand Down
27 changes: 6 additions & 21 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"time"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/messaging"

"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -90,14 +88,15 @@ type (
config *Config
logger bark.Logger
metricsClient metrics.Client
messageProducer messaging.Producer

sync.RWMutex
lastUpdated time.Time
shardInfo *persistence.ShardInfo
transferSequenceNumber int64
maxTransferSequenceNumber int64
transferMaxReadLevel int64

// exist only in memory
standbyClusterCurrentTime map[string]time.Time
}
)
Expand Down Expand Up @@ -588,26 +587,14 @@ func (s *shardContextImpl) updateMaxReadLevelLocked(rl int64) {
}

func (s *shardContextImpl) updateShardInfoLocked() error {
now := time.Now()
var err error
now := common.NewRealTimeSource().Now()
if s.lastUpdated.Add(s.config.ShardUpdateMinInterval()).After(now) {
return nil
}
updatedShardInfo := copyShardInfo(s.shardInfo)

if s.messageProducer != nil {
syncStatusTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus),
SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{
SourceCluster: common.StringPtr(s.currentCluster),
ShardId: common.Int64Ptr(int64(s.shardID)),
Timestamp: common.Int64Ptr(now.UnixNano()),
},
}
// ignore the error
s.messageProducer.Publish(syncStatusTask)
}

err := s.shardManager.UpdateShard(&persistence.UpdateShardRequest{
err = s.shardManager.UpdateShard(&persistence.UpdateShardRequest{
ShardInfo: updatedShardInfo,
PreviousRangeID: s.shardInfo.RangeID,
})
Expand Down Expand Up @@ -677,8 +664,7 @@ func (s *shardContextImpl) GetCurrentTime(cluster string) time.Time {
// TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter.
func acquireShard(shardID int, svc service.Service, shardManager persistence.ShardManager,
historyMgr persistence.HistoryManager, executionMgr persistence.ExecutionManager, domainCache cache.DomainCache,
owner string, closeCh chan<- int, config *Config, logger bark.Logger,
metricsClient metrics.Client, messageProducer messaging.Producer) (ShardContext,
owner string, closeCh chan<- int, config *Config, logger bark.Logger, metricsClient metrics.Client) (ShardContext,
error) {
response, err0 := shardManager.GetShard(&persistence.GetShardRequest{ShardID: shardID})
if err0 != nil {
Expand Down Expand Up @@ -712,7 +698,6 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha
shardInfo: updatedShardInfo,
closeCh: closeCh,
metricsClient: metricsClient,
messageProducer: messageProducer,
config: config,
standbyClusterCurrentTime: standbyClusterCurrentTime,
}
Expand Down
Loading

0 comments on commit 302f239

Please sign in to comment.