diff --git a/service/worker/processor.go b/service/worker/processor.go index a688539f530..d3eaa8498f4 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -21,6 +21,8 @@ package worker import ( + "errors" + "math" "sync" "sync/atomic" "time" @@ -43,6 +45,32 @@ import ( "go.uber.org/yarpc/yarpcerrors" ) +type workerStatus int + +var errMaxAttemptReached = errors.New("Maximum attempts exceeded") + +const ( + workerStatusRunning workerStatus = iota + workerStatusPendingRetry +) + +const ( + // below are the max retry count for worker + + // [0, 0.6) percentage max retry count + retryCountInfinity int64 = math.MaxInt64 // using int64 max as infinity + // [0.6, 0.7) percentage max retry count + retryCount70PercentInRetry int64 = 256 + // [0.7, 0.8) percentage max retry count + retryCount80PercentInRetry int64 = 128 + // [0.8, 0.9) percentage max retry count + retryCount90PercentInRetry int64 = 64 + // [0.9, 0.95) percentage max retry count + retryCount95PercentInRetry int64 = 32 + // [0.95, 1] percentage max retry count + retryCount100PercentInRetry int64 = 8 +) + type ( // DomainReplicator is the interface which can replicate the domain DomainReplicator interface { @@ -65,6 +93,11 @@ type ( metricsClient metrics.Client domainReplicator DomainReplicator historyClient history.Client + + // worker in retry count is used by underlying processor when doing retry on a task + // this help the replicator / underlying processor understanding the overall + // situation and giveup retrying + workerInRetryCount int32 } ) @@ -97,9 +130,10 @@ func newReplicationTaskProcessor(currentCluster, sourceCluster, consumer string, logging.TagSourceCluster: sourceCluster, logging.TagConsumerName: consumer, }), - metricsClient: metricsClient, - domainReplicator: domainReplicator, - historyClient: historyClient, + metricsClient: metricsClient, + domainReplicator: domainReplicator, + historyClient: historyClient, + workerInRetryCount: 0, } } @@ -145,13 +179,52 @@ func (p *replicationTaskProcessor) Stop() { } } +func (p *replicationTaskProcessor) updateWorkerRetryStatus(isInRetry bool) { + if isInRetry { + atomic.AddInt32(&p.workerInRetryCount, 1) + } else { + atomic.AddInt32(&p.workerInRetryCount, -1) + } +} + +// getRemainingRetryCount returns the max retry count at the moment +func (p *replicationTaskProcessor) getRemainingRetryCount(remainingRetryCount int64) int64 { + workerInRetry := float64(atomic.LoadInt32(&p.workerInRetryCount)) + numWorker := float64(p.config.ReplicatorConcurrency) + retryPercentage := workerInRetry / numWorker + + min := func(i int64, j int64) int64 { + if i < j { + return i + } + return j + } + + if retryPercentage < 0.6 { + return min(remainingRetryCount, retryCountInfinity) + } + if retryPercentage < 0.7 { + return min(remainingRetryCount, retryCount70PercentInRetry) + } + if retryPercentage < 0.8 { + return min(remainingRetryCount, retryCount80PercentInRetry) + } + if retryPercentage < 0.9 { + return min(remainingRetryCount, retryCount90PercentInRetry) + } + if retryPercentage < 0.95 { + return min(remainingRetryCount, retryCount95PercentInRetry) + } + return min(remainingRetryCount, retryCount100PercentInRetry) +} + func (p *replicationTaskProcessor) processorPump() { defer p.shutdownWG.Done() var workerWG sync.WaitGroup - for i := 0; i < p.config.ReplicatorConcurrency; i++ { + for workerID := 0; workerID < p.config.ReplicatorConcurrency; workerID++ { workerWG.Add(1) - go p.worker(&workerWG) + go p.messageProcessLoop(&workerWG, workerID) } select { @@ -166,7 +239,7 @@ func (p *replicationTaskProcessor) processorPump() { } } -func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { +func (p *replicationTaskProcessor) messageProcessLoop(workerWG *sync.WaitGroup, workerID int) { defer workerWG.Done() for { @@ -176,44 +249,70 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { p.logger.Info("Worker for replication task processor shutting down.") return // channel closed } + p.processWithRetry(msg, workerID) + case <-p.consumer.Closed(): + p.logger.Info("Consumer closed. Processor shutting down.") + return + } + } +} - var err error - ProcessRetryLoop: - for { - select { - case <-p.shutdownCh: - return - default: - op := func() error { - return p.process(msg) - } - err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError) - if err != nil && p.isTransientRetryableError(err) { - // Keep on retrying transient errors for ever - continue ProcessRetryLoop - } +func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID int) { + var err error + + isInRetry := false + remainingRetryCount := retryCountInfinity + defer func() { + if isInRetry { + p.updateWorkerRetryStatus(false) + } + }() + +ProcessRetryLoop: + for { + select { + case <-p.shutdownCh: + return + default: + op := func() error { + remainingRetryCount-- + if remainingRetryCount <= 0 { + return errMaxAttemptReached } - break ProcessRetryLoop + errMsg := p.process(msg) + if errMsg != nil && p.isTransientRetryableError(errMsg) { + // Keep on retrying transient errors for ever + if !isInRetry { + isInRetry = true + p.updateWorkerRetryStatus(true) + } + remainingRetryCount = p.getRemainingRetryCount(remainingRetryCount) + } + return errMsg } - if err == nil { - // Successfully processed replication task. Ack message to move the cursor forward. - msg.Ack() - } else { - // Task still failed after all retries. This is most probably due to a bug in replication code. - // Nack the task to move it to DLQ to not block replication for other workflow executions. - p.logger.WithFields(bark.Fields{ - logging.TagErr: err, - logging.TagPartitionKey: msg.Partition(), - logging.TagOffset: msg.Offset(), - }).Error("Error processing replication task.") - msg.Nack() + err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError) + if err != nil && p.isTransientRetryableError(err) { + // Keep on retrying transient errors for ever + continue ProcessRetryLoop } - case <-p.consumer.Closed(): - p.logger.Info("Consumer closed. Processor shutting down.") - return } + break ProcessRetryLoop + } + + if err == nil { + // Successfully processed replication task. Ack message to move the cursor forward. + msg.Ack() + } else { + // Task still failed after all retries. This is most probably due to a bug in replication code. + // Nack the task to move it to DLQ to not block replication for other workflow executions. + p.logger.WithFields(bark.Fields{ + logging.TagErr: err, + logging.TagPartitionKey: msg.Partition(), + logging.TagOffset: msg.Offset(), + }).Error("Error processing replication task.") + msg.Nack() } } @@ -337,9 +436,9 @@ func (p *replicationTaskProcessor) isTransientRetryableError(err error) bool { return true case *shared.RetryTaskError: return true + default: + return false } - - return false } func deserialize(payload []byte) (*replicator.ReplicationTask, error) {