Skip to content

Commit

Permalink
Fix flaky build id issue (#1338)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jan 9, 2024
1 parent 6244097 commit 1fe10d5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
21 changes: 10 additions & 11 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ type (
binaryChecksum string
sdkVersion string
sdkName string
buildID string
// Is null if there was no task completed event to read the build ID from (but may be
// empty string if there was, and it was empty)
buildID *string
}

finishedTask struct {
Expand Down Expand Up @@ -350,7 +352,7 @@ func (eh *history) nextTask() (*preparedTask, error) {

var markers []*historypb.HistoryEvent
var msgs []*protocolpb.Message
var buildID string
var buildID *string
if len(result) > 0 {
nextTaskEvents, err := eh.prepareTask()
if err != nil {
Expand Down Expand Up @@ -463,8 +465,9 @@ OrderEvents:
// Skip
default:
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
taskEvents.buildID = event.GetWorkflowTaskCompletedEventAttributes().
bidStr := event.GetWorkflowTaskCompletedEventAttributes().
GetWorkerVersion().GetBuildId()
taskEvents.buildID = &bidStr
} else if isPreloadMarkerEvent(event) {
taskEvents.markers = append(taskEvents.markers, event)
} else if attrs := event.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil {
Expand Down Expand Up @@ -983,7 +986,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
eventHandler.ResetLAWFTAttemptCounts()
eventHandler.sdkFlags.markSDKFlagsSent()

isQueryOnlyTask := workflowTask.task.StartedEventId == 0
w.workflowInfo.currentTaskBuildID = w.wth.workerBuildID
ProcessEvents:
for {
nextTask, err := reorderedHistory.nextTask()
Expand All @@ -995,7 +998,7 @@ ProcessEvents:
historyMessages := nextTask.msgs
flags := nextTask.flags
binaryChecksum := nextTask.binaryChecksum
currentBuildID := nextTask.buildID
nextTaskBuildId := nextTask.buildID
// Check if we are replaying so we know if we should use the messages in the WFT or the history
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
var msgs *eventMsgIndex
Expand Down Expand Up @@ -1026,12 +1029,8 @@ ProcessEvents:
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
if isReplay {
w.workflowInfo.currentTaskBuildID = currentBuildID
} else if !isReplay && !isQueryOnlyTask {
// Query only tasks should use the build ID from the workflow task, not the worker's
// build id, since the user cares about what affected workflow state.
w.workflowInfo.currentTaskBuildID = w.wth.workerBuildID
if isReplay && nextTaskBuildId != nil {
w.workflowInfo.currentTaskBuildID = *nextTaskBuildId
}
// Reset the mutable side effect markers recorded
eventHandler.mutableSideEffectsRecorded = nil
Expand Down
2 changes: 1 addition & 1 deletion test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
// Start workflow
wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.BuildIDWorkflow)
// Query to see that the build ID is 1.0
var lastBuildID string
res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
var lastBuildID string
ts.NoError(err)
ts.NoError(res.Get(&lastBuildID))
ts.Equal("1.0", lastBuildID)
Expand Down

0 comments on commit 1fe10d5

Please sign in to comment.