Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update internal timers to use NewTimerWithOptions #1618

Merged
merged 21 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ type WorkflowOutboundInterceptor interface {
Await(ctx Context, condition func() bool) error

// AwaitWithTimeout intercepts workflow.AwaitWithTimeout.
AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error)
AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error)
yuandrew marked this conversation as resolved.
Show resolved Hide resolved

// ExecuteActivity intercepts workflow.ExecuteActivity.
// interceptor.WorkflowHeader will return a non-nil map for this context.
Expand Down
4 changes: 2 additions & 2 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (w *WorkflowOutboundInterceptorBase) Await(ctx Context, condition func() bo
}

// AwaitWithTimeout implements WorkflowOutboundInterceptor.AwaitWithTimeout.
func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (bool, error) {
return w.Next.AwaitWithTimeout(ctx, timeout, condition)
func (w *WorkflowOutboundInterceptorBase) AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (bool, error) {
return w.Next.AwaitWithTimeout(ctx, timeout, options, condition)
}

// ExecuteLocalActivity implements WorkflowOutboundInterceptor.ExecuteLocalActivity.
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ func TestAwaitWithTimeoutNoTimeout(t *testing.T) {
flag := false
var awaitOk bool
d := createNewDispatcher(func(ctx Context) {
awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return flag })
awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, TimerOptions{"TestAwaitWithTimeoutNoTimeout"}, func() bool { return flag })
})
defer d.Close()
err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)
Expand Down Expand Up @@ -1097,7 +1097,7 @@ func TestAwaitWithTimeoutCancellation(t *testing.T) {
interceptor, ctx := createRootTestContext()
ctx, cancelHandler := WithCancel(ctx)
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return false })
awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, TimerOptions{"TestAwaitWithTimeoutCancellation"}, func() bool { return false })
}, func() bool { return false })
defer d.Close()
err := d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
attributes.InheritBuildId = determineInheritBuildIdFlagForCommand(
params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName)

