Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Reset Workflow Continue As New Case #853

Merged
merged 2 commits into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 190 additions & 67 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,10 @@ func (r *historyReplicator) ApplyStartEvent(context *workflowExecutionContext, r

func (r *historyReplicator) ApplyOtherEventsMissingMutableState(domainID string, workflowID string, incomingVersion int64, logger bark.Logger) error {
// we need to check the current workflow execution
currentContext, release, err := r.historyCache.getOrCreateWorkflowExecution(
domainID,
// only use the workflow ID, to get the current running one
shared.WorkflowExecution{WorkflowId: common.StringPtr(workflowID)},
)
currentRunID, currentLastWriteVersion, _, err := r.getCurrentWorkflowInfo(domainID, workflowID)
if err != nil {
return err
}
currentMsBuilder, err := currentContext.loadWorkflowExecution()
if err != nil {
// no matter what error happen, we need to retry
release(err)
return err
}
currentLastWriteVersion := currentMsBuilder.GetLastWriteVersion()
currentRunID := currentMsBuilder.GetExecutionInfo().RunID
release(nil)

// we can also use the start version
if currentLastWriteVersion > incomingVersion {
Expand Down Expand Up @@ -240,45 +227,55 @@ func (r *historyReplicator) ApplyOtherEventsVersionChecking(context *workflowExe
r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.StaleReplicationEventsCounter)
return nil, nil
}
if rState.LastWriteVersion < incomingVersion {
// Check if this is the first event after failover
logger.Infof("First Event after replication. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v, IncomingV: %v.",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, incomingVersion)
previousActiveCluster := r.clusterMetadata.ClusterNameForFailoverVersion(rState.LastWriteVersion)
ri, ok := replicationInfo[previousActiveCluster]
if !ok {
logger.Errorf("No ReplicationInfo Found For Previous Active Cluster. Previous Active Cluster: %v, Request Source Cluster: %v, Request ReplicationInfo: %v.",
previousActiveCluster, request.GetSourceCluster(), request.ReplicationInfo)
// TODO: Handle missing replication information, #840
// Returning BadRequestError to force the message to land into DLQ
return nil, ErrMissingReplicationInfo
}

// Detect conflict
if ri.GetLastEventId() > rState.LastWriteEventID {
// if there is any bug in the replication protocol or implementation, this case can happen
logger.Errorf("Conflict detected, but cannot resolve. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID,
previousActiveCluster, ri.GetVersion(), ri.GetLastEventId())
// Returning BadRequestError to force the message to land into DLQ
return nil, ErrCorruptedReplicationInfo
if rState.LastWriteVersion == incomingVersion {
// for ri.GetLastEventId() == rState.LastWriteEventID, ideally we should not do anything
return msBuilder, nil
}

// we have rState.LastWriteVersion < incomingVersion

// Check if this is the first event after failover
logger.Infof("First Event after replication. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v, IncomingV: %v.",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, incomingVersion)
previousActiveCluster := r.clusterMetadata.ClusterNameForFailoverVersion(rState.LastWriteVersion)
ri, ok := replicationInfo[previousActiveCluster]
if !ok {
logger.Errorf("No ReplicationInfo Found For Previous Active Cluster. Previous Active Cluster: %v, Request Source Cluster: %v, Request ReplicationInfo: %v.",
previousActiveCluster, request.GetSourceCluster(), request.ReplicationInfo)
// TODO: Handle missing replication information, #840
// Returning BadRequestError to force the message to land into DLQ
return nil, ErrMissingReplicationInfo
}

// Detect conflict
if ri.GetLastEventId() > rState.LastWriteEventID {
// if there is any bug in the replication protocol or implementation, this case can happen
logger.Errorf("Conflict detected, but cannot resolve. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID,
previousActiveCluster, ri.GetVersion(), ri.GetLastEventId())
// Returning BadRequestError to force the message to land into DLQ
return nil, ErrCorruptedReplicationInfo
}

if ri.GetLastEventId() < rState.LastWriteEventID {
logger.Infof("Conflict detected. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID,
previousActiveCluster, ri.GetVersion(), ri.GetLastEventId())
r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.HistoryConflictsCounter)

// handling edge case when resetting a workflow, and this workflow has done continue
// we need to terminate the continue as new-ed workflow
err = r.conflictResolutionTerminateContinueAsNew(msBuilder)
if err != nil {
return nil, err
}

if ri.GetLastEventId() < rState.LastWriteEventID {
logger.Infof("Conflict detected. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID,
previousActiveCluster, ri.GetVersion(), ri.GetLastEventId())
r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.HistoryConflictsCounter)

resolver := r.getNewConflictResolver(context, logger)
msBuilder, err = resolver.reset(uuid.New(), ri.GetLastEventId(), msBuilder.GetExecutionInfo().StartTimestamp)
logger.Infof("Completed Resetting of workflow execution. NextEventID: %v. Err: %v", msBuilder.GetNextEventID(), err)
if err != nil {
return nil, err
}
resolver := r.getNewConflictResolver(context, logger)
msBuilder, err = resolver.reset(uuid.New(), ri.GetLastEventId(), msBuilder.GetExecutionInfo().StartTimestamp)
logger.Infof("Completed Resetting of workflow execution. NextEventID: %v. Err: %v", msBuilder.GetNextEventID(), err)
if err != nil {
return nil, err
}

// for ri.GetLastEventId() == rState.LastWriteEventID, ideally we should not do anything
}
return msBuilder, nil
}
Expand Down Expand Up @@ -587,34 +584,116 @@ func (r *historyReplicator) replicateWorkflowStarted(context *workflowExecutionC
// whether the remote active cluster is aware of the current running workflow,
// the only thing we can do is to terminate the current workflow and
// start the new workflow from the request
domainEntry, err := r.domainCache.GetDomainByID(domainID)
if err != nil {
return err
}

// same workflow ID, same shard
err = r.historyEngine.TerminateWorkflowExecution(ctx.Background(), &h.TerminateWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
TerminateRequest: &shared.TerminateWorkflowExecutionRequest{
Domain: common.StringPtr(domainEntry.GetInfo().Name),
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(executionInfo.WorkflowID),
RunId: common.StringPtr(currentRunID),
},
Reason: common.StringPtr("Terminate Workflow Due To Version Conflict."),
Details: nil,
Identity: common.StringPtr("worker-service"),
},
})
err = r.terminateWorkflow(domainID, executionInfo.WorkflowID, currentRunID)
if err != nil {
if _, ok := err.(*shared.EntityNotExistsError); !ok {
return err
}
// if workflow is completed just when the call is made, will get EntityNotExistsError
// we are not sure whether the workflow to be terminated ends with continue as new or not
// so when encounter EntityNotExistsError, just contiue to execute, if err occurs,
// there will be retry on the worker level
}
isBrandNew = false
return createWorkflow(isBrandNew, currentRunID)
}

