Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: exponential backoff in flaky kafka test #479

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions _integration-tests/tests/ibm_sarama/ibm_sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
package ibm_sarama

import (
"context"
"errors"
"fmt"
"testing"
"time"

"datadoghq.dev/orchestrion/_integration-tests/utils"
"datadoghq.dev/orchestrion/_integration-tests/utils/backoff"
"datadoghq.dev/orchestrion/_integration-tests/validator/trace"
"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -46,30 +49,26 @@
func produceMessage(t *testing.T, addrs []string, cfg *sarama.Config) {
t.Helper()

createProducer := func() (_ sarama.SyncProducer, err error) {
defer func() {
if r := recover(); r != nil && err == nil {
var ok bool
if err, ok = r.(error); !ok {
err = fmt.Errorf("panic: %v", r)
var producer sarama.SyncProducer
err := backoff.Retry(
context.Background(),
backoff.NewConstantStrategy(50*time.Millisecond),
func() (err error) {
defer func() {
if r := recover(); r != nil && err == nil {
var ok bool
if err, ok = r.(error); !ok {
err = errors.Join(err, fmt.Errorf("panic: %v", r))

Check warning on line 61 in _integration-tests/tests/ibm_sarama/ibm_sarama.go

View check run for this annotation

Codecov / codecov/patch

_integration-tests/tests/ibm_sarama/ibm_sarama.go#L59-L61

Added lines #L59 - L61 were not covered by tests
}
}
}
}()
return sarama.NewSyncProducer(addrs, cfg)
}
}()

var (
producer sarama.SyncProducer
err error
producer, err = sarama.NewSyncProducer(addrs, cfg)
return err
},
&backoff.RetryOptions{MaxAttempts: 3},
)
for attemptsLeft := 3; attemptsLeft > 0; attemptsLeft-- {
producer, err = createProducer()
if err != nil {
time.Sleep(50 * time.Millisecond)
continue
}
break
}

require.NoError(t, err, "failed to create producer")
defer func() { assert.NoError(t, producer.Close(), "failed to close producer") }()

Expand Down
41 changes: 17 additions & 24 deletions _integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
"testing"
"time"

"datadoghq.dev/orchestrion/_integration-tests/utils"
"datadoghq.dev/orchestrion/_integration-tests/utils/backoff"
"datadoghq.dev/orchestrion/_integration-tests/validator/trace"
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"datadoghq.dev/orchestrion/_integration-tests/utils"
"datadoghq.dev/orchestrion/_integration-tests/validator/trace"
)

const (
Expand Down Expand Up @@ -86,27 +85,21 @@
Value: []byte("Third message"),
},
}
const (
maxRetries = 10
retryDelay = 100 * time.Millisecond
)
var (
retryCount int
err error
err := backoff.Retry(
ctx,
backoff.NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second),
func() error { return tc.writer.WriteMessages(ctx, messages...) },
&backoff.RetryOptions{
MaxAttempts: 10,
ShouldRetry: func(err error, attempt int, delay time.Duration) bool {
if !errors.Is(err, kafka.UnknownTopicOrPartition) {
return false

Check warning on line 96 in _integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go

View check run for this annotation

Codecov / codecov/patch

_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go#L94-L96

Added lines #L94 - L96 were not covered by tests
}
t.Logf("failed to produce kafka messages, will retry in %s (attempt left: %d)", delay, 10-attempt)
return true

Check warning on line 99 in _integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go

View check run for this annotation

Codecov / codecov/patch

_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go#L98-L99

Added lines #L98 - L99 were not covered by tests
},
},
)
for retryCount < maxRetries {
err = tc.writer.WriteMessages(ctx, messages...)
if err == nil {
break
}
// This error happens sometimes with brand-new topics, as there is a delay between when the topic is created
// on the broker, and when the topic can actually be written to.
if errors.Is(err, kafka.UnknownTopicOrPartition) {
retryCount++
t.Logf("failed to produce kafka messages, will retry in %s (retryCount: %d)", retryDelay, retryCount)
time.Sleep(retryDelay)
}
}
require.NoError(t, err)
require.NoError(t, tc.writer.Close())
}
Expand Down
105 changes: 105 additions & 0 deletions _integration-tests/utils/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

// Package backoff provides utilities to retry operations when encoutering
// transient errors. It is used by integration tests to allow for testcontainers
// enough time to become fully ready, avoiding tests flaking because the CI
// resources are constrained enough to cause containers to not be "immediately"
// ready to serve traffic.
package backoff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could use https://github.com/cenkalti/backoff instead of implementing this ourselves 🤔


import (
"context"
"errors"
"math"
"time"
)

type Strategy interface {
// Next returns the back-off delay to wait for before making the next attempt.
Next() time.Duration
}

const (
defaultMaxAttempts = 10
)

// RetryAllErrors is the default function used by [RetryOptions.ShouldRetry]. It
// returns [true] regardless of its arguments.
func RetryAllErrors(error, int, time.Duration) bool {
return true
}

type RetryOptions struct {
// MaxAttempts is the maximum number of attempts to make before giving up. If
// it is negative, there is no limit to the number of attempts (it will be set
// to [math.MaxInt]); if it is zero, the default value of 10 will be used. It
// is fine (although a little silly) to set [RetryOptions.MaxAttempts] to 1.
MaxAttempts int
// ShouldRetry is called with the error returned by the action, the attempt
// number, and the delay before the next attempt could be made. If it returns
// [true], the next attempt will be made; otherwise, the [Retry] function will
// immediately return. If [nil], the default [RetryAllErrors] function will be
// used.
ShouldRetry func(err error, attempt int, delay time.Duration) bool
// Sleep is the function used to wait in between attempts. It is intended to
// be used in testing. If [nil], the default [time.Sleep] function will be
// used.
Sleep func(time.Duration)
}

// Retry makes up to [RetryOptions.MaxAttempts] at calling the [action]
// function. It uses the [Strategy] to determine how much time to wait between
// attempts. The [RetryOptions.ShouldRetry] function is called with all
// non-[nil] errors returned by [action], the attempt number, and the delay
// before the next attempt. If it returns [true], the [RetryOptions.Sleep]
// function is called with the delay, and the next attempt is made. Otherwise,
// [Retry] returns immediately.
func Retry(
ctx context.Context,
strategy Strategy,
action func() error,
opts *RetryOptions,
) error {
var (
maxAttempts = defaultMaxAttempts
shouldRetry = RetryAllErrors
sleep = time.Sleep
)
if opts != nil {
if opts.MaxAttempts > 0 {
maxAttempts = opts.MaxAttempts
} else if opts.MaxAttempts < 0 {
maxAttempts = math.MaxInt
}
if opts.ShouldRetry != nil {
shouldRetry = opts.ShouldRetry
}
if opts.Sleep != nil {
sleep = opts.Sleep
}
}

var errs error
for attempt, delay := 0, time.Duration(0); attempt < maxAttempts && ctx.Err() == nil; attempt, delay = attempt+1, strategy.Next() {
if delay > 0 {
sleep(delay)
}

err := action()
if err == nil {
// Success!
return nil
}

// Accumulate this error on top of the others we have observed so far.
errs = errors.Join(errs, err)

if shouldRetry != nil && !shouldRetry(err, attempt, delay) {
break
}
}
return errors.Join(errs, ctx.Err())
}
160 changes: 160 additions & 0 deletions _integration-tests/utils/backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

package backoff

import (
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRetry(t *testing.T) {
// The sequence of delays observed for 10 attempts using an exponential
// backoff strategy with initial delay of 100ms, factor of 2, max delay of 5s.
delaySequence := []time.Duration{
/* attempt 1 */ // Immediate
/* attempt 2 */ 100 * time.Millisecond,
/* attempt 3 */ 200 * time.Millisecond,
/* attempt 4 */ 400 * time.Millisecond,
/* attempt 5 */ 800 * time.Millisecond,
/* attempt 6 */ 1600 * time.Millisecond,
/* attempt 7 */ 3200 * time.Millisecond,
/* attempt 8 */ 5 * time.Second,
/* attempt 9 */ 5 * time.Second,
/* attempt 10 */ 5 * time.Second,
}

t.Run("no-success", func(t *testing.T) {
ctx := context.Background()
strategy := NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second)
maxAttempts := 10
expectedErrs := make([]error, 0, maxAttempts)
action := func() error {
err := fmt.Errorf("Error number %d", len(expectedErrs)+1)
expectedErrs = append(expectedErrs, err)
return err
}
delays := make([]time.Duration, 0, maxAttempts)
timeSleep := func(d time.Duration) {
delays = append(delays, d)
}

err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: maxAttempts, Sleep: timeSleep})
require.Error(t, err)
assert.Equal(t, delaySequence, delays)
for _, expectedErr := range expectedErrs {
assert.ErrorIs(t, err, expectedErr)
}
})

t.Run("non-retryable error", func(t *testing.T) {
ctx := context.Background()
strategy := NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second)
maxAttempts := 10
shouldRetry := func(err error, _ int, _ time.Duration) bool {
return !strings.Contains(err.Error(), "3")
}
expectedErrs := make([]error, 0, maxAttempts)
action := func() error {
err := fmt.Errorf("Error number %d", len(expectedErrs)+1)
expectedErrs = append(expectedErrs, err)
return err
}
delays := make([]time.Duration, 0, maxAttempts)
timeSleep := func(d time.Duration) {
delays = append(delays, d)
}

err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: maxAttempts, ShouldRetry: shouldRetry, Sleep: timeSleep})
require.Error(t, err)
// We hit the non-retryable error at the 3rd attempt.
assert.Equal(t, delaySequence[:2], delays)
for _, expectedErr := range expectedErrs {
assert.ErrorIs(t, err, expectedErr)
}
})

t.Run("context cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

strategy := NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second)
maxAttempts := 10
expectedErrs := make([]error, 0, maxAttempts)
action := func() error {
err := fmt.Errorf("Error number %d", len(expectedErrs)+1)
expectedErrs = append(expectedErrs, err)
return err
}
delays := make([]time.Duration, 0, maxAttempts)
timeSleep := func(d time.Duration) {
delays = append(delays, d)

// Simulate context deadline after 1 second.
var ttl time.Duration
for _, delay := range delays {
ttl += delay
}
if ttl >= time.Second {
cancel()
}
}

