diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index c2eed0a1907..6b0720a0d2f 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -359,7 +359,7 @@ func (r *historyReplicator) ApplyOtherEvents(ctx context.Context, context *workf history := request.GetHistory() lastEvent := history.Events[len(history.Events)-1] now := time.Unix(0, lastEvent.GetTimestamp()) - return context.updateHelper(nil, nil, nil, false, sourceCluster, lastWriteVersion, transactionID, now) + return context.updateHelper(nil, nil, transactionID, now, false, nil, sourceCluster) } // Apply the replication task diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index 4b1f99ddaf3..ca6b3b4865d 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -145,9 +145,8 @@ func (c *workflowExecutionContext) replicateWorkflowExecution(request *h.Replica nextEventID := lastEventID + 1 c.msBuilder.GetExecutionInfo().NextEventID = nextEventID - builder := newHistoryBuilderFromEvents(request.History.Events, c.logger) - return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), request.GetVersion(), - transactionID, now) + standbyHistoryBuilder := newHistoryBuilderFromEvents(request.History.Events, c.logger) + return c.updateHelper(transferTasks, timerTasks, transactionID, now, false, standbyHistoryBuilder, request.GetSourceCluster()) } func (c *workflowExecutionContext) updateVersion() error { @@ -188,12 +187,12 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi } now := time.Now() - return c.updateHelper(nil, transferTasks, timerTasks, c.createReplicationTask, "", currentVersion, transactionID, now) + return c.updateHelper(transferTasks, timerTasks, transactionID, now, c.createReplicationTask, nil, "") } -func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task, - timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string, lastWriteVersion int64, - transactionID int64, now time.Time) (errRet error) { +func (c *workflowExecutionContext) updateHelper(transferTasks []persistence.Task, timerTasks []persistence.Task, + transactionID int64, now time.Time, + createReplicationTask bool, standbyHistoryBuilder *historyBuilder, sourceCluster string) (errRet error) { defer func() { if errRet != nil { @@ -208,40 +207,66 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe return err } - // Replicator passes in a custom builder as it already has the events - if builder == nil { - // If no builder is passed in then use the one as part of the updates - builder = updates.newEventsBuilder - } executionInfo := c.msBuilder.GetExecutionInfo() - hasNewHistoryEvents := len(builder.history) > 0 + + // this builder has events generated locally + hasNewStandbyHistoryEvents := standbyHistoryBuilder != nil && len(standbyHistoryBuilder.history) > 0 + activeHistoryBuilder := updates.newEventsBuilder + hasNewActiveHistoryEvents := len(activeHistoryBuilder.history) > 0 // Replication state should only be updated after the UpdateSession is closed. IDs for certain events are only // generated on CloseSession as they could be buffered events. The value for NextEventID will be wrong on // mutable state if read before flushing the buffered events. crossDCEnabled := c.msBuilder.GetReplicationState() != nil - if crossDCEnabled && hasNewHistoryEvents { - lastEventID := c.msBuilder.GetNextEventID() - 1 - c.msBuilder.UpdateReplicationStateLastEventID(sourceCluster, lastWriteVersion, lastEventID) + if crossDCEnabled { + // always standby history first + if hasNewStandbyHistoryEvents { + lastEvent := standbyHistoryBuilder.history[len(standbyHistoryBuilder.history)-1] + c.msBuilder.UpdateReplicationStateLastEventID( + sourceCluster, + lastEvent.GetVersion(), + lastEvent.GetEventId(), + ) + } + + if hasNewActiveHistoryEvents { + c.msBuilder.UpdateReplicationStateLastEventID( + "", + c.msBuilder.GetCurrentVersion(), + executionInfo.NextEventID, + ) + } + } + + // always standby history first + if hasNewStandbyHistoryEvents { + firstEvent := standbyHistoryBuilder.GetFirstEvent() + // Note: standby events has no transient decision events + err = c.appendHistoryEvents(standbyHistoryBuilder, standbyHistoryBuilder.history, transactionID) + if err != nil { + return err + } + + executionInfo.LastFirstEventID = firstEvent.GetEventId() } // Some operations only update the mutable state. For example RecordActivityTaskHeartbeat. - if hasNewHistoryEvents { - firstEvent := builder.GetFirstEvent() + if hasNewActiveHistoryEvents { + firstEvent := activeHistoryBuilder.GetFirstEvent() // Transient decision events need to be written as a separate batch - if builder.HasTransientEvents() { - err = c.appendHistoryEvents(builder, builder.transientHistory, transactionID) + if activeHistoryBuilder.HasTransientEvents() { + err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.transientHistory, transactionID) if err != nil { return err } } - err = c.appendHistoryEvents(builder, builder.history, transactionID) + err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.history, transactionID) if err != nil { return err } - executionInfo.LastFirstEventID = *firstEvent.EventId + executionInfo.LastFirstEventID = firstEvent.GetEventId() } continueAsNew := updates.continueAsNew @@ -262,7 +287,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe var replicationTasks []persistence.Task // Check if the update resulted in new history events before generating replication task - if hasNewHistoryEvents && createReplicationTask { + if hasNewActiveHistoryEvents && createReplicationTask { // Let's create a replication task as part of this update replicationTasks = append(replicationTasks, c.msBuilder.CreateReplicationTask()) }