Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some behavior change on worker #847

Merged
merged 5 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ var (
// ErrRetryEntityNotExists is returned to indicate workflow execution is not created yet and replicator should
// try this task again after a small delay.
ErrRetryEntityNotExists = &shared.RetryTaskError{Message: "workflow execution not found"}
// ErrRetryExecutionAlreadyStarted is returned to indicate another workflow execution already started,
// this error can be return if we encounter race condition, i.e. terminating the target workflow while
// the target workflow has done continue as new.
// try this task again after a small delay.
ErrRetryExecutionAlreadyStarted = &shared.RetryTaskError{Message: "another workflow execution is running"}
// ErrMissingReplicationInfo is returned when replication task is missing replication information from source cluster
ErrMissingReplicationInfo = &shared.BadRequestError{Message: "replication task is missing cluster replication info"}
// ErrCorruptedReplicationInfo is returned when replication task has corrupted replication information from source cluster
Expand Down Expand Up @@ -99,9 +104,15 @@ func newHistoryReplicator(shard ShardContext, historyEngine *historyEngineImpl,

func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retError error) {
defer func() {
if _, ok := retError.(*shared.EntityNotExistsError); ok {
r.logger.Warnf("Encounter EntityNotExistsError: %v", retError)
retError = ErrRetryEntityNotExists
if retError != nil {
switch retError.(type) {
case *shared.EntityNotExistsError:
r.logger.Warnf("Encounter EntityNotExistsError: %v", retError)
retError = ErrRetryEntityNotExists
case *shared.WorkflowExecutionAlreadyStartedError:
r.logger.Warnf("Encounter WorkflowExecutionAlreadyStartedError: %v", retError)
retError = ErrRetryExecutionAlreadyStarted
}
}
}()

Expand Down
19 changes: 5 additions & 14 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) {

var err error
ProcessRetryLoop:
for retryCount := 1; retryCount <= p.config.ReplicatorMaxRetryCount; {
for {
select {
case <-p.shutdownCh:
return
Expand All @@ -188,20 +188,9 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) {
return p.process(msg)
}
err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError)
if err != nil {
// Check if this is an explicit ask to retry the task by handler
if _, ok := err.(*shared.RetryTaskError); ok {
// Increment the retryCount as we will retry the error upto ReplicatorMaxRetryCount before moving
// it to DLQ
retryCount++
time.Sleep(p.config.ReplicatorRetryDelay)
continue ProcessRetryLoop
}

if err != nil && p.isTransientRetryableError(err) {
// Keep on retrying transient errors for ever
if p.isTransientRetryableError(err) {
continue ProcessRetryLoop
}
continue ProcessRetryLoop
}
}

Expand Down Expand Up @@ -331,6 +320,8 @@ func (p *replicationTaskProcessor) isTransientRetryableError(err error) bool {
return true
case *shared.InternalServiceError:
return true
case *shared.RetryTaskError:
return true
}

return false
Expand Down
7 changes: 1 addition & 6 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
package worker

import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand All @@ -45,7 +43,6 @@ type (
// Replicator settings
ReplicatorConcurrency int
ReplicatorMaxRetryCount int
ReplicatorRetryDelay time.Duration
}
)

Expand All @@ -61,9 +58,7 @@ func NewService(params *service.BootstrapParams) common.Daemon {
// NewConfig builds the new Config for cadence-worker service
func NewConfig() *Config {
return &Config{
ReplicatorConcurrency: 10,
ReplicatorMaxRetryCount: 20,
ReplicatorRetryDelay: 50 * time.Millisecond,
ReplicatorConcurrency: 1000,
}
}

Expand Down