err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: maxAttempts, Sleep: timeSleep})
require.Error(t, err)
// We reach the 1 second total waited during the 4th back-off.
assert.Equal(t, delaySequence[:4], delays)
for _, expectedErr := range expectedErrs {
require.ErrorIs(t, err, expectedErr)
}
require.ErrorIs(t, err, context.Canceled)
})

t.Run("unlimited retries", func(t *testing.T) {
ctx := context.Background()
strategy := NewConstantStrategy(100 * time.Millisecond)
var attempts int
action := func() error {
attempts++
// At least 20 errors, then flip a coin... but no more than 100 attempts.
if attempts < 20 || (attempts < 100 && rand.Int()%2 == 0) {
return fmt.Errorf("Error number %d", attempts)
}
return nil
}
var delayCount int
timeSleep := func(time.Duration) {
delayCount++
}

err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: -1, Sleep: timeSleep})
require.NoError(t, err)
// We should have waited as many times as we attempted, except for the initial attempt.
assert.Equal(t, delayCount, attempts-1)
})

t.Run("immediate success", func(t *testing.T) {
ctx := context.Background()
strategy := NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second)
maxAttempts := 10
shouldRetry := func(error, int, time.Duration) bool { return false }
action := func() error { return nil }
delays := make([]time.Duration, 0, maxAttempts)
timeSleep := func(d time.Duration) {
delays = append(delays, d)
}

err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: maxAttempts, ShouldRetry: shouldRetry, Sleep: timeSleep})
require.NoError(t, err)
assert.Empty(t, delays)
})
}
Loading
Loading