Skip to content

Commit

Permalink
remove TestRedirectWithConcurrentActivities
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Jun 14, 2024
1 parent d6c858d commit a173c81
Showing 1 changed file with 0 additions and 230 deletions.
230 changes: 0 additions & 230 deletions tests/versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -2089,234 +2087,6 @@ func (s *VersioningIntegSuite) TestDispatchActivityUpgrade() {
s.validateWorkflowBuildIds(ctx, run.GetID(), run.GetRunID(), v12, true, v12, "", []string{v1, v11})
}

func (s *VersioningIntegSuite) TestRedirectWithConcurrentActivities() {
// Testing that wf never "goes back" to older build ID in presence of concurrent activities and random failures.
//
// SETUP:
// 1- Run workers with build IDs v1.0 to v1.9.
// 2- Workflows runs a nested loop of 10 * `activityRuns`. First loop is per build ID, second is to run parallel
// activities for each build ID.
// 3- Activities fail by `activityErrorRate` probability. Otherwise, they return success after a random wait.
// 4- The first activity seeing a build ID (i.e. being run by that build ID) removes the redirect rule targeting
// that build ID to test the workflow behaves correctly in absence of the applied redirect rule.
// 5- One activity in each batch is responsible to add redirect rule to the next build ID. since, there is random
// delay in activities, more activities of the same batch likely start both before and after this activity.
// 6- Workflow waits for completion of all activities in a batch before going to the next one.
//
// VERIFYING:
// 1- After each activity is completed, in workflow we verify that it is not completed by a build ID newer of that
// of the workflow worker.
// 2- Workflow finishes. I.e. no task is dropped without rescheduling due to build ID mismatch.
// 3- At the end of the test we verify that the wf and activity started event build IDs are compatible with
// workflows output (which contains the build ID of each completed activity)
// 4- Redirect counter of all started events is valid
// 5- Redirect rules were applied at least to one activity.
// 6- At least one activity was retried.
// 7- Some history events are unordered based on event timestamp (due to parallel activity random delay)

// Reduce user data long poll time for faster propagation of the versioning data. This is needed because of the
// exponential minWaitTime logic in userDataManagerImpl that gets triggered because rules change very fast in
// this test.
dc := s.testCluster.host.dcClient
dc.OverrideValue(s.T(), dynamicconfig.MatchingGetUserDataLongPollTimeout, 2*time.Second)

tq := s.randomizeStr(s.T().Name())
v1 := s.prefixed("v1.0")

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
rule := s.addAssignmentRule(ctx, tq, v1)
s.waitForAssignmentRulePropagation(ctx, tq, rule)

versions := []string{v1}
for v := 1; v <= 9; v++ {
versions = append(versions, s.prefixed("v1."+strconv.Itoa(v)))
}

activityErrorRate := .2 // chance of each activity attempt fail
activityRuns := int32(10) // run the activity ~10 times on each version

activityCounter := atomic.Int32{}
triggerRedirectAtActivityRun := atomic.Int32{}
triggerRedirectAtActivityRun.Store(1)

lastRedirectTarget := atomic.Value{}
lastRedirectTarget.Store(versions[0] + " redirect cleaned")
var workers []worker.Worker

act := func(version string, runId int32) (string, error) {
runs := activityCounter.Add(1)
s.T().Logf("Starting activity %d on %s at %d\n", runId, version, runs)
if lastRedirectTarget.CompareAndSwap(version, version+" observed") && version != versions[0] {
// The last redirect rule is applied and observed by an activity, now delete it to make sure wf keeps using
// the right build ID after applying the redirect rule, even when the rule is not present anymore.
index, err := strconv.Atoi(version[len(version)-1:]) // get the last char of version is the index in the versions array
s.NoError(err)
s.T().Logf("Removing redirect from %s to %s \n", versions[index-1], version)
s.removeRedirectRule(ctx, tq, versions[index-1])
lastRedirectTarget.CompareAndSwap(version+" observed", version+" redirect cleaned")
}
if rand.Float64() < activityErrorRate {
return "", errors.New("intentionally failing activity")
}
if triggerRedirectAtActivityRun.Load() == runId {
// When enough activities are run using the current version, add redirect rule to the next version.
v := runId / activityRuns
if int(v+1) < len(versions) {
// wait for last redirect rule to be cleaned up
for !lastRedirectTarget.CompareAndSwap(versions[v]+" redirect cleaned", versions[v+1]) {
}
s.T().Logf("Adding redirect from %s to %s at %d\n", versions[v], versions[v+1], runs)
s.addRedirectRule(ctx, tq, versions[v], versions[v+1])
// Intentionally do not wait for propagation of the rules to partitions. Waiting will linger this
// activity and allows all the other concurrent activities to finish, leaving only the WFT task to
// see the redirect rule for the first time.
triggerRedirectAtActivityRun.CompareAndSwap(runId, runId+activityRuns)
}
}

// Add random sleep to simulate network delay
//nolint:forbidigo
time.Sleep(time.Duration(int64(rand.Intn(50)) * int64(time.Millisecond)))
s.T().Logf("Completing activity %d on %s at %d\n", runId, version, runs)
return version, nil
}

wf := func(wfCtx workflow.Context, wfVersion string) (string, error) {
var res []string
// because of rule propagation delay we run for more than 10 cycles to make sure all versions are seen
for i := 0; i <= 12; i++ {
var futures []workflow.Future
for j := 0; j < int(activityRuns); j++ {
f := workflow.ExecuteActivity(workflow.WithActivityOptions(
wfCtx, workflow.ActivityOptions{
DisableEagerExecution: true,
VersioningIntent: temporal.VersioningIntentCompatible,
StartToCloseTimeout: 200 * time.Millisecond,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 10 * time.Millisecond,
BackoffCoefficient: 1,
},
}), "act", i*int(activityRuns)+j)
futures = append(futures, f)
}

for _, f := range futures {
var activityVersion string
err := f.Get(wfCtx, &activityVersion)
s.NoError(err)
res = append(res, activityVersion)
// The output of a newer build ID should never be sent to a wf worker of an older build ID
s.Assert().GreaterOrEqual(wfVersion, activityVersion)
// TODO: uncomment this check once workflow.GetInfo(wfCtx).GetCurrentBuildID() returns correct value
// based on last started task build ID, not last completed task build ID.
// s.Assert().GreaterOrEqual(workflow.GetInfo(wfCtx).GetCurrentBuildID(), activityVersion)
}
}

return strings.Join(res, " "), nil
}

