diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index b249f0c7aa7..39bb625c27f 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -194,23 +194,10 @@ func (r *historyReplicator) ApplyStartEvent(context *workflowExecutionContext, r func (r *historyReplicator) ApplyOtherEventsMissingMutableState(domainID string, workflowID string, incomingVersion int64, logger bark.Logger) error { // we need to check the current workflow execution - currentContext, release, err := r.historyCache.getOrCreateWorkflowExecution( - domainID, - // only use the workflow ID, to get the current running one - shared.WorkflowExecution{WorkflowId: common.StringPtr(workflowID)}, - ) + currentRunID, currentLastWriteVersion, _, err := r.getCurrentWorkflowInfo(domainID, workflowID) if err != nil { return err } - currentMsBuilder, err := currentContext.loadWorkflowExecution() - if err != nil { - // no matter what error happen, we need to retry - release(err) - return err - } - currentLastWriteVersion := currentMsBuilder.GetLastWriteVersion() - currentRunID := currentMsBuilder.GetExecutionInfo().RunID - release(nil) // we can also use the start version if currentLastWriteVersion > incomingVersion { @@ -240,45 +227,55 @@ func (r *historyReplicator) ApplyOtherEventsVersionChecking(context *workflowExe r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.StaleReplicationEventsCounter) return nil, nil } - if rState.LastWriteVersion < incomingVersion { - // Check if this is the first event after failover - logger.Infof("First Event after replication. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v, IncomingV: %v.", - rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, incomingVersion) - previousActiveCluster := r.clusterMetadata.ClusterNameForFailoverVersion(rState.LastWriteVersion) - ri, ok := replicationInfo[previousActiveCluster] - if !ok { - logger.Errorf("No ReplicationInfo Found For Previous Active Cluster. Previous Active Cluster: %v, Request Source Cluster: %v, Request ReplicationInfo: %v.", - previousActiveCluster, request.GetSourceCluster(), request.ReplicationInfo) - // TODO: Handle missing replication information, #840 - // Returning BadRequestError to force the message to land into DLQ - return nil, ErrMissingReplicationInfo - } - // Detect conflict - if ri.GetLastEventId() > rState.LastWriteEventID { - // if there is any bug in the replication protocol or implementation, this case can happen - logger.Errorf("Conflict detected, but cannot resolve. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}", - rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, - previousActiveCluster, ri.GetVersion(), ri.GetLastEventId()) - // Returning BadRequestError to force the message to land into DLQ - return nil, ErrCorruptedReplicationInfo + if rState.LastWriteVersion == incomingVersion { + // for ri.GetLastEventId() == rState.LastWriteEventID, ideally we should not do anything + return msBuilder, nil + } + + // we have rState.LastWriteVersion < incomingVersion + + // Check if this is the first event after failover + logger.Infof("First Event after replication. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v, IncomingV: %v.", + rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, incomingVersion) + previousActiveCluster := r.clusterMetadata.ClusterNameForFailoverVersion(rState.LastWriteVersion) + ri, ok := replicationInfo[previousActiveCluster] + if !ok { + logger.Errorf("No ReplicationInfo Found For Previous Active Cluster. Previous Active Cluster: %v, Request Source Cluster: %v, Request ReplicationInfo: %v.", + previousActiveCluster, request.GetSourceCluster(), request.ReplicationInfo) + // TODO: Handle missing replication information, #840 + // Returning BadRequestError to force the message to land into DLQ + return nil, ErrMissingReplicationInfo + } + + // Detect conflict + if ri.GetLastEventId() > rState.LastWriteEventID { + // if there is any bug in the replication protocol or implementation, this case can happen + logger.Errorf("Conflict detected, but cannot resolve. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}", + rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, + previousActiveCluster, ri.GetVersion(), ri.GetLastEventId()) + // Returning BadRequestError to force the message to land into DLQ + return nil, ErrCorruptedReplicationInfo + } + + if ri.GetLastEventId() < rState.LastWriteEventID { + logger.Infof("Conflict detected. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}", + rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, + previousActiveCluster, ri.GetVersion(), ri.GetLastEventId()) + r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.HistoryConflictsCounter) + + // handling edge case when resetting a workflow, and this workflow has done continue + // we need to terminate the continue as new-ed workflow + err = r.conflictResolutionTerminateContinueAsNew(msBuilder) + if err != nil { + return nil, err } - - if ri.GetLastEventId() < rState.LastWriteEventID { - logger.Infof("Conflict detected. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}", - rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, - previousActiveCluster, ri.GetVersion(), ri.GetLastEventId()) - r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.HistoryConflictsCounter) - - resolver := r.getNewConflictResolver(context, logger) - msBuilder, err = resolver.reset(uuid.New(), ri.GetLastEventId(), msBuilder.GetExecutionInfo().StartTimestamp) - logger.Infof("Completed Resetting of workflow execution. NextEventID: %v. Err: %v", msBuilder.GetNextEventID(), err) - if err != nil { - return nil, err - } + resolver := r.getNewConflictResolver(context, logger) + msBuilder, err = resolver.reset(uuid.New(), ri.GetLastEventId(), msBuilder.GetExecutionInfo().StartTimestamp) + logger.Infof("Completed Resetting of workflow execution. NextEventID: %v. Err: %v", msBuilder.GetNextEventID(), err) + if err != nil { + return nil, err } - - // for ri.GetLastEventId() == rState.LastWriteEventID, ideally we should not do anything } return msBuilder, nil } @@ -587,34 +584,116 @@ func (r *historyReplicator) replicateWorkflowStarted(context *workflowExecutionC // whether the remote active cluster is aware of the current running workflow, // the only thing we can do is to terminate the current workflow and // start the new workflow from the request - domainEntry, err := r.domainCache.GetDomainByID(domainID) - if err != nil { - return err - } + // same workflow ID, same shard - err = r.historyEngine.TerminateWorkflowExecution(ctx.Background(), &h.TerminateWorkflowExecutionRequest{ - DomainUUID: common.StringPtr(domainID), - TerminateRequest: &shared.TerminateWorkflowExecutionRequest{ - Domain: common.StringPtr(domainEntry.GetInfo().Name), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(executionInfo.WorkflowID), - RunId: common.StringPtr(currentRunID), - }, - Reason: common.StringPtr("Terminate Workflow Due To Version Conflict."), - Details: nil, - Identity: common.StringPtr("worker-service"), - }, - }) + err = r.terminateWorkflow(domainID, executionInfo.WorkflowID, currentRunID) if err != nil { if _, ok := err.(*shared.EntityNotExistsError); !ok { return err } // if workflow is completed just when the call is made, will get EntityNotExistsError + // we are not sure whether the workflow to be terminated ends with continue as new or not + // so when encounter EntityNotExistsError, just contiue to execute, if err occurs, + // there will be retry on the worker level } isBrandNew = false return createWorkflow(isBrandNew, currentRunID) } +func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(msBuilder mutableState) (retError error) { + // this function aims to solve the edge case when this workflow, when going through + // reset, has already started a next generation (continue as new-ed workflow) + + if msBuilder.IsWorkflowExecutionRunning() { + // workflow still running, no continued as new edge case to solve + return nil + } + + if msBuilder.GetExecutionInfo().CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew { + // workflow close status not being continue as new + return nil + } + + // the close status is continue as new + // so it is impossible that the current running workflow (one with the same workflow ID) + // has the same run ID as "this" workflow + // meaning there is no chance that when we grab the current running workflow (same workflow ID) + // and enounter a dead lock + domainID := msBuilder.GetExecutionInfo().DomainID + workflowID := msBuilder.GetExecutionInfo().WorkflowID + currentRunID, _, closeStatus, err := r.getCurrentWorkflowInfo(domainID, workflowID) + if err != nil { + return err + } + if closeStatus != persistence.WorkflowCloseStatusNone { + // current workflow finished + // note, it is impassoble that a current workflow ends with continue as new as close status + return nil + } + + getPrevRunID := func(domainID string, workflowID string, runID string) (string, error) { + response, err := r.historyMgr.GetWorkflowExecutionHistory(&persistence.GetWorkflowExecutionHistoryRequest{ + DomainID: domainID, + Execution: shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + }, + FirstEventID: common.FirstEventID, + NextEventID: common.FirstEventID + 1, + PageSize: defaultHistoryPageSize, + NextPageToken: nil, + }) + if err != nil { + return "", err + } + if len(response.Events) == 0 { + return "", fmt.Errorf("no history found for domainID: %v, workflowID: %v, runID: %v", + domainID, workflowID, runID) + } + serializedHistoryEventBatch := response.Events[0] + persistence.SetSerializedHistoryDefaults(&serializedHistoryEventBatch) + serializer, err := persistence.NewHistorySerializerFactory().Get(serializedHistoryEventBatch.EncodingType) + if err != nil { + return "", err + } + history, err := serializer.Deserialize(&serializedHistoryEventBatch) + if err != nil { + return "", err + } + if len(history.Events) == 0 { + return "", fmt.Errorf("no history events found for domainID: %v, workflowID: %v, runID: %v", + domainID, workflowID, runID) + } + + return history.Events[0].WorkflowExecutionStartedEventAttributes.GetContinuedExecutionRunId(), nil + } + + targetRunID := msBuilder.GetExecutionInfo().RunID + runID := currentRunID + for err == nil && runID != "" && runID != targetRunID { + // using the current running workflow to trace back (assuming continue as new) + runID, err = getPrevRunID(domainID, workflowID, runID) + } + if err != nil { + return err + } + if runID == "" { + // cannot relate the current running workflow to the workflow which events are being resetted. + return nil + } + + // we have runID == targetRunID + // meaning the current workflow is a result of continue as new of the workflow to be resetted + + // if workflow is completed just when the call is made, will get EntityNotExistsError + // we are not sure whether the workflow to be terminated ends with continue as new or not + // so when encounter EntityNotExistsError, as well as other error, just return the err + // we will retry on the worker level + + // same workflow ID, same shard + return r.terminateWorkflow(domainID, workflowID, currentRunID) +} + func (r *historyReplicator) Serialize(history *shared.History) (*persistence.SerializedHistoryEventBatch, error) { eventBatch := persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), history.Events) h, err := r.historySerializer.Serialize(eventBatch) @@ -624,6 +703,50 @@ func (r *historyReplicator) Serialize(history *shared.History) (*persistence.Ser return h, nil } +func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID string) (runID string, lastWriteVersion int64, closeStatus int, retError error) { + // we need to check the current workflow execution + context, release, err := r.historyCache.getOrCreateWorkflowExecution( + domainID, + // only use the workflow ID, to get the current running one + shared.WorkflowExecution{WorkflowId: common.StringPtr(workflowID)}, + ) + if err != nil { + return "", common.EmptyVersion, persistence.WorkflowCloseStatusNone, err + } + defer func() { release(retError) }() + + msBuilder, err := context.loadWorkflowExecution() + if err != nil { + // no matter what error happen, we need to retry + return "", common.EmptyVersion, persistence.WorkflowCloseStatusNone, err + } + lastWriteVersion = msBuilder.GetLastWriteVersion() + runID = msBuilder.GetExecutionInfo().RunID + closeStatus = msBuilder.GetExecutionInfo().CloseStatus + return +} + +func (r *historyReplicator) terminateWorkflow(domainID string, workflowID string, runID string) error { + domainEntry, err := r.domainCache.GetDomainByID(domainID) + if err != nil { + return err + } + // same workflow ID, same shard + return r.historyEngine.TerminateWorkflowExecution(ctx.Background(), &h.TerminateWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(domainID), + TerminateRequest: &shared.TerminateWorkflowExecutionRequest{ + Domain: common.StringPtr(domainEntry.GetInfo().Name), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + }, + Reason: common.StringPtr("Terminate Workflow Due To Version Conflict."), + Details: nil, + Identity: common.StringPtr("worker-service"), + }, + }) +} + func (r *historyReplicator) notify(clusterName string, now time.Time, transferTasks []persistence.Task, timerTasks []persistence.Task) { r.shard.SetCurrentTime(clusterName, now) diff --git a/service/history/historyReplicator_test.go b/service/history/historyReplicator_test.go index fafcc5c29c2..6d13c21a63d 100644 --- a/service/history/historyReplicator_test.go +++ b/service/history/historyReplicator_test.go @@ -309,6 +309,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre LastWriteEventID: currentLastEventID, }) msBuilderIn.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{StartTimestamp: startTimeStamp}) + msBuilderIn.On("IsWorkflowExecutionRunning").Return(true) s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentLastWriteVersion).Return(prevActiveCluster) mockConflictResolver := &mockConflictResolver{} @@ -323,6 +324,10 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre s.Nil(err) } +func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_ResolveConflict_OtherCase() { + // other cases will be tested in TestConflictResolutionTerminateContinueAsNew +} + func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGreaterThanCurrent_NoOp() { domainID := validDomainID workflowID := "some random workflow ID" @@ -1489,3 +1494,150 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc s.Equal(1, len(timerTasks)) s.Equal(version, timerTasks[0].GetVersion()) } + +func (s *historyReplicatorSuite) TestConflictResolutionTerminateContinueAsNew_TargetRunning() { + msBuilderTarget := &mockMutableState{} + msBuilderTarget.On("IsWorkflowExecutionRunning").Return(true) + err := s.historyReplicator.conflictResolutionTerminateContinueAsNew(msBuilderTarget) + s.Nil(err) +} + +func (s *historyReplicatorSuite) TestConflictResolutionTerminateContinueAsNew_TargetClosed_NotContinueAsNew() { + msBuilderTarget := &mockMutableState{} + msBuilderTarget.On("IsWorkflowExecutionRunning").Return(false) + msBuilderTarget.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{CloseStatus: persistence.WorkflowCloseStatusCompleted}) + + err := s.historyReplicator.conflictResolutionTerminateContinueAsNew(msBuilderTarget) + s.Nil(err) +} + +func (s *historyReplicatorSuite) TestConflictResolutionTerminateContinueAsNew_TargetClosed_ContinueAsNew_CurrentClosed() { + domainID := validDomainID + workflowID := "some random target workflow ID" + targetRunID := uuid.New() + + msBuilderTarget := &mockMutableState{} + msBuilderTarget.On("IsWorkflowExecutionRunning").Return(false) + msBuilderTarget.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ + DomainID: domainID, + WorkflowID: workflowID, + RunID: targetRunID, + CloseStatus: persistence.WorkflowCloseStatusContinuedAsNew, + }) + + currentRunID := uuid.New() + contextCurrent, release, err := s.historyReplicator.historyCache.getOrCreateWorkflowExecution(domainID, shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(currentRunID), + }) + s.Nil(err) + msBuilderCurrent := &mockMutableState{} + msBuilderCurrent.On("GetLastWriteVersion").Return(int64(999)) // this is not actually used, but will be called + msBuilderCurrent.On("GetReplicationState").Return(&persistence.ReplicationState{}) // this is used to update the version on mutable state + msBuilderCurrent.On("IsWorkflowExecutionRunning").Return(false) // this is used to update the version on mutable state + msBuilderCurrent.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{RunID: currentRunID, CloseStatus: persistence.WorkflowCloseStatusTerminated}) + contextCurrent.msBuilder = msBuilderCurrent + release(nil) + + s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{ + DomainID: domainID, + WorkflowID: workflowID, + }).Return(&persistence.GetCurrentExecutionResponse{ + RunID: currentRunID, + // other attributes are not used + }, nil) + + err = s.historyReplicator.conflictResolutionTerminateContinueAsNew(msBuilderTarget) + s.Nil(err) +} + +func (s *historyReplicatorSuite) TestConflictResolutionTerminateContinueAsNew_TargetClosed_ContinueAsNew_CurrentRunning() { + version := int64(4801) // this does nothing in this test + domainName := "some random domain name" + domainID := validDomainID + workflowID := "some random target workflow ID" + targetRunID := uuid.New() + + msBuilderTarget := &mockMutableState{} + msBuilderTarget.On("IsWorkflowExecutionRunning").Return(false) + msBuilderTarget.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ + DomainID: domainID, + WorkflowID: workflowID, + RunID: targetRunID, + CloseStatus: persistence.WorkflowCloseStatusContinuedAsNew, + }) + + // this mocks are for the terminate current workflow operation + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return( + &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ID: domainID, Name: domainName}, + Config: &persistence.DomainConfig{Retention: 1}, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, + }, + }, + FailoverVersion: version, + IsGlobalDomain: true, + TableVersion: persistence.DomainTableVersionV1, + }, nil, + ).Once() + + currentRunID := uuid.New() + contextCurrent, release, err := s.historyReplicator.historyCache.getOrCreateWorkflowExecution(domainID, shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(currentRunID), + }) + s.Nil(err) + msBuilderCurrent := &mockMutableState{} + msBuilderCurrent.On("GetLastWriteVersion").Return(int64(999)) // this is not actually used, but will be called + msBuilderCurrent.On("GetReplicationState").Return(&persistence.ReplicationState{}) // this is used to update the version on mutable state + msBuilderCurrent.On("IsWorkflowExecutionRunning").Return(true) // this is used to update the version on mutable state + msBuilderCurrent.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{RunID: currentRunID, CloseStatus: persistence.WorkflowCloseStatusNone}) + msBuilderCurrent.On("UpdateReplicationStateVersion", version) + contextCurrent.msBuilder = msBuilderCurrent + release(nil) + s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{ + DomainID: domainID, + WorkflowID: workflowID, + }).Return(&persistence.GetCurrentExecutionResponse{ + RunID: currentRunID, + // other attributes are not used + }, nil) + + currentStartEvent := &shared.HistoryEvent{ + EventId: common.Int64Ptr(common.FirstEventID), + EventType: shared.EventTypeWorkflowExecutionStarted.Ptr(), + WorkflowExecutionStartedEventAttributes: &shared.WorkflowExecutionStartedEventAttributes{ + ContinuedExecutionRunId: common.StringPtr(targetRunID), + // other attributes are not used + }, + } + currentStartEventBatch := persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), []*shared.HistoryEvent{currentStartEvent}) + serializedStartEventBatch, err := persistence.NewJSONHistorySerializer().Serialize(currentStartEventBatch) + s.Nil(err) + s.mockHistoryMgr.On("GetWorkflowExecutionHistory", &persistence.GetWorkflowExecutionHistoryRequest{ + DomainID: domainID, + Execution: shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(currentRunID), + }, + FirstEventID: common.FirstEventID, + NextEventID: common.FirstEventID + 1, + PageSize: defaultHistoryPageSize, + NextPageToken: nil, + }).Return(&persistence.GetWorkflowExecutionHistoryResponse{ + Events: []persistence.SerializedHistoryEventBatch{*serializedStartEventBatch}, + NextPageToken: nil, + }, nil) + + // return nil, to trigger the history engine to return err, so we can assert on it + // this is to save a lot of meaningless mock, since we are not testing functionality of history engine + msBuilderCurrent.On("AddWorkflowExecutionTerminatedEvent", mock.Anything).Return(nil) + + err = s.historyReplicator.conflictResolutionTerminateContinueAsNew(msBuilderTarget) + s.NotNil(err) + _, ok := err.(*shared.InternalServiceError) + s.True(ok) +}