diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index 444dd83df..078dd9d90 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -37,6 +37,7 @@ require ( github.com/tinylib/msgp v1.1.2 // indirect go.temporal.io/api v1.26.1 // indirect go.uber.org/atomic v1.9.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index 754c6f06e..e8b6babc2 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -121,6 +121,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index 083d3ef6b..9c9e1f647 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -25,6 +25,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect go.temporal.io/api v1.26.1 // indirect go.uber.org/atomic v1.9.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index 0d2c6bec9..9a901f468 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -88,6 +88,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index d0ad63565..32c729835 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -23,6 +23,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect go.temporal.io/api v1.26.1 // indirect go.uber.org/atomic v1.9.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index 45d0a330d..0281a6c55 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -82,6 +82,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index 2e08b8858..5f9d36a29 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -24,6 +24,7 @@ require ( github.com/twmb/murmur3 v1.1.5 // indirect go.temporal.io/api v1.26.1 // indirect go.uber.org/atomic v1.9.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index d7e653421..f02ca2d71 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -150,6 +150,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/go.mod b/go.mod index ecf0a39c3..f2c1be8e0 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e golang.org/x/net v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 // indirect diff --git a/go.sum b/go.sum index ee6eb1092..09b42eb21 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index 9956a497f..6ddb2681b 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -25,13 +25,14 @@ require ( github.com/stretchr/testify v1.8.4 // indirect go.temporal.io/api v1.26.1 // indirect go.uber.org/atomic v1.9.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect - golang.org/x/mod v0.12.0 // indirect + golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.12.1-0.20230825192346-2191a27a6dc5 // indirect + golang.org/x/tools v0.16.0 // indirect google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index 127994c5a..ad94324b7 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -86,6 +86,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a h1:Jw5wfR+h9mnIYH+OtGT2im5wV1YGGDora5vTv/aa5bE= golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -95,8 +97,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= -golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= -golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -117,7 +119,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -150,8 +152,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= -golang.org/x/tools v0.12.1-0.20230825192346-2191a27a6dc5 h1:Vk4mysSz+GqQK2eqgWbo4zEO89wkeAjJiFIr9bpqa8k= -golang.org/x/tools v0.12.1-0.20230825192346-2191a27a6dc5/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= +golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM= +golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 8a21722fe..1854c4dee 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -49,7 +49,7 @@ func createRootTestContext() (interceptor *workflowEnvironmentInterceptor, ctx C func createNewDispatcher(f func(ctx Context)) dispatcher { interceptor, ctx := createRootTestContext() - result, _ := newDispatcher(ctx, interceptor, f) + result, _ := newDispatcher(ctx, interceptor, f, func() bool { return false }) result.interceptor = interceptor return result } @@ -906,7 +906,7 @@ func TestAwaitCancellation(t *testing.T) { ctx, cancelHandler := WithCancel(ctx) d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { awaitError = Await(ctx, func() bool { return false }) - }) + }, func() bool { return false }) defer d.Close() err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) require.NoError(t, err) @@ -943,6 +943,58 @@ func TestAwaitWithTimeoutNoTimeout(t *testing.T) { require.True(t, d.IsDone()) } +func TestRecursiveEagerCoroutine(t *testing.T) { + // Verify eager coroutines run before normal coroutines + // even if they are scheduled in other eager coroutines + var d dispatcher + var history []string + d = createNewDispatcher(func(ctx Context) { + history = append(history, "root") + Go(ctx, func(ctx Context) { + history = append(history, "coroutine 1") + }) + d.NewCoroutine(ctx, "outer eager", true, func(ctx Context) { + history = append(history, "outer eager coroutine") + d.NewCoroutine(ctx, "inner eager", true, func(ctx Context) { + history = append(history, "inner eager coroutine") + }) + }) + Go(ctx, func(ctx Context) { + history = append(history, "coroutine 2") + }) + // Yield to allow the eager coroutines to run + state := getState(ctx) + history = append(history, "root yield start") + state.yield("test") + history = append(history, "root yield finish") + + }) + defer d.Close() + err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) + require.NoError(t, err) + require.True(t, d.IsDone()) + require.Equal(t, []string{"root", "root yield start", "outer eager coroutine", "inner eager coroutine", "coroutine 1", "coroutine 2", "root yield finish"}, history) +} + +func TestEagerCoroutineWhileNotRunning(t *testing.T) { + var history []string + interceptor, ctx := createRootTestContext() + d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { + history = append(history, "root") + }, func() bool { return false }) + d.interceptor = interceptor + + defer d.Close() + d.NewCoroutine(ctx, "eager", true, func(ctx Context) { + history = append(history, "eager coroutine") + }) + + err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) + require.NoError(t, err) + require.True(t, d.IsDone()) + require.Equal(t, []string{"eager coroutine", "root"}, history) +} + func TestAwaitWithTimeoutCancellation(t *testing.T) { var awaitWithTimeoutError error var awaitOk bool @@ -950,7 +1002,7 @@ func TestAwaitWithTimeoutCancellation(t *testing.T) { ctx, cancelHandler := WithCancel(ctx) d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return false }) - }) + }, func() bool { return false }) defer d.Close() err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) require.NoError(t, err) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 7e402b35a..d201601f3 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -170,6 +170,10 @@ type ( sdkVersion string sdkNameUpdated bool sdkName string + // Any update requests received in a workflow task before we have registered + // any handlers are not scheduled and are queued here until either their + // handler is registered or the event loop runs out of work and they are rejected. + bufferedUpdateRequests map[string][]func() protocols *protocol.Registry } @@ -242,6 +246,7 @@ func newWorkflowExecutionEventHandler( protocols: protocol.NewRegistry(), mutableSideEffectCallCounter: make(map[string]int), sdkFlags: newSDKFlags(capabilities), + bufferedUpdateRequests: make(map[string][]func()), } // Attempt to skip 1 log level to remove the ReplayLogger from the stack. context.logger = log.Skip(ilog.NewReplayLogger( @@ -881,6 +886,44 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro wc.logger.Debug("SideEffect Marker added", tagSideEffectID, sideEffectID) } +func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool { + return wc.sdkFlags.tryUse(flag, !wc.isReplay) +} + +func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) { + wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f) +} + +func (wc *workflowEnvironmentImpl) HandleUpdates(name string) bool { + if !wc.sdkFlags.tryUse(SDKPriorityUpdateHandling, !wc.isReplay) { + return false + } + updatesHandled := false + if bufferedUpdateRequests, ok := wc.bufferedUpdateRequests[name]; ok { + for _, request := range bufferedUpdateRequests { + request() + updatesHandled = true + } + delete(wc.bufferedUpdateRequests, name) + } + return updatesHandled +} + +func (wc *workflowEnvironmentImpl) DrainUnhandledUpdates() bool { + anyExecuted := false + // Check if any buffered update requests remain when we have no more coroutines to run and let them schedule so they are rejected. + // Generally iterating a map in workflow code is bad because it is non deterministic + // this case is fine since all these update handles will be rejected and not recorded in history. + for name, requests := range wc.bufferedUpdateRequests { + for _, request := range requests { + request() + anyExecuted = true + } + delete(wc.bufferedUpdateRequests, name) + } + return anyExecuted +} + // lookupMutableSideEffect gets the current value of the MutableSideEffect for id for the // current call count of id. func (wc *workflowEnvironmentImpl) lookupMutableSideEffect(id string) *commonpb.Payloads { @@ -1293,6 +1336,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted( // replay sees the _final_ value of applied flags, not intermediate values // as the value varies by WFT) weh.sdkFlags.tryUse(SDKFlagProtocolMessageCommand, !weh.isReplay) + weh.sdkFlags.tryUse(SDKPriorityUpdateHandling, !weh.isReplay) // Invoke the workflow. weh.workflowDefinition.Execute(weh, attributes.Header, attributes.Input) diff --git a/internal/internal_flags.go b/internal/internal_flags.go index 9ecd9e7ba..503c650a5 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -44,7 +44,10 @@ const ( // a workflow task response's command set to order messages with respect to // commands. SDKFlagProtocolMessageCommand = 3 - SDKFlagUnknown = math.MaxUint32 + // SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method. + // It will also cause the SDK to immediately handle updates when a handler is registered. + SDKPriorityUpdateHandling = 4 + SDKFlagUnknown = math.MaxUint32 ) func sdkFlagFromUint(value uint32) sdkFlag { @@ -57,6 +60,8 @@ func sdkFlagFromUint(value uint32) sdkFlag { return SDKFlagChildWorkflowErrorExecution case uint32(SDKFlagProtocolMessageCommand): return SDKFlagProtocolMessageCommand + case uint32(SDKPriorityUpdateHandling): + return SDKPriorityUpdateHandling default: return SDKFlagUnknown } diff --git a/internal/internal_update.go b/internal/internal_update.go index bcab87f34..9976f6f7a 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -69,7 +69,7 @@ type ( // yield itself as necessary. UpdateScheduler interface { // Spawn starts a new named coroutine, executing the given function f. - Spawn(ctx Context, name string, f func(ctx Context)) Context + Spawn(ctx Context, name string, highPriority bool, f func(ctx Context)) Context // Yield returns control to the scheduler. Yield(ctx Context, status string) @@ -252,20 +252,16 @@ func defaultUpdateHandler( callbacks.Reject(err) return } - scheduler.Spawn(ctx, name, func(ctx Context) { + eo := getWorkflowEnvOptions(ctx) + priorityUpdateHandling := env.TryUse(SDKPriorityUpdateHandling) + + updateRunner := func(ctx Context) { ctx = WithValue(ctx, updateInfoContextKey, &UpdateInfo{ ID: id, }) eo := getWorkflowEnvOptions(ctx) - - // If we suspect that handler registration has not occurred (e.g. - // because this update is part of the first workflow task and is being - // delivered before the workflow function itself has run and had a - // chance to register update handlers) then we yield control back to the - // scheduler to allow handler registration to occur. The scheduler will - // resume this coroutine after others have run to a blocking point. - if len(eo.updateHandlers) == 0 { + if len(eo.updateHandlers) == 0 && !priorityUpdateHandling { scheduler.Yield(ctx, "yielding for initial handler registration") } handler, ok := eo.updateHandlers[name] @@ -306,7 +302,22 @@ func defaultUpdateHandler( callbacks.Accept() success, err := envInterceptor.inboundInterceptor.ExecuteUpdate(ctx, &input) callbacks.Complete(success, err) - }) + } + + // If we suspect that handler registration has not occurred (e.g. + // because this update is part of the first workflow task and is being + // delivered before the workflow function itself has run and had a + // chance to register update handlers) then we queue updates + // to allow handler registration to occur. When a handler is registered the + // updates will be scheduled and ran. + if len(eo.updateHandlers) == 0 && priorityUpdateHandling { + env.QueueUpdate(name, func() { + scheduler.Spawn(ctx, name, priorityUpdateHandling, updateRunner) + }) + } else { + scheduler.Spawn(ctx, name, priorityUpdateHandling, updateRunner) + } + } // newUpdateHandler instantiates a new updateHandler if the supplied handler and diff --git a/internal/internal_update_test.go b/internal/internal_update_test.go index 6dcb35330..bc2d77e5c 100644 --- a/internal/internal_update_test.go +++ b/internal/internal_update_test.go @@ -28,6 +28,7 @@ import ( "errors" "reflect" "testing" + "time" "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" @@ -51,7 +52,7 @@ func mustSetUpdateHandler( } type testUpdateScheduler struct { - SpawnImpl func(Context, string, func(Context)) Context + SpawnImpl func(Context, string, bool, func(Context)) Context YieldImpl func(Context, string) } @@ -67,8 +68,8 @@ func (tuc *testUpdateCallbacks) Complete(success interface{}, err error) { tuc.CompleteImpl(success, err) } -func (tus *testUpdateScheduler) Spawn(ctx Context, name string, f func(Context)) Context { - return tus.SpawnImpl(ctx, name, f) +func (tus *testUpdateScheduler) Spawn(ctx Context, name string, p bool, f func(Context)) Context { + return tus.SpawnImpl(ctx, name, p, f) } func (tus *testUpdateScheduler) Yield(ctx Context, status string) { @@ -76,7 +77,7 @@ func (tus *testUpdateScheduler) Yield(ctx Context, status string) { } var runOnCallingThread = &testUpdateScheduler{ - SpawnImpl: func(ctx Context, _ string, f func(Context)) Context { + SpawnImpl: func(ctx Context, _ string, _ bool, f func(Context)) Context { f(ctx) return ctx }, @@ -104,7 +105,8 @@ func TestUpdateHandlerPanicHandling(t *testing.T) { dispatcher, ctx := newDispatcher( ctx, interceptor, - func(ctx Context) {}) + func(ctx Context) {}, + func() bool { return false }) dispatcher.executing = true panicFunc := func() error { panic("intentional") } @@ -169,7 +171,6 @@ func TestUpdateValidatorFnValidation(t *testing.T) { } func TestDefaultUpdateHandler(t *testing.T) { - t.Parallel() dc := converter.GetDefaultDataConverter() env := &workflowEnvironmentImpl{ sdkFlags: testSDKFlags, @@ -179,13 +180,15 @@ func TestDefaultUpdateHandler(t *testing.T) { Namespace: "namespace:" + t.Name(), TaskQueueName: "taskqueue:" + t.Name(), }, + bufferedUpdateRequests: make(map[string][]func()), } interceptor, ctx, err := newWorkflowContext(env, nil) require.NoError(t, err) dispatcher, ctx := newDispatcher( ctx, interceptor, - func(ctx Context) {}) + func(ctx Context) {}, + env.DrainUnhandledUpdates) dispatcher.executing = true hdr := &commonpb.Header{Fields: map[string]*commonpb.Payload{}} @@ -299,13 +302,15 @@ func TestDefaultUpdateHandler(t *testing.T) { // registration at workflow start time has already occurred interceptor, ctx, err := newWorkflowContext(env, nil) require.NoError(t, err) + updateFunc := func(ctx Context, s string) (string, error) { return s + " success!", nil } dispatcher, ctx := newDispatcher( ctx, interceptor, - func(ctx Context) {}) - dispatcher.executing = true + func(ctx Context) { + mustSetUpdateHandler(t, ctx, t.Name(), updateFunc, UpdateHandlerOptions{}) + }, + func() bool { return false }) - updateFunc := func(ctx Context, s string) (string, error) { return s + " success!", nil } var ( resultErr error rejectErr error @@ -313,7 +318,7 @@ func TestDefaultUpdateHandler(t *testing.T) { result interface{} ) sched := &testUpdateScheduler{ - SpawnImpl: func(ctx Context, _ string, f func(Context)) Context { + SpawnImpl: func(ctx Context, _ string, _ bool, f func(Context)) Context { f(ctx) return ctx }, @@ -330,6 +335,7 @@ func TestDefaultUpdateHandler(t *testing.T) { result = success }, }, sched) + require.NoError(t, dispatcher.ExecuteUntilAllBlocked(time.Second)) require.True(t, accepted) require.Nil(t, resultErr) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index efdb1d060..f102ce94f 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -124,6 +124,17 @@ type ( UpsertSearchAttributes(attributes map[string]interface{}) error UpsertMemo(memoMap map[string]interface{}) error GetRegistry() *registry + // QueueUpdate request of type name + QueueUpdate(name string, f func()) + // HandleUpdates unblock all updates of type name + // returns true if any update was unblocked + HandleUpdates(name string) bool + // DrainUnhandledUpdates unblocks all updates, meant to be used to drain + // all unhandled updates at the end of a workflow task + // returns true if any update was unblocked + DrainUnhandledUpdates() bool + // TryUse returns true if this flag may currently be used. + TryUse(flag sdkFlag) bool } // WorkflowDefinitionFactory factory for creating WorkflowDefinition instances. diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 171a81cbf..c89461ed9 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -36,6 +36,8 @@ import ( "time" "unicode" + "golang.org/x/exp/slices" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.uber.org/atomic" @@ -92,7 +94,7 @@ type ( // Create coroutine. To be called from within other coroutine. // Used by the interceptors - NewCoroutine(ctx Context, name string, f func(ctx Context)) Context + NewCoroutine(ctx Context, name string, highPriority bool, f func(ctx Context)) Context } // Workflow is an interface that any workflow should implement. @@ -171,6 +173,10 @@ type ( interceptor WorkflowOutboundInterceptor deadlockDetector *deadlockDetector readOnly bool + // allBlockedCallback is called when all coroutines are blocked, + // returns true if the callback updated any coroutines state and there may be more work + allBlockedCallback func() bool + newEagerCoroutines []*coroutineState } // WorkflowOptions options passed to the workflow function @@ -246,8 +252,8 @@ type ( dataConverter converter.DataConverter } - // coroScheduler adapts the coro dispatcher to the UpdateScheduler interface - coroScheduler struct { + // updateSchedulerImpl adapts the coro dispatcher to the UpdateScheduler interface + updateSchedulerImpl struct { dispatcher dispatcher } ) @@ -305,7 +311,7 @@ type workflowEnvironmentInterceptor struct { } func (wc *workflowEnvironmentInterceptor) Go(ctx Context, name string, f func(ctx Context)) Context { - return wc.dispatcher.NewCoroutine(ctx, name, f) + return wc.dispatcher.NewCoroutine(ctx, name, false, f) } func getWorkflowOutboundInterceptor(ctx Context) WorkflowOutboundInterceptor { @@ -512,7 +518,7 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common r.workflowResult, r.error = d.workflow.Execute(d.rootCtx, input) rpp := getWorkflowResultPointerPointer(ctx) *rpp = r - }) + }, getWorkflowEnvironment(rootCtx).DrainUnhandledUpdates) // set the information from the headers that is to be propagated in the workflow context rootCtx, err = workflowContextWithHeaderPropagated(rootCtx, header, env.GetContextPropagators()) @@ -543,7 +549,7 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common getWorkflowEnvironment(d.rootCtx).RegisterUpdateHandler( func(name string, id string, serializedArgs *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks) { - defaultUpdateHandler(d.rootCtx, name, id, serializedArgs, header, callbacks, coroScheduler{d.dispatcher}) + defaultUpdateHandler(d.rootCtx, name, id, serializedArgs, header, callbacks, updateSchedulerImpl{d.dispatcher}) }) getWorkflowEnvironment(d.rootCtx).RegisterQueryHandler( @@ -605,10 +611,11 @@ func (d *syncWorkflowDefinition) Close() { // NewDispatcher creates a new Dispatcher instance with a root coroutine function. // Context passed to the root function is child of the passed rootCtx. // This way rootCtx can be used to pass values to the coroutine code. -func newDispatcher(rootCtx Context, interceptor *workflowEnvironmentInterceptor, root func(ctx Context)) (*dispatcherImpl, Context) { +func newDispatcher(rootCtx Context, interceptor *workflowEnvironmentInterceptor, root func(ctx Context), allBlockedCallback func() bool) (*dispatcherImpl, Context) { result := &dispatcherImpl{ - interceptor: interceptor.outboundInterceptor, - deadlockDetector: newDeadlockDetector(), + interceptor: interceptor.outboundInterceptor, + deadlockDetector: newDeadlockDetector(), + allBlockedCallback: allBlockedCallback, } interceptor.dispatcher = result ctxWithState := result.interceptor.Go(rootCtx, "root", root) @@ -1039,11 +1046,11 @@ func (s *coroutineState) stackTrace() string { return <-stackCh } -func (d *dispatcherImpl) NewCoroutine(ctx Context, name string, f func(ctx Context)) Context { +func (d *dispatcherImpl) NewCoroutine(ctx Context, name string, highPriority bool, f func(ctx Context)) Context { if name == "" { name = fmt.Sprintf("%v", d.sequence+1) } - state := d.newState(name) + state := d.newState(name, highPriority) spawned := WithValue(ctx, coroutinesContextKey, state) go func(crt *coroutineState) { defer crt.close() @@ -1059,7 +1066,7 @@ func (d *dispatcherImpl) NewCoroutine(ctx Context, name string, f func(ctx Conte return spawned } -func (d *dispatcherImpl) newState(name string) *coroutineState { +func (d *dispatcherImpl) newState(name string, highPriority bool) *coroutineState { c := &coroutineState{ name: name, dispatcher: d, @@ -1067,7 +1074,13 @@ func (d *dispatcherImpl) newState(name string) *coroutineState { unblock: make(chan unblockFunc), } d.sequence++ - d.coroutines = append(d.coroutines, c) + if highPriority { + // Update requests need to be added to the front of the dispatchers coroutine list so they + // are handled before the root coroutine. + d.newEagerCoroutines = append(d.newEagerCoroutines, c) + } else { + d.coroutines = append(d.coroutines, c) + } return c } @@ -1096,7 +1109,9 @@ func (d *dispatcherImpl) ExecuteUntilAllBlocked(deadlockDetectionTimeout time.Du }() allBlocked := false // Keep executing until at least one goroutine made some progress - for !allBlocked { + for !allBlocked || d.allBlockedCallback() { + d.coroutines = append(d.newEagerCoroutines, d.coroutines...) + d.newEagerCoroutines = nil // Give every coroutine chance to execute removing closed ones allBlocked = true lastSequence := d.sequence @@ -1121,12 +1136,16 @@ func (d *dispatcherImpl) ExecuteUntilAllBlocked(deadlockDetectionTimeout time.Du } else { allBlocked = allBlocked && (c.keptBlocked || c.closed.Load()) } + // If any eager coroutines were created by the last coroutine we + // need to schedule them now. + if len(d.newEagerCoroutines) > 0 { + d.coroutines = slices.Insert(d.coroutines, i+1, d.newEagerCoroutines...) + d.newEagerCoroutines = nil + allBlocked = false + } } // Set allBlocked to false if new coroutines where created allBlocked = allBlocked && lastSequence == d.sequence - if len(d.coroutines) == 0 { - break - } } return nil } @@ -1497,6 +1516,9 @@ func setUpdateHandler(ctx Context, updateName string, handler interface{}, opts return err } getWorkflowEnvOptions(ctx).updateHandlers[updateName] = uh + if getWorkflowEnvironment(ctx).HandleUpdates(updateName) { + getState(ctx).yield("letting any updates waiting on a handler run") + } return nil } @@ -1645,12 +1667,12 @@ func (wg *waitGroupImpl) Wait(ctx Context) { } // Spawn starts a new coroutine with Dispatcher.NewCoroutine -func (cs coroScheduler) Spawn(ctx Context, name string, f func(Context)) Context { - return cs.dispatcher.NewCoroutine(ctx, name, f) +func (us updateSchedulerImpl) Spawn(ctx Context, name string, highPriority bool, f func(Context)) Context { + return us.dispatcher.NewCoroutine(ctx, name, highPriority, f) } // Yield calls the yield function on the coroutineState associated with the // supplied workflow context. -func (cs coroScheduler) Yield(ctx Context, reason string) { +func (us updateSchedulerImpl) Yield(ctx Context, reason string) { getState(ctx).yield(reason) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 64b0d2e72..3aa9d1c7e 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -206,6 +206,9 @@ type ( // True if this was created only for testing activities not workflows. activityEnvOnly bool + + workflowFunctionExecuting bool + bufferedUpdateRequests map[string][]func() } testSessionEnvironmentImpl struct { @@ -258,11 +261,12 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist changeVersions: make(map[string]Version), openSessions: make(map[string]*SessionInfo), - doneChannel: make(chan struct{}), - workerStopChannel: make(chan struct{}), - dataConverter: converter.GetDefaultDataConverter(), - failureConverter: GetDefaultFailureConverter(), - runTimeout: maxWorkflowTimeout, + doneChannel: make(chan struct{}), + workerStopChannel: make(chan struct{}), + dataConverter: converter.GetDefaultDataConverter(), + failureConverter: GetDefaultFailureConverter(), + runTimeout: maxWorkflowTimeout, + bufferedUpdateRequests: make(map[string][]func()), } if debugMode { @@ -542,6 +546,47 @@ func (env *testWorkflowEnvironmentImpl) getWorkflowDefinition(wt WorkflowType) ( return newSyncWorkflowDefinition(wd), nil } +func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool { + return true +} + +func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) { + env.bufferedUpdateRequests[name] = append(env.bufferedUpdateRequests[name], f) +} + +func (env *testWorkflowEnvironmentImpl) HandleUpdates(name string) bool { + updatesHandled := false + if bufferedUpdateRequests, ok := env.bufferedUpdateRequests[name]; ok { + for _, requests := range bufferedUpdateRequests { + requests() + updatesHandled = true + } + delete(env.bufferedUpdateRequests, name) + } + return updatesHandled +} + +func (env *testWorkflowEnvironmentImpl) DrainUnhandledUpdates() bool { + // Due to mock registration the test environment cannot run the workflow function + // in the first "workflow task". We need to delay the draining until the main function has + // had a chance to run. + if !env.workflowFunctionExecuting { + return false + } + anyExecuted := false + // Check if any buffered update requests remain when we have no more coroutines to run and let them schedule so they are rejected. + // Generally iterating a map in workflow code is bad because it is non deterministic + // this case is fine since all these update handles will be rejected and not recorded in history. + for name, bufferedUpdateRequests := range env.bufferedUpdateRequests { + for _, request := range bufferedUpdateRequests { + request() + anyExecuted = true + } + delete(env.bufferedUpdateRequests, name) + } + return anyExecuted +} + func (env *testWorkflowEnvironmentImpl) executeActivity( activityFn interface{}, args ...interface{}, @@ -1721,6 +1766,7 @@ func (w *workflowExecutorWrapper) Execute(ctx Context, input *commonpb.Payloads) // reduce runningCount to allow auto-forwarding mock clock after current workflow dispatcher run is blocked (aka // ExecuteUntilAllBlocked() returns). env.runningCount-- + w.env.workflowFunctionExecuting = true childWE := env.workflowInfo.WorkflowExecution var startedErr error @@ -2389,7 +2435,10 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflow(name string, id string, u if err != nil { panic(err) } - env.updateHandler(name, id, data, nil, uc) + env.postCallback(func() { + // Do not send any headers on test invocations + env.updateHandler(name, id, data, nil, uc) + }, true) } func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id string, uc UpdateCallbacks, args ...interface{}) error { @@ -2401,7 +2450,9 @@ func (env *testWorkflowEnvironmentImpl) updateWorkflowByID(workflowID, name, id if err != nil { panic(err) } - env.updateHandler(name, id, data, nil, uc) + env.postCallback(func() { + env.updateHandler(name, id, data, nil, uc) + }, true) return nil } diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 203db5461..377ed5323 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -283,6 +283,133 @@ func TestWorkflowIDUpdateWorkflowByID(t *testing.T) { require.Equal(t, "input", str) } +func TestWorkflowUpdateOrder(t *testing.T) { + var suite WorkflowTestSuite + // Test UpdateWorkflowByID works with custom ID + env := suite.NewTestWorkflowEnvironment() + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(interface{}, error) {}, + }) + }, 0) + + env.ExecuteWorkflow(func(ctx Context) (int, error) { + var inflightUpdates int + var ranUpdates int + err := SetUpdateHandler(ctx, "update", func(ctx Context) error { + inflightUpdates++ + ranUpdates++ + defer func() { + inflightUpdates-- + }() + return Sleep(ctx, time.Hour) + }, UpdateHandlerOptions{}) + if err != nil { + return 0, err + } + err = Await(ctx, func() bool { return inflightUpdates == 0 }) + return ranUpdates, err + }) + require.NoError(t, env.GetWorkflowError()) + var result int + require.NoError(t, env.GetWorkflowResult(&result)) + require.Equal(t, 1, result) +} + +func TestWorkflowNotRegisteredRejected(t *testing.T) { + var suite WorkflowTestSuite + // Test UpdateWorkflowByID works with custom ID + env := suite.NewTestWorkflowEnvironment() + var updateRejectionErr error + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id", &updateCallback{ + reject: func(err error) { + updateRejectionErr = err + }, + accept: func() { + require.Fail(t, "update should not be accepted") + }, + complete: func(interface{}, error) {}, + }) + }, 0) + + env.ExecuteWorkflow(func(ctx Context) error { + return Sleep(ctx, time.Hour) + }) + require.NoError(t, env.GetWorkflowError()) + require.NoError(t, env.GetWorkflowResult(nil)) + require.Error(t, updateRejectionErr) + require.Equal(t, "unknown update update. KnownUpdates=[]", updateRejectionErr.Error()) +} + +func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { + var suite WorkflowTestSuite + // Test UpdateWorkflowByID works with custom ID + env := suite.NewTestWorkflowEnvironment() + // Send 3 updates, with one bad update + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "1", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(interface{}, error) {}, + }) + }, 0) + + var updateRejectionErr error + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("bad update", "2", &updateCallback{ + reject: func(err error) { + updateRejectionErr = err + }, + accept: func() { + require.Fail(t, "update should not be rejected") + }, + complete: func(interface{}, error) {}, + }) + }, 0) + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "3", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(interface{}, error) {}, + }) + }, 0) + + env.ExecuteWorkflow(func(ctx Context) (int, error) { + var inflightUpdates int + var ranUpdates int + err := SetUpdateHandler(ctx, "update", func(ctx Context) error { + inflightUpdates++ + ranUpdates++ + defer func() { + inflightUpdates-- + }() + return Sleep(ctx, time.Hour) + }, UpdateHandlerOptions{}) + if err != nil { + return 0, err + } + err = Await(ctx, func() bool { return inflightUpdates == 0 }) + return ranUpdates, err + }) + require.NoError(t, env.GetWorkflowError()) + var result int + require.NoError(t, env.GetWorkflowResult(&result)) + require.Equal(t, 2, result) + + require.Error(t, updateRejectionErr) + require.Equal(t, "unknown update bad update. KnownUpdates=[update]", updateRejectionErr.Error()) +} + func TestWorkflowStartTimeInsideTestWorkflow(t *testing.T) { var suite WorkflowTestSuite env := suite.NewTestWorkflowEnvironment() diff --git a/test/go.mod b/test/go.mod index ebc2f938f..1bffbae22 100644 --- a/test/go.mod +++ b/test/go.mod @@ -34,6 +34,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/twmb/murmur3 v1.1.5 // indirect go.uber.org/atomic v1.9.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/test/go.sum b/test/go.sum index 40fdd2e3f..a33478fdd 100644 --- a/test/go.sum +++ b/test/go.sum @@ -162,6 +162,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -232,7 +234,7 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/test/integration_test.go b/test/integration_test.go index 93d287c1a..56a89ab15 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "math/rand" "os" "strings" "sync" @@ -2424,6 +2425,157 @@ func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSizeWithRecreat ts.NoError(run2.Get(ctx, nil)) } +func (ts *IntegrationTestSuite) TestUpdateWithNoHandlerRejected() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-with-no-handle-rejected") + options.StartDelay = time.Hour + run, err := ts.client.ExecuteWorkflow(ctx, + options, + ts.workflows.Basic) + ts.NoError(err) + // Send an update that we know has no handle + handle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "bad handle") + ts.NoError(err) + ts.Error(handle.Get(ctx, nil)) + // The workflow should still complete + var result []string + ts.NoError(run.Get(ctx, &result)) +} + +func (ts *IntegrationTestSuite) TestUpdateWithWrongHandleRejected() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-with-wrong-handle-rejected") + options.StartDelay = time.Hour + run, err := ts.client.ExecuteWorkflow(ctx, + options, + ts.workflows.WaitOnUpdate) + ts.NoError(err) + // Send an update before the first workflow task + updateHandle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "bad update") + ts.NoError(err) + ts.Error(updateHandle.Get(ctx, nil)) + // Get the result + var result int + ts.NoError(run.Get(ctx, &result)) + ts.Equal(0, result) +} + +func (ts *IntegrationTestSuite) TestWaitOnUpdate() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-wait-on-update") + options.StartDelay = time.Hour + run, err := ts.client.ExecuteWorkflow(ctx, + options, + ts.workflows.WaitOnUpdate) + ts.NoError(err) + // Send an update before the first workflow task + updateHandle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "echo") + ts.NoError(err) + ts.NoError(updateHandle.Get(ctx, nil)) + // Get the result + var result int + ts.NoError(run.Get(ctx, &result)) + ts.Equal(1, result) +} + +func (ts *IntegrationTestSuite) TestUpdateOrdering() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-ordering") + options.StartDelay = time.Hour + run, err := ts.client.ExecuteWorkflow(ctx, + options, + ts.workflows.UpdateOrdering) + ts.NoError(err) + // Send an update before the first workflow task + updateHandle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + ts.NoError(updateHandle.Get(ctx, nil)) + // Send an update after the first workflow task + updateHandle, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + ts.NoError(updateHandle.Get(ctx, nil)) + // Get the result + var result int + ts.NoError(run.Get(ctx, &result)) + ts.Equal(2, result) +} + +func (ts *IntegrationTestSuite) TestMultipleUpdateOrderingCancel() { + ts.testUpdateOrderingCancel(true) +} + +func (ts *IntegrationTestSuite) TestMultipleUpdateOrdering() { + ts.testUpdateOrderingCancel(false) +} + +func (ts *IntegrationTestSuite) testUpdateOrderingCancel(cancelWf bool) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + // Kill the worker so we can send multiple update requests and possibly a cancel in the same WFT + ts.worker.Stop() + // Start the workflow + options := ts.startWorkflowOptions("test-multiple-update-ordering") + run, err := ts.client.ExecuteWorkflow(ctx, + options, + ts.workflows.WaitOnUpdate) + ts.NoError(err) + + if cancelWf { + ts.NoError(ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID())) + } + var wf sync.WaitGroup + updateHandles := []string{"echo", "sleep", "empty"} + for i := 0; i < 10; i++ { + wf.Add(1) + go func() { + defer wf.Done() + handle := updateHandles[rand.Intn(3)] + updateHandle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), handle) + ts.NoError(err) + updateErr := updateHandle.Get(ctx, nil) + if cancelWf { + var cancelErr *temporal.CanceledError + ts.ErrorAs(updateErr, &cancelErr) + } else { + ts.NoError(updateErr) + } + }() + } + + // Server does not support admitted so we have to send the update in a seperate goroutine + time.Sleep(5 * time.Second) + // Now create a new worker on that same task queue to resume the work of the + // workflow + nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{}) + ts.registerWorkflowsAndActivities(nextWorker) + ts.NoError(nextWorker.Start()) + defer nextWorker.Stop() + wf.Wait() + // Get the result + var result int + ts.NoError(run.Get(ctx, &result)) + ts.Equal(10, result) +} + +func (ts *IntegrationTestSuite) TestUpdateAlwaysHandled() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-always-handled") + options.StartDelay = time.Hour + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateSetHandlerOnly) + ts.NoError(err) + // Send an update before the first workflow task + _, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + var result int + ts.NoError(run.Get(ctx, &result)) + ts.Equal(1, result) +} + func (ts *IntegrationTestSuite) TestSessionOnWorkerFailure() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/test/replaytests/multiple-updates-canceled.json b/test/replaytests/multiple-updates-canceled.json new file mode 100644 index 000000000..e95bd1af4 --- /dev/null +++ b/test/replaytests/multiple-updates-canceled.json @@ -0,0 +1,1013 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2023-11-28T16:56:19.803025336Z", + "eventType": "WorkflowExecutionStarted", + "version": "0", + "taskId": "1446787", + "workerMayIgnore": false, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "MultipleUpdateWorkflow" + }, + "parentWorkflowNamespace": "", + "parentWorkflowNamespaceId": "", + "parentWorkflowExecution": null, + "parentInitiatedEventId": "0", + "taskQueue": { + "name": "tq-733cff68-44a2-41b8-ad85-1d4ee6291579-TestIntegrationSuite/TestMultipleUpdateOrderingCancel", + "kind": "Normal", + "normalName": "" + }, + "input": null, + "workflowExecutionTimeout": "15s", + "workflowRunTimeout": "15s", + "workflowTaskTimeout": "1s", + "continuedExecutionRunId": "", + "initiator": "Unspecified", + "continuedFailure": null, + "lastCompletionResult": null, + "originalExecutionRunId": "bf3a8dfb-61f9-4ea2-a5f6-ce6a9103fbab", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@", + "firstExecutionRunId": "bf3a8dfb-61f9-4ea2-a5f6-ce6a9103fbab", + "retryPolicy": null, + "attempt": 1, + "workflowExecutionExpirationTime": "2023-11-28T16:56:34.802Z", + "cronSchedule": "", + "firstWorkflowTaskBackoff": "0s", + "memo": null, + "searchAttributes": null, + "prevAutoResetPoints": null, + "header": { + "fields": {} + }, + "parentInitiatedEventVersion": "0", + "workflowId": "test-multiple-update-ordering", + "sourceVersionStamp": null + } + }, + { + "eventId": "2", + "eventTime": "2023-11-28T16:56:19.803105378Z", + "eventType": "WorkflowTaskScheduled", + "version": "0", + "taskId": "1446788", + "workerMayIgnore": false, + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "tq-733cff68-44a2-41b8-ad85-1d4ee6291579-TestIntegrationSuite/TestMultipleUpdateOrderingCancel", + "kind": "Normal", + "normalName": "" + }, + "startToCloseTimeout": "1s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2023-11-28T16:56:19.817677670Z", + "eventType": "WorkflowExecutionCancelRequested", + "version": "0", + "taskId": "1446797", + "workerMayIgnore": false, + "workflowExecutionCancelRequestedEventAttributes": { + "cause": "", + "externalInitiatedEventId": "0", + "externalWorkflowExecution": null, + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "4", + "eventTime": "2023-11-28T16:56:24.846537880Z", + "eventType": "WorkflowTaskStarted", + "version": "0", + "taskId": "1446799", + "workerMayIgnore": false, + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "7e88b502-e3d3-4553-9fbe-18411cef7a5e", + "suggestContinueAsNew": false, + "historySizeBytes": "990" + } + }, + { + "eventId": "5", + "eventTime": "2023-11-28T16:56:24.876563214Z", + "eventType": "WorkflowTaskCompleted", + "version": "0", + "taskId": "1446803", + "workerMayIgnore": false, + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "4", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@", + "binaryChecksum": "", + "workerVersion": { + "buildId": "62d192ee9495d08baad51e9cb4060dcc", + "bundleId": "", + "useVersioning": false + }, + "sdkMetadata": { + "coreUsedFlags": [], + "langUsedFlags": [ + 3, + 4 + ], + "sdkName": "temporal-go", + "sdkVersion": "1.25.1" + }, + "meteringMetadata": { + "nonfirstLocalActivityExecutionAttempts": 0 + } + } + }, + { + "eventId": "6", + "eventTime": "2023-11-28T16:56:24.877446339Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446804", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "67b35fc6-482c-4c07-a735-170845a63f0c", + "acceptedRequestMessageId": "67b35fc6-482c-4c07-a735-170845a63f0c/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "67b35fc6-482c-4c07-a735-170845a63f0c", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "echo", + "args": null + } + } + } + }, + { + "eventId": "7", + "eventTime": "2023-11-28T16:56:24.877501922Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "1446805", + "workerMayIgnore": false, + "activityTaskScheduledEventAttributes": { + "activityId": "7", + "activityType": { + "name": "Echo" + }, + "taskQueue": { + "name": "tq-733cff68-44a2-41b8-ad85-1d4ee6291579-TestIntegrationSuite/TestMultipleUpdateOrderingCancel", + "kind": "Normal", + "normalName": "" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "5", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 0, + "nonRetryableErrorTypes": [] + }, + "useCompatibleVersion": true + } + }, + { + "eventId": "8", + "eventTime": "2023-11-28T16:56:24.877519172Z", + "eventType": "ActivityTaskCancelRequested", + "version": "0", + "taskId": "1446806", + "workerMayIgnore": false, + "activityTaskCancelRequestedEventAttributes": { + "scheduledEventId": "7", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "9", + "eventTime": "2023-11-28T16:56:24.877554922Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446807", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "67b35fc6-482c-4c07-a735-170845a63f0c", + "identity": "" + }, + "acceptedEventId": "6", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "10", + "eventTime": "2023-11-28T16:56:24.877575422Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446808", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "ca99de81-de56-4b35-8cc5-6aaf4921c01d", + "acceptedRequestMessageId": "ca99de81-de56-4b35-8cc5-6aaf4921c01d/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "ca99de81-de56-4b35-8cc5-6aaf4921c01d", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "echo", + "args": null + } + } + } + }, + { + "eventId": "11", + "eventTime": "2023-11-28T16:56:24.877591589Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "1446809", + "workerMayIgnore": false, + "activityTaskScheduledEventAttributes": { + "activityId": "11", + "activityType": { + "name": "Echo" + }, + "taskQueue": { + "name": "tq-733cff68-44a2-41b8-ad85-1d4ee6291579-TestIntegrationSuite/TestMultipleUpdateOrderingCancel", + "kind": "Normal", + "normalName": "" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "5", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 0, + "nonRetryableErrorTypes": [] + }, + "useCompatibleVersion": true + } + }, + { + "eventId": "12", + "eventTime": "2023-11-28T16:56:24.877598755Z", + "eventType": "ActivityTaskCancelRequested", + "version": "0", + "taskId": "1446810", + "workerMayIgnore": false, + "activityTaskCancelRequestedEventAttributes": { + "scheduledEventId": "11", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "13", + "eventTime": "2023-11-28T16:56:24.877605630Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446811", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "ca99de81-de56-4b35-8cc5-6aaf4921c01d", + "identity": "" + }, + "acceptedEventId": "10", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "14", + "eventTime": "2023-11-28T16:56:24.877613672Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446812", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "91c39ecb-99d6-43e1-a074-b81193870e78", + "acceptedRequestMessageId": "91c39ecb-99d6-43e1-a074-b81193870e78/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "91c39ecb-99d6-43e1-a074-b81193870e78", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "echo", + "args": null + } + } + } + }, + { + "eventId": "15", + "eventTime": "2023-11-28T16:56:24.877620589Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "1446813", + "workerMayIgnore": false, + "activityTaskScheduledEventAttributes": { + "activityId": "15", + "activityType": { + "name": "Echo" + }, + "taskQueue": { + "name": "tq-733cff68-44a2-41b8-ad85-1d4ee6291579-TestIntegrationSuite/TestMultipleUpdateOrderingCancel", + "kind": "Normal", + "normalName": "" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "5", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 0, + "nonRetryableErrorTypes": [] + }, + "useCompatibleVersion": true + } + }, + { + "eventId": "16", + "eventTime": "2023-11-28T16:56:24.877624672Z", + "eventType": "ActivityTaskCancelRequested", + "version": "0", + "taskId": "1446814", + "workerMayIgnore": false, + "activityTaskCancelRequestedEventAttributes": { + "scheduledEventId": "15", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "17", + "eventTime": "2023-11-28T16:56:24.877630172Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446815", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "91c39ecb-99d6-43e1-a074-b81193870e78", + "identity": "" + }, + "acceptedEventId": "14", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "18", + "eventTime": "2023-11-28T16:56:24.877637505Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446816", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "822ed276-adc7-467e-9057-8b3549fc81d5", + "acceptedRequestMessageId": "822ed276-adc7-467e-9057-8b3549fc81d5/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "822ed276-adc7-467e-9057-8b3549fc81d5", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "echo", + "args": null + } + } + } + }, + { + "eventId": "19", + "eventTime": "2023-11-28T16:56:24.877647255Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "1446817", + "workerMayIgnore": false, + "activityTaskScheduledEventAttributes": { + "activityId": "19", + "activityType": { + "name": "Echo" + }, + "taskQueue": { + "name": "tq-733cff68-44a2-41b8-ad85-1d4ee6291579-TestIntegrationSuite/TestMultipleUpdateOrderingCancel", + "kind": "Normal", + "normalName": "" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "5", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 0, + "nonRetryableErrorTypes": [] + }, + "useCompatibleVersion": true + } + }, + { + "eventId": "20", + "eventTime": "2023-11-28T16:56:24.877650380Z", + "eventType": "ActivityTaskCancelRequested", + "version": "0", + "taskId": "1446818", + "workerMayIgnore": false, + "activityTaskCancelRequestedEventAttributes": { + "scheduledEventId": "19", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "21", + "eventTime": "2023-11-28T16:56:24.877656547Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446819", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "822ed276-adc7-467e-9057-8b3549fc81d5", + "identity": "" + }, + "acceptedEventId": "18", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "22", + "eventTime": "2023-11-28T16:56:24.877662547Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446820", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "ec448fe1-b60b-4268-9316-4c8a8aa08bd6", + "acceptedRequestMessageId": "ec448fe1-b60b-4268-9316-4c8a8aa08bd6/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "ec448fe1-b60b-4268-9316-4c8a8aa08bd6", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "sleep", + "args": null + } + } + } + }, + { + "eventId": "23", + "eventTime": "2023-11-28T16:56:24.877672130Z", + "eventType": "TimerStarted", + "version": "0", + "taskId": "1446821", + "workerMayIgnore": false, + "timerStartedEventAttributes": { + "timerId": "23", + "startToFireTimeout": "1s", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "24", + "eventTime": "2023-11-28T16:56:24.877680422Z", + "eventType": "TimerCanceled", + "version": "0", + "taskId": "1446822", + "workerMayIgnore": false, + "timerCanceledEventAttributes": { + "timerId": "23", + "startedEventId": "23", + "workflowTaskCompletedEventId": "5", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "25", + "eventTime": "2023-11-28T16:56:24.877685839Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446823", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "ec448fe1-b60b-4268-9316-4c8a8aa08bd6", + "identity": "" + }, + "acceptedEventId": "22", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "26", + "eventTime": "2023-11-28T16:56:24.877695547Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446824", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "2b2860a0-bb75-4f9e-bfb1-ae4fced0345e", + "acceptedRequestMessageId": "2b2860a0-bb75-4f9e-bfb1-ae4fced0345e/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "2b2860a0-bb75-4f9e-bfb1-ae4fced0345e", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "sleep", + "args": null + } + } + } + }, + { + "eventId": "27", + "eventTime": "2023-11-28T16:56:24.877701380Z", + "eventType": "TimerStarted", + "version": "0", + "taskId": "1446825", + "workerMayIgnore": false, + "timerStartedEventAttributes": { + "timerId": "27", + "startToFireTimeout": "1s", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "28", + "eventTime": "2023-11-28T16:56:24.877702505Z", + "eventType": "TimerCanceled", + "version": "0", + "taskId": "1446826", + "workerMayIgnore": false, + "timerCanceledEventAttributes": { + "timerId": "27", + "startedEventId": "27", + "workflowTaskCompletedEventId": "5", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "29", + "eventTime": "2023-11-28T16:56:24.877704630Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446827", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "2b2860a0-bb75-4f9e-bfb1-ae4fced0345e", + "identity": "" + }, + "acceptedEventId": "26", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "30", + "eventTime": "2023-11-28T16:56:24.877715589Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446828", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "7b208937-82cd-4538-8029-506244aa66a8", + "acceptedRequestMessageId": "7b208937-82cd-4538-8029-506244aa66a8/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "7b208937-82cd-4538-8029-506244aa66a8", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "sleep", + "args": null + } + } + } + }, + { + "eventId": "31", + "eventTime": "2023-11-28T16:56:24.877721380Z", + "eventType": "TimerStarted", + "version": "0", + "taskId": "1446829", + "workerMayIgnore": false, + "timerStartedEventAttributes": { + "timerId": "31", + "startToFireTimeout": "1s", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "32", + "eventTime": "2023-11-28T16:56:24.877722089Z", + "eventType": "TimerCanceled", + "version": "0", + "taskId": "1446830", + "workerMayIgnore": false, + "timerCanceledEventAttributes": { + "timerId": "31", + "startedEventId": "31", + "workflowTaskCompletedEventId": "5", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "33", + "eventTime": "2023-11-28T16:56:24.877724505Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446831", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "7b208937-82cd-4538-8029-506244aa66a8", + "identity": "" + }, + "acceptedEventId": "30", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "34", + "eventTime": "2023-11-28T16:56:24.877729922Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446832", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "0fc68c09-7cf4-47c5-a7fe-a54b87690075", + "acceptedRequestMessageId": "0fc68c09-7cf4-47c5-a7fe-a54b87690075/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "0fc68c09-7cf4-47c5-a7fe-a54b87690075", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "empty", + "args": null + } + } + } + }, + { + "eventId": "35", + "eventTime": "2023-11-28T16:56:24.877734130Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446833", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "0fc68c09-7cf4-47c5-a7fe-a54b87690075", + "identity": "" + }, + "acceptedEventId": "34", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "36", + "eventTime": "2023-11-28T16:56:24.877738172Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446834", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "6f56376a-c2e9-4392-88d7-acab5a88ef5c", + "acceptedRequestMessageId": "6f56376a-c2e9-4392-88d7-acab5a88ef5c/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "6f56376a-c2e9-4392-88d7-acab5a88ef5c", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "empty", + "args": null + } + } + } + }, + { + "eventId": "37", + "eventTime": "2023-11-28T16:56:24.877747047Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446835", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "6f56376a-c2e9-4392-88d7-acab5a88ef5c", + "identity": "" + }, + "acceptedEventId": "36", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "38", + "eventTime": "2023-11-28T16:56:24.877753547Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446836", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "2dca0463-2edb-41fd-99ae-771b6cf0ea5a", + "acceptedRequestMessageId": "2dca0463-2edb-41fd-99ae-771b6cf0ea5a/request", + "acceptedRequestSequencingEventId": "3", + "acceptedRequest": { + "meta": { + "updateId": "2dca0463-2edb-41fd-99ae-771b6cf0ea5a", + "identity": "13318@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "empty", + "args": null + } + } + } + }, + { + "eventId": "39", + "eventTime": "2023-11-28T16:56:24.877757255Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446837", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "2dca0463-2edb-41fd-99ae-771b6cf0ea5a", + "identity": "" + }, + "acceptedEventId": "38", + "outcome": { + "failure": { + "message": "canceled", + "source": "GoSDK", + "stackTrace": "", + "encodedAttributes": null, + "cause": null, + "canceledFailureInfo": { + "details": null + } + } + } + } + }, + { + "eventId": "40", + "eventTime": "2023-11-28T16:56:24.877764089Z", + "eventType": "WorkflowExecutionCompleted", + "version": "0", + "taskId": "1446838", + "workerMayIgnore": false, + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MTA=" + } + ] + }, + "workflowTaskCompletedEventId": "5", + "newExecutionRunId": "" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/multiple-updates.json b/test/replaytests/multiple-updates.json new file mode 100644 index 000000000..14e89ca93 --- /dev/null +++ b/test/replaytests/multiple-updates.json @@ -0,0 +1,1096 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2023-11-28T16:57:59.949373841Z", + "eventType": "WorkflowExecutionStarted", + "version": "0", + "taskId": "1446847", + "workerMayIgnore": false, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "MultipleUpdateWorkflow" + }, + "parentWorkflowNamespace": "", + "parentWorkflowNamespaceId": "", + "parentWorkflowExecution": null, + "parentInitiatedEventId": "0", + "taskQueue": { + "name": "tq-ca4512b3-5a8f-483f-aab7-915860ef7685-TestIntegrationSuite/TestMultipleUpdateOrdering", + "kind": "Normal", + "normalName": "" + }, + "input": null, + "workflowExecutionTimeout": "15s", + "workflowRunTimeout": "15s", + "workflowTaskTimeout": "1s", + "continuedExecutionRunId": "", + "initiator": "Unspecified", + "continuedFailure": null, + "lastCompletionResult": null, + "originalExecutionRunId": "d722448d-7be3-47e1-bc51-758a6b959501", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "firstExecutionRunId": "d722448d-7be3-47e1-bc51-758a6b959501", + "retryPolicy": null, + "attempt": 1, + "workflowExecutionExpirationTime": "2023-11-28T16:58:14.949Z", + "cronSchedule": "", + "firstWorkflowTaskBackoff": "0s", + "memo": null, + "searchAttributes": null, + "prevAutoResetPoints": null, + "header": { + "fields": {} + }, + "parentInitiatedEventVersion": "0", + "workflowId": "test-multiple-update-ordering", + "sourceVersionStamp": null + } + }, + { + "eventId": "2", + "eventTime": "2023-11-28T16:57:59.949401174Z", + "eventType": "WorkflowTaskScheduled", + "version": "0", + "taskId": "1446848", + "workerMayIgnore": false, + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "tq-ca4512b3-5a8f-483f-aab7-915860ef7685-TestIntegrationSuite/TestMultipleUpdateOrdering", + "kind": "Normal", + "normalName": "" + }, + "startToCloseTimeout": "1s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2023-11-28T16:58:05.973759969Z", + "eventType": "WorkflowTaskStarted", + "version": "0", + "taskId": "1446857", + "workerMayIgnore": false, + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "47927834-2c4a-46f2-8d69-ae9a1b9a4105", + "suggestContinueAsNew": false, + "historySizeBytes": "896" + } + }, + { + "eventId": "4", + "eventTime": "2023-11-28T16:58:06.004982136Z", + "eventType": "WorkflowTaskCompleted", + "version": "0", + "taskId": "1446864", + "workerMayIgnore": false, + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "binaryChecksum": "", + "workerVersion": { + "buildId": "62d192ee9495d08baad51e9cb4060dcc", + "bundleId": "", + "useVersioning": false + }, + "sdkMetadata": { + "coreUsedFlags": [], + "langUsedFlags": [ + 3, + 4 + ], + "sdkName": "temporal-go", + "sdkVersion": "1.25.1" + }, + "meteringMetadata": { + "nonfirstLocalActivityExecutionAttempts": 0 + } + } + }, + { + "eventId": "5", + "eventTime": "2023-11-28T16:58:06.005081302Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446865", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "30ea422c-1f4f-4410-b829-f1530e66499e", + "acceptedRequestMessageId": "30ea422c-1f4f-4410-b829-f1530e66499e/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "30ea422c-1f4f-4410-b829-f1530e66499e", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "echo", + "args": null + } + } + } + }, + { + "eventId": "6", + "eventTime": "2023-11-28T16:58:06.005148386Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "1446866", + "workerMayIgnore": false, + "activityTaskScheduledEventAttributes": { + "activityId": "6", + "activityType": { + "name": "Echo" + }, + "taskQueue": { + "name": "tq-ca4512b3-5a8f-483f-aab7-915860ef7685-TestIntegrationSuite/TestMultipleUpdateOrdering", + "kind": "Normal", + "normalName": "" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "4", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 0, + "nonRetryableErrorTypes": [] + }, + "useCompatibleVersion": true + } + }, + { + "eventId": "7", + "eventTime": "2023-11-28T16:58:06.005166094Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446867", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "b1db82b7-891f-45b0-b1ff-2a145665aaaf", + "acceptedRequestMessageId": "b1db82b7-891f-45b0-b1ff-2a145665aaaf/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "b1db82b7-891f-45b0-b1ff-2a145665aaaf", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "echo", + "args": null + } + } + } + }, + { + "eventId": "8", + "eventTime": "2023-11-28T16:58:06.005211219Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "1446868", + "workerMayIgnore": false, + "activityTaskScheduledEventAttributes": { + "activityId": "8", + "activityType": { + "name": "Echo" + }, + "taskQueue": { + "name": "tq-ca4512b3-5a8f-483f-aab7-915860ef7685-TestIntegrationSuite/TestMultipleUpdateOrdering", + "kind": "Normal", + "normalName": "" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "4", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 0, + "nonRetryableErrorTypes": [] + }, + "useCompatibleVersion": true + } + }, + { + "eventId": "9", + "eventTime": "2023-11-28T16:58:06.005219094Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446869", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "5c579628-c76d-4bb4-b9fb-be82d0ac1ca2", + "acceptedRequestMessageId": "5c579628-c76d-4bb4-b9fb-be82d0ac1ca2/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "5c579628-c76d-4bb4-b9fb-be82d0ac1ca2", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "echo", + "args": null + } + } + } + }, + { + "eventId": "10", + "eventTime": "2023-11-28T16:58:06.005228969Z", + "eventType": "ActivityTaskScheduled", + "version": "0", + "taskId": "1446870", + "workerMayIgnore": false, + "activityTaskScheduledEventAttributes": { + "activityId": "10", + "activityType": { + "name": "Echo" + }, + "taskQueue": { + "name": "tq-ca4512b3-5a8f-483f-aab7-915860ef7685-TestIntegrationSuite/TestMultipleUpdateOrdering", + "kind": "Normal", + "normalName": "" + }, + "header": { + "fields": {} + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + }, + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "4", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s", + "maximumAttempts": 0, + "nonRetryableErrorTypes": [] + }, + "useCompatibleVersion": true + } + }, + { + "eventId": "11", + "eventTime": "2023-11-28T16:58:06.005250636Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446871", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "be3af148-fa9e-48d5-8d6a-5040bb1ef3c5", + "acceptedRequestMessageId": "be3af148-fa9e-48d5-8d6a-5040bb1ef3c5/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "be3af148-fa9e-48d5-8d6a-5040bb1ef3c5", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "sleep", + "args": null + } + } + } + }, + { + "eventId": "12", + "eventTime": "2023-11-28T16:58:06.005258469Z", + "eventType": "TimerStarted", + "version": "0", + "taskId": "1446872", + "workerMayIgnore": false, + "timerStartedEventAttributes": { + "timerId": "12", + "startToFireTimeout": "1s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "13", + "eventTime": "2023-11-28T16:58:06.005271886Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446873", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "6526309d-5d3e-489c-a0dd-5c832991a9be", + "acceptedRequestMessageId": "6526309d-5d3e-489c-a0dd-5c832991a9be/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "6526309d-5d3e-489c-a0dd-5c832991a9be", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "sleep", + "args": null + } + } + } + }, + { + "eventId": "14", + "eventTime": "2023-11-28T16:58:06.005277469Z", + "eventType": "TimerStarted", + "version": "0", + "taskId": "1446874", + "workerMayIgnore": false, + "timerStartedEventAttributes": { + "timerId": "14", + "startToFireTimeout": "1s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "15", + "eventTime": "2023-11-28T16:58:06.005280052Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446875", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "bfaaf99c-dd02-49c2-b658-ffe30447f8c4", + "acceptedRequestMessageId": "bfaaf99c-dd02-49c2-b658-ffe30447f8c4/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "bfaaf99c-dd02-49c2-b658-ffe30447f8c4", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "sleep", + "args": null + } + } + } + }, + { + "eventId": "16", + "eventTime": "2023-11-28T16:58:06.005283344Z", + "eventType": "TimerStarted", + "version": "0", + "taskId": "1446876", + "workerMayIgnore": false, + "timerStartedEventAttributes": { + "timerId": "16", + "startToFireTimeout": "1s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "17", + "eventTime": "2023-11-28T16:58:06.005285636Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446877", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "54cd05be-76fb-4bdd-8312-ae89db4bd6f8", + "acceptedRequestMessageId": "54cd05be-76fb-4bdd-8312-ae89db4bd6f8/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "54cd05be-76fb-4bdd-8312-ae89db4bd6f8", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "sleep", + "args": null + } + } + } + }, + { + "eventId": "18", + "eventTime": "2023-11-28T16:58:06.005293886Z", + "eventType": "TimerStarted", + "version": "0", + "taskId": "1446878", + "workerMayIgnore": false, + "timerStartedEventAttributes": { + "timerId": "18", + "startToFireTimeout": "1s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "19", + "eventTime": "2023-11-28T16:58:06.005296052Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446879", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "836d871d-0cd2-4407-9d31-25289f9a2d4a", + "acceptedRequestMessageId": "836d871d-0cd2-4407-9d31-25289f9a2d4a/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "836d871d-0cd2-4407-9d31-25289f9a2d4a", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "empty", + "args": null + } + } + } + }, + { + "eventId": "20", + "eventTime": "2023-11-28T16:58:06.005308761Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446880", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "836d871d-0cd2-4407-9d31-25289f9a2d4a", + "identity": "" + }, + "acceptedEventId": "19", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "21", + "eventTime": "2023-11-28T16:58:06.005317261Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446881", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "983ed085-f136-4c79-8296-d39236fcbc1a", + "acceptedRequestMessageId": "983ed085-f136-4c79-8296-d39236fcbc1a/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "983ed085-f136-4c79-8296-d39236fcbc1a", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "empty", + "args": null + } + } + } + }, + { + "eventId": "22", + "eventTime": "2023-11-28T16:58:06.005324802Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446882", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "983ed085-f136-4c79-8296-d39236fcbc1a", + "identity": "" + }, + "acceptedEventId": "21", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "23", + "eventTime": "2023-11-28T16:58:06.005329761Z", + "eventType": "WorkflowExecutionUpdateAccepted", + "version": "0", + "taskId": "1446883", + "workerMayIgnore": false, + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "cdc28308-7c06-4360-867d-1cc77718e575", + "acceptedRequestMessageId": "cdc28308-7c06-4360-867d-1cc77718e575/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "cdc28308-7c06-4360-867d-1cc77718e575", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@" + }, + "input": { + "header": { + "fields": {} + }, + "name": "empty", + "args": null + } + } + } + }, + { + "eventId": "24", + "eventTime": "2023-11-28T16:58:06.005335552Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446884", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "cdc28308-7c06-4360-867d-1cc77718e575", + "identity": "" + }, + "acceptedEventId": "23", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "25", + "eventTime": "2023-11-28T16:58:07.007910928Z", + "eventType": "TimerFired", + "version": "0", + "taskId": "1446889", + "workerMayIgnore": false, + "timerFiredEventAttributes": { + "timerId": "12", + "startedEventId": "12" + } + }, + { + "eventId": "26", + "eventTime": "2023-11-28T16:58:07.007923386Z", + "eventType": "TimerFired", + "version": "0", + "taskId": "1446890", + "workerMayIgnore": false, + "timerFiredEventAttributes": { + "timerId": "14", + "startedEventId": "14" + } + }, + { + "eventId": "27", + "eventTime": "2023-11-28T16:58:07.007926470Z", + "eventType": "TimerFired", + "version": "0", + "taskId": "1446891", + "workerMayIgnore": false, + "timerFiredEventAttributes": { + "timerId": "16", + "startedEventId": "16" + } + }, + { + "eventId": "28", + "eventTime": "2023-11-28T16:58:07.007928678Z", + "eventType": "TimerFired", + "version": "0", + "taskId": "1446892", + "workerMayIgnore": false, + "timerFiredEventAttributes": { + "timerId": "18", + "startedEventId": "18" + } + }, + { + "eventId": "29", + "eventTime": "2023-11-28T16:58:07.007943678Z", + "eventType": "WorkflowTaskScheduled", + "version": "0", + "taskId": "1446893", + "workerMayIgnore": false, + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:f5992dd6-a6c3-4d9d-bf95-5a4e66789caf", + "kind": "Sticky", + "normalName": "tq-ca4512b3-5a8f-483f-aab7-915860ef7685-TestIntegrationSuite/TestMultipleUpdateOrdering" + }, + "startToCloseTimeout": "1s", + "attempt": 1 + } + }, + { + "eventId": "30", + "eventTime": "2023-11-28T16:58:06.005347011Z", + "eventType": "ActivityTaskStarted", + "version": "0", + "taskId": "1446897", + "workerMayIgnore": false, + "activityTaskStartedEventAttributes": { + "scheduledEventId": "6", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "21074c9e-4716-4154-a6fc-df44e810f45c", + "attempt": 1, + "lastFailure": null + } + }, + { + "eventId": "31", + "eventTime": "2023-11-28T16:58:07.030794011Z", + "eventType": "ActivityTaskCompleted", + "version": "0", + "taskId": "1446898", + "workerMayIgnore": false, + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduledEventId": "6", + "startedEventId": "30", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "workerVersion": null + } + }, + { + "eventId": "32", + "eventTime": "2023-11-28T16:58:06.005363177Z", + "eventType": "ActivityTaskStarted", + "version": "0", + "taskId": "1446901", + "workerMayIgnore": false, + "activityTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "0e3d2d42-ab66-4e85-a61b-c29227bbf290", + "attempt": 1, + "lastFailure": null + } + }, + { + "eventId": "33", + "eventTime": "2023-11-28T16:58:07.040422845Z", + "eventType": "ActivityTaskCompleted", + "version": "0", + "taskId": "1446902", + "workerMayIgnore": false, + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduledEventId": "8", + "startedEventId": "32", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "workerVersion": null + } + }, + { + "eventId": "34", + "eventTime": "2023-11-28T16:58:06.005368427Z", + "eventType": "ActivityTaskStarted", + "version": "0", + "taskId": "1446905", + "workerMayIgnore": false, + "activityTaskStartedEventAttributes": { + "scheduledEventId": "10", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "c7bcb6d9-43b9-4593-8979-928c369e7f1d", + "attempt": 1, + "lastFailure": null + } + }, + { + "eventId": "35", + "eventTime": "2023-11-28T16:58:07.048957136Z", + "eventType": "ActivityTaskCompleted", + "version": "0", + "taskId": "1446906", + "workerMayIgnore": false, + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MQ==" + } + ] + }, + "scheduledEventId": "10", + "startedEventId": "34", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "workerVersion": null + } + }, + { + "eventId": "36", + "eventTime": "2023-11-28T16:58:07.057198178Z", + "eventType": "WorkflowTaskStarted", + "version": "0", + "taskId": "1446908", + "workerMayIgnore": false, + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "29", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "aaee70fc-2ee9-4afd-aa0d-3fd88169b70f", + "suggestContinueAsNew": false, + "historySizeBytes": "5310" + } + }, + { + "eventId": "37", + "eventTime": "2023-11-28T16:58:07.069501053Z", + "eventType": "WorkflowTaskCompleted", + "version": "0", + "taskId": "1446912", + "workerMayIgnore": false, + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "29", + "startedEventId": "36", + "identity": "13528@Quinn-Klassens-MacBook-Pro.local@", + "binaryChecksum": "", + "workerVersion": { + "buildId": "62d192ee9495d08baad51e9cb4060dcc", + "bundleId": "", + "useVersioning": false + }, + "sdkMetadata": { + "coreUsedFlags": [], + "langUsedFlags": [], + "sdkName": "", + "sdkVersion": "" + }, + "meteringMetadata": { + "nonfirstLocalActivityExecutionAttempts": 0 + } + } + }, + { + "eventId": "38", + "eventTime": "2023-11-28T16:58:07.069539970Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446913", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "be3af148-fa9e-48d5-8d6a-5040bb1ef3c5", + "identity": "" + }, + "acceptedEventId": "11", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "39", + "eventTime": "2023-11-28T16:58:07.069554178Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446914", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "6526309d-5d3e-489c-a0dd-5c832991a9be", + "identity": "" + }, + "acceptedEventId": "13", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "40", + "eventTime": "2023-11-28T16:58:07.069559011Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446915", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "bfaaf99c-dd02-49c2-b658-ffe30447f8c4", + "identity": "" + }, + "acceptedEventId": "15", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "41", + "eventTime": "2023-11-28T16:58:07.069563511Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446916", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "54cd05be-76fb-4bdd-8312-ae89db4bd6f8", + "identity": "" + }, + "acceptedEventId": "17", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "42", + "eventTime": "2023-11-28T16:58:07.069567178Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446917", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "30ea422c-1f4f-4410-b829-f1530e66499e", + "identity": "" + }, + "acceptedEventId": "5", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "43", + "eventTime": "2023-11-28T16:58:07.069571595Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446918", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "b1db82b7-891f-45b0-b1ff-2a145665aaaf", + "identity": "" + }, + "acceptedEventId": "7", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "44", + "eventTime": "2023-11-28T16:58:07.069574761Z", + "eventType": "WorkflowExecutionUpdateCompleted", + "version": "0", + "taskId": "1446919", + "workerMayIgnore": false, + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "5c579628-c76d-4bb4-b9fb-be82d0ac1ca2", + "identity": "" + }, + "acceptedEventId": "9", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + }, + "data": null + } + ] + } + } + } + }, + { + "eventId": "45", + "eventTime": "2023-11-28T16:58:07.069578928Z", + "eventType": "WorkflowExecutionCompleted", + "version": "0", + "taskId": "1446920", + "workerMayIgnore": false, + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "MTA=" + } + ] + }, + "workflowTaskCompletedEventId": "37", + "newExecutionRunId": "" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index d9487344a..1ecf91e9b 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -406,6 +406,15 @@ func (s *replayTestSuite) TestChildWorkflowCancelWithUpdate() { s.NoError(err) } +func (s *replayTestSuite) TestMultipleUpdates() { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(MultipleUpdateWorkflow) + err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "multiple-updates.json") + s.NoError(err) + err = replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "multiple-updates-canceled.json") + s.NoError(err) +} + type captureConverter struct { converter.DataConverter toPayloads []interface{} diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index 6d7467b50..72c88c509 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -501,3 +501,48 @@ func ChildWorkflowCancelWithUpdate(ctx workflow.Context) error { workflow.GetSignalChannel(ctx, "shutdown").Receive(ctx, nil) return nil } + +func MultipleUpdateWorkflow(ctx workflow.Context) (int, error) { + inflightUpdates := 0 + updatesRan := 0 + sleepHandle := func(ctx workflow.Context) error { + inflightUpdates++ + updatesRan++ + defer func() { + inflightUpdates-- + }() + return workflow.Sleep(ctx, time.Second) + } + echoHandle := func(ctx workflow.Context) error { + inflightUpdates++ + updatesRan++ + defer func() { + inflightUpdates-- + }() + + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToStartTimeout: 5 * time.Second, + ScheduleToCloseTimeout: 5 * time.Second, + StartToCloseTimeout: 9 * time.Second, + }) + return workflow.ExecuteActivity(ctx, "Echo", 1, 1).Get(ctx, nil) + } + emptyHandle := func(ctx workflow.Context) error { + inflightUpdates++ + updatesRan++ + defer func() { + inflightUpdates-- + }() + return ctx.Err() + } + // Register multiple update handles in the first workflow task to make sure we process an + // update only when its handle is registered, not when any handle is registered + workflow.SetUpdateHandler(ctx, "echo", echoHandle) + workflow.SetUpdateHandler(ctx, "sleep", sleepHandle) + workflow.SetUpdateHandler(ctx, "empty", emptyHandle) + err := workflow.Await(ctx, func() bool { return inflightUpdates == 0 }) + if err != nil { + return 0, err + } + return updatesRan, nil +} diff --git a/test/workflow_test.go b/test/workflow_test.go index b57109088..e9378b3a6 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -2130,6 +2130,73 @@ func (w *Workflows) PanicOnSignal(ctx workflow.Context) error { panic("intentional panic") } +func (w *Workflows) WaitOnUpdate(ctx workflow.Context) (int, error) { + inflightUpdates := 0 + updatesRan := 0 + sleepHandle := func(ctx workflow.Context) error { + inflightUpdates++ + updatesRan++ + err := workflow.Sleep(ctx, time.Second) + inflightUpdates-- + return err + } + echoHandle := func(ctx workflow.Context) error { + inflightUpdates++ + updatesRan++ + + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + var a Activities + err := workflow.ExecuteActivity(ctx, a.Echo, 1, 1).Get(ctx, nil) + inflightUpdates-- + return err + } + emptyHandle := func(ctx workflow.Context) error { + inflightUpdates++ + updatesRan++ + defer func() { + inflightUpdates-- + }() + return ctx.Err() + } + // Register multiple update handles in the first workflow task to make sure we process an + // update only when its handle is registered, not when any handle is registered + workflow.SetUpdateHandler(ctx, "echo", echoHandle) + workflow.SetUpdateHandler(ctx, "sleep", sleepHandle) + workflow.SetUpdateHandler(ctx, "empty", emptyHandle) + err := workflow.Await(ctx, func() bool { return inflightUpdates == 0 }) + if err != nil { + return 0, err + } + return updatesRan, nil +} + +func (w *Workflows) UpdateSetHandlerOnly(ctx workflow.Context) (int, error) { + updatesRan := 0 + updateHandle := func(ctx workflow.Context) error { + updatesRan++ + return nil + } + workflow.SetUpdateHandler(ctx, "update", updateHandle) + return updatesRan, nil +} + +func (w *Workflows) UpdateOrdering(ctx workflow.Context) (int, error) { + updatesRan := 0 + updateHandle := func(ctx workflow.Context) error { + updatesRan++ + return nil + } + // Register multiple update handles in the first workflow task to make sure we process an + // update only when its handle is registered, not when any handle is registered + workflow.SetUpdateHandler(ctx, "update", updateHandle) + currentTime := workflow.Now(ctx) + // Wait a workflow task + workflow.Await(ctx, func() bool { + return workflow.Now(ctx).After(currentTime) + }) + return updatesRan, nil +} + var forcedNonDeterminismCounter int func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDiffName bool) (err error) { @@ -2471,6 +2538,9 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ChildWorkflowAndParentCancel) worker.RegisterWorkflow(w.sleep) worker.RegisterWorkflow(w.timer) + worker.RegisterWorkflow(w.WaitOnUpdate) + worker.RegisterWorkflow(w.UpdateOrdering) + worker.RegisterWorkflow(w.UpdateSetHandlerOnly) } func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {