Skip to content

Commit

Permalink
Add support for NextRetryDelay for local activities
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed May 8, 2024
1 parent 8361067 commit 6a37830
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 16 deletions.
11 changes: 7 additions & 4 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,11 @@ func NewApplicationError(msg string, errType string, nonRetryable bool, cause er

func NewApplicationErrorWithOptions(msg string, errType string, options ApplicationErrorOptions) error {
applicationErr := &ApplicationError{
msg: msg,
errType: errType,
cause: options.Cause,
nonRetryable: options.NonRetryable,
msg: msg,
errType: errType,
cause: options.Cause,
nonRetryable: options.NonRetryable,
nextRetryDelay: options.NextRetryDelay,
}
// When return error to user, use EncodedValues as details and data is ready to be decoded by calling Get
details := options.Details
Expand Down Expand Up @@ -582,6 +583,8 @@ func (e *ApplicationError) Unwrap() error {
return e.cause
}

// NextRetryDelay returns the delay to wait before retrying the activity.
// a zero value means to use the activities retry policy.
func (e *ApplicationError) NextRetryDelay() time.Duration { return e.nextRetryDelay }

// Error from error interface
Expand Down
33 changes: 21 additions & 12 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu
return false
}

func getRetryBackoff(lar *localActivityResult, now time.Time, dataConverter converter.DataConverter) time.Duration {
func getRetryBackoff(lar *localActivityResult, now time.Time, _ converter.DataConverter) time.Duration {
return getRetryBackoffWithNowTime(lar.task.retryPolicy, lar.task.attempt, lar.err, now, lar.task.expireTime)
}

Expand All @@ -1291,22 +1291,31 @@ func getRetryBackoffWithNowTime(p *RetryPolicy, attempt int32, err error, now, e
if p.MaximumAttempts > 0 && attempt >= p.MaximumAttempts {
return noRetryBackoff // max attempt reached
}

var backoffInterval time.Duration
// Extract backoff interval from error if it is a retryable error.
// Not using errors.As() since we don't want to explore the whole error chain.
if applicationErr, ok := err.(*ApplicationError); ok {
backoffInterval = applicationErr.nextRetryDelay
}
// Calculate next backoff interval if the error did not contain the next backoff interval.
// attempt starts from 1
backoffInterval := time.Duration(float64(p.InitialInterval) * math.Pow(p.BackoffCoefficient, float64(attempt-1)))
if backoffInterval <= 0 {
// math.Pow() could overflow
if p.MaximumInterval > 0 {
if backoffInterval == time.Duration(0) {
backoffInterval = time.Duration(float64(p.InitialInterval) * math.Pow(p.BackoffCoefficient, float64(attempt-1)))
if backoffInterval <= 0 {
// math.Pow() could overflow
if p.MaximumInterval > 0 {
backoffInterval = p.MaximumInterval
} else {
return noRetryBackoff
}
}
if p.MaximumInterval > 0 && backoffInterval > p.MaximumInterval {
// cap next interval to MaxInterval
backoffInterval = p.MaximumInterval
} else {
return noRetryBackoff
}
}

if p.MaximumInterval > 0 && backoffInterval > p.MaximumInterval {
// cap next interval to MaxInterval
backoffInterval = p.MaximumInterval
}

nextScheduleTime := now.Add(backoffInterval)
if !expireTime.IsZero() && nextScheduleTime.After(expireTime) {
return noRetryBackoff
Expand Down
33 changes: 33 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2412,6 +2412,39 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowWithLocalActivity() {
s.Equal("hello local_activity", result)
}

func (s *WorkflowTestSuiteUnitTest) Test_WorkflowWithLocalActivityNextDelay() {
localActivityFn := func(ctx context.Context, delay time.Duration) error {
return NewApplicationErrorWithOptions("test delay", "DelayError", ApplicationErrorOptions{
NextRetryDelay: delay,
})
}

workflowFn := func(ctx Context) (time.Duration, error) {
lao := LocalActivityOptions{
ScheduleToCloseTimeout: time.Minute,
RetryPolicy: &RetryPolicy{
MaximumAttempts: 5,
},
}
ctx = WithLocalActivityOptions(ctx, lao)
var result string
t1 := Now(ctx)
f := ExecuteLocalActivity(ctx, localActivityFn, time.Second)
_ = f.Get(ctx, &result)
t2 := Now(ctx)
return t2.Sub(t1), nil
}

env := s.NewTestWorkflowEnvironment()
env.ExecuteWorkflow(workflowFn)
s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
var result time.Duration
err := env.GetWorkflowResult(&result)
s.NoError(err)
s.Equal(4*time.Second, result)
}

func (s *WorkflowTestSuiteUnitTest) Test_LocalActivity() {
localActivityFn := func(ctx context.Context, name string) (string, error) {
return "hello " + name, nil
Expand Down
6 changes: 6 additions & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func LocalSleep(_ context.Context, delay time.Duration) error {
return nil
}

func ErrorWithNextDelay(_ context.Context, delay time.Duration) error {
return temporal.NewApplicationErrorWithOptions("error with next delay", "NextDelay", temporal.ApplicationErrorOptions{
NextRetryDelay: delay,
})
}

func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) {
a.append("ActivityToBeCanceled")
for {
Expand Down
9 changes: 9 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,15 @@ func (ts *IntegrationTestSuite) TestDeadlockDetectionViaLocalActivity() {
ts.True(strings.Contains(applicationErr.Error(), "Potential deadlock detected"))
}

func (ts *IntegrationTestSuite) TestLocalActivityNextRetryDelay() {
var expected time.Duration
wfOpts := ts.startWorkflowOptions("test-local-activity-next-retry-delay")
wfOpts.WorkflowTaskTimeout = 5 * time.Second
err := ts.executeWorkflowWithOption(wfOpts, ts.workflows.LocalActivityNextRetryDelay, &expected)
ts.NoError(err)
fmt.Println(expected)
}

func (ts *IntegrationTestSuite) TestActivityRetryOnError() {
var expected []string
err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected)
Expand Down
17 changes: 17 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ func (w *Workflows) DeadlockedWithLocalActivity(ctx workflow.Context) ([]string,
return []string{}, nil
}

func (w *Workflows) LocalActivityNextRetryDelay(ctx workflow.Context) (time.Duration, error) {
laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
ScheduleToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 8,
},
})

t1 := workflow.Now(ctx)
workflow.GetLogger(ctx).Info("calling ExecuteLocalActivity")
_ = workflow.ExecuteLocalActivity(laCtx, ErrorWithNextDelay, time.Second).Get(laCtx, nil)
workflow.GetLogger(ctx).Info("calling ExecuteLocalActivity done")
t2 := workflow.Now(ctx)
return t2.Sub(t1), nil
}

func (w *Workflows) Panicked(ctx workflow.Context) ([]string, error) {
panic("simulated")
}
Expand Down Expand Up @@ -3024,6 +3040,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.UpdateSettingHandlerInHandler)
worker.RegisterWorkflow(w.UpdateCancelableWorkflow)
worker.RegisterWorkflow(w.UpdateHandlerRegisteredLate)
worker.RegisterWorkflow(w.LocalActivityNextRetryDelay)

worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childWithRetryPolicy)
Expand Down

0 comments on commit 6a37830

Please sign in to comment.