Skip to content

Commit

Permalink
Bugfix replication protocol (#979)
Browse files Browse the repository at this point in the history
* Bugfix: when workflow is idle during the failover, no event will be generated, thus not replication info will be added; if a double failover is done for this workflow, the source cluster is not changed at all, so no replication info is needed.

* Add more metrics on worker
  • Loading branch information
wxing1292 authored Jul 19, 2018
1 parent aa5e689 commit 0a3ff55
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 5 deletions.
8 changes: 8 additions & 0 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package cluster

import (
"fmt"

"github.com/uber/cadence/common/service/dynamicconfig"
)

Expand All @@ -35,6 +36,8 @@ type (
IsMasterCluster() bool
// GetNextFailoverVersion return the next failover version for domain failover
GetNextFailoverVersion(string, int64) int64
// IsVersionFromSameCluster return true if 2 version are used for the same cluster
IsVersionFromSameCluster(version1 int64, version2 int64) bool
// GetMasterClusterName return the master cluster name
GetMasterClusterName() string
// GetCurrentClusterName return the current cluster name
Expand Down Expand Up @@ -132,6 +135,11 @@ func (metadata *metadataImpl) GetNextFailoverVersion(cluster string, currentFail
return failoverVersion
}

// IsVersionFromSameCluster return true if 2 version are used for the same cluster
func (metadata *metadataImpl) IsVersionFromSameCluster(version1 int64, version2 int64) bool {
return (version1-version2)%metadata.failoverVersionIncrement == 0
}

func (metadata *metadataImpl) IsMasterCluster() bool {
return metadata.masterClusterName == metadata.currentClusterName
}
Expand Down
3 changes: 3 additions & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const (
TagNextEventID = "next-event-id"
TagTimeoutType = "timeout-type"
TagReplicationInfo = "replication-info"
TagAttemptCount = "attempt-count"
TagAttemptStart = "attempt-start"
TagAttemptEnd = "attempt-end"

// workflow logging tag values
// TagWorkflowComponent Values
Expand Down
16 changes: 15 additions & 1 deletion common/mocks/ClusterMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (_m *ClusterMetadata) GetMasterClusterName() string {
return r0
}

// GetNextFailoverVersion provides a mock function with given fields: _a0
// GetNextFailoverVersion provides a mock function with given fields: _a0, _a1
func (_m *ClusterMetadata) GetNextFailoverVersion(_a0 string, _a1 int64) int64 {
ret := _m.Called(_a0, _a1)

Expand All @@ -99,6 +99,20 @@ func (_m *ClusterMetadata) GetNextFailoverVersion(_a0 string, _a1 int64) int64 {
return r0
}

// IsVersionFromSameCluster provides a mock function with given fields: _a0, _a1
func (_m *ClusterMetadata) IsVersionFromSameCluster(_a0 int64, _a1 int64) bool {
ret := _m.Called(_a0, _a1)

var r0 bool
if rf, ok := ret.Get(0).(func(int64, int64) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}

return r0
}

// IsGlobalDomainEnabled provides a mock function with given fields:
func (_m *ClusterMetadata) IsGlobalDomainEnabled() bool {
ret := _m.Called()
Expand Down
7 changes: 7 additions & 0 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ func (r *historyReplicator) ApplyOtherEventsVersionChecking(ctx context.Context,
logger.Info("First Event after replication.")
ri, ok := replicationInfo[previousActiveCluster]
if !ok {
// it is possible that a workflow will not generate any event in few rounds of failover
// meaning that the incoming version > last write version and
// (incoming version - last write version) % failover version increment == 0
if r.clusterMetadata.IsVersionFromSameCluster(incomingVersion, rState.LastWriteVersion) {
return msBuilder, nil
}

r.logError(logger, "No ReplicationInfo Found For Previous Active Cluster.", ErrMissingReplicationInfo)
// TODO: Handle missing replication information, #840
// Returning BadRequestError to force the message to land into DLQ
Expand Down
64 changes: 64 additions & 0 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,70 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre
s.Nil(err)
}

func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_MissingReplicationInfo_SameCluster() {
domainID := validDomainID
workflowID := "some random workflow ID"
runID := uuid.New()

currentLastWriteVersion := int64(10)
incomingVersion := currentLastWriteVersion + 10

prevActiveCluster := cluster.TestAlternativeClusterName
context := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
}, s.mockShard, s.mockExecutionMgr, s.logger)
msBuilderIn := &mockMutableState{}
context.msBuilder = msBuilderIn

request := &h.ReplicateEventsRequest{
Version: common.Int64Ptr(incomingVersion),
History: &shared.History{},
}
msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{
LastWriteVersion: currentLastWriteVersion,
})
s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentLastWriteVersion).Return(prevActiveCluster)
s.mockClusterMetadata.On("IsVersionFromSameCluster", incomingVersion, currentLastWriteVersion).Return(true)

msBuilderOut, err := s.historyReplicator.ApplyOtherEventsVersionChecking(ctx.Background(), context, msBuilderIn,
request, s.logger)
s.Equal(msBuilderIn, msBuilderOut)
s.Nil(err)
}

func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_MissingReplicationInfo_DiffCluster() {
domainID := validDomainID
workflowID := "some random workflow ID"
runID := uuid.New()

currentLastWriteVersion := int64(10)
incomingVersion := currentLastWriteVersion + 10

prevActiveCluster := cluster.TestAlternativeClusterName
context := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
}, s.mockShard, s.mockExecutionMgr, s.logger)
msBuilderIn := &mockMutableState{}
context.msBuilder = msBuilderIn

request := &h.ReplicateEventsRequest{
Version: common.Int64Ptr(incomingVersion),
History: &shared.History{},
}
msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{
LastWriteVersion: currentLastWriteVersion,
})
s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentLastWriteVersion).Return(prevActiveCluster)
s.mockClusterMetadata.On("IsVersionFromSameCluster", incomingVersion, currentLastWriteVersion).Return(false)

msBuilderOut, err := s.historyReplicator.ApplyOtherEventsVersionChecking(ctx.Background(), context, msBuilderIn,
request, s.logger)
s.Nil(msBuilderOut)
s.Equal(ErrMissingReplicationInfo, err)
}

func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_Err() {
domainID := validDomainID
workflowID := "some random workflow ID"
Expand Down
23 changes: 19 additions & 4 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID
forceBuffer := false
remainingRetryCount := p.config.ReplicationTaskMaxRetry

attempt := 0
startTime := time.Now()
op := func() error {
attempt++
processErr := p.process(msg, forceBuffer)
if processErr != nil && p.isRetryTaskError(processErr) {
// Enable buffering of replication tasks for next attempt
Expand All @@ -210,7 +213,7 @@ func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID
}

ProcessRetryLoop:
for attempt := 0; ; attempt++ {
for {
select {
case <-p.shutdownCh:
return
Expand All @@ -227,7 +230,10 @@ ProcessRetryLoop:
logging.TagErr: err,
logging.TagPartitionKey: msg.Partition(),
logging.TagOffset: msg.Offset(),
}).Warn("Error processing replication task.")
logging.TagAttemptCount: attempt,
logging.TagAttemptStart: startTime,
logging.TagAttemptEnd: time.Now(),
}).Warn("Error (transient) processing replication task.")
}

// Keep on retrying transient errors for ever
Expand Down Expand Up @@ -258,6 +264,9 @@ ProcessRetryLoop:
logging.TagErr: err,
logging.TagPartitionKey: msg.Partition(),
logging.TagOffset: msg.Offset(),
logging.TagAttemptCount: attempt,
logging.TagAttemptStart: startTime,
logging.TagAttemptEnd: time.Now(),
}).Error("Error processing replication task.")
msg.Nack()
}
Expand Down Expand Up @@ -346,8 +355,14 @@ Loop:
}
}
if !processTask {
p.logger.Debugf("Dropping non-targeted history task with domainID: %v, workflowID: %v, runID: %v, firstEventID: %v, nextEventID: %v.",
attr.GetDomainId(), attr.GetWorkflowId(), attr.GetRunId(), attr.GetFirstEventId(), attr.GetNextEventId())
p.logger.WithFields(bark.Fields{
logging.TagDomainID: attr.GetDomainId(),
logging.TagWorkflowExecutionID: attr.GetWorkflowId(),
logging.TagWorkflowRunID: attr.GetRunId(),
logging.TagFirstEventID: attr.GetFirstEventId(),
logging.TagNextEventID: attr.GetNextEventId(),
logging.TagVersion: attr.GetVersion(),
}).Warn("Dropping non-targeted history task.")
return nil
}

Expand Down

0 comments on commit 0a3ff55

Please sign in to comment.