diff --git a/service/history/workflow_task_handler_callbacks.go b/service/history/workflow_task_handler_callbacks.go index 59494e8fbf8..f50bdefeee3 100644 --- a/service/history/workflow_task_handler_callbacks.go +++ b/service/history/workflow_task_handler_callbacks.go @@ -441,8 +441,9 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( return nil, serviceerror.NewNotFound("Workflow task not found.") } - if assignedBuildId := ms.GetAssignedBuildId(); assignedBuildId != "" { - // worker versioning is used, make sure the task was completed by the right build ID + if assignedBuildId := ms.GetAssignedBuildId(); assignedBuildId != "" && !ms.IsStickyTaskQueueSet() { + // Worker versioning is used, make sure the task was completed by the right build ID, unless we're using a + // sticky queue in which case Matching will not send the build ID wftStartedBuildId := ms.GetExecutionInfo().GetWorkflowTaskBuildId() wftCompletedBuildId := request.GetWorkerVersionStamp().GetBuildId() if wftCompletedBuildId != wftStartedBuildId { diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index da4829073d2..ad9776ba373 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -399,7 +399,11 @@ func checkVersionForStickyPoll(data *persistencespb.VersioningData, caps *common // A poller is using a build ID, but we don't know about that build ID. See comments in // lookupVersionSetForPoll. If we consider it the default for its set, then we should // leave it on the sticky queue here. - return false, nil + // We set return true for all sticky tasks until old versioning is cleaned up. + // this value is used by matching_engine for deciding if it should pass the worker build ID + // to history in the recordStart call or not. We don't need to pass build ID for sticky + // tasks as no redirect happen in a sticky queue. + return true, nil } set := data.VersionSets[setIdx] lastIndex := len(set.BuildIds) - 1 diff --git a/tests/versioning.go b/tests/versioning.go index 3e12a37efc7..34a2a89d30c 100644 --- a/tests/versioning.go +++ b/tests/versioning.go @@ -29,16 +29,19 @@ import ( "context" "errors" "fmt" + "math/rand" + "strconv" "strings" "sync/atomic" "time" "github.com/dgryski/go-farm" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" + "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/tqid" "go.temporal.io/server/common/worker_versioning" - "google.golang.org/protobuf/types/known/durationpb" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" @@ -51,6 +54,7 @@ import ( "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/matchingservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -705,8 +709,14 @@ func (s *VersioningIntegSuite) workflowStaysInBuildId() { var out string s.NoError(run.Get(ctx, &out)) s.Equal("done!", out) - s.validateWorkflowBuildId(ctx, run.GetID(), run.GetRunID(), v1, true, v1, "", nil) - s.validateWorkflowEventsVersionStamps(ctx, run.GetID(), run.GetRunID(), []string{v1, v1, v1, v1, v1}, "") + s.validateWorkflowBuildIds(ctx, run.GetID(), run.GetRunID(), v1, true, v1, "", nil) + s.validateWorkflowEventsVersionStamps(ctx, run.GetID(), run.GetRunID(), []string{ + v1, // WFT + v1, // activity + // v1, skipped because it belongs to sticky queue + v1, // activity + // v1, skipped because it belongs to sticky queue + }, "") } func (s *VersioningIntegSuite) TestUnversionedWorkflowStaysUnversioned() { @@ -1157,17 +1167,23 @@ func (s *VersioningIntegSuite) independentActivityTaskAssignmentSpooled(versione s.NoError(run.Get(ctx, &out)) s.Equal("done!", out) - wfBuild := "" if versionedWf { - wfBuild = wfV1 + s.validateWorkflowEventsVersionStamps( + ctx, run.GetID(), run.GetRunID(), []string{ + wfV1, + v3, // succeeded activity + // wfV1, removed because it's on a sticky queue + }, "", + ) + } else { + s.validateWorkflowEventsVersionStamps( + ctx, run.GetID(), run.GetRunID(), []string{ + "", + v3, // succeeded activity + "", + }, "", + ) } - s.validateWorkflowEventsVersionStamps( - ctx, run.GetID(), run.GetRunID(), []string{ - wfBuild, - v3, // succeeded activity - wfBuild, - }, "", - ) } func (s *VersioningIntegSuite) TestIndependentActivityTaskAssignment_SyncMatch_VersionedWorkflow() { @@ -1313,18 +1329,25 @@ func (s *VersioningIntegSuite) independentActivityTaskAssignmentSyncMatch(versio s.NoError(run.Get(ctx, &out)) s.Equal("done!", out) - wfBuild := "" if versionedWf { - wfBuild = wfV1 + s.validateWorkflowBuildIds(ctx, run.GetID(), run.GetRunID(), wfV1, true, wfV1, "", nil) + s.validateWorkflowEventsVersionStamps( + ctx, run.GetID(), run.GetRunID(), []string{ + wfV1, + v3, // succeeded activity + // wfV1, skipping stamp because this is a sticky queue task + }, "", + ) + } else { + s.validateWorkflowBuildIds(ctx, run.GetID(), run.GetRunID(), "", true, wfV1, "", nil) + s.validateWorkflowEventsVersionStamps( + ctx, run.GetID(), run.GetRunID(), []string{ + "", + v3, // succeeded activity + "", + }, "", + ) } - s.validateWorkflowBuildId(ctx, run.GetID(), run.GetRunID(), wfBuild, true, wfV1, "", nil) - s.validateWorkflowEventsVersionStamps( - ctx, run.GetID(), run.GetRunID(), []string{ - wfBuild, - v3, // succeeded activity - wfBuild, - }, "", - ) } func (s *VersioningIntegSuite) TestWorkflowTaskRedirectInRetryFirstTask() { @@ -1474,9 +1497,9 @@ func (s *VersioningIntegSuite) testWorkflowTaskRedirectInRetry(firstTask bool) { } if !firstTask { expectedStamps = []string{ - v1, // first wf task - v1, // activity task - v1, // failed wf task on sticky queue + v1, // first wf task + v1, // activity task + // v1, // skipping stamp for failed wf task on sticky queue v1, // failed wf task on normal queue v11, // timed out wf task show up in history because they happened on a different build id v12, // succeeded wf task @@ -2063,7 +2086,235 @@ func (s *VersioningIntegSuite) TestDispatchActivityUpgrade() { s.NoError(run.Get(ctx, &out)) s.Equal("v1.1v1.2", out) - s.validateWorkflowBuildId(ctx, run.GetID(), run.GetRunID(), v12, true, v12, "", []string{v1, v11}) + s.validateWorkflowBuildIds(ctx, run.GetID(), run.GetRunID(), v12, true, v12, "", []string{v1, v11}) +} + +func (s *VersioningIntegSuite) TestRedirectWithConcurrentActivities() { + // Testing that wf never "goes back" to older build ID in presence of concurrent activities and random failures. + // + // SETUP: + // 1- Run workers with build IDs v1.0 to v1.9. + // 2- Workflows runs a nested loop of 10 * `activityRuns`. First loop is per build ID, second is to run parallel + // activities for each build ID. + // 3- Activities fail by `activityErrorRate` probability. Otherwise, they return success after a random wait. + // 4- The first activity seeing a build ID (i.e. being run by that build ID) removes the redirect rule targeting + // that build ID to test the workflow behaves correctly in absence of the applied redirect rule. + // 5- One activity in each batch is responsible to add redirect rule to the next build ID. since, there is random + // delay in activities, more activities of the same batch likely start both before and after this activity. + // 6- Workflow waits for completion of all activities in a batch before going to the next one. + // + // VERIFYING: + // 1- After each activity is completed, in workflow we verify that it is not completed by a build ID newer of that + // of the workflow worker. + // 2- Workflow finishes. I.e. no task is dropped without rescheduling due to build ID mismatch. + // 3- At the end of the test we verify that the wf and activity started event build IDs are compatible with + // workflows output (which contains the build ID of each completed activity) + // 4- Redirect counter of all started events is valid + // 5- Redirect rules were applied at least to one activity. + // 6- At least one activity was retried. + // 7- Some history events are unordered based on event timestamp (due to parallel activity random delay) + + // Reduce user data long poll time for faster propagation of the versioning data. This is needed because of the + // exponential minWaitTime logic in userDataManagerImpl that gets triggered because rules change very fast in + // this test. + dc := s.testCluster.host.dcClient + dc.OverrideValue(s.T(), dynamicconfig.MatchingGetUserDataLongPollTimeout, 2*time.Second) + + tq := s.randomizeStr(s.T().Name()) + v1 := s.prefixed("v1.0") + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + rule := s.addAssignmentRule(ctx, tq, v1) + s.waitForAssignmentRulePropagation(ctx, tq, rule) + + versions := []string{v1} + for v := 1; v <= 9; v++ { + versions = append(versions, s.prefixed("v1."+strconv.Itoa(v))) + } + + activityErrorRate := .2 // chance of each activity attempt fail + activityRuns := int32(10) // run the activity ~10 times on each version + + activityCounter := atomic.Int32{} + triggerRedirectAtActivityRun := atomic.Int32{} + triggerRedirectAtActivityRun.Store(1) + + lastRedirectTarget := atomic.Value{} + lastRedirectTarget.Store(versions[0] + " redirect cleaned") + var workers []worker.Worker + + act := func(version string, runId int32) (string, error) { + runs := activityCounter.Add(1) + s.T().Logf("Starting activity %d on %s at %d\n", runId, version, runs) + if lastRedirectTarget.CompareAndSwap(version, version+" observed") && version != versions[0] { + // The last redirect rule is applied and observed by an activity, now delete it to make sure wf keeps using + // the right build ID after applying the redirect rule, even when the rule is not present anymore. + index, err := strconv.Atoi(version[len(version)-1:]) // get the last char of version is the index in the versions array + s.NoError(err) + s.T().Logf("Removing redirect from %s to %s \n", versions[index-1], version) + s.removeRedirectRule(ctx, tq, versions[index-1]) + lastRedirectTarget.CompareAndSwap(version+" observed", version+" redirect cleaned") + } + if rand.Float64() < activityErrorRate { + return "", errors.New("intentionally failing activity") + } + if triggerRedirectAtActivityRun.Load() == runId { + // When enough activities are run using the current version, add redirect rule to the next version. + v := runId / activityRuns + if int(v+1) < len(versions) { + // wait for last redirect rule to be cleaned up + for !lastRedirectTarget.CompareAndSwap(versions[v]+" redirect cleaned", versions[v+1]) { + } + s.T().Logf("Adding redirect from %s to %s at %d\n", versions[v], versions[v+1], runs) + s.addRedirectRule(ctx, tq, versions[v], versions[v+1]) + // Intentionally do not wait for propagation of the rules to partitions. Waiting will linger this + // activity and allows all the other concurrent activities to finish, leaving only the WFT task to + // see the redirect rule for the first time. + triggerRedirectAtActivityRun.CompareAndSwap(runId, runId+activityRuns) + } + } + + // Add random sleep to simulate network delay + //nolint:forbidigo + time.Sleep(time.Duration(int64(rand.Intn(50)) * int64(time.Millisecond))) + s.T().Logf("Completing activity %d on %s at %d\n", runId, version, runs) + return version, nil + } + + wf := func(wfCtx workflow.Context, wfVersion string) (string, error) { + var res []string + // because of rule propagation delay we run for more than 10 cycles to make sure all versions are seen + for i := 0; i <= 12; i++ { + var futures []workflow.Future + for j := 0; j < int(activityRuns); j++ { + f := workflow.ExecuteActivity(workflow.WithActivityOptions( + wfCtx, workflow.ActivityOptions{ + DisableEagerExecution: true, + VersioningIntent: temporal.VersioningIntentCompatible, + StartToCloseTimeout: 200 * time.Millisecond, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 10 * time.Millisecond, + BackoffCoefficient: 1, + }, + }), "act", i*int(activityRuns)+j) + futures = append(futures, f) + } + + for _, f := range futures { + var activityVersion string + err := f.Get(wfCtx, &activityVersion) + s.NoError(err) + res = append(res, activityVersion) + // The output of a newer build ID should never be sent to a wf worker of an older build ID + s.Assert().GreaterOrEqual(wfVersion, activityVersion) + // TODO: uncomment this check once workflow.GetInfo(wfCtx).GetCurrentBuildID() returns correct value + // based on last started task build ID, not last completed task build ID. + // s.Assert().GreaterOrEqual(workflow.GetInfo(wfCtx).GetCurrentBuildID(), activityVersion) + } + } + + return strings.Join(res, " "), nil + } + + // run all workers + for i := 0; i <= 9; i++ { + v := versions[i] + w := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: v, + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + MaxConcurrentActivityTaskPollers: 2, + // Limit the number of concurrent activities so not all scheduled activities are immediately started. + MaxConcurrentActivityExecutionSize: 2, + }) + w.RegisterWorkflowWithOptions( + func(ctx workflow.Context) (string, error) { + return wf(ctx, v) + }, + workflow.RegisterOptions{Name: "wf"}) + w.RegisterActivityWithOptions( + func(runId int32) (string, error) { + return act(v, runId) + }, + activity.RegisterOptions{Name: "act"}) + s.NoError(w.Start()) + workers = append(workers, w) + } + + run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, "wf") + s.NoError(err) + + // Workflow should finish, otherwise it may mean we dropped some task without rescheduling them in the new build ID + var out string + s.NoError(run.Get(ctx, &out)) + s.validateWorkflowBuildIds(ctx, run.GetID(), run.GetRunID(), versions[9], true, versions[9], "", versions[:9]) + + activityPerVersion := make(map[string]int) + for _, v := range strings.Split(out, " ") { + activityPerVersion[v]++ + } + + wh := s.sdkClient.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + redirectAppliedToActivityTask := false + activityRetried := false + sawUnorderedEvents := false + var maxBuildId string + var maxStartedTimestamp time.Time + for wh.HasNext() { + he, err := wh.Next() + s.Nil(err) + var taskStartedStamp *commonpb.WorkerVersionStamp + var taskRedirectCounter int64 + var buildId string + if activityStarted := he.GetActivityTaskStartedEventAttributes(); activityStarted != nil { + taskStartedStamp = activityStarted.GetWorkerVersion() + buildId = taskStartedStamp.GetBuildId() + if buildId > maxBuildId { + redirectAppliedToActivityTask = true + } + if activityStarted.Attempt > 1 { + activityRetried = true + } + s.True(taskStartedStamp.GetUseVersioning()) + taskRedirectCounter = activityStarted.GetBuildIdRedirectCounter() + activityPerVersion[buildId]-- + } else if wfStarted := he.GetWorkflowTaskStartedEventAttributes(); wfStarted != nil { + taskStartedStamp = wfStarted.GetWorkerVersion() + if taskStartedStamp != nil { + // taskStartedStamp is nil for sticky queues + s.True(taskStartedStamp.GetUseVersioning()) + buildId = taskStartedStamp.GetBuildId() + taskRedirectCounter = wfStarted.GetBuildIdRedirectCounter() + } + } + if he.EventTime.AsTime().Before(maxStartedTimestamp) { + sawUnorderedEvents = true + } else { + maxStartedTimestamp = he.EventTime.AsTime() + } + if buildId > maxBuildId { + maxBuildId = buildId + } + if taskStartedStamp != nil { + // the last char of version is the index in the versions array which is the expected redirect counter for + // a task started event + expectedRedirectCounter, err := strconv.Atoi(buildId[len(buildId)-1:]) + s.NoError(err) + s.Equal(expectedRedirectCounter, int(taskRedirectCounter)) + } + } + for v, c := range activityPerVersion { + s.Equal(0, c, "activity count mismatch for build ID "+v) + } + // Following validations are more to make sure the test stays correct, rather than testing server's functionality + s.True(activityRetried, "no activity retried") + s.True(sawUnorderedEvents) + s.True(redirectAppliedToActivityTask, "no redirect rule applied to an activity task, is this test broken?") + + for _, w := range workers { + w.Stop() + } } func (s *VersioningIntegSuite) TestDispatchActivityCompatible() { @@ -3636,8 +3887,13 @@ func (s *VersioningIntegSuite) validateBuildIdAfterReset(ctx context.Context, wf run2 := s.sdkClient.GetWorkflow(ctx, run.GetID(), wfr.GetRunId()) s.NoError(run2.Get(ctx, &out)) s.Equal("done!", out) - s.validateWorkflowBuildId(ctx, run2.GetID(), run2.GetRunID(), expectedBuildId, true, expectedBuildId, inheritedBuildId, nil) - s.validateWorkflowEventsVersionStamps(ctx, run2.GetID(), run2.GetRunID(), []string{expectedBuildId, expectedBuildId, expectedBuildId}, inheritedBuildId) + s.validateWorkflowBuildIds(ctx, run2.GetID(), run2.GetRunID(), expectedBuildId, true, expectedBuildId, inheritedBuildId, nil) + s.validateWorkflowEventsVersionStamps(ctx, run2.GetID(), run2.GetRunID(), []string{ + expectedBuildId, + expectedBuildId, + // expectedBuildId, skipped because it belongs to a sticky queue + }, + inheritedBuildId) // now reset the original wf to second wf task and make sure it remains in v1 wfr, err = s.sdkClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ @@ -4517,6 +4773,22 @@ func (s *VersioningIntegSuite) addRedirectRule(ctx context.Context, tq, source s return rule } +func (s *VersioningIntegSuite) removeRedirectRule(ctx context.Context, tq, source string) { + cT := s.getVersioningRules(ctx, tq).GetConflictToken() + res, err := s.engine.UpdateWorkerVersioningRules(ctx, &workflowservice.UpdateWorkerVersioningRulesRequest{ + Namespace: s.namespace, + TaskQueue: tq, + ConflictToken: cT, + Operation: &workflowservice.UpdateWorkerVersioningRulesRequest_DeleteCompatibleRedirectRule{ + DeleteCompatibleRedirectRule: &workflowservice.UpdateWorkerVersioningRulesRequest_DeleteCompatibleBuildIdRedirectRule{ + SourceBuildId: source, + }, + }, + }) + s.NoError(err) + s.NotNil(res) +} + // addCompatibleBuildId updates build id info on a task queue with a new compatible build id. func (s *VersioningIntegSuite) addCompatibleBuildId(ctx context.Context, tq, newBuildId, existing string, makeSetDefault bool) { res, err := s.engine.UpdateWorkerBuildIdCompatibility(ctx, &workflowservice.UpdateWorkerBuildIdCompatibilityRequest{ @@ -4713,6 +4985,68 @@ func (s *VersioningIntegSuite) validateWorkflowBuildId( } } +func (s *VersioningIntegSuite) validateWorkflowBuildIds( + ctx context.Context, + wfId string, + runId string, + expectedBuildId string, + newVersioning bool, + expectedStampBuildId string, + expectedInheritedBuildId string, + extraSearchAttrBuildIds []string, +) { + dw, err := s.sdkClient.DescribeWorkflowExecution(ctx, wfId, runId) + s.NoError(err) + saPayload := dw.GetWorkflowExecutionInfo().GetSearchAttributes().GetIndexedFields()["BuildIds"] + searchAttrAny, err := searchattribute.DecodeValue(saPayload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, true) + var searchAttr []string + if searchAttrAny != nil { + searchAttr = searchAttrAny.([]string) + } + s.NoError(err) + if expectedBuildId == "" { + if expectedStampBuildId != "" { + s.NotNil(dw.GetWorkflowExecutionInfo().GetMostRecentWorkerVersionStamp().GetBuildId()) + s.False(dw.GetWorkflowExecutionInfo().GetMostRecentWorkerVersionStamp().GetUseVersioning()) + s.Equal(2+len(extraSearchAttrBuildIds), len(searchAttr)) + s.Equal(worker_versioning.UnversionedSearchAttribute, searchAttr[0]) + s.True(strings.HasPrefix(searchAttr[1], worker_versioning.UnversionedSearchAttribute)) + } else { + s.Nil(dw.GetWorkflowExecutionInfo().GetMostRecentWorkerVersionStamp()) + s.Equal(0, len(searchAttr)) + } + } else { + if expectedStampBuildId != "" { + s.True(dw.GetWorkflowExecutionInfo().GetMostRecentWorkerVersionStamp().GetUseVersioning()) + s.Equal(expectedStampBuildId, dw.GetWorkflowExecutionInfo().GetMostRecentWorkerVersionStamp().GetBuildId()) + } else { + s.Nil(dw.GetWorkflowExecutionInfo().GetMostRecentWorkerVersionStamp()) + } + if newVersioning { + s.Equal(expectedBuildId, dw.GetWorkflowExecutionInfo().GetAssignedBuildId()) + s.Equal(2+len(extraSearchAttrBuildIds), len(searchAttr)) + s.Equal(worker_versioning.AssignedBuildIdSearchAttribute(expectedBuildId), searchAttr[0]) + s.Contains(searchAttr, worker_versioning.VersionedBuildIdSearchAttribute(expectedBuildId)) + } else { + s.Equal("", dw.GetWorkflowExecutionInfo().GetAssignedBuildId()) + if expectedStampBuildId != "" { + s.Equal(1+len(extraSearchAttrBuildIds), len(searchAttr)) + s.Contains(searchAttr, worker_versioning.VersionedBuildIdSearchAttribute(expectedBuildId)) + } else { + s.Equal(0, len(searchAttr)) + } + } + } + s.Equal(expectedInheritedBuildId, dw.GetWorkflowExecutionInfo().GetInheritedBuildId()) + for _, b := range extraSearchAttrBuildIds { + if expectedBuildId == "" { + s.Contains(searchAttr, worker_versioning.UnversionedBuildIdSearchAttribute(b)) + } else { + s.Contains(searchAttr, worker_versioning.VersionedBuildIdSearchAttribute(b)) + } + } +} + func (s *VersioningIntegSuite) validateWorkflowEventsVersionStamps( ctx context.Context, wfId, runId string,