Skip to content

Commit

Permalink
dynamic set the retry for retryable error on worker processor (#883)
Browse files Browse the repository at this point in the history
* dynamic set the retry count for retry-able error on worker processor
  • Loading branch information
wxing1292 authored Jun 23, 2018
1 parent 5530848 commit 5cdfeb2
Showing 1 changed file with 138 additions and 39 deletions.
177 changes: 138 additions & 39 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package worker

import (
"errors"
"math"
"sync"
"sync/atomic"
"time"
Expand All @@ -43,6 +45,32 @@ import (
"go.uber.org/yarpc/yarpcerrors"
)

type workerStatus int

var errMaxAttemptReached = errors.New("Maximum attempts exceeded")

const (
workerStatusRunning workerStatus = iota
workerStatusPendingRetry
)

const (
// below are the max retry count for worker

// [0, 0.6) percentage max retry count
retryCountInfinity int64 = math.MaxInt64 // using int64 max as infinity
// [0.6, 0.7) percentage max retry count
retryCount70PercentInRetry int64 = 256
// [0.7, 0.8) percentage max retry count
retryCount80PercentInRetry int64 = 128
// [0.8, 0.9) percentage max retry count
retryCount90PercentInRetry int64 = 64
// [0.9, 0.95) percentage max retry count
retryCount95PercentInRetry int64 = 32
// [0.95, 1] percentage max retry count
retryCount100PercentInRetry int64 = 8
)

type (
// DomainReplicator is the interface which can replicate the domain
DomainReplicator interface {
Expand All @@ -65,6 +93,11 @@ type (
metricsClient metrics.Client
domainReplicator DomainReplicator
historyClient history.Client

// worker in retry count is used by underlying processor when doing retry on a task
// this help the replicator / underlying processor understanding the overall
// situation and giveup retrying
workerInRetryCount int32
}
)

Expand Down Expand Up @@ -97,9 +130,10 @@ func newReplicationTaskProcessor(currentCluster, sourceCluster, consumer string,
logging.TagSourceCluster: sourceCluster,
logging.TagConsumerName: consumer,
}),
metricsClient: metricsClient,
domainReplicator: domainReplicator,
historyClient: historyClient,
metricsClient: metricsClient,
domainReplicator: domainReplicator,
historyClient: historyClient,
workerInRetryCount: 0,
}
}

Expand Down Expand Up @@ -145,13 +179,52 @@ func (p *replicationTaskProcessor) Stop() {
}
}

func (p *replicationTaskProcessor) updateWorkerRetryStatus(isInRetry bool) {
if isInRetry {
atomic.AddInt32(&p.workerInRetryCount, 1)
} else {
atomic.AddInt32(&p.workerInRetryCount, -1)
}
}

// getRemainingRetryCount returns the max retry count at the moment
func (p *replicationTaskProcessor) getRemainingRetryCount(remainingRetryCount int64) int64 {
workerInRetry := float64(atomic.LoadInt32(&p.workerInRetryCount))
numWorker := float64(p.config.ReplicatorConcurrency)
retryPercentage := workerInRetry / numWorker

min := func(i int64, j int64) int64 {
if i < j {
return i
}
return j
}

if retryPercentage < 0.6 {
return min(remainingRetryCount, retryCountInfinity)
}
if retryPercentage < 0.7 {
return min(remainingRetryCount, retryCount70PercentInRetry)
}
if retryPercentage < 0.8 {
return min(remainingRetryCount, retryCount80PercentInRetry)
}
if retryPercentage < 0.9 {
return min(remainingRetryCount, retryCount90PercentInRetry)
}
if retryPercentage < 0.95 {
return min(remainingRetryCount, retryCount95PercentInRetry)
}
return min(remainingRetryCount, retryCount100PercentInRetry)
}

func (p *replicationTaskProcessor) processorPump() {
defer p.shutdownWG.Done()

var workerWG sync.WaitGroup
for i := 0; i < p.config.ReplicatorConcurrency; i++ {
for workerID := 0; workerID < p.config.ReplicatorConcurrency; workerID++ {
workerWG.Add(1)
go p.worker(&workerWG)
go p.messageProcessLoop(&workerWG, workerID)
}

select {
Expand All @@ -166,7 +239,7 @@ func (p *replicationTaskProcessor) processorPump() {
}
}

func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) {
func (p *replicationTaskProcessor) messageProcessLoop(workerWG *sync.WaitGroup, workerID int) {
defer workerWG.Done()

for {
Expand All @@ -176,44 +249,70 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) {
p.logger.Info("Worker for replication task processor shutting down.")
return // channel closed
}
p.processWithRetry(msg, workerID)
case <-p.consumer.Closed():
p.logger.Info("Consumer closed. Processor shutting down.")
return
}
}
}

var err error
ProcessRetryLoop:
for {
select {
case <-p.shutdownCh:
return
default:
op := func() error {
return p.process(msg)
}
err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError)
if err != nil && p.isTransientRetryableError(err) {
// Keep on retrying transient errors for ever
continue ProcessRetryLoop
}
func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID int) {
var err error

isInRetry := false
remainingRetryCount := retryCountInfinity
defer func() {
if isInRetry {
p.updateWorkerRetryStatus(false)
}
}()

ProcessRetryLoop:
for {
select {
case <-p.shutdownCh:
return
default:
op := func() error {
remainingRetryCount--
if remainingRetryCount <= 0 {
return errMaxAttemptReached
}

break ProcessRetryLoop
errMsg := p.process(msg)
if errMsg != nil && p.isTransientRetryableError(errMsg) {
// Keep on retrying transient errors for ever
if !isInRetry {
isInRetry = true
p.updateWorkerRetryStatus(true)
}
remainingRetryCount = p.getRemainingRetryCount(remainingRetryCount)
}
return errMsg
}

if err == nil {
// Successfully processed replication task. Ack message to move the cursor forward.
msg.Ack()
} else {
// Task still failed after all retries. This is most probably due to a bug in replication code.
// Nack the task to move it to DLQ to not block replication for other workflow executions.
p.logger.WithFields(bark.Fields{
logging.TagErr: err,
logging.TagPartitionKey: msg.Partition(),
logging.TagOffset: msg.Offset(),
}).Error("Error processing replication task.")
msg.Nack()
err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError)
if err != nil && p.isTransientRetryableError(err) {
// Keep on retrying transient errors for ever
continue ProcessRetryLoop
}
case <-p.consumer.Closed():
p.logger.Info("Consumer closed. Processor shutting down.")
return
}
break ProcessRetryLoop
}

if err == nil {
// Successfully processed replication task. Ack message to move the cursor forward.
msg.Ack()
} else {
// Task still failed after all retries. This is most probably due to a bug in replication code.
// Nack the task to move it to DLQ to not block replication for other workflow executions.
p.logger.WithFields(bark.Fields{
logging.TagErr: err,
logging.TagPartitionKey: msg.Partition(),
logging.TagOffset: msg.Offset(),
}).Error("Error processing replication task.")
msg.Nack()
}
}

Expand Down Expand Up @@ -337,9 +436,9 @@ func (p *replicationTaskProcessor) isTransientRetryableError(err error) bool {
return true
case *shared.RetryTaskError:
return true
default:
return false
}

return false
}

func deserialize(payload []byte) (*replicator.ReplicationTask, error) {
Expand Down

0 comments on commit 5cdfeb2

Please sign in to comment.