diff --git a/internal/error.go b/internal/error.go index 78a870ebf..69ca659da 100644 --- a/internal/error.go +++ b/internal/error.go @@ -328,10 +328,11 @@ func NewApplicationError(msg string, errType string, nonRetryable bool, cause er func NewApplicationErrorWithOptions(msg string, errType string, options ApplicationErrorOptions) error { applicationErr := &ApplicationError{ - msg: msg, - errType: errType, - cause: options.Cause, - nonRetryable: options.NonRetryable, + msg: msg, + errType: errType, + cause: options.Cause, + nonRetryable: options.NonRetryable, + nextRetryDelay: options.NextRetryDelay, } // When return error to user, use EncodedValues as details and data is ready to be decoded by calling Get details := options.Details @@ -582,6 +583,8 @@ func (e *ApplicationError) Unwrap() error { return e.cause } +// NextRetryDelay returns the delay to wait before retrying the activity. +// a zero value means to use the activities retry policy. func (e *ApplicationError) NextRetryDelay() time.Duration { return e.nextRetryDelay } // Error from error interface diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 873a85088..4a66246cf 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1279,7 +1279,7 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu return false } -func getRetryBackoff(lar *localActivityResult, now time.Time, dataConverter converter.DataConverter) time.Duration { +func getRetryBackoff(lar *localActivityResult, now time.Time, _ converter.DataConverter) time.Duration { return getRetryBackoffWithNowTime(lar.task.retryPolicy, lar.task.attempt, lar.err, now, lar.task.expireTime) } @@ -1291,22 +1291,31 @@ func getRetryBackoffWithNowTime(p *RetryPolicy, attempt int32, err error, now, e if p.MaximumAttempts > 0 && attempt >= p.MaximumAttempts { return noRetryBackoff // max attempt reached } + + var backoffInterval time.Duration + // Extract backoff interval from error if it is a retryable error. + // Not using errors.As() since we don't want to explore the whole error chain. + if applicationErr, ok := err.(*ApplicationError); ok { + backoffInterval = applicationErr.nextRetryDelay + } + // Calculate next backoff interval if the error did not contain the next backoff interval. // attempt starts from 1 - backoffInterval := time.Duration(float64(p.InitialInterval) * math.Pow(p.BackoffCoefficient, float64(attempt-1))) - if backoffInterval <= 0 { - // math.Pow() could overflow - if p.MaximumInterval > 0 { + if backoffInterval == time.Duration(0) { + backoffInterval = time.Duration(float64(p.InitialInterval) * math.Pow(p.BackoffCoefficient, float64(attempt-1))) + if backoffInterval <= 0 { + // math.Pow() could overflow + if p.MaximumInterval > 0 { + backoffInterval = p.MaximumInterval + } else { + return noRetryBackoff + } + } + if p.MaximumInterval > 0 && backoffInterval > p.MaximumInterval { + // cap next interval to MaxInterval backoffInterval = p.MaximumInterval - } else { - return noRetryBackoff } } - if p.MaximumInterval > 0 && backoffInterval > p.MaximumInterval { - // cap next interval to MaxInterval - backoffInterval = p.MaximumInterval - } - nextScheduleTime := now.Add(backoffInterval) if !expireTime.IsZero() && nextScheduleTime.After(expireTime) { return noRetryBackoff diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 15a09a2b1..472e8f1fa 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -2412,6 +2412,39 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowWithLocalActivity() { s.Equal("hello local_activity", result) } +func (s *WorkflowTestSuiteUnitTest) Test_WorkflowWithLocalActivityNextDelay() { + localActivityFn := func(ctx context.Context, delay time.Duration) error { + return NewApplicationErrorWithOptions("test delay", "DelayError", ApplicationErrorOptions{ + NextRetryDelay: delay, + }) + } + + workflowFn := func(ctx Context) (time.Duration, error) { + lao := LocalActivityOptions{ + ScheduleToCloseTimeout: time.Minute, + RetryPolicy: &RetryPolicy{ + MaximumAttempts: 5, + }, + } + ctx = WithLocalActivityOptions(ctx, lao) + var result string + t1 := Now(ctx) + f := ExecuteLocalActivity(ctx, localActivityFn, time.Second) + _ = f.Get(ctx, &result) + t2 := Now(ctx) + return t2.Sub(t1), nil + } + + env := s.NewTestWorkflowEnvironment() + env.ExecuteWorkflow(workflowFn) + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result time.Duration + err := env.GetWorkflowResult(&result) + s.NoError(err) + s.Equal(4*time.Second, result) +} + func (s *WorkflowTestSuiteUnitTest) Test_LocalActivity() { localActivityFn := func(ctx context.Context, name string) (string, error) { return "hello " + name, nil diff --git a/test/activity_test.go b/test/activity_test.go index 9e4ade2d9..37f92c81f 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -86,6 +86,12 @@ func LocalSleep(_ context.Context, delay time.Duration) error { return nil } +func ErrorWithNextDelay(_ context.Context, delay time.Duration) error { + return temporal.NewApplicationErrorWithOptions("error with next delay", "NextDelay", temporal.ApplicationErrorOptions{ + NextRetryDelay: delay, + }) +} + func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) { a.append("ActivityToBeCanceled") for { diff --git a/test/integration_test.go b/test/integration_test.go index feba04baa..18798ee47 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -465,6 +465,15 @@ func (ts *IntegrationTestSuite) TestDeadlockDetectionViaLocalActivity() { ts.True(strings.Contains(applicationErr.Error(), "Potential deadlock detected")) } +func (ts *IntegrationTestSuite) TestLocalActivityNextRetryDelay() { + var expected time.Duration + wfOpts := ts.startWorkflowOptions("test-local-activity-next-retry-delay") + wfOpts.WorkflowTaskTimeout = 5 * time.Second + err := ts.executeWorkflowWithOption(wfOpts, ts.workflows.LocalActivityNextRetryDelay, &expected) + ts.NoError(err) + fmt.Println(expected) +} + func (ts *IntegrationTestSuite) TestActivityRetryOnError() { var expected []string err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected) diff --git a/test/workflow_test.go b/test/workflow_test.go index 5a85446a6..dfdd4aec9 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -101,6 +101,22 @@ func (w *Workflows) DeadlockedWithLocalActivity(ctx workflow.Context) ([]string, return []string{}, nil } +func (w *Workflows) LocalActivityNextRetryDelay(ctx workflow.Context) (time.Duration, error) { + laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 8, + }, + }) + + t1 := workflow.Now(ctx) + workflow.GetLogger(ctx).Info("calling ExecuteLocalActivity") + _ = workflow.ExecuteLocalActivity(laCtx, ErrorWithNextDelay, time.Second).Get(laCtx, nil) + workflow.GetLogger(ctx).Info("calling ExecuteLocalActivity done") + t2 := workflow.Now(ctx) + return t2.Sub(t1), nil +} + func (w *Workflows) Panicked(ctx workflow.Context) ([]string, error) { panic("simulated") } @@ -3024,6 +3040,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.UpdateSettingHandlerInHandler) worker.RegisterWorkflow(w.UpdateCancelableWorkflow) worker.RegisterWorkflow(w.UpdateHandlerRegisteredLate) + worker.RegisterWorkflow(w.LocalActivityNextRetryDelay) worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childWithRetryPolicy)