Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial attempt at 429 Retry-After header support #5285

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 84 additions & 12 deletions pkg/kncloudevents/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
nethttp "net/http"
"strconv"
"time"

"github.com/hashicorp/go-retryablehttp"
Expand All @@ -29,6 +30,8 @@ import (
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
)

const RetryAfterHeader = "Retry-After"

var noRetries = RetryConfig{
RetryMax: 0,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
Expand Down Expand Up @@ -126,6 +129,12 @@ func NoRetries() RetryConfig {
}

func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) {
return RetryConfigFromDeliverySpecWith429(spec, false) // Maintain current implementation until exposed in DeliverySpec
}

// RetryConfigFromDeliverySpecWith429 extension is hoped to be temporary, and can be removed if
// the respect429RetryAfter could always be true, or is incorporated into the DeliverySpec.
func RetryConfigFromDeliverySpecWith429(spec duckv1.DeliverySpec, respect429RetryAfter bool) (RetryConfig, error) {

retryConfig := NoRetries()

Expand All @@ -137,27 +146,90 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error)
retryConfig.BackoffPolicy = spec.BackoffPolicy
retryConfig.BackoffDelay = spec.BackoffDelay

if spec.BackoffPolicy != nil && spec.BackoffDelay != nil {
if (spec.BackoffPolicy != nil && spec.BackoffDelay != nil) || respect429RetryAfter {
backoffFn, err := generateBackoffFn(spec.BackoffPolicy, spec.BackoffDelay, respect429RetryAfter)
if err != nil {
return retryConfig, err
}
retryConfig.Backoff = backoffFn
}

delay, err := period.Parse(*spec.BackoffDelay)
return retryConfig, nil
}