func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(msBuilder mutableState) (retError error) {
// this function aims to solve the edge case when this workflow, when going through
// reset, has already started a next generation (continue as new-ed workflow)

if msBuilder.IsWorkflowExecutionRunning() {
// workflow still running, no continued as new edge case to solve
return nil
}

if msBuilder.GetExecutionInfo().CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew {
// workflow close status not being continue as new
return nil
}

// the close status is continue as new
// so it is impossible that the current running workflow (one with the same workflow ID)
// has the same run ID as "this" workflow
// meaning there is no chance that when we grab the current running workflow (same workflow ID)
// and enounter a dead lock
domainID := msBuilder.GetExecutionInfo().DomainID
workflowID := msBuilder.GetExecutionInfo().WorkflowID
currentRunID, _, closeStatus, err := r.getCurrentWorkflowInfo(domainID, workflowID)
if err != nil {
return err
}
if closeStatus != persistence.WorkflowCloseStatusNone {
// current workflow finished
// note, it is impassoble that a current workflow ends with continue as new as close status
return nil
}

getPrevRunID := func(domainID string, workflowID string, runID string) (string, error) {
response, err := r.historyMgr.GetWorkflowExecutionHistory(&persistence.GetWorkflowExecutionHistoryRequest{
DomainID: domainID,
Execution: shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
FirstEventID: common.FirstEventID,
NextEventID: common.FirstEventID + 1,
PageSize: defaultHistoryPageSize,
NextPageToken: nil,
})
if err != nil {
return "", err
}
if len(response.Events) == 0 {
return "", fmt.Errorf("no history found for domainID: %v, workflowID: %v, runID: %v",
domainID, workflowID, runID)
}
serializedHistoryEventBatch := response.Events[0]
persistence.SetSerializedHistoryDefaults(&serializedHistoryEventBatch)
serializer, err := persistence.NewHistorySerializerFactory().Get(serializedHistoryEventBatch.EncodingType)
if err != nil {
return "", err
}
history, err := serializer.Deserialize(&serializedHistoryEventBatch)
if err != nil {
return "", err
}
if len(history.Events) == 0 {
return "", fmt.Errorf("no history events found for domainID: %v, workflowID: %v, runID: %v",
domainID, workflowID, runID)
}

return history.Events[0].WorkflowExecutionStartedEventAttributes.GetContinuedExecutionRunId(), nil
}

targetRunID := msBuilder.GetExecutionInfo().RunID
runID := currentRunID
for err == nil && runID != "" && runID != targetRunID {
// using the current running workflow to trace back (assuming continue as new)
runID, err = getPrevRunID(domainID, workflowID, runID)
}
if err != nil {
return err
}
if runID == "" {
// cannot relate the current running workflow to the workflow which events are being resetted.
return nil
}

// we have runID == targetRunID
// meaning the current workflow is a result of continue as new of the workflow to be resetted

// if workflow is completed just when the call is made, will get EntityNotExistsError
// we are not sure whether the workflow to be terminated ends with continue as new or not
// so when encounter EntityNotExistsError, as well as other error, just return the err
// we will retry on the worker level

// same workflow ID, same shard
return r.terminateWorkflow(domainID, workflowID, currentRunID)
}