startMetadata, err := buildUserMetadata(params.staticSummary, params.staticDetails, wc.dataConverter)
startMetadata, err := buildUserMetadata(params.StaticSummary, params.StaticDetails, wc.dataConverter)
if err != nil {
callback(nil, err)
return
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func convertToPBScheduleAction(
return nil, err
}

userMetadata, err := buildUserMetadata(action.staticSummary, action.staticDetails, dataConverter)
userMetadata, err := buildUserMetadata(action.StaticSummary, action.StaticDetails, dataConverter)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,8 @@ type (
// runningUpdatesHandles is a map of update handlers that are currently running.
runningUpdatesHandles map[string]UpdateInfo
VersioningIntent VersioningIntent
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary string
staticDetails string
StaticSummary string
StaticDetails string
// currentDetails is the user-set string returned on metadata query as
// WorkflowMetadata.current_details
currentDetails string
Expand Down Expand Up @@ -848,7 +847,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
}

func (c *channelImpl) ReceiveWithTimeout(ctx Context, timeout time.Duration, valuePtr interface{}) (ok, more bool) {
okAwait, err := AwaitWithTimeout(ctx, timeout, func() bool { return c.Len() > 0 })
okAwait, err := AwaitWithTimeout(ctx, timeout, TimerOptions{"ReceiveWithTimeout"}, func() bool { return c.Len() > 0 })
if err != nil { // context canceled
return false, true
}
Expand Down Expand Up @@ -1166,6 +1165,7 @@ func (s *coroutineState) exit(timeout time.Duration) {
return true
}

// TODO?
timer := time.NewTimer(timeout)
defer timer.Stop()

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4036,7 +4036,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityDeadlineExceeded() {

func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithTimeoutTimeout() {
workflowFn := func(ctx Context) (bool, error) {
return AwaitWithTimeout(ctx, time.Second, func() bool { return false })
return AwaitWithTimeout(ctx, time.Second, TimerOptions{Summary: "Test_AwaitWithTimeoutTimeout"}, func() bool { return false })
}

env := s.NewTestWorkflowEnvironment()
Expand Down
11 changes: 8 additions & 3 deletions internal/schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,14 @@ type (
// Deprecated - This is only for update of older search attributes. This may be removed in a future version.
UntypedSearchAttributes map[string]*commonpb.Payload

// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary string
staticDetails string
// StaticSummary - Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be
// in single-line Temporal markdown format.
StaticSummary string

// StaticDetails - General fixed details for this workflow execution that will appear in UI/CLI. This can be in
// Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be
// updated. For details that can be updated, use SetCurrentDetails within the workflow.
StaticDetails string
}

// ScheduleOptions configure the parameters for creating a schedule.
Expand Down
34 changes: 20 additions & 14 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,14 @@ type (
// WARNING: Worker versioning is currently experimental
VersioningIntent VersioningIntent

// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
staticSummary string
staticDetails string
// StaticSummary - Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be
// in single-line Temporal markdown format.
StaticSummary string

// StaticDetails - General fixed details for this workflow execution that will appear in UI/CLI. This can be in
// Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be
// updated. For details that can be updated, use SetCurrentDetails within the workflow.
StaticDetails string
}

// RegisterWorkflowOptions consists of options for registering a workflow
Expand Down Expand Up @@ -485,16 +490,19 @@ func (wc *workflowEnvironmentInterceptor) Await(ctx Context, condition func() bo

// AwaitWithTimeout blocks the calling thread until condition() returns true
// Returns ok equals to false if timed out and err equals to CanceledError if the ctx is canceled.
func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) {
func AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) {
assertNotInReadOnlyState(ctx)
state := getState(ctx)
return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, condition)
return state.dispatcher.interceptor.AwaitWithTimeout(ctx, timeout, options, condition)
}

func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) {
func (wc *workflowEnvironmentInterceptor) AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) {
state := getState(ctx)
defer state.unblocked()
timer := NewTimer(ctx, timeout)
if options.Summary == "" {
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
options.Summary = "AwaitWithTimeout"
}
timer := NewTimerWithOptions(ctx, timeout, options)
for !condition() {
doneCh := ctx.Done()
// TODO: Consider always returning a channel
Expand Down Expand Up @@ -1329,7 +1337,7 @@ func Sleep(ctx Context, d time.Duration) (err error) {
}

func (wc *workflowEnvironmentInterceptor) Sleep(ctx Context, d time.Duration) (err error) {
t := NewTimer(ctx, d)
t := NewTimerWithOptions(ctx, d, TimerOptions{Summary: "Sleep"})
err = t.Get(ctx, nil)
return
}
Expand Down Expand Up @@ -1569,9 +1577,8 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context {
wfOptions.TypedSearchAttributes = cwo.TypedSearchAttributes
wfOptions.ParentClosePolicy = cwo.ParentClosePolicy
wfOptions.VersioningIntent = cwo.VersioningIntent
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
wfOptions.staticSummary = cwo.staticSummary
wfOptions.staticDetails = cwo.staticDetails
wfOptions.StaticSummary = cwo.StaticSummary
wfOptions.StaticDetails = cwo.StaticDetails

return ctx1
}
Expand All @@ -1598,9 +1605,8 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions {
TypedSearchAttributes: opts.TypedSearchAttributes,
ParentClosePolicy: opts.ParentClosePolicy,
VersioningIntent: opts.VersioningIntent,
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary: opts.staticSummary,
staticDetails: opts.staticDetails,
StaticSummary: opts.StaticSummary,
StaticDetails: opts.StaticDetails,
}
}

Expand Down
6 changes: 3 additions & 3 deletions workflow/deterministic_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func Await(ctx Context, condition func() bool) error {
// The following code will block until the captured count
// variable is set to 5, or one hour passes.
//
// workflow.AwaitWithTimeout(ctx, time.Hour, func() bool {
// workflow.AwaitWithTimeout(ctx, time.Hour, options, func() bool {
// return count == 5
// })
func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error) {
return internal.AwaitWithTimeout(ctx, timeout, condition)
func AwaitWithTimeout(ctx Context, timeout time.Duration, options TimerOptions, condition func() bool) (ok bool, err error) {
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
return internal.AwaitWithTimeout(ctx, timeout, options, condition)
}

// NewChannel creates a new Channel instance
Expand Down
Loading