Skip to content

Commit

Permalink
Bugfix: when failover happen and workflow has pending decision, new e…
Browse files Browse the repository at this point in the history
…vents on the active dide (aftter the failover) will be buffered. workflow execution context should first store the standby events (after the failover), then flush the active events. (#991)
  • Loading branch information
wxing1292 authored Jul 23, 2018
1 parent 65f9834 commit 09a1a9c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 24 deletions.
2 changes: 1 addition & 1 deletion service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (r *historyReplicator) ApplyOtherEvents(ctx context.Context, context *workf
history := request.GetHistory()
lastEvent := history.Events[len(history.Events)-1]
now := time.Unix(0, lastEvent.GetTimestamp())
return context.updateHelper(nil, nil, nil, false, sourceCluster, lastWriteVersion, transactionID, now)
return context.updateHelper(nil, nil, transactionID, now, false, nil, sourceCluster)
}

// Apply the replication task
Expand Down
71 changes: 48 additions & 23 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ func (c *workflowExecutionContext) replicateWorkflowExecution(request *h.Replica
nextEventID := lastEventID + 1
c.msBuilder.GetExecutionInfo().NextEventID = nextEventID

builder := newHistoryBuilderFromEvents(request.History.Events, c.logger)
return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), request.GetVersion(),
transactionID, now)
standbyHistoryBuilder := newHistoryBuilderFromEvents(request.History.Events, c.logger)
return c.updateHelper(transferTasks, timerTasks, transactionID, now, false, standbyHistoryBuilder, request.GetSourceCluster())
}

func (c *workflowExecutionContext) updateVersion() error {
Expand Down Expand Up @@ -188,12 +187,12 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi
}

now := time.Now()
return c.updateHelper(nil, transferTasks, timerTasks, c.createReplicationTask, "", currentVersion, transactionID, now)
return c.updateHelper(transferTasks, timerTasks, transactionID, now, c.createReplicationTask, nil, "")
}

func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task,
timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string, lastWriteVersion int64,
transactionID int64, now time.Time) (errRet error) {
func (c *workflowExecutionContext) updateHelper(transferTasks []persistence.Task, timerTasks []persistence.Task,
transactionID int64, now time.Time,
createReplicationTask bool, standbyHistoryBuilder *historyBuilder, sourceCluster string) (errRet error) {

defer func() {
if errRet != nil {
Expand All @@ -208,40 +207,66 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe
return err
}

// Replicator passes in a custom builder as it already has the events
if builder == nil {
// If no builder is passed in then use the one as part of the updates
builder = updates.newEventsBuilder
}
executionInfo := c.msBuilder.GetExecutionInfo()
hasNewHistoryEvents := len(builder.history) > 0

// this builder has events generated locally
hasNewStandbyHistoryEvents := standbyHistoryBuilder != nil && len(standbyHistoryBuilder.history) > 0
activeHistoryBuilder := updates.newEventsBuilder
hasNewActiveHistoryEvents := len(activeHistoryBuilder.history) > 0

// Replication state should only be updated after the UpdateSession is closed. IDs for certain events are only
// generated on CloseSession as they could be buffered events. The value for NextEventID will be wrong on
// mutable state if read before flushing the buffered events.
crossDCEnabled := c.msBuilder.GetReplicationState() != nil
if crossDCEnabled && hasNewHistoryEvents {
lastEventID := c.msBuilder.GetNextEventID() - 1
c.msBuilder.UpdateReplicationStateLastEventID(sourceCluster, lastWriteVersion, lastEventID)
if crossDCEnabled {
// always standby history first
if hasNewStandbyHistoryEvents {
lastEvent := standbyHistoryBuilder.history[len(standbyHistoryBuilder.history)-1]
c.msBuilder.UpdateReplicationStateLastEventID(
sourceCluster,
lastEvent.GetVersion(),
lastEvent.GetEventId(),
)
}

if hasNewActiveHistoryEvents {
c.msBuilder.UpdateReplicationStateLastEventID(
"",
c.msBuilder.GetCurrentVersion(),
executionInfo.NextEventID,
)
}
}

// always standby history first
if hasNewStandbyHistoryEvents {
firstEvent := standbyHistoryBuilder.GetFirstEvent()
// Note: standby events has no transient decision events
err = c.appendHistoryEvents(standbyHistoryBuilder, standbyHistoryBuilder.history, transactionID)
if err != nil {
return err
}

executionInfo.LastFirstEventID = firstEvent.GetEventId()
}

// Some operations only update the mutable state. For example RecordActivityTaskHeartbeat.
if hasNewHistoryEvents {
firstEvent := builder.GetFirstEvent()
if hasNewActiveHistoryEvents {
firstEvent := activeHistoryBuilder.GetFirstEvent()
// Transient decision events need to be written as a separate batch
if builder.HasTransientEvents() {
err = c.appendHistoryEvents(builder, builder.transientHistory, transactionID)
if activeHistoryBuilder.HasTransientEvents() {
err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.transientHistory, transactionID)
if err != nil {
return err
}
}

err = c.appendHistoryEvents(builder, builder.history, transactionID)
err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.history, transactionID)
if err != nil {
return err
}

executionInfo.LastFirstEventID = *firstEvent.EventId
executionInfo.LastFirstEventID = firstEvent.GetEventId()
}

continueAsNew := updates.continueAsNew
Expand All @@ -262,7 +287,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe

var replicationTasks []persistence.Task
// Check if the update resulted in new history events before generating replication task
if hasNewHistoryEvents && createReplicationTask {
if hasNewActiveHistoryEvents && createReplicationTask {
// Let's create a replication task as part of this update
replicationTasks = append(replicationTasks, c.msBuilder.CreateReplicationTask())
}
Expand Down

0 comments on commit 09a1a9c

Please sign in to comment.