func (r *historyReplicator) Serialize(history *shared.History) (*persistence.SerializedHistoryEventBatch, error) {
eventBatch := persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), history.Events)
h, err := r.historySerializer.Serialize(eventBatch)
Expand All @@ -624,6 +703,50 @@ func (r *historyReplicator) Serialize(history *shared.History) (*persistence.Ser
return h, nil
}

func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID string) (runID string, lastWriteVersion int64, closeStatus int, retError error) {
// we need to check the current workflow execution
context, release, err := r.historyCache.getOrCreateWorkflowExecution(
domainID,
// only use the workflow ID, to get the current running one
shared.WorkflowExecution{WorkflowId: common.StringPtr(workflowID)},
)
if err != nil {
return "", common.EmptyVersion, persistence.WorkflowCloseStatusNone, err
}
defer func() { release(retError) }()

msBuilder, err := context.loadWorkflowExecution()
if err != nil {
// no matter what error happen, we need to retry
return "", common.EmptyVersion, persistence.WorkflowCloseStatusNone, err
}
lastWriteVersion = msBuilder.GetLastWriteVersion()
runID = msBuilder.GetExecutionInfo().RunID
closeStatus = msBuilder.GetExecutionInfo().CloseStatus
return
}

func (r *historyReplicator) terminateWorkflow(domainID string, workflowID string, runID string) error {
domainEntry, err := r.domainCache.GetDomainByID(domainID)
if err != nil {
return err
}
// same workflow ID, same shard
return r.historyEngine.TerminateWorkflowExecution(ctx.Background(), &h.TerminateWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
TerminateRequest: &shared.TerminateWorkflowExecutionRequest{
Domain: common.StringPtr(domainEntry.GetInfo().Name),
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
Reason: common.StringPtr("Terminate Workflow Due To Version Conflict."),
Details: nil,
Identity: common.StringPtr("worker-service"),
},
})
}

func (r *historyReplicator) notify(clusterName string, now time.Time, transferTasks []persistence.Task,
timerTasks []persistence.Task) {
r.shard.SetCurrentTime(clusterName, now)
Expand Down
Loading