Skip to content

Commit

Permalink
Make worker retry on 2 level, make history replicator allow option to…
Browse files Browse the repository at this point in the history
… force buffer events (#894)
  • Loading branch information
wxing1292 authored Jun 26, 2018
1 parent 515df70 commit e5f07f2
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 24 deletions.
4 changes: 2 additions & 2 deletions .gen/go/history/idl.go

Large diffs are not rendered by default.

40 changes: 38 additions & 2 deletions .gen/go/history/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ struct ReplicationInfo {
}

struct ReplicateEventsRequest {
10: optional string sourceCluster
10: optional string sourceCluster
20: optional string domainUUID
30: optional shared.WorkflowExecution workflowExecution
40: optional i64 (js.type = "Long") firstEventId
Expand All @@ -225,6 +225,7 @@ struct ReplicateEventsRequest {
70: optional map<string, ReplicationInfo> replicationInfo
80: optional shared.History history
90: optional shared.History newRunHistory
100: optional bool forceBufferEvents
}

/**
Expand Down
11 changes: 10 additions & 1 deletion service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ var (
// ErrRetryEntityNotExists is returned to indicate workflow execution is not created yet and replicator should
// try this task again after a small delay.
ErrRetryEntityNotExists = &shared.RetryTaskError{Message: "workflow execution not found"}
// ErrRetryExistingWorkflow is returned when events are arriving out of order, and there is another workflow with same version running
ErrRetryExistingWorkflow = &shared.RetryTaskError{Message: "workflow with same version is running"}
// ErrRetryBufferEvents is returned when events are arriving out of order, should retry, or specify force apply
ErrRetryBufferEvents = &shared.RetryTaskError{Message: "retry on applying buffer events"}
// ErrRetryExecutionAlreadyStarted is returned to indicate another workflow execution already started,
// this error can be return if we encounter race condition, i.e. terminating the target workflow while
// the target workflow has done continue as new.
Expand Down Expand Up @@ -296,6 +300,11 @@ func (r *historyReplicator) ApplyOtherEvents(context *workflowExecutionContext,
// out of order replication task and store it in the buffer
logger.Debugf("Buffer out of order replication task. NextEvent: %v, FirstEvent: %v",
msBuilder.GetNextEventID(), firstEventID)

if !request.GetForceBufferEvents() {
return ErrRetryBufferEvents
}

r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.BufferedReplicationTaskCounter)
err = msBuilder.BufferReplicationTask(request)
if err != nil {
Expand Down Expand Up @@ -589,7 +598,7 @@ func (r *historyReplicator) replicateWorkflowStarted(context *workflowExecutionC
return nil
}
if currentStartVersion == incomingVersion {
return ErrRetryEntityNotExists
return ErrRetryExistingWorkflow
}

// currentStartVersion < incomingVersion && current workflow still running
Expand Down
41 changes: 38 additions & 3 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,11 @@ func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingEqualToCurrent() {
// TODO
}

func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent() {
func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent_NoForceBuffer() {
domainID := validDomainID
workflowID := "some random workflow ID"
runID := uuid.New()

currentSourceCluster := "some random current source cluster"
currentVersion := int64(4096)
currentNextEventID := int64(10)

Expand All @@ -460,6 +459,42 @@ func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent
NextEventId: common.Int64Ptr(incomingNextEventID),
}

msBuilder.On("GetNextEventID").Return(currentNextEventID)

err := s.historyReplicator.ApplyOtherEvents(context, msBuilder, request, s.logger)
s.Equal(ErrRetryBufferEvents, err)
}

func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent_ForceBuffer() {
domainID := validDomainID
workflowID := "some random workflow ID"
runID := uuid.New()

currentSourceCluster := "some random current source cluster"
currentVersion := int64(4096)
currentNextEventID := int64(10)

incomingSourceCluster := "some random incoming source cluster"
incomingVersion := currentVersion * 2
incomingFirstEventID := currentNextEventID + 4
incomingNextEventID := incomingFirstEventID + 4

context := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
}, s.mockShard, s.mockExecutionMgr, s.logger)
context.updateCondition = currentNextEventID
msBuilder := &mockMutableState{}
context.msBuilder = msBuilder

request := &h.ReplicateEventsRequest{
SourceCluster: common.StringPtr(incomingSourceCluster),
Version: common.Int64Ptr(incomingVersion),
FirstEventId: common.Int64Ptr(incomingFirstEventID),
NextEventId: common.Int64Ptr(incomingNextEventID),
ForceBufferEvents: common.BoolPtr(true),
}

serializedHistoryBatch := &persistence.SerializedHistoryEventBatch{
EncodingType: common.EncodingTypeJSON,
Version: 144,
Expand Down Expand Up @@ -1388,7 +1423,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, errRet).Once()

err := s.historyReplicator.replicateWorkflowStarted(context, msBuilder, di, sourceCluster, history, sBuilder, s.logger)
s.Equal(ErrRetryEntityNotExists, err)
s.Equal(ErrRetryExistingWorkflow, err)
s.Equal(1, len(transferTasks))
s.Equal(version, transferTasks[0].GetVersion())
s.Equal(1, len(timerTasks))
Expand Down
38 changes: 26 additions & 12 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ const (
retryCount95PercentInRetry int64 = 32
// [0.95, 1] percentage max retry count
retryCount100PercentInRetry int64 = 8

retryErrorWaitMillis = 100
)

type (
Expand Down Expand Up @@ -282,7 +284,7 @@ ProcessRetryLoop:
return errMaxAttemptReached
}

errMsg := p.process(msg)
errMsg := p.process(msg, isInRetry)
if errMsg != nil && p.isTransientRetryableError(errMsg) {
// Keep on retrying transient errors for ever
if !isInRetry {
Expand Down Expand Up @@ -318,7 +320,7 @@ ProcessRetryLoop:
}
}

func (p *replicationTaskProcessor) process(msg kafka.Message) error {
func (p *replicationTaskProcessor) process(msg kafka.Message, inRetry bool) error {
scope := metrics.ReplicatorScope
task, err := deserialize(msg.Value())
if err != nil {
Expand All @@ -342,7 +344,7 @@ func (p *replicationTaskProcessor) process(msg kafka.Message) error {
err = p.handleDomainReplicationTask(task)
case replicator.ReplicationTaskTypeHistory:
scope = metrics.HistoryReplicationTaskScope
err = p.handleHistoryReplicationTask(task)
err = p.handleHistoryReplicationTask(task, inRetry)
default:
err = ErrUnknownReplicationTask
}
Expand All @@ -363,7 +365,7 @@ func (p *replicationTaskProcessor) handleDomainReplicationTask(task *replicator.
return p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes)
}

func (p *replicationTaskProcessor) handleHistoryReplicationTask(task *replicator.ReplicationTask) error {
func (p *replicationTaskProcessor) handleHistoryReplicationTask(task *replicator.ReplicationTask, inRetry bool) error {
p.metricsClient.IncCounter(metrics.HistoryReplicationTaskScope, metrics.ReplicatorMessages)
sw := p.metricsClient.StartTimer(metrics.HistoryReplicationTaskScope, metrics.ReplicatorLatency)
defer sw.Stop()
Expand All @@ -383,20 +385,32 @@ Loop:
return nil
}

return p.historyClient.ReplicateEvents(context.Background(), &h.ReplicateEventsRequest{
var err error
req := &h.ReplicateEventsRequest{
SourceCluster: common.StringPtr(p.sourceCluster),
DomainUUID: attr.DomainId,
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: attr.WorkflowId,
RunId: attr.RunId,
},
FirstEventId: attr.FirstEventId,
NextEventId: attr.NextEventId,
Version: attr.Version,
ReplicationInfo: attr.ReplicationInfo,
History: attr.History,
NewRunHistory: attr.NewRunHistory,
})
FirstEventId: attr.FirstEventId,
NextEventId: attr.NextEventId,
Version: attr.Version,
ReplicationInfo: attr.ReplicationInfo,
History: attr.History,
NewRunHistory: attr.NewRunHistory,
ForceBufferEvents: common.BoolPtr(inRetry),
}

RetryLoop:
for i := 0; i < p.config.ReplicatorBufferRetryCount; i++ {
err = p.historyClient.ReplicateEvents(context.Background(), req)
if _, ok := err.(*shared.RetryTaskError); ok {
time.Sleep(retryErrorWaitMillis * time.Millisecond)
continue RetryLoop
}
}
return err
}

func (p *replicationTaskProcessor) updateFailureMetric(scope int, err error) {
Expand Down
7 changes: 4 additions & 3 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type (
// Config contains all the service config for worker
Config struct {
// Replicator settings
ReplicatorConcurrency int
ReplicatorMaxRetryCount int
ReplicatorConcurrency int
ReplicatorBufferRetryCount int
}
)

Expand All @@ -58,7 +58,8 @@ func NewService(params *service.BootstrapParams) common.Daemon {
// NewConfig builds the new Config for cadence-worker service
func NewConfig() *Config {
return &Config{
ReplicatorConcurrency: 1000,
ReplicatorConcurrency: 1000,
ReplicatorBufferRetryCount: 8,
}
}

Expand Down

0 comments on commit e5f07f2

Please sign in to comment.