// run all workers
for i := 0; i <= 9; i++ {
v := versions[i]
w := worker.New(s.sdkClient, tq, worker.Options{
BuildID: v,
UseBuildIDForVersioning: true,
MaxConcurrentWorkflowTaskPollers: numPollers,
MaxConcurrentActivityTaskPollers: 2,
// Limit the number of concurrent activities so not all scheduled activities are immediately started.
MaxConcurrentActivityExecutionSize: 2,
})
w.RegisterWorkflowWithOptions(
func(ctx workflow.Context) (string, error) {
return wf(ctx, v)
},
workflow.RegisterOptions{Name: "wf"})
w.RegisterActivityWithOptions(
func(runId int32) (string, error) {
return act(v, runId)
},
activity.RegisterOptions{Name: "act"})
s.NoError(w.Start())
workers = append(workers, w)
}

run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, "wf")
s.NoError(err)

// Workflow should finish, otherwise it may mean we dropped some task without rescheduling them in the new build ID
var out string
s.NoError(run.Get(ctx, &out))
s.validateWorkflowBuildIds(ctx, run.GetID(), run.GetRunID(), versions[9], true, versions[9], "", versions[:9])

activityPerVersion := make(map[string]int)
for _, v := range strings.Split(out, " ") {
activityPerVersion[v]++
}

wh := s.sdkClient.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
redirectAppliedToActivityTask := false
activityRetried := false
sawUnorderedEvents := false
var maxBuildId string
var maxStartedTimestamp time.Time
for wh.HasNext() {
he, err := wh.Next()
s.Nil(err)
var taskStartedStamp *commonpb.WorkerVersionStamp
var taskRedirectCounter int64
var buildId string
if activityStarted := he.GetActivityTaskStartedEventAttributes(); activityStarted != nil {
taskStartedStamp = activityStarted.GetWorkerVersion()
buildId = taskStartedStamp.GetBuildId()
if buildId > maxBuildId {
redirectAppliedToActivityTask = true
}
if activityStarted.Attempt > 1 {
activityRetried = true
}
s.True(taskStartedStamp.GetUseVersioning())
taskRedirectCounter = activityStarted.GetBuildIdRedirectCounter()
activityPerVersion[buildId]--
} else if wfStarted := he.GetWorkflowTaskStartedEventAttributes(); wfStarted != nil {
taskStartedStamp = wfStarted.GetWorkerVersion()
if taskStartedStamp != nil {
// taskStartedStamp is nil for sticky queues
s.True(taskStartedStamp.GetUseVersioning())
buildId = taskStartedStamp.GetBuildId()
taskRedirectCounter = wfStarted.GetBuildIdRedirectCounter()
}
}
if he.EventTime.AsTime().Before(maxStartedTimestamp) {
sawUnorderedEvents = true
} else {
maxStartedTimestamp = he.EventTime.AsTime()
}
if buildId > maxBuildId {
maxBuildId = buildId
}
if taskStartedStamp != nil {
// the last char of version is the index in the versions array which is the expected redirect counter for
// a task started event
expectedRedirectCounter, err := strconv.Atoi(buildId[len(buildId)-1:])
s.NoError(err)
s.Equal(expectedRedirectCounter, int(taskRedirectCounter))
}
}
for v, c := range activityPerVersion {
s.Equal(0, c, "activity count mismatch for build ID "+v)
}
// Following validations are more to make sure the test stays correct, rather than testing server's functionality
s.True(activityRetried, "no activity retried")
s.True(sawUnorderedEvents)
s.True(redirectAppliedToActivityTask, "no redirect rule applied to an activity task, is this test broken?")

for _, w := range workers {
w.Stop()
}
}

func (s *VersioningIntegSuite) TestDispatchActivityCompatible() {
s.testWithMatchingBehavior(s.dispatchActivityCompatible)
}
Expand Down

0 comments on commit a173c81

Please sign in to comment.