Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix failover should flush buffered events #991

Merged
merged 1 commit into from
Jul 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (r *historyReplicator) ApplyOtherEvents(ctx context.Context, context *workf
history := request.GetHistory()
lastEvent := history.Events[len(history.Events)-1]
now := time.Unix(0, lastEvent.GetTimestamp())
return context.updateHelper(nil, nil, nil, false, sourceCluster, lastWriteVersion, transactionID, now)
return context.updateHelper(nil, nil, transactionID, now, false, nil, sourceCluster)
}

// Apply the replication task
Expand Down
71 changes: 48 additions & 23 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ func (c *workflowExecutionContext) replicateWorkflowExecution(request *h.Replica
nextEventID := lastEventID + 1
c.msBuilder.GetExecutionInfo().NextEventID = nextEventID

builder := newHistoryBuilderFromEvents(request.History.Events, c.logger)
return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), request.GetVersion(),
transactionID, now)
standbyHistoryBuilder := newHistoryBuilderFromEvents(request.History.Events, c.logger)
return c.updateHelper(transferTasks, timerTasks, transactionID, now, false, standbyHistoryBuilder, request.GetSourceCluster())
}

func (c *workflowExecutionContext) updateVersion() error {
Expand Down Expand Up @@ -188,12 +187,12 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi
}

now := time.Now()
return c.updateHelper(nil, transferTasks, timerTasks, c.createReplicationTask, "", currentVersion, transactionID, now)
return c.updateHelper(transferTasks, timerTasks, transactionID, now, c.createReplicationTask, nil, "")
}

func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task,
timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string, lastWriteVersion int64,
transactionID int64, now time.Time) (errRet error) {
func (c *workflowExecutionContext) updateHelper(transferTasks []persistence.Task, timerTasks []persistence.Task,
transactionID int64, now time.Time,
createReplicationTask bool, standbyHistoryBuilder *historyBuilder, sourceCluster string) (errRet error) {

defer func() {
if errRet != nil {
Expand All @@ -208,40 +207,66 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe
return err
}

// Replicator passes in a custom builder as it already has the events
if builder == nil {
// If no builder is passed in then use the one as part of the updates
builder = updates.newEventsBuilder
}
executionInfo := c.msBuilder.GetExecutionInfo()
hasNewHistoryEvents := len(builder.history) > 0

// this builder has events generated locally
hasNewStandbyHistoryEvents := standbyHistoryBuilder != nil && len(standbyHistoryBuilder.history) > 0
activeHistoryBuilder := updates.newEventsBuilder
hasNewActiveHistoryEvents := len(activeHistoryBuilder.history) > 0

// Replication state should only be updated after the UpdateSession is closed. IDs for certain events are only
// generated on CloseSession as they could be buffered events. The value for NextEventID will be wrong on
// mutable state if read before flushing the buffered events.
crossDCEnabled := c.msBuilder.GetReplicationState() != nil
if crossDCEnabled && hasNewHistoryEvents {
lastEventID := c.msBuilder.GetNextEventID() - 1
c.msBuilder.UpdateReplicationStateLastEventID(sourceCluster, lastWriteVersion, lastEventID)
if crossDCEnabled {
// always standby history first
if hasNewStandbyHistoryEvents {
lastEvent := standbyHistoryBuilder.history[len(standbyHistoryBuilder.history)-1]
c.msBuilder.UpdateReplicationStateLastEventID(
sourceCluster,
lastEvent.GetVersion(),
lastEvent.GetEventId(),
)
}

if hasNewActiveHistoryEvents {
c.msBuilder.UpdateReplicationStateLastEventID(
"",
c.msBuilder.GetCurrentVersion(),
executionInfo.NextEventID,
)
}
}

// always standby history first
if hasNewStandbyHistoryEvents {
firstEvent := standbyHistoryBuilder.GetFirstEvent()
// Note: standby events has no transient decision events
err = c.appendHistoryEvents(standbyHistoryBuilder, standbyHistoryBuilder.history, transactionID)
if err != nil {
return err
}

executionInfo.LastFirstEventID = firstEvent.GetEventId()
}

// Some operations only update the mutable state. For example RecordActivityTaskHeartbeat.
if hasNewHistoryEvents {
firstEvent := builder.GetFirstEvent()
if hasNewActiveHistoryEvents {
firstEvent := activeHistoryBuilder.GetFirstEvent()
// Transient decision events need to be written as a separate batch
if builder.HasTransientEvents() {
err = c.appendHistoryEvents(builder, builder.transientHistory, transactionID)
if activeHistoryBuilder.HasTransientEvents() {
err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.transientHistory, transactionID)
if err != nil {
return err
}
}

err = c.appendHistoryEvents(builder, builder.history, transactionID)
err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.history, transactionID)
if err != nil {
return err
}

executionInfo.LastFirstEventID = *firstEvent.EventId
executionInfo.LastFirstEventID = firstEvent.GetEventId()
}

continueAsNew := updates.continueAsNew
Expand All @@ -262,7 +287,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe

var replicationTasks []persistence.Task
// Check if the update resulted in new history events before generating replication task
if hasNewHistoryEvents && createReplicationTask {
if hasNewActiveHistoryEvents && createReplicationTask {
// Let's create a replication task as part of this update
replicationTasks = append(replicationTasks, c.msBuilder.CreateReplicationTask())
}
Expand Down