From dcbba343b70111949354bc2cf23c40a5dbff9fb0 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 3 Sep 2024 13:56:28 -0700 Subject: [PATCH 01/17] switch internal timers to use options, expose StaticSummary and StaticDetails --- internal/internal_event_handlers.go | 2 +- internal/internal_schedule_client.go | 2 +- internal/internal_workflow.go | 6 +++--- internal/schedule_client.go | 11 ++++++++--- internal/workflow.go | 25 ++++++++++++++----------- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index b7897df1a..765b02e61 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -602,7 +602,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( attributes.InheritBuildId = determineInheritBuildIdFlagForCommand( params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName) - startMetadata, err := buildUserMetadata(params.staticSummary, params.staticDetails, wc.dataConverter) + startMetadata, err := buildUserMetadata(params.StaticSummary, params.StaticDetails, wc.dataConverter) if err != nil { callback(nil, err) return diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index 5de1df4c3..e3cb43424 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -637,7 +637,7 @@ func convertToPBScheduleAction( return nil, err } - userMetadata, err := buildUserMetadata(action.staticSummary, action.staticDetails, dataConverter) + userMetadata, err := buildUserMetadata(action.StaticSummary, action.StaticDetails, dataConverter) if err != nil { return nil, err } diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 7714472bf..4d5356877 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -229,9 +229,8 @@ type ( // runningUpdatesHandles is a map of update handlers that are currently running. runningUpdatesHandles map[string]UpdateInfo VersioningIntent VersioningIntent - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - staticSummary string - staticDetails string + StaticSummary string + StaticDetails string // currentDetails is the user-set string returned on metadata query as // WorkflowMetadata.current_details currentDetails string @@ -1166,6 +1165,7 @@ func (s *coroutineState) exit(timeout time.Duration) { return true } + // TODO? timer := time.NewTimer(timeout) defer timer.Stop() diff --git a/internal/schedule_client.go b/internal/schedule_client.go index c6ea706b2..da6c5fe47 100644 --- a/internal/schedule_client.go +++ b/internal/schedule_client.go @@ -282,9 +282,14 @@ type ( // Deprecated - This is only for update of older search attributes. This may be removed in a future version. UntypedSearchAttributes map[string]*commonpb.Payload - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - staticSummary string - staticDetails string + // StaticSummary - Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be + // in single-line Temporal markdown format. + StaticSummary string + + // StaticDetails - General fixed details for this workflow execution that will appear in UI/CLI. This can be in + // Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be + // updated. For details that can be updated, use SetCurrentDetails within the workflow. + StaticDetails string } // ScheduleOptions configure the parameters for creating a schedule. diff --git a/internal/workflow.go b/internal/workflow.go index 0f3223f9e..fc74c1ae2 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -380,9 +380,14 @@ type ( // WARNING: Worker versioning is currently experimental VersioningIntent VersioningIntent - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - staticSummary string - staticDetails string + // StaticSummary - Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be + // in single-line Temporal markdown format. + StaticSummary string + + // StaticDetails - General fixed details for this workflow execution that will appear in UI/CLI. This can be in + // Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be + // updated. For details that can be updated, use SetCurrentDetails within the workflow. + StaticDetails string } // RegisterWorkflowOptions consists of options for registering a workflow @@ -494,7 +499,7 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { state := getState(ctx) defer state.unblocked() - timer := NewTimer(ctx, timeout) + timer := NewTimerWithOptions(ctx, timeout, TimerOptions{Summary: "AwaitWithTimeout"}) for !condition() { doneCh := ctx.Done() // TODO: Consider always returning a channel @@ -1329,7 +1334,7 @@ func Sleep(ctx Context, d time.Duration) (err error) { } func (wc *workflowEnvironmentInterceptor) Sleep(ctx Context, d time.Duration) (err error) { - t := NewTimer(ctx, d) + t := NewTimerWithOptions(ctx, d, TimerOptions{Summary: "Sleep"}) err = t.Get(ctx, nil) return } @@ -1569,9 +1574,8 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { wfOptions.TypedSearchAttributes = cwo.TypedSearchAttributes wfOptions.ParentClosePolicy = cwo.ParentClosePolicy wfOptions.VersioningIntent = cwo.VersioningIntent - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - wfOptions.staticSummary = cwo.staticSummary - wfOptions.staticDetails = cwo.staticDetails + wfOptions.StaticSummary = cwo.StaticSummary + wfOptions.StaticDetails = cwo.StaticDetails return ctx1 } @@ -1598,9 +1602,8 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions { TypedSearchAttributes: opts.TypedSearchAttributes, ParentClosePolicy: opts.ParentClosePolicy, VersioningIntent: opts.VersioningIntent, - // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed - staticSummary: opts.staticSummary, - staticDetails: opts.staticDetails, + StaticSummary: opts.StaticSummary, + StaticDetails: opts.StaticDetails, } } From 7e827d735d88593e93f53ff459bea93e3712a0db Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 3 Sep 2024 14:19:03 -0700 Subject: [PATCH 02/17] plumb options through AwaitWithTimeout --- internal/interceptor.go | 2 +- internal/interceptor_base.go | 4 ++-- internal/internal_coroutines_test.go | 4 ++-- internal/internal_workflow.go | 2 +- internal/internal_workflow_testsuite_test.go | 2 +- internal/workflow.go | 11 +++++++---- workflow/deterministic_wrappers.go | 6 +++--- 7 files changed, 17 insertions(+), 14 deletions(-) diff --git a/internal/interceptor.go b/internal/interceptor.go index db110074c..5220a5712 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -211,7 +211,7 @@ type WorkflowOutboundInterceptor interface { Await(ctx Context, condition func() bool) error // AwaitWithTimeout intercepts workflow.AwaitWithTimeout. - AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error) + AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) // ExecuteActivity intercepts workflow.ExecuteActivity. // interceptor.WorkflowHeader will return a non-nil map for this context. diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index c4465faf9..1ca64136c 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -201,8 +201,8 @@ func (w *WorkflowOutboundInterceptorBase) Await(ctx Context, condition func() bo } // AwaitWithTimeout implements WorkflowOutboundInterceptor.AwaitWithTimeout. -func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error) { - return w.Next.AwaitWithTimeout(ctx, timeout, condition) +func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) { + return w.Next.AwaitWithTimeout(ctx, timeout, options, condition) } // ExecuteLocalActivity implements WorkflowOutboundInterceptor.ExecuteLocalActivity. diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 4e3478961..075b504a9 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -1021,7 +1021,7 @@ func TestAwaitWithTimeoutNoTimeout(t *testing.T) { flag := false var awaitOk bool d := createNewDispatcher(func(ctx Context) { - awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return flag }) + awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, TimerOptions{"TestAwaitWithTimeoutNoTimeout"}, func() bool { return flag }) }) defer d.Close() err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) @@ -1097,7 +1097,7 @@ func TestAwaitWithTimeoutCancellation(t *testing.T) { interceptor, ctx := createRootTestContext() ctx, cancelHandler := WithCancel(ctx) d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { - awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return false }) + awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, TimerOptions{"TestAwaitWithTimeoutCancellation"}, func() bool { return false }) }, func() bool { return false }) defer d.Close() err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 4d5356877..9c4a440e9 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -847,7 +847,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) { } func (c *channelImpl) ReceiveWithTimeout(ctx Context, timeout time.Duration, valuePtr interface{}) (ok, more bool) { - okAwait, err := AwaitWithTimeout(ctx, timeout, func() bool { return c.Len() > 0 }) + okAwait, err := AwaitWithTimeout(ctx, timeout, TimerOptions{"ReceiveWithTimeout"}, func() bool { return c.Len() > 0 }) if err != nil { // context canceled return false, true } diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 472e8f1fa..62f5b691c 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4036,7 +4036,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityDeadlineExceeded() { func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithTimeoutTimeout() { workflowFn := func(ctx Context) (bool, error) { - return AwaitWithTimeout(ctx, time.Second, func() bool { return false }) + return AwaitWithTimeout(ctx, time.Second, TimerOptions{Summary: "Test_AwaitWithTimeoutTimeout"}, func() bool { return false }) } env := s.NewTestWorkflowEnvironment() diff --git a/internal/workflow.go b/internal/workflow.go index fc74c1ae2..e338fa690 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -490,16 +490,19 @@ func (wc *workflowEnvironmentInterceptor) Await(ctx Context, condition func() bo // AwaitWithTimeout blocks the calling thread until condition() returns true // Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled. -func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { +func AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { assertNotInReadOnlyState(ctx) state := getState(ctx) - return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, condition) + return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, options, condition) } -func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { +func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { state := getState(ctx) defer state.unblocked() - timer := NewTimerWithOptions(ctx, timeout, TimerOptions{Summary: "AwaitWithTimeout"}) + if options.Summary == "" { + options.Summary = "AwaitWithTimeout" + } + timer := NewTimerWithOptions(ctx, timeout, options) for !condition() { doneCh := ctx.Done() // TODO: Consider always returning a channel diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index 0a4c78d7e..2f5eb5298 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -102,11 +102,11 @@ func Await(ctx Context, condition func() bool) error { // The following code will block until the captured count // variable is set to 5, or one hour passes. // -// workflow.AwaitWithTimeout(ctx, time.Hour, func() bool { +// workflow.AwaitWithTimeout(ctx, time.Hour, options, func() bool { // return count == 5 // }) -func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { - return internal.AwaitWithTimeout(ctx, timeout, condition) +func AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { + return internal.AwaitWithTimeout(ctx, timeout, options, condition) } // NewChannel creates a new Channel instance From d635af0c8f6bba1c0824834e9ca792b5f7ab842a Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 4 Sep 2024 09:51:55 -0700 Subject: [PATCH 03/17] create new API, don't break existing API --- internal/interceptor.go | 5 ++- internal/interceptor_base.go | 9 ++++- internal/internal_coroutines_test.go | 4 +- internal/internal_workflow.go | 3 +- internal/internal_workflow_testsuite_test.go | 2 +- internal/workflow.go | 40 +++++++++++++++++--- workflow/deterministic_wrappers.go | 17 ++++++++- 7 files changed, 64 insertions(+), 16 deletions(-) diff --git a/internal/interceptor.go b/internal/interceptor.go index 5220a5712..20bd64277 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -211,7 +211,10 @@ type WorkflowOutboundInterceptor interface { Await(ctx Context, condition func() bool) error // AwaitWithTimeout intercepts workflow.AwaitWithTimeout. - AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) + AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error) + + // AwaitWithTimeoutWithOptions intercepts workflow.AwaitWithTimeoutWithOptions. + AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) // ExecuteActivity intercepts workflow.ExecuteActivity. // interceptor.WorkflowHeader will return a non-nil map for this context. diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index 1ca64136c..da61711b8 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -201,8 +201,13 @@ func (w *WorkflowOutboundInterceptorBase) Await(ctx Context, condition func() bo } // AwaitWithTimeout implements WorkflowOutboundInterceptor.AwaitWithTimeout. -func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) { - return w.Next.AwaitWithTimeout(ctx, timeout, options, condition) +func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error) { + return w.Next.AwaitWithTimeout(ctx, timeout, condition) +} + +// AwaitWithTimeoutAndOptions implements WorkflowOutboundInterceptor.AwaitWithTimeoutAndOptions. +func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) { + return w.Next.AwaitWithTimeoutAndOptions(ctx, timeout, options, condition) } // ExecuteLocalActivity implements WorkflowOutboundInterceptor.ExecuteLocalActivity. diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 075b504a9..4e3478961 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -1021,7 +1021,7 @@ func TestAwaitWithTimeoutNoTimeout(t *testing.T) { flag := false var awaitOk bool d := createNewDispatcher(func(ctx Context) { - awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, TimerOptions{"TestAwaitWithTimeoutNoTimeout"}, func() bool { return flag }) + awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return flag }) }) defer d.Close() err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) @@ -1097,7 +1097,7 @@ func TestAwaitWithTimeoutCancellation(t *testing.T) { interceptor, ctx := createRootTestContext() ctx, cancelHandler := WithCancel(ctx) d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { - awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, TimerOptions{"TestAwaitWithTimeoutCancellation"}, func() bool { return false }) + awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return false }) }, func() bool { return false }) defer d.Close() err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout) diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 9c4a440e9..35b6dfdb0 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -847,7 +847,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) { } func (c *channelImpl) ReceiveWithTimeout(ctx Context, timeout time.Duration, valuePtr interface{}) (ok, more bool) { - okAwait, err := AwaitWithTimeout(ctx, timeout, TimerOptions{"ReceiveWithTimeout"}, func() bool { return c.Len() > 0 }) + okAwait, err := AwaitWithTimeout(ctx, timeout, func() bool { return c.Len() > 0 }) if err != nil { // context canceled return false, true } @@ -1165,7 +1165,6 @@ func (s *coroutineState) exit(timeout time.Duration) { return true } - // TODO? timer := time.NewTimer(timeout) defer timer.Stop() diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 62f5b691c..472e8f1fa 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4036,7 +4036,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityDeadlineExceeded() { func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithTimeoutTimeout() { workflowFn := func(ctx Context) (bool, error) { - return AwaitWithTimeout(ctx, time.Second, TimerOptions{Summary: "Test_AwaitWithTimeoutTimeout"}, func() bool { return false }) + return AwaitWithTimeout(ctx, time.Second, func() bool { return false }) } env := s.NewTestWorkflowEnvironment() diff --git a/internal/workflow.go b/internal/workflow.go index e338fa690..ba9dea834 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -490,17 +490,45 @@ func (wc *workflowEnvironmentInterceptor) Await(ctx Context, condition func() bo // AwaitWithTimeout blocks the calling thread until condition() returns true // Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled. -func AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { +func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { assertNotInReadOnlyState(ctx) state := getState(ctx) - return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, options, condition) + return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, condition) } -func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { +func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { + state := getState(ctx) + defer state.unblocked() + timer := NewTimerWithOptions(ctx, timeout, TimerOptions{"AwaitWithTimeout"}) + for !condition() { + doneCh := ctx.Done() + // TODO: Consider always returning a channel + if doneCh != nil { + if _, more := doneCh.ReceiveAsyncWithMoreFlag(nil); !more { + return false, NewCanceledError("AwaitWithTimeout context canceled") + } + } + if timer.IsReady() { + return false, nil + } + state.yield("AwaitWithTimeout") + } + return true, nil +} + +// AwaitWithTimeoutAndOptions blocks the calling thread until condition() returns true +// Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled. +func AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { + assertNotInReadOnlyState(ctx) + state := getState(ctx) + return state.dispatcher.interceptor.AwaitWithTimeoutAndOptions(ctx, timeout, options, condition) +} + +func (wc *workflowEnvironmentInterceptor) AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { state := getState(ctx) defer state.unblocked() if options.Summary == "" { - options.Summary = "AwaitWithTimeout" + options.Summary = "AwaitWithTimeoutAndOptions" } timer := NewTimerWithOptions(ctx, timeout, options) for !condition() { @@ -508,13 +536,13 @@ func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout // TODO: Consider always returning a channel if doneCh != nil { if _, more := doneCh.ReceiveAsyncWithMoreFlag(nil); !more { - return false, NewCanceledError("AwaitWithTimeout context canceled") + return false, NewCanceledError("AwaitWithTimeoutAndOptions context canceled") } } if timer.IsReady() { return false, nil } - state.yield("AwaitWithTimeout") + state.yield("AwaitWithTimeoutAndOptions") } return true, nil } diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index 2f5eb5298..887178e30 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -105,8 +105,21 @@ func Await(ctx Context, condition func() bool) error { // workflow.AwaitWithTimeout(ctx, time.Hour, options, func() bool { // return count == 5 // }) -func AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { - return internal.AwaitWithTimeout(ctx, timeout, options, condition) +func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { + return internal.AwaitWithTimeout(ctx, timeout, condition) +} + +// AwaitWithTimeoutAndOptions blocks the calling thread until condition() returns true +// or blocking time exceeds the passed timeout value. +// Returns ok=false if timed out, and err CanceledError if the ctx is canceled. +// The following code will block until the captured count +// variable is set to 5, or one hour passes. +// +// workflow.AwaitWithTimeoutAndOptions(ctx, time.Hour, TimerOptions{"AwaitWithTimeoutAndOptions example"}, func() bool { +// return count == 5 +// }) +func AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { + return internal.AwaitWithTimeoutAndOptions(ctx, timeout, options, condition) } // NewChannel creates a new Channel instance From 68f13944e56704a923efd9feb1b29be945f70867 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 4 Sep 2024 09:57:06 -0700 Subject: [PATCH 04/17] missed a spot to remove API change --- workflow/deterministic_wrappers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index 887178e30..aa212e2aa 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -102,7 +102,7 @@ func Await(ctx Context, condition func() bool) error { // The following code will block until the captured count // variable is set to 5, or one hour passes. // -// workflow.AwaitWithTimeout(ctx, time.Hour, options, func() bool { +// workflow.AwaitWithTimeout(ctx, time.Hour, func() bool { // return count == 5 // }) func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { From 259a845b35735ad5b9812fa6068815f25ddbe940 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 4 Sep 2024 10:52:23 -0700 Subject: [PATCH 05/17] add experimental tag, fix test --- internal/interceptor.go | 2 ++ internal/interceptor_base.go | 2 ++ internal/workflow.go | 4 ++++ internal/workflow_test.go | 2 ++ workflow/deterministic_wrappers.go | 2 ++ 5 files changed, 12 insertions(+) diff --git a/internal/interceptor.go b/internal/interceptor.go index 4a7ac1bc9..0c92b7a98 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -214,6 +214,8 @@ type WorkflowOutboundInterceptor interface { AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error) // AwaitWithTimeoutWithOptions intercepts workflow.AwaitWithTimeoutWithOptions. + // + // NOTE: Experimental AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) // ExecuteActivity intercepts workflow.ExecuteActivity. diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index 196a689e1..7d1cbc80c 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -206,6 +206,8 @@ func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout } // AwaitWithTimeoutAndOptions implements WorkflowOutboundInterceptor.AwaitWithTimeoutAndOptions. +// +// NOTE: Experimental func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) { return w.Next.AwaitWithTimeoutAndOptions(ctx, timeout, options, condition) } diff --git a/internal/workflow.go b/internal/workflow.go index 1292294f6..031e51699 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -382,11 +382,15 @@ type ( // StaticSummary - Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be // in single-line Temporal markdown format. + // + // NOTE: Experimental StaticSummary string // StaticDetails - General fixed details for this workflow execution that will appear in UI/CLI. This can be in // Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be // updated. For details that can be updated, use SetCurrentDetails within the workflow. + // + // NOTE: Experimental StaticDetails string } diff --git a/internal/workflow_test.go b/internal/workflow_test.go index 1abfa8209..94dfab6fd 100644 --- a/internal/workflow_test.go +++ b/internal/workflow_test.go @@ -61,6 +61,8 @@ func TestGetChildWorkflowOptions(t *testing.T) { }, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, VersioningIntent: VersioningIntentDefault, + StaticSummary: "foo", + StaticDetails: "bar", } // Require test options to have non-zero value for each field. This ensures that we update tests (and the diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index 697c8fd3d..5bf0815ca 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -120,6 +120,8 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) // workflow.AwaitWithTimeoutAndOptions(ctx, time.Hour, TimerOptions{"AwaitWithTimeoutAndOptions example"}, func() bool { // return count == 5 // }) +// +// NOTE: Experimental func AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { return internal.AwaitWithTimeoutAndOptions(ctx, timeout, options, condition) } From 25acfc6fafda72f2af2811da8685d9ceb9a70436 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 4 Sep 2024 11:25:18 -0700 Subject: [PATCH 06/17] take out StaticSummary StaticDetails changes --- internal/internal_event_handlers.go | 2 +- internal/internal_schedule_client.go | 2 +- internal/internal_workflow.go | 5 +++-- internal/schedule_client.go | 11 +++-------- internal/workflow.go | 23 +++++++---------------- internal/workflow_test.go | 2 -- 6 files changed, 15 insertions(+), 30 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 765b02e61..b7897df1a 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -602,7 +602,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( attributes.InheritBuildId = determineInheritBuildIdFlagForCommand( params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName) - startMetadata, err := buildUserMetadata(params.StaticSummary, params.StaticDetails, wc.dataConverter) + startMetadata, err := buildUserMetadata(params.staticSummary, params.staticDetails, wc.dataConverter) if err != nil { callback(nil, err) return diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index e3cb43424..5de1df4c3 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -637,7 +637,7 @@ func convertToPBScheduleAction( return nil, err } - userMetadata, err := buildUserMetadata(action.StaticSummary, action.StaticDetails, dataConverter) + userMetadata, err := buildUserMetadata(action.staticSummary, action.staticDetails, dataConverter) if err != nil { return nil, err } diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 35b6dfdb0..7714472bf 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -229,8 +229,9 @@ type ( // runningUpdatesHandles is a map of update handlers that are currently running. runningUpdatesHandles map[string]UpdateInfo VersioningIntent VersioningIntent - StaticSummary string - StaticDetails string + // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed + staticSummary string + staticDetails string // currentDetails is the user-set string returned on metadata query as // WorkflowMetadata.current_details currentDetails string diff --git a/internal/schedule_client.go b/internal/schedule_client.go index da6c5fe47..c6ea706b2 100644 --- a/internal/schedule_client.go +++ b/internal/schedule_client.go @@ -282,14 +282,9 @@ type ( // Deprecated - This is only for update of older search attributes. This may be removed in a future version. UntypedSearchAttributes map[string]*commonpb.Payload - // StaticSummary - Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be - // in single-line Temporal markdown format. - StaticSummary string - - // StaticDetails - General fixed details for this workflow execution that will appear in UI/CLI. This can be in - // Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be - // updated. For details that can be updated, use SetCurrentDetails within the workflow. - StaticDetails string + // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed + staticSummary string + staticDetails string } // ScheduleOptions configure the parameters for creating a schedule. diff --git a/internal/workflow.go b/internal/workflow.go index 031e51699..30b778173 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -380,18 +380,9 @@ type ( // WARNING: Worker versioning is currently experimental VersioningIntent VersioningIntent - // StaticSummary - Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be - // in single-line Temporal markdown format. - // - // NOTE: Experimental - StaticSummary string - - // StaticDetails - General fixed details for this workflow execution that will appear in UI/CLI. This can be in - // Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be - // updated. For details that can be updated, use SetCurrentDetails within the workflow. - // - // NOTE: Experimental - StaticDetails string + // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed + staticSummary string + staticDetails string } // RegisterWorkflowOptions consists of options for registering a workflow @@ -1611,8 +1602,8 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { wfOptions.TypedSearchAttributes = cwo.TypedSearchAttributes wfOptions.ParentClosePolicy = cwo.ParentClosePolicy wfOptions.VersioningIntent = cwo.VersioningIntent - wfOptions.StaticSummary = cwo.StaticSummary - wfOptions.StaticDetails = cwo.StaticDetails + wfOptions.staticSummary = cwo.staticSummary + wfOptions.staticDetails = cwo.staticDetails return ctx1 } @@ -1639,8 +1630,8 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions { TypedSearchAttributes: opts.TypedSearchAttributes, ParentClosePolicy: opts.ParentClosePolicy, VersioningIntent: opts.VersioningIntent, - StaticSummary: opts.StaticSummary, - StaticDetails: opts.StaticDetails, + staticSummary: opts.staticSummary, + staticDetails: opts.staticDetails, } } diff --git a/internal/workflow_test.go b/internal/workflow_test.go index 94dfab6fd..1abfa8209 100644 --- a/internal/workflow_test.go +++ b/internal/workflow_test.go @@ -61,8 +61,6 @@ func TestGetChildWorkflowOptions(t *testing.T) { }, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, VersioningIntent: VersioningIntentDefault, - StaticSummary: "foo", - StaticDetails: "bar", } // Require test options to have non-zero value for each field. This ensures that we update tests (and the From e19bff2486b5e119b5cd685b9fd33db5530cc54e Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 4 Sep 2024 11:29:55 -0700 Subject: [PATCH 07/17] missed a few spots --- internal/workflow.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/workflow.go b/internal/workflow.go index 30b778173..1b2601d8c 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -496,7 +496,7 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { state := getState(ctx) defer state.unblocked() - timer := NewTimerWithOptions(ctx, timeout, TimerOptions{"AwaitWithTimeout"}) + timer := NewTimerWithOptions(ctx, timeout, TimerOptions{Summary: "AwaitWithTimeout"}) for !condition() { doneCh := ctx.Done() // TODO: Consider always returning a channel @@ -1602,6 +1602,7 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { wfOptions.TypedSearchAttributes = cwo.TypedSearchAttributes wfOptions.ParentClosePolicy = cwo.ParentClosePolicy wfOptions.VersioningIntent = cwo.VersioningIntent + // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed wfOptions.staticSummary = cwo.staticSummary wfOptions.staticDetails = cwo.staticDetails @@ -1630,8 +1631,9 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions { TypedSearchAttributes: opts.TypedSearchAttributes, ParentClosePolicy: opts.ParentClosePolicy, VersioningIntent: opts.VersioningIntent, - staticSummary: opts.staticSummary, - staticDetails: opts.staticDetails, + // TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed + staticSummary: opts.staticSummary, + staticDetails: opts.staticDetails, } } From ef94234ca14de042940b810f820574e6abcd10ae Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 4 Sep 2024 13:40:22 -0700 Subject: [PATCH 08/17] remove duplicate code --- internal/workflow.go | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/internal/workflow.go b/internal/workflow.go index 1b2601d8c..b8faf0199 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -494,23 +494,7 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) } func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { - state := getState(ctx) - defer state.unblocked() - timer := NewTimerWithOptions(ctx, timeout, TimerOptions{Summary: "AwaitWithTimeout"}) - for !condition() { - doneCh := ctx.Done() - // TODO: Consider always returning a channel - if doneCh != nil { - if _, more := doneCh.ReceiveAsyncWithMoreFlag(nil); !more { - return false, NewCanceledError("AwaitWithTimeout context canceled") - } - } - if timer.IsReady() { - return false, nil - } - state.yield("AwaitWithTimeout") - } - return true, nil + return wc.AwaitWithTimeoutAndOptions(ctx, timeout, TimerOptions{Summary: "AwaitWithTimeout"}, condition) } // AwaitWithTimeoutAndOptions blocks the calling thread until condition() returns true From b95dbb31db2b89164246e8e366c602b9ac035cb1 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 4 Sep 2024 13:52:03 -0700 Subject: [PATCH 09/17] cleaner code share --- internal/workflow.go | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/internal/workflow.go b/internal/workflow.go index b8faf0199..9b25fc86c 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -485,6 +485,26 @@ func (wc *workflowEnvironmentInterceptor) Await(ctx Context, condition func() bo return nil } +func (wc *workflowEnvironmentInterceptor) awaitWithOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool, functionName string) (ok bool, err error) { + state := getState(ctx) + defer state.unblocked() + timer := NewTimerWithOptions(ctx, timeout, options) + for !condition() { + doneCh := ctx.Done() + // TODO: Consider always returning a channel + if doneCh != nil { + if _, more := doneCh.ReceiveAsyncWithMoreFlag(nil); !more { + return false, NewCanceledError("%s context canceled", functionName) + } + } + if timer.IsReady() { + return false, nil + } + state.yield(functionName) + } + return true, nil +} + // AwaitWithTimeout blocks the calling thread until condition() returns true // Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled. func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { @@ -494,7 +514,7 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) } func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { - return wc.AwaitWithTimeoutAndOptions(ctx, timeout, TimerOptions{Summary: "AwaitWithTimeout"}, condition) + return wc.awaitWithOptions(ctx, timeout, TimerOptions{Summary: "AwaitWithTimeout"}, condition, "AwaitWithTimeout") } // AwaitWithTimeoutAndOptions blocks the calling thread until condition() returns true @@ -506,26 +526,7 @@ func AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options Time } func (wc *workflowEnvironmentInterceptor) AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { - state := getState(ctx) - defer state.unblocked() - if options.Summary == "" { - options.Summary = "AwaitWithTimeoutAndOptions" - } - timer := NewTimerWithOptions(ctx, timeout, options) - for !condition() { - doneCh := ctx.Done() - // TODO: Consider always returning a channel - if doneCh != nil { - if _, more := doneCh.ReceiveAsyncWithMoreFlag(nil); !more { - return false, NewCanceledError("AwaitWithTimeoutAndOptions context canceled") - } - } - if timer.IsReady() { - return false, nil - } - state.yield("AwaitWithTimeoutAndOptions") - } - return true, nil + return wc.awaitWithOptions(ctx, timeout, options, condition, "AwaitWithTimeoutAndOptions") } // NewChannel create new Channel instance From f2d7d057cc4319e70c6e55fc29c2edd99e1b0ec2 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 5 Sep 2024 09:03:25 -0700 Subject: [PATCH 10/17] AwaitOptions --- internal/interceptor.go | 4 ++-- internal/interceptor_base.go | 6 +++--- internal/workflow.go | 31 ++++++++++++++++++++++-------- workflow/deterministic_wrappers.go | 8 ++++---- 4 files changed, 32 insertions(+), 17 deletions(-) diff --git a/internal/interceptor.go b/internal/interceptor.go index 0c92b7a98..41d3fb0c2 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -213,10 +213,10 @@ type WorkflowOutboundInterceptor interface { // AwaitWithTimeout intercepts workflow.AwaitWithTimeout. AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error) - // AwaitWithTimeoutWithOptions intercepts workflow.AwaitWithTimeoutWithOptions. + // AwaitWithOptions intercepts workflow.AwaitWithOptions. // // NOTE: Experimental - AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) + AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (bool, error) // ExecuteActivity intercepts workflow.ExecuteActivity. // interceptor.WorkflowHeader will return a non-nil map for this context. diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index 7d1cbc80c..7ce4758ed 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -205,11 +205,11 @@ func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout return w.Next.AwaitWithTimeout(ctx, timeout, condition) } -// AwaitWithTimeoutAndOptions implements WorkflowOutboundInterceptor.AwaitWithTimeoutAndOptions. +// AwaitWithOptions implements WorkflowOutboundInterceptor.AwaitWithOptions. // // NOTE: Experimental -func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) { - return w.Next.AwaitWithTimeoutAndOptions(ctx, timeout, options, condition) +func (w *WorkflowOutboundInterceptorBase) AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (bool, error) { + return w.Next.AwaitWithOptions(ctx, options, condition) } // ExecuteLocalActivity implements WorkflowOutboundInterceptor.ExecuteLocalActivity. diff --git a/internal/workflow.go b/internal/workflow.go index 9b25fc86c..0f9b3e542 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -458,6 +458,20 @@ type ( // NOTE: Experimental Summary string } + + // AwaitOptions are options set when creating an await. + // + // NOTE: Experimental + AwaitOptions struct { + // Timeout is the await timeout if the await condition is not met. + // + // NOTE: Experimental + Timeout time.Duration + // TimerOptions are options set for the underlying timer created. + // + // NOTE: Experimental + TimerOptions TimerOptions + } ) // Await blocks the calling thread until condition() returns true @@ -485,10 +499,10 @@ func (wc *workflowEnvironmentInterceptor) Await(ctx Context, condition func() bo return nil } -func (wc *workflowEnvironmentInterceptor) awaitWithOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool, functionName string) (ok bool, err error) { +func (wc *workflowEnvironmentInterceptor) awaitWithOptions(ctx Context, options AwaitOptions, condition func() bool, functionName string) (ok bool, err error) { state := getState(ctx) defer state.unblocked() - timer := NewTimerWithOptions(ctx, timeout, options) + timer := NewTimerWithOptions(ctx, options.Timeout, options.TimerOptions) for !condition() { doneCh := ctx.Done() // TODO: Consider always returning a channel @@ -514,19 +528,20 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) } func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) { - return wc.awaitWithOptions(ctx, timeout, TimerOptions{Summary: "AwaitWithTimeout"}, condition, "AwaitWithTimeout") + options := AwaitOptions{Timeout: timeout, TimerOptions: TimerOptions{Summary: "AwaitWithTimeout"}} + return wc.awaitWithOptions(ctx, options, condition, "AwaitWithTimeout") } -// AwaitWithTimeoutAndOptions blocks the calling thread until condition() returns true +// AwaitWithOptions blocks the calling thread until condition() returns true // Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled. -func AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { +func AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) { assertNotInReadOnlyState(ctx) state := getState(ctx) - return state.dispatcher.interceptor.AwaitWithTimeoutAndOptions(ctx, timeout, options, condition) + return state.dispatcher.interceptor.AwaitWithOptions(ctx, options, condition) } -func (wc *workflowEnvironmentInterceptor) AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { - return wc.awaitWithOptions(ctx, timeout, options, condition, "AwaitWithTimeoutAndOptions") +func (wc *workflowEnvironmentInterceptor) AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) { + return wc.awaitWithOptions(ctx, options, condition, "AwaitWithOptions") } // NewChannel create new Channel instance diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index 5bf0815ca..52e5b453b 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -111,19 +111,19 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) return internal.AwaitWithTimeout(ctx, timeout, condition) } -// AwaitWithTimeoutAndOptions blocks the calling thread until condition() returns true +// AwaitWithOptions blocks the calling thread until condition() returns true // or blocking time exceeds the passed timeout value. // Returns ok=false if timed out, and err CanceledError if the ctx is canceled. // The following code will block until the captured count // variable is set to 5, or one hour passes. // -// workflow.AwaitWithTimeoutAndOptions(ctx, time.Hour, TimerOptions{"AwaitWithTimeoutAndOptions example"}, func() bool { +// workflow.AwaitWithOptions(ctx, options, func() bool { // return count == 5 // }) // // NOTE: Experimental -func AwaitWithTimeoutAndOptions(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) { - return internal.AwaitWithTimeoutAndOptions(ctx, timeout, options, condition) +func AwaitWithOptions(ctx Context, options internal.AwaitOptions, condition func() bool) (ok bool, err error) { + return internal.AwaitWithOptions(ctx, options, condition) } // NewChannel creates a new Channel instance From a626eafd38de1534d7d60846b7400ec259356955 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 5 Sep 2024 13:17:51 -0700 Subject: [PATCH 11/17] alias AwaitOptions in public package --- workflow/deterministic_wrappers.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index 52e5b453b..b7a198f0f 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -72,6 +72,11 @@ type ( // // NOTE: Experimental TimerOptions = internal.TimerOptions + + // AwaitOptions are options for [AwaitWithOptions] + // + // NOTE: Experimental + AwaitOptions = internal.AwaitOptions ) // Await blocks the calling thread until condition() returns true. @@ -122,7 +127,7 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) // }) // // NOTE: Experimental -func AwaitWithOptions(ctx Context, options internal.AwaitOptions, condition func() bool) (ok bool, err error) { +func AwaitWithOptions(ctx Context, options AwaitOptions, condition func() bool) (ok bool, err error) { return internal.AwaitWithOptions(ctx, options, condition) } From c4c7e11c09cd22bf9cdd799950b43f2ee65fa0b5 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 5 Sep 2024 13:45:45 -0700 Subject: [PATCH 12/17] add unit test --- internal/internal_workflow_testsuite_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 472e8f1fa..71160cced 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4048,6 +4048,21 @@ func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithTimeoutTimeout() { s.False(result) } +func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithOptionsTieout() { + options := AwaitOptions{Timeout: time.Second, TimerOptions: TimerOptions{Summary: "Test_AwaitWithOptionsTieout"}} + workflowFn := func(ctx Context) (bool, error) { + return AwaitWithOptions(ctx, options, func() bool { return false }) + } + + env := s.NewTestWorkflowEnvironment() + env.ExecuteWorkflow(workflowFn) + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + result := true + _ = env.GetWorkflowResult(&result) + s.False(result) +} + func (s *WorkflowTestSuiteUnitTest) Test_NoDetachedChildWait() { // One cron+abandon and one request-cancel childOptionSet := []ChildWorkflowOptions{ From dfcac68ee4284ecd300981281cd5c4a304e766b9 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 6 Sep 2024 16:51:14 -0700 Subject: [PATCH 13/17] wip --- test/integration_test.go | 32 ++++++++++++++++++++++++++++++++ test/workflow_test.go | 14 ++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/test/integration_test.go b/test/integration_test.go index a163b04f8..42c441ac0 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6266,6 +6266,38 @@ func (ts *IntegrationTestSuite) TestUserMetadata() { ts.Equal("my-timer", str) } +func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start workflow + fmt.Println("start workflow") + opts := ts.startWorkflowOptions("test-await-options" + uuid.New()) + _, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.AwaitWithOptions, 2, 1) + ts.NoError(err) + + // TODO: Figure out how to query awaitwithoptions run + // resp, err := ts.client.DescribeWorkflowExecution(ctx, run.GetID(), "") + // ts.NoError(err) + + // AwaitWithOptions + // iter := ts.client.GetWorkflowHistory(ctx, opts.ID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + // var timerEvent *historypb.HistoryEvent + // for iter.HasNext() { + // event, err1 := iter.Next() + // ts.NoError(err1) + // fmt.Println(event.String()) + // if event.GetTimerStartedEventAttributes() != nil { + // ts.Nil(timerEvent) + // timerEvent = event + // } + // } + // ts.Equal("a", "b") + + // TODO: One that times out? + +} + // executeWorkflow executes a given workflow and waits for the result func (ts *IntegrationTestSuite) executeWorkflow( wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}, diff --git a/test/workflow_test.go b/test/workflow_test.go index 04153a834..4f92eb988 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -3119,6 +3119,19 @@ func (w *Workflows) UserMetadata(ctx workflow.Context) error { ).Get(ctx, nil) } +func (w *Workflows) AwaitWithOptions(ctx workflow.Context, timeout time.Duration, sleepDuration time.Duration) (bool, error) { + options := workflow.AwaitOptions{ + Timeout: timeout, + TimerOptions: workflow.TimerOptions{Summary: "await-options-test"}, + } + + return workflow.AwaitWithOptions(ctx, options, func() bool { + time.Sleep(sleepDuration) + return true + }) + +} + func (w *Workflows) RunsLocalAndNonlocalActsWithRetries(ctx workflow.Context, numOfEachActKind int, actFailTimes int) error { var activities *Activities futures := make([]workflow.Future, 0) @@ -3268,6 +3281,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.UpdateWithMutex) worker.RegisterWorkflow(w.UpdateWithSemaphore) worker.RegisterWorkflow(w.UserMetadata) + worker.RegisterWorkflow(w.AwaitWithOptions) worker.RegisterWorkflow(w.WorkflowWithRejectableUpdate) worker.RegisterWorkflow(w.child) From 5319540f390537bfc95492bc7314edfc1a81b7b1 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Sep 2024 11:11:36 -0700 Subject: [PATCH 14/17] test works with prettifyString logging --- test/integration_test.go | 84 +++++++++++++++++++++++------- test/workflow_test.go | 7 ++- workflow/deterministic_wrappers.go | 2 +- 3 files changed, 68 insertions(+), 25 deletions(-) diff --git a/test/integration_test.go b/test/integration_test.go index 42c441ac0..8ffddff2c 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6266,33 +6266,77 @@ func (ts *IntegrationTestSuite) TestUserMetadata() { ts.Equal("my-timer", str) } +// prettifyString formats the given string to make it more readable +func prettifyString(s string) string { + var builder strings.Builder + + indent := 0 + inQuote := false + + for _, ch := range s { + switch ch { + case '{': + if !inQuote { + builder.WriteString("\n") + builder.WriteString(strings.Repeat("\t", indent)) + indent++ + } + builder.WriteRune(ch) + case '}': + if !inQuote { + indent-- + builder.WriteString("\n") + builder.WriteString(strings.Repeat("\t", indent)) + } + builder.WriteRune(ch) + case '"': + inQuote = !inQuote + builder.WriteRune(ch) + case ' ': + if !inQuote { + builder.WriteRune(ch) + } else { + builder.WriteRune(ch) + } + default: + builder.WriteRune(ch) + } + } + + return builder.String() +} + func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + var str string // Start workflow fmt.Println("start workflow") opts := ts.startWorkflowOptions("test-await-options" + uuid.New()) - _, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.AwaitWithOptions, 2, 1) - ts.NoError(err) - - // TODO: Figure out how to query awaitwithoptions run - // resp, err := ts.client.DescribeWorkflowExecution(ctx, run.GetID(), "") - // ts.NoError(err) - - // AwaitWithOptions - // iter := ts.client.GetWorkflowHistory(ctx, opts.ID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) - // var timerEvent *historypb.HistoryEvent - // for iter.HasNext() { - // event, err1 := iter.Next() - // ts.NoError(err1) - // fmt.Println(event.String()) - // if event.GetTimerStartedEventAttributes() != nil { - // ts.Nil(timerEvent) - // timerEvent = event - // } - // } - // ts.Equal("a", "b") + run, err := ts.client.ExecuteWorkflow(ctx, opts, + ts.workflows.AwaitWithOptions) + ts.NoError(err) + + // Confirm workflow has completed + ts.NoError(run.Get(ctx, nil)) + + // Confirm AwaitWithOptions's underlying timer has fired properly + iter := ts.client.GetWorkflowHistory(ctx, opts.ID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + var timerEvent *historypb.HistoryEvent + for iter.HasNext() { + event, err1 := iter.Next() + ts.NoError(err1) + fmt.Println("event.String():\n", prettifyString(event.String())) + if event.GetTimerStartedEventAttributes() != nil { + ts.Nil(timerEvent) + timerEvent = event + } + } + ts.NotNil(timerEvent) + ts.NoError(converter.GetDefaultDataConverter().FromPayload( + timerEvent.UserMetadata.Summary, &str)) + ts.Equal("await-timer", str) // TODO: One that times out? diff --git a/test/workflow_test.go b/test/workflow_test.go index 4f92eb988..aa042b05f 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -3119,14 +3119,13 @@ func (w *Workflows) UserMetadata(ctx workflow.Context) error { ).Get(ctx, nil) } -func (w *Workflows) AwaitWithOptions(ctx workflow.Context, timeout time.Duration, sleepDuration time.Duration) (bool, error) { +func (w *Workflows) AwaitWithOptions(ctx workflow.Context) (bool, error) { options := workflow.AwaitOptions{ - Timeout: timeout, - TimerOptions: workflow.TimerOptions{Summary: "await-options-test"}, + Timeout: 1 * time.Millisecond, + TimerOptions: workflow.TimerOptions{Summary: "await-timer"}, } return workflow.AwaitWithOptions(ctx, options, func() bool { - time.Sleep(sleepDuration) return true }) diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index b7a198f0f..8e3f75026 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -122,7 +122,7 @@ func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) // The following code will block until the captured count // variable is set to 5, or one hour passes. // -// workflow.AwaitWithOptions(ctx, options, func() bool { +// workflow.AwaitWithOptions(ctx, AwaitOptions{Timeout: time.Hour, TimerOptions: TimerOptions{Summary:"Example"}}, func() bool { // return count == 5 // }) // From 41d8fd81d91710084c3f257056df951b5c427cf0 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Sep 2024 11:12:10 -0700 Subject: [PATCH 15/17] clean up --- test/integration_test.go | 44 ---------------------------------------- 1 file changed, 44 deletions(-) diff --git a/test/integration_test.go b/test/integration_test.go index 8ffddff2c..2b04775a1 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6266,46 +6266,6 @@ func (ts *IntegrationTestSuite) TestUserMetadata() { ts.Equal("my-timer", str) } -// prettifyString formats the given string to make it more readable -func prettifyString(s string) string { - var builder strings.Builder - - indent := 0 - inQuote := false - - for _, ch := range s { - switch ch { - case '{': - if !inQuote { - builder.WriteString("\n") - builder.WriteString(strings.Repeat("\t", indent)) - indent++ - } - builder.WriteRune(ch) - case '}': - if !inQuote { - indent-- - builder.WriteString("\n") - builder.WriteString(strings.Repeat("\t", indent)) - } - builder.WriteRune(ch) - case '"': - inQuote = !inQuote - builder.WriteRune(ch) - case ' ': - if !inQuote { - builder.WriteRune(ch) - } else { - builder.WriteRune(ch) - } - default: - builder.WriteRune(ch) - } - } - - return builder.String() -} - func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -6327,7 +6287,6 @@ func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() { for iter.HasNext() { event, err1 := iter.Next() ts.NoError(err1) - fmt.Println("event.String():\n", prettifyString(event.String())) if event.GetTimerStartedEventAttributes() != nil { ts.Nil(timerEvent) timerEvent = event @@ -6337,9 +6296,6 @@ func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() { ts.NoError(converter.GetDefaultDataConverter().FromPayload( timerEvent.UserMetadata.Summary, &str)) ts.Equal("await-timer", str) - - // TODO: One that times out? - } // executeWorkflow executes a given workflow and waits for the result From 7cb65b12018df0345041e7ccf835da5e9890e97f Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Sep 2024 11:19:33 -0700 Subject: [PATCH 16/17] no need for unit test now that we have better E2E test --- internal/internal_workflow_testsuite_test.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 71160cced..472e8f1fa 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4048,21 +4048,6 @@ func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithTimeoutTimeout() { s.False(result) } -func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithOptionsTieout() { - options := AwaitOptions{Timeout: time.Second, TimerOptions: TimerOptions{Summary: "Test_AwaitWithOptionsTieout"}} - workflowFn := func(ctx Context) (bool, error) { - return AwaitWithOptions(ctx, options, func() bool { return false }) - } - - env := s.NewTestWorkflowEnvironment() - env.ExecuteWorkflow(workflowFn) - s.True(env.IsWorkflowCompleted()) - s.NoError(env.GetWorkflowError()) - result := true - _ = env.GetWorkflowResult(&result) - s.False(result) -} - func (s *WorkflowTestSuiteUnitTest) Test_NoDetachedChildWait() { // One cron+abandon and one request-cancel childOptionSet := []ChildWorkflowOptions{ From 87d8428ede40a4c84bb611c6b77d1cdc59460163 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Sep 2024 13:02:44 -0700 Subject: [PATCH 17/17] remove print --- test/integration_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration_test.go b/test/integration_test.go index 2b04775a1..fb64f2093 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6272,7 +6272,6 @@ func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() { var str string // Start workflow - fmt.Println("start workflow") opts := ts.startWorkflowOptions("test-await-options" + uuid.New()) run, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.AwaitWithOptions)