Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix replication protocol #979

Merged
merged 4 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package cluster

import (
"fmt"

"github.com/uber/cadence/common/service/dynamicconfig"
)

Expand All @@ -35,6 +36,8 @@ type (
IsMasterCluster() bool
// GetNextFailoverVersion return the next failover version for domain failover
GetNextFailoverVersion(string, int64) int64
// IsVersionFromSameCluster return true if 2 version are used for the same cluster
IsVersionFromSameCluster(version1 int64, version2 int64) bool
// GetMasterClusterName return the master cluster name
GetMasterClusterName() string
// GetCurrentClusterName return the current cluster name
Expand Down Expand Up @@ -132,6 +135,11 @@ func (metadata *metadataImpl) GetNextFailoverVersion(cluster string, currentFail
return failoverVersion
}

// IsVersionFromSameCluster return true if 2 version are used for the same cluster
func (metadata *metadataImpl) IsVersionFromSameCluster(version1 int64, version2 int64) bool {
return (version1-version2)%metadata.failoverVersionIncrement == 0
}

func (metadata *metadataImpl) IsMasterCluster() bool {
return metadata.masterClusterName == metadata.currentClusterName
}
Expand Down
3 changes: 3 additions & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const (
TagNextEventID = "next-event-id"
TagTimeoutType = "timeout-type"
TagReplicationInfo = "replication-info"
TagAttemptCount = "attempt-count"
TagAttemptStart = "attempt-start"
TagAttemptEnd = "attempt-end"

// workflow logging tag values
// TagWorkflowComponent Values
Expand Down
16 changes: 15 additions & 1 deletion common/mocks/ClusterMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (_m *ClusterMetadata) GetMasterClusterName() string {
return r0
}

// GetNextFailoverVersion provides a mock function with given fields: _a0
// GetNextFailoverVersion provides a mock function with given fields: _a0, _a1
func (_m *ClusterMetadata) GetNextFailoverVersion(_a0 string, _a1 int64) int64 {
ret := _m.Called(_a0, _a1)

Expand All @@ -99,6 +99,20 @@ func (_m *ClusterMetadata) GetNextFailoverVersion(_a0 string, _a1 int64) int64 {
return r0
}

// IsVersionFromSameCluster provides a mock function with given fields: _a0, _a1
func (_m *ClusterMetadata) IsVersionFromSameCluster(_a0 int64, _a1 int64) bool {
ret := _m.Called(_a0, _a1)

var r0 bool
if rf, ok := ret.Get(0).(func(int64, int64) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}

return r0
}

// IsGlobalDomainEnabled provides a mock function with given fields:
func (_m *ClusterMetadata) IsGlobalDomainEnabled() bool {
ret := _m.Called()
Expand Down
7 changes: 7 additions & 0 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ func (r *historyReplicator) ApplyOtherEventsVersionChecking(ctx context.Context,
logger.Info("First Event after replication.")
ri, ok := replicationInfo[previousActiveCluster]
if !ok {
// it is possible that a workflow will not generate any event in few rounds of failover
// meaning that the incoming version > last write version and
// (incoming version - last write version) % failover version increment == 0
if r.clusterMetadata.IsVersionFromSameCluster(incomingVersion, rState.LastWriteVersion) {
return msBuilder, nil
}

r.logError(logger, "No ReplicationInfo Found For Previous Active Cluster.", ErrMissingReplicationInfo)
// TODO: Handle missing replication information, #840
// Returning BadRequestError to force the message to land into DLQ
Expand Down
64 changes: 64 additions & 0 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,70 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre
s.Nil(err)
}

func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_MissingReplicationInfo_SameCluster() {
domainID := validDomainID
workflowID := "some random workflow ID"
runID := uuid.New()

currentLastWriteVersion := int64(10)
incomingVersion := currentLastWriteVersion + 10

prevActiveCluster := cluster.TestAlternativeClusterName
context := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
}, s.mockShard, s.mockExecutionMgr, s.logger)
msBuilderIn := &mockMutableState{}
context.msBuilder = msBuilderIn

request := &h.ReplicateEventsRequest{
Version: common.Int64Ptr(incomingVersion),
History: &shared.History{},
}
msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{
LastWriteVersion: currentLastWriteVersion,
})
s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentLastWriteVersion).Return(prevActiveCluster)
s.mockClusterMetadata.On("IsVersionFromSameCluster", incomingVersion, currentLastWriteVersion).Return(true)

msBuilderOut, err := s.historyReplicator.ApplyOtherEventsVersionChecking(ctx.Background(), context, msBuilderIn,
request, s.logger)
s.Equal(msBuilderIn, msBuilderOut)
s.Nil(err)
}

func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_MissingReplicationInfo_DiffCluster() {
domainID := validDomainID
workflowID := "some random workflow ID"
runID := uuid.New()

currentLastWriteVersion := int64(10)
incomingVersion := currentLastWriteVersion + 10

prevActiveCluster := cluster.TestAlternativeClusterName
context := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
}, s.mockShard, s.mockExecutionMgr, s.logger)
msBuilderIn := &mockMutableState{}
context.msBuilder = msBuilderIn

request := &h.ReplicateEventsRequest{
Version: common.Int64Ptr(incomingVersion),
History: &shared.History{},
}
msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{
LastWriteVersion: currentLastWriteVersion,
})
s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentLastWriteVersion).Return(prevActiveCluster)
s.mockClusterMetadata.On("IsVersionFromSameCluster", incomingVersion, currentLastWriteVersion).Return(false)

msBuilderOut, err := s.historyReplicator.ApplyOtherEventsVersionChecking(ctx.Background(), context, msBuilderIn,
request, s.logger)
s.Nil(msBuilderOut)
s.Equal(ErrMissingReplicationInfo, err)
}

func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_Err() {
domainID := validDomainID
workflowID := "some random workflow ID"
Expand Down
23 changes: 19 additions & 4 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID
forceBuffer := false
remainingRetryCount := p.config.ReplicationTaskMaxRetry

attempt := 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have an attempt variable as part of for loop. Can we just have a one instead of 2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably you also want to include attempt on warn logs for every 100 failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all done

startTime := time.Now()
op := func() error {
attempt++
processErr := p.process(msg, forceBuffer)
if processErr != nil && p.isRetryTaskError(processErr) {
// Enable buffering of replication tasks for next attempt
Expand All @@ -210,7 +213,7 @@ func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID
}

ProcessRetryLoop:
for attempt := 0; ; attempt++ {
for {
select {
case <-p.shutdownCh:
return
Expand All @@ -227,7 +230,10 @@ ProcessRetryLoop:
logging.TagErr: err,
logging.TagPartitionKey: msg.Partition(),
logging.TagOffset: msg.Offset(),
}).Warn("Error processing replication task.")
logging.TagAttemptCount: attempt,
logging.TagAttemptStart: startTime,
logging.TagAttemptEnd: time.Now(),
}).Warn("Error (transient) processing replication task.")
}

// Keep on retrying transient errors for ever
Expand Down Expand Up @@ -258,6 +264,9 @@ ProcessRetryLoop:
logging.TagErr: err,
logging.TagPartitionKey: msg.Partition(),
logging.TagOffset: msg.Offset(),
logging.TagAttemptCount: attempt,
logging.TagAttemptStart: startTime,
logging.TagAttemptEnd: time.Now(),
}).Error("Error processing replication task.")
msg.Nack()
}
Expand Down Expand Up @@ -346,8 +355,14 @@ Loop:
}
}
if !processTask {
p.logger.Debugf("Dropping non-targeted history task with domainID: %v, workflowID: %v, runID: %v, firstEventID: %v, nextEventID: %v.",
attr.GetDomainId(), attr.GetWorkflowId(), attr.GetRunId(), attr.GetFirstEventId(), attr.GetNextEventId())
p.logger.WithFields(bark.Fields{
logging.TagDomainID: attr.GetDomainId(),
logging.TagWorkflowExecutionID: attr.GetWorkflowId(),
logging.TagWorkflowRunID: attr.GetRunId(),
logging.TagFirstEventID: attr.GetFirstEventId(),
logging.TagNextEventID: attr.GetNextEventId(),
logging.TagVersion: attr.GetVersion(),
}).Warn("Dropping non-targeted history task.")
return nil
}

Expand Down