// generateBackoffFn returns a valid Backoff function based on the specified criteria. In order for the respect429RetryAfter
// value to have any effect the associated RetryConfig.CheckRetry function must have first allowed retry on 429 status codes.
func generateBackoffFn(backoffPolicy *duckv1.BackoffPolicyType, backoffDelay *string, respect429RetryAfter bool) (Backoff, error) {

// Calculate Backoff Delay Duration
var backoffDelayDuration *time.Duration
if backoffDelay != nil {
delay, err := period.Parse(*backoffDelay)
if err != nil {
return retryConfig, fmt.Errorf("failed to parse Spec.BackoffDelay: %w", err)
return nil, fmt.Errorf("failed to parse Spec.BackoffDelay: %w", err)
}
duration, _ := delay.Duration()
backoffDelayDuration = &duration
}

// Create The Backoff Function
backoffFn := func(attemptNum int, resp *nethttp.Response) time.Duration {

// Parse Any 429 Retry-After Header Durations (Ignore Errors - Assume No Backoff)
var retryAfterDuration time.Duration
if respect429RetryAfter {
retryAfterDuration, _ = parse429RetryAfterDuration(resp)
}

delayDuration, _ := delay.Duration()
switch *spec.BackoffPolicy {
case duckv1.BackoffPolicyExponential:
retryConfig.Backoff = func(attemptNum int, resp *nethttp.Response) time.Duration {
return delayDuration * time.Duration(math.Exp2(float64(attemptNum)))
// Calculate The Appropriate Backoff Duration
var backoffDuration time.Duration
if backoffPolicy != nil && backoffDelayDuration != nil {
switch *backoffPolicy {
case duckv1.BackoffPolicyExponential:
backoffDuration = *backoffDelayDuration * time.Duration(math.Exp2(float64(attemptNum)))
case duckv1.BackoffPolicyLinear:
backoffDuration = *backoffDelayDuration * time.Duration(attemptNum)
}
case duckv1.BackoffPolicyLinear:
retryConfig.Backoff = func(attemptNum int, resp *nethttp.Response) time.Duration {
return delayDuration * time.Duration(attemptNum)
}

// Return The Larger Of The Two Backoff Durations
if retryAfterDuration > backoffDuration {
return retryAfterDuration
} else {
return backoffDuration
}
}

// Return the Backoff Function
return backoffFn, nil
}

// parseRetryAfterDuration returns a Duration expressing the amount of time
// requested to wait by a 429 Retry-After header, or none if not present or invalid.
// According to the spec (https://tools.ietf.org/html/rfc7231#section-7.1.3)
// the Retry-After Header's value can be one of an HTTP-date or delay-seconds,
// both of which are supported here.
func parse429RetryAfterDuration(resp *nethttp.Response) (time.Duration, error) {

var retryAfterDuration time.Duration

if resp != nil && resp.StatusCode == nethttp.StatusTooManyRequests && resp.Header != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whatever we decide, make sure to add a comment to the spec as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also honor the 503? https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503

Yeah I agree, good catch - thanks!

retryAfterString := resp.Header.Get(RetryAfterHeader)
if len(retryAfterString) > 0 {
if retryAfterInt, parseIntErr := strconv.ParseInt(retryAfterString, 10, 64); parseIntErr == nil {
retryAfterDuration = time.Duration(retryAfterInt) * time.Second
} else {
retryAfterTime, parseTimeErr := nethttp.ParseTime(retryAfterString) // Supports http.TimeFormat, time.RFC850 & time.ANSIC
if parseTimeErr != nil {
fmt.Printf("parseTimeError: %v", parseTimeErr)
return retryAfterDuration, fmt.Errorf("failed to parse Retry-After header: ParseInt Error = %v, ParseTime Error = %v", parseIntErr, parseTimeErr)
}
retryAfterDuration = time.Until(retryAfterTime)
}
}
}

return retryConfig, nil
return retryAfterDuration, nil
}

// Simple default implementation
Expand Down
137 changes: 133 additions & 4 deletions pkg/kncloudevents/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,32 @@ import (
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
)

// Test The RetryConfigFromDeliverySpec() Functionality
func TestRetryConfigFromDeliverySpec(t *testing.T) {

const retry = 5

// Inline Utility For Creating Http.Response
createResponse := func(statusCode int, retryAfter string) *http.Response {
var response *http.Response
if statusCode > 0 {
response = &http.Response{StatusCode: statusCode}
if len(retryAfter) > 0 {
response.Header = make(map[string][]string)
response.Header.Set(RetryAfterHeader, retryAfter)
}
}
return response
}

// Define The Test Cases
testcases := []struct {
name string
respectRetryAfter bool
backoffPolicy duckv1.BackoffPolicyType
backoffDelay string
response *http.Response
expectedBackoffDurations []time.Duration
roundExpectedDuration time.Duration
wantErr bool
}{{
name: "Successful Linear Backoff 2500ms, 5 retries",
Expand Down Expand Up @@ -86,10 +104,117 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) {
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "FOO",
wantErr: true,
}, {
name: "Nil Response",
respectRetryAfter: true,
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "PT2.5S",
response: nil,
expectedBackoffDurations: []time.Duration{
1 * 2500 * time.Millisecond,
2 * 2500 * time.Millisecond,
3 * 2500 * time.Millisecond,
4 * 2500 * time.Millisecond,
5 * 2500 * time.Millisecond,
},
}, {
name: "429 Response Without Retry-After",
respectRetryAfter: true,
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "PT2.5S",
response: createResponse(http.StatusTooManyRequests, ""),
expectedBackoffDurations: []time.Duration{
1 * 2500 * time.Millisecond,
2 * 2500 * time.Millisecond,
3 * 2500 * time.Millisecond,
4 * 2500 * time.Millisecond,
5 * 2500 * time.Millisecond,
},
}, {
name: "429 Response With Larger Valid Seconds Retry-After",
respectRetryAfter: true,
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "PT2.5S",
response: createResponse(http.StatusTooManyRequests, "10"),
expectedBackoffDurations: []time.Duration{
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
10 * time.Second,
},
}, {
name: "429 Response With Smaller Valid Seconds Retry-After",
respectRetryAfter: true,
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "PT2.5S",
response: createResponse(http.StatusTooManyRequests, "1"),
expectedBackoffDurations: []time.Duration{
1 * 2500 * time.Millisecond,
2 * 2500 * time.Millisecond,
3 * 2500 * time.Millisecond,
4 * 2500 * time.Millisecond,
5 * 2500 * time.Millisecond,
},
}, {
name: "429 Response With Valid Future Time Retry-After",
respectRetryAfter: true,
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "PT2.5S",
response: createResponse(http.StatusTooManyRequests, time.Now().Add(30*time.Second).Format(time.RFC850)),
roundExpectedDuration: 5 * time.Second, // Rough rounding time for test to verify against time.Now() in implementation ; )
expectedBackoffDurations: []time.Duration{
30 * time.Second,
30 * time.Second,
30 * time.Second,
30 * time.Second,
30 * time.Second,
},
}, {
name: "429 Response With Valid Prior Time Retry-After",
respectRetryAfter: true,
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "PT2.5S",
response: createResponse(http.StatusTooManyRequests, time.Now().Add(-30*time.Second).Format(time.RFC850)),
expectedBackoffDurations: []time.Duration{
1 * 2500 * time.Millisecond,
2 * 2500 * time.Millisecond,
3 * 2500 * time.Millisecond,
4 * 2500 * time.Millisecond,
5 * 2500 * time.Millisecond,
},
}, {
name: "429 Response With Invalid Retry-After",
respectRetryAfter: true,
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "PT2.5S",
response: createResponse(http.StatusTooManyRequests, "FOO"),
expectedBackoffDurations: []time.Duration{
1 * 2500 * time.Millisecond,
2 * 2500 * time.Millisecond,
3 * 2500 * time.Millisecond,
4 * 2500 * time.Millisecond,
5 * 2500 * time.Millisecond,
},
}, {
name: "200 Response With Valid Retry-After",
respectRetryAfter: true,
backoffPolicy: duckv1.BackoffPolicyLinear,
backoffDelay: "PT2.5S",
response: createResponse(http.StatusOK, "10"),
expectedBackoffDurations: []time.Duration{
1 * 2500 * time.Millisecond,
2 * 2500 * time.Millisecond,
3 * 2500 * time.Millisecond,
4 * 2500 * time.Millisecond,
5 * 2500 * time.Millisecond,
},
}}

// Execute The Individual TestCases
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {

// Create The DeliverySpec To Test
deliverySpec := duckv1.DeliverySpec{
DeadLetterSink: nil,
Expand All @@ -99,16 +224,20 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) {
}

// Create the RetryConfig from the deliverySpec
retryConfig, err := RetryConfigFromDeliverySpec(deliverySpec)
retryConfig, err := RetryConfigFromDeliverySpecWith429(deliverySpec, tc.respectRetryAfter)
assert.Equal(t, tc.wantErr, err != nil)

// If successful then validate the retryConfig (Max & Backoff calculations).
if err == nil {
assert.Equal(t, retry, retryConfig.RetryMax)
for i := 1; i < retry; i++ {
expectedBackoffDuration := tc.expectedBackoffDurations[i-1]
actualBackoffDuration := retryConfig.Backoff(i, nil)
assert.Equal(t, expectedBackoffDuration, actualBackoffDuration)
actualBackoffDuration := retryConfig.Backoff(i, tc.response)
if tc.roundExpectedDuration > 0 {
assert.Equal(t, expectedBackoffDuration, actualBackoffDuration.Round(tc.roundExpectedDuration)) // Round Time String Cases
} else {
assert.Equal(t, expectedBackoffDuration, actualBackoffDuration)
}
travis-minke-sap marked this conversation as resolved.
Show resolved Hide resolved
}
}
})
Expand Down