Skip to content

Commit

Permalink
Handle updates immediately when registered (#1306)
Browse files Browse the repository at this point in the history
Handle updates immediately when registered.
Handle updates before the root coroutine.
  • Loading branch information
Quinn-With-Two-Ns authored Dec 4, 2023
1 parent 987379d commit 5fdbecc
Show file tree
Hide file tree
Showing 29 changed files with 2,796 additions and 61 deletions.
1 change: 1 addition & 0 deletions contrib/datadog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/tinylib/msgp v1.1.2 // indirect
go.temporal.io/api v1.26.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions contrib/datadog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
1 change: 1 addition & 0 deletions contrib/opentelemetry/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
go.temporal.io/api v1.26.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions contrib/opentelemetry/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
1 change: 1 addition & 0 deletions contrib/opentracing/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
go.temporal.io/api v1.26.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions contrib/opentracing/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
1 change: 1 addition & 0 deletions contrib/tally/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/twmb/murmur3 v1.1.5 // indirect
go.temporal.io/api v1.26.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions contrib/tally/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e
golang.org/x/net v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
5 changes: 3 additions & 2 deletions internal/cmd/build/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ require (
github.com/stretchr/testify v1.8.4 // indirect
go.temporal.io/api v1.26.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.12.1-0.20230825192346-2191a27a6dc5 // indirect
golang.org/x/tools v0.16.0 // indirect
google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect
Expand Down
12 changes: 7 additions & 5 deletions internal/cmd/build/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a h1:Jw5wfR+h9mnIYH+OtGT2im5wV1YGGDora5vTv/aa5bE=
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand All @@ -95,8 +97,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -117,7 +119,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -150,8 +152,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/tools v0.12.1-0.20230825192346-2191a27a6dc5 h1:Vk4mysSz+GqQK2eqgWbo4zEO89wkeAjJiFIr9bpqa8k=
golang.org/x/tools v0.12.1-0.20230825192346-2191a27a6dc5/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
58 changes: 55 additions & 3 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func createRootTestContext() (interceptor *workflowEnvironmentInterceptor, ctx C

func createNewDispatcher(f func(ctx Context)) dispatcher {
interceptor, ctx := createRootTestContext()
result, _ := newDispatcher(ctx, interceptor, f)
result, _ := newDispatcher(ctx, interceptor, f, func() bool { return false })
result.interceptor = interceptor
return result
}
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestAwaitCancellation(t *testing.T) {
ctx, cancelHandler := WithCancel(ctx)
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
awaitError = Await(ctx, func() bool { return false })
})
}, func() bool { return false })
defer d.Close()
err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)
require.NoError(t, err)
Expand Down Expand Up @@ -943,14 +943,66 @@ func TestAwaitWithTimeoutNoTimeout(t *testing.T) {
require.True(t, d.IsDone())
}

func TestRecursiveEagerCoroutine(t *testing.T) {
// Verify eager coroutines run before normal coroutines
// even if they are scheduled in other eager coroutines
var d dispatcher
var history []string
d = createNewDispatcher(func(ctx Context) {
history = append(history, "root")
Go(ctx, func(ctx Context) {
history = append(history, "coroutine 1")
})
d.NewCoroutine(ctx, "outer eager", true, func(ctx Context) {
history = append(history, "outer eager coroutine")
d.NewCoroutine(ctx, "inner eager", true, func(ctx Context) {
history = append(history, "inner eager coroutine")
})
})
Go(ctx, func(ctx Context) {
history = append(history, "coroutine 2")
})
// Yield to allow the eager coroutines to run
state := getState(ctx)
history = append(history, "root yield start")
state.yield("test")
history = append(history, "root yield finish")

})
defer d.Close()
err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)
require.NoError(t, err)
require.True(t, d.IsDone())
require.Equal(t, []string{"root", "root yield start", "outer eager coroutine", "inner eager coroutine", "coroutine 1", "coroutine 2", "root yield finish"}, history)
}

func TestEagerCoroutineWhileNotRunning(t *testing.T) {
var history []string
interceptor, ctx := createRootTestContext()
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
history = append(history, "root")
}, func() bool { return false })
d.interceptor = interceptor

defer d.Close()
d.NewCoroutine(ctx, "eager", true, func(ctx Context) {
history = append(history, "eager coroutine")
})

err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)
require.NoError(t, err)
require.True(t, d.IsDone())
require.Equal(t, []string{"eager coroutine", "root"}, history)
}

func TestAwaitWithTimeoutCancellation(t *testing.T) {
var awaitWithTimeoutError error
var awaitOk bool
interceptor, ctx := createRootTestContext()
ctx, cancelHandler := WithCancel(ctx)
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return false })
})
}, func() bool { return false })
defer d.Close()
err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)
require.NoError(t, err)
Expand Down
44 changes: 44 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ type (
sdkVersion string
sdkNameUpdated bool
sdkName string
// Any update requests received in a workflow task before we have registered
// any handlers are not scheduled and are queued here until either their
// handler is registered or the event loop runs out of work and they are rejected.
bufferedUpdateRequests map[string][]func()

protocols *protocol.Registry
}
Expand Down Expand Up @@ -242,6 +246,7 @@ func newWorkflowExecutionEventHandler(
protocols: protocol.NewRegistry(),
mutableSideEffectCallCounter: make(map[string]int),
sdkFlags: newSDKFlags(capabilities),
bufferedUpdateRequests: make(map[string][]func()),
}
// Attempt to skip 1 log level to remove the ReplayLogger from the stack.
context.logger = log.Skip(ilog.NewReplayLogger(
Expand Down Expand Up @@ -881,6 +886,44 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro
wc.logger.Debug("SideEffect Marker added", tagSideEffectID, sideEffectID)
}

func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
return wc.sdkFlags.tryUse(flag, !wc.isReplay)
}

func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) {
wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f)
}

func (wc *workflowEnvironmentImpl) HandleUpdates(name string) bool {
if !wc.sdkFlags.tryUse(SDKPriorityUpdateHandling, !wc.isReplay) {
return false
}
updatesHandled := false
if bufferedUpdateRequests, ok := wc.bufferedUpdateRequests[name]; ok {
for _, request := range bufferedUpdateRequests {
request()
updatesHandled = true
}
delete(wc.bufferedUpdateRequests, name)
}
return updatesHandled
}

func (wc *workflowEnvironmentImpl) DrainUnhandledUpdates() bool {
anyExecuted := false
// Check if any buffered update requests remain when we have no more coroutines to run and let them schedule so they are rejected.
// Generally iterating a map in workflow code is bad because it is non deterministic
// this case is fine since all these update handles will be rejected and not recorded in history.
for name, requests := range wc.bufferedUpdateRequests {
for _, request := range requests {
request()
anyExecuted = true
}
delete(wc.bufferedUpdateRequests, name)
}
return anyExecuted
}

// lookupMutableSideEffect gets the current value of the MutableSideEffect for id for the
// current call count of id.
func (wc *workflowEnvironmentImpl) lookupMutableSideEffect(id string) *commonpb.Payloads {
Expand Down Expand Up @@ -1293,6 +1336,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted(
// replay sees the _final_ value of applied flags, not intermediate values
// as the value varies by WFT)
weh.sdkFlags.tryUse(SDKFlagProtocolMessageCommand, !weh.isReplay)
weh.sdkFlags.tryUse(SDKPriorityUpdateHandling, !weh.isReplay)

// Invoke the workflow.
weh.workflowDefinition.Execute(weh, attributes.Header, attributes.Input)
Expand Down
7 changes: 6 additions & 1 deletion internal/internal_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ const (
// a workflow task response's command set to order messages with respect to
// commands.
SDKFlagProtocolMessageCommand = 3
SDKFlagUnknown = math.MaxUint32
// SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method.
// It will also cause the SDK to immediately handle updates when a handler is registered.
SDKPriorityUpdateHandling = 4
SDKFlagUnknown = math.MaxUint32
)

func sdkFlagFromUint(value uint32) sdkFlag {
Expand All @@ -57,6 +60,8 @@ func sdkFlagFromUint(value uint32) sdkFlag {
return SDKFlagChildWorkflowErrorExecution
case uint32(SDKFlagProtocolMessageCommand):
return SDKFlagProtocolMessageCommand
case uint32(SDKPriorityUpdateHandling):
return SDKPriorityUpdateHandling
default:
return SDKFlagUnknown
}
Expand Down
Loading

0 comments on commit 5fdbecc

Please sign in to comment.