From 0a3ff555358776323296a08838ae70152c897f91 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Wed, 18 Jul 2018 18:02:32 -0700 Subject: [PATCH] Bugfix replication protocol (#979) * Bugfix: when workflow is idle during the failover, no event will be generated, thus not replication info will be added; if a double failover is done for this workflow, the source cluster is not changed at all, so no replication info is needed. * Add more metrics on worker --- common/cluster/metadata.go | 8 +++ common/logging/tags.go | 3 ++ common/mocks/ClusterMetadata.go | 16 +++++- service/history/historyReplicator.go | 7 +++ service/history/historyReplicator_test.go | 64 +++++++++++++++++++++++ service/worker/processor.go | 23 ++++++-- 6 files changed, 116 insertions(+), 5 deletions(-) diff --git a/common/cluster/metadata.go b/common/cluster/metadata.go index 08a93225468..9bc7562e9ca 100644 --- a/common/cluster/metadata.go +++ b/common/cluster/metadata.go @@ -22,6 +22,7 @@ package cluster import ( "fmt" + "github.com/uber/cadence/common/service/dynamicconfig" ) @@ -35,6 +36,8 @@ type ( IsMasterCluster() bool // GetNextFailoverVersion return the next failover version for domain failover GetNextFailoverVersion(string, int64) int64 + // IsVersionFromSameCluster return true if 2 version are used for the same cluster + IsVersionFromSameCluster(version1 int64, version2 int64) bool // GetMasterClusterName return the master cluster name GetMasterClusterName() string // GetCurrentClusterName return the current cluster name @@ -132,6 +135,11 @@ func (metadata *metadataImpl) GetNextFailoverVersion(cluster string, currentFail return failoverVersion } +// IsVersionFromSameCluster return true if 2 version are used for the same cluster +func (metadata *metadataImpl) IsVersionFromSameCluster(version1 int64, version2 int64) bool { + return (version1-version2)%metadata.failoverVersionIncrement == 0 +} + func (metadata *metadataImpl) IsMasterCluster() bool { return metadata.masterClusterName == metadata.currentClusterName } diff --git a/common/logging/tags.go b/common/logging/tags.go index 12445593f9d..c0833e0dd25 100644 --- a/common/logging/tags.go +++ b/common/logging/tags.go @@ -59,6 +59,9 @@ const ( TagNextEventID = "next-event-id" TagTimeoutType = "timeout-type" TagReplicationInfo = "replication-info" + TagAttemptCount = "attempt-count" + TagAttemptStart = "attempt-start" + TagAttemptEnd = "attempt-end" // workflow logging tag values // TagWorkflowComponent Values diff --git a/common/mocks/ClusterMetadata.go b/common/mocks/ClusterMetadata.go index 5dd23938592..72f272be424 100644 --- a/common/mocks/ClusterMetadata.go +++ b/common/mocks/ClusterMetadata.go @@ -85,7 +85,7 @@ func (_m *ClusterMetadata) GetMasterClusterName() string { return r0 } -// GetNextFailoverVersion provides a mock function with given fields: _a0 +// GetNextFailoverVersion provides a mock function with given fields: _a0, _a1 func (_m *ClusterMetadata) GetNextFailoverVersion(_a0 string, _a1 int64) int64 { ret := _m.Called(_a0, _a1) @@ -99,6 +99,20 @@ func (_m *ClusterMetadata) GetNextFailoverVersion(_a0 string, _a1 int64) int64 { return r0 } +// IsVersionFromSameCluster provides a mock function with given fields: _a0, _a1 +func (_m *ClusterMetadata) IsVersionFromSameCluster(_a0 int64, _a1 int64) bool { + ret := _m.Called(_a0, _a1) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64, int64) bool); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // IsGlobalDomainEnabled provides a mock function with given fields: func (_m *ClusterMetadata) IsGlobalDomainEnabled() bool { ret := _m.Called() diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 9c52600653a..13622c88721 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -273,6 +273,13 @@ func (r *historyReplicator) ApplyOtherEventsVersionChecking(ctx context.Context, logger.Info("First Event after replication.") ri, ok := replicationInfo[previousActiveCluster] if !ok { + // it is possible that a workflow will not generate any event in few rounds of failover + // meaning that the incoming version > last write version and + // (incoming version - last write version) % failover version increment == 0 + if r.clusterMetadata.IsVersionFromSameCluster(incomingVersion, rState.LastWriteVersion) { + return msBuilder, nil + } + r.logError(logger, "No ReplicationInfo Found For Previous Active Cluster.", ErrMissingReplicationInfo) // TODO: Handle missing replication information, #840 // Returning BadRequestError to force the message to land into DLQ diff --git a/service/history/historyReplicator_test.go b/service/history/historyReplicator_test.go index c56bbe5cf69..357f01b91e2 100644 --- a/service/history/historyReplicator_test.go +++ b/service/history/historyReplicator_test.go @@ -374,6 +374,70 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre s.Nil(err) } +func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_MissingReplicationInfo_SameCluster() { + domainID := validDomainID + workflowID := "some random workflow ID" + runID := uuid.New() + + currentLastWriteVersion := int64(10) + incomingVersion := currentLastWriteVersion + 10 + + prevActiveCluster := cluster.TestAlternativeClusterName + context := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + }, s.mockShard, s.mockExecutionMgr, s.logger) + msBuilderIn := &mockMutableState{} + context.msBuilder = msBuilderIn + + request := &h.ReplicateEventsRequest{ + Version: common.Int64Ptr(incomingVersion), + History: &shared.History{}, + } + msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{ + LastWriteVersion: currentLastWriteVersion, + }) + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentLastWriteVersion).Return(prevActiveCluster) + s.mockClusterMetadata.On("IsVersionFromSameCluster", incomingVersion, currentLastWriteVersion).Return(true) + + msBuilderOut, err := s.historyReplicator.ApplyOtherEventsVersionChecking(ctx.Background(), context, msBuilderIn, + request, s.logger) + s.Equal(msBuilderIn, msBuilderOut) + s.Nil(err) +} + +func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_MissingReplicationInfo_DiffCluster() { + domainID := validDomainID + workflowID := "some random workflow ID" + runID := uuid.New() + + currentLastWriteVersion := int64(10) + incomingVersion := currentLastWriteVersion + 10 + + prevActiveCluster := cluster.TestAlternativeClusterName + context := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + }, s.mockShard, s.mockExecutionMgr, s.logger) + msBuilderIn := &mockMutableState{} + context.msBuilder = msBuilderIn + + request := &h.ReplicateEventsRequest{ + Version: common.Int64Ptr(incomingVersion), + History: &shared.History{}, + } + msBuilderIn.On("GetReplicationState").Return(&persistence.ReplicationState{ + LastWriteVersion: currentLastWriteVersion, + }) + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentLastWriteVersion).Return(prevActiveCluster) + s.mockClusterMetadata.On("IsVersionFromSameCluster", incomingVersion, currentLastWriteVersion).Return(false) + + msBuilderOut, err := s.historyReplicator.ApplyOtherEventsVersionChecking(ctx.Background(), context, msBuilderIn, + request, s.logger) + s.Nil(msBuilderOut) + s.Equal(ErrMissingReplicationInfo, err) +} + func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_Err() { domainID := validDomainID workflowID := "some random workflow ID" diff --git a/service/worker/processor.go b/service/worker/processor.go index 46589c00a54..9eedd752f59 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -199,7 +199,10 @@ func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID forceBuffer := false remainingRetryCount := p.config.ReplicationTaskMaxRetry + attempt := 0 + startTime := time.Now() op := func() error { + attempt++ processErr := p.process(msg, forceBuffer) if processErr != nil && p.isRetryTaskError(processErr) { // Enable buffering of replication tasks for next attempt @@ -210,7 +213,7 @@ func (p *replicationTaskProcessor) processWithRetry(msg kafka.Message, workerID } ProcessRetryLoop: - for attempt := 0; ; attempt++ { + for { select { case <-p.shutdownCh: return @@ -227,7 +230,10 @@ ProcessRetryLoop: logging.TagErr: err, logging.TagPartitionKey: msg.Partition(), logging.TagOffset: msg.Offset(), - }).Warn("Error processing replication task.") + logging.TagAttemptCount: attempt, + logging.TagAttemptStart: startTime, + logging.TagAttemptEnd: time.Now(), + }).Warn("Error (transient) processing replication task.") } // Keep on retrying transient errors for ever @@ -258,6 +264,9 @@ ProcessRetryLoop: logging.TagErr: err, logging.TagPartitionKey: msg.Partition(), logging.TagOffset: msg.Offset(), + logging.TagAttemptCount: attempt, + logging.TagAttemptStart: startTime, + logging.TagAttemptEnd: time.Now(), }).Error("Error processing replication task.") msg.Nack() } @@ -346,8 +355,14 @@ Loop: } } if !processTask { - p.logger.Debugf("Dropping non-targeted history task with domainID: %v, workflowID: %v, runID: %v, firstEventID: %v, nextEventID: %v.", - attr.GetDomainId(), attr.GetWorkflowId(), attr.GetRunId(), attr.GetFirstEventId(), attr.GetNextEventId()) + p.logger.WithFields(bark.Fields{ + logging.TagDomainID: attr.GetDomainId(), + logging.TagWorkflowExecutionID: attr.GetWorkflowId(), + logging.TagWorkflowRunID: attr.GetRunId(), + logging.TagFirstEventID: attr.GetFirstEventId(), + logging.TagNextEventID: attr.GetNextEventId(), + logging.TagVersion: attr.GetVersion(), + }).Warn("Dropping non-targeted history task.") return nil }