From 4ef8ec99aba465ebc21a2cb21639c50b8c9aa8b2 Mon Sep 17 00:00:00 2001 From: I867318 Date: Tue, 20 Apr 2021 11:22:25 -0600 Subject: [PATCH 1/2] Initial attempt at 429 Retry-After header support --- pkg/kncloudevents/message_sender.go | 96 ++++++++++++++-- pkg/kncloudevents/message_sender_test.go | 137 ++++++++++++++++++++++- 2 files changed, 217 insertions(+), 16 deletions(-) diff --git a/pkg/kncloudevents/message_sender.go b/pkg/kncloudevents/message_sender.go index a2be82b2082..ae9158fc50b 100644 --- a/pkg/kncloudevents/message_sender.go +++ b/pkg/kncloudevents/message_sender.go @@ -21,6 +21,7 @@ import ( "fmt" "math" nethttp "net/http" + "strconv" "time" "github.com/hashicorp/go-retryablehttp" @@ -29,6 +30,8 @@ import ( duckv1 "knative.dev/eventing/pkg/apis/duck/v1" ) +const RetryAfterHeader = "Retry-After" + var noRetries = RetryConfig{ RetryMax: 0, CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { @@ -126,6 +129,12 @@ func NoRetries() RetryConfig { } func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) { + return RetryConfigFromDeliverySpecWith429(spec, false) // Maintain current implementation until exposed in DeliverySpec +} + +// RetryConfigFromDeliverySpecWith429 extension is hoped to be temporary, and can be removed if +// the respect429RetryAfter could always be true, or is incorporated into the DeliverySpec. +func RetryConfigFromDeliverySpecWith429(spec duckv1.DeliverySpec, respect429RetryAfter bool) (RetryConfig, error) { retryConfig := NoRetries() @@ -137,27 +146,90 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) retryConfig.BackoffPolicy = spec.BackoffPolicy retryConfig.BackoffDelay = spec.BackoffDelay - if spec.BackoffPolicy != nil && spec.BackoffDelay != nil { + if (spec.BackoffPolicy != nil && spec.BackoffDelay != nil) || respect429RetryAfter { + backoffFn, err := generateBackoffFn(spec.BackoffPolicy, spec.BackoffDelay, respect429RetryAfter) + if err != nil { + return retryConfig, err + } + retryConfig.Backoff = backoffFn + } - delay, err := period.Parse(*spec.BackoffDelay) + return retryConfig, nil +} + +// generateBackoffFn returns a valid Backoff function based on the specified criteria. In order for the respect429RetryAfter +// value to have any effect the associated RetryConfig.CheckRetry function must have first allowed retry on 429 status codes. +func generateBackoffFn(backoffPolicy *duckv1.BackoffPolicyType, backoffDelay *string, respect429RetryAfter bool) (Backoff, error) { + + // Calculate Backoff Delay Duration + var backoffDelayDuration *time.Duration + if backoffDelay != nil { + delay, err := period.Parse(*backoffDelay) if err != nil { - return retryConfig, fmt.Errorf("failed to parse Spec.BackoffDelay: %w", err) + return nil, fmt.Errorf("failed to parse Spec.BackoffDelay: %w", err) + } + duration, _ := delay.Duration() + backoffDelayDuration = &duration + } + + // Create The Backoff Function + backoffFn := func(attemptNum int, resp *nethttp.Response) time.Duration { + + // Parse Any 429 Retry-After Header Durations (Ignore Errors - Assume No Backoff) + var retryAfterDuration time.Duration + if respect429RetryAfter { + retryAfterDuration, _ = parse429RetryAfterDuration(resp) } - delayDuration, _ := delay.Duration() - switch *spec.BackoffPolicy { - case duckv1.BackoffPolicyExponential: - retryConfig.Backoff = func(attemptNum int, resp *nethttp.Response) time.Duration { - return delayDuration * time.Duration(math.Exp2(float64(attemptNum))) + // Calculate The Appropriate Backoff Duration + var backoffDuration time.Duration + if backoffPolicy != nil && backoffDelayDuration != nil { + switch *backoffPolicy { + case duckv1.BackoffPolicyExponential: + backoffDuration = *backoffDelayDuration * time.Duration(math.Exp2(float64(attemptNum))) + case duckv1.BackoffPolicyLinear: + backoffDuration = *backoffDelayDuration * time.Duration(attemptNum) } - case duckv1.BackoffPolicyLinear: - retryConfig.Backoff = func(attemptNum int, resp *nethttp.Response) time.Duration { - return delayDuration * time.Duration(attemptNum) + } + + // Return The Larger Of The Two Backoff Durations + if retryAfterDuration > backoffDuration { + return retryAfterDuration + } else { + return backoffDuration + } + } + + // Return the Backoff Function + return backoffFn, nil +} + +// parseRetryAfterDuration returns a Duration expressing the amount of time +// requested to wait by a 429 Retry-After header, or none if not present or invalid. +// According to the spec (https://tools.ietf.org/html/rfc7231#section-7.1.3) +// the Retry-After Header's value can be one of an HTTP-date or delay-seconds, +// both of which are supported here. +func parse429RetryAfterDuration(resp *nethttp.Response) (time.Duration, error) { + + var retryAfterDuration time.Duration + + if resp != nil && resp.StatusCode == nethttp.StatusTooManyRequests && resp.Header != nil { + retryAfterString := resp.Header.Get(RetryAfterHeader) + if len(retryAfterString) > 0 { + if retryAfterInt, parseIntErr := strconv.ParseInt(retryAfterString, 10, 64); parseIntErr == nil { + retryAfterDuration = time.Duration(retryAfterInt) * time.Second + } else { + retryAfterTime, parseTimeErr := nethttp.ParseTime(retryAfterString) // Supports http.TimeFormat, time.RFC850 & time.ANSIC + if parseTimeErr != nil { + fmt.Printf("parseTimeError: %v", parseTimeErr) + return retryAfterDuration, fmt.Errorf("failed to parse Retry-After header: ParseInt Error = %v, ParseTime Error = %v", parseIntErr, parseTimeErr) + } + retryAfterDuration = time.Until(retryAfterTime) } } } - return retryConfig, nil + return retryAfterDuration, nil } // Simple default implementation diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go index fc07bdddcd0..37e323262fb 100644 --- a/pkg/kncloudevents/message_sender_test.go +++ b/pkg/kncloudevents/message_sender_test.go @@ -39,14 +39,32 @@ import ( eventingduck "knative.dev/eventing/pkg/apis/duck/v1" ) -// Test The RetryConfigFromDeliverySpec() Functionality func TestRetryConfigFromDeliverySpec(t *testing.T) { + const retry = 5 + + // Inline Utility For Creating Http.Response + createResponse := func(statusCode int, retryAfter string) *http.Response { + var response *http.Response + if statusCode > 0 { + response = &http.Response{StatusCode: statusCode} + if len(retryAfter) > 0 { + response.Header = make(map[string][]string) + response.Header.Set(RetryAfterHeader, retryAfter) + } + } + return response + } + + // Define The Test Cases testcases := []struct { name string + respectRetryAfter bool backoffPolicy duckv1.BackoffPolicyType backoffDelay string + response *http.Response expectedBackoffDurations []time.Duration + roundExpectedDuration time.Duration wantErr bool }{{ name: "Successful Linear Backoff 2500ms, 5 retries", @@ -86,10 +104,117 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) { backoffPolicy: duckv1.BackoffPolicyLinear, backoffDelay: "FOO", wantErr: true, + }, { + name: "Nil Response", + respectRetryAfter: true, + backoffPolicy: duckv1.BackoffPolicyLinear, + backoffDelay: "PT2.5S", + response: nil, + expectedBackoffDurations: []time.Duration{ + 1 * 2500 * time.Millisecond, + 2 * 2500 * time.Millisecond, + 3 * 2500 * time.Millisecond, + 4 * 2500 * time.Millisecond, + 5 * 2500 * time.Millisecond, + }, + }, { + name: "429 Response Without Retry-After", + respectRetryAfter: true, + backoffPolicy: duckv1.BackoffPolicyLinear, + backoffDelay: "PT2.5S", + response: createResponse(http.StatusTooManyRequests, ""), + expectedBackoffDurations: []time.Duration{ + 1 * 2500 * time.Millisecond, + 2 * 2500 * time.Millisecond, + 3 * 2500 * time.Millisecond, + 4 * 2500 * time.Millisecond, + 5 * 2500 * time.Millisecond, + }, + }, { + name: "429 Response With Larger Valid Seconds Retry-After", + respectRetryAfter: true, + backoffPolicy: duckv1.BackoffPolicyLinear, + backoffDelay: "PT2.5S", + response: createResponse(http.StatusTooManyRequests, "10"), + expectedBackoffDurations: []time.Duration{ + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + 10 * time.Second, + }, + }, { + name: "429 Response With Smaller Valid Seconds Retry-After", + respectRetryAfter: true, + backoffPolicy: duckv1.BackoffPolicyLinear, + backoffDelay: "PT2.5S", + response: createResponse(http.StatusTooManyRequests, "1"), + expectedBackoffDurations: []time.Duration{ + 1 * 2500 * time.Millisecond, + 2 * 2500 * time.Millisecond, + 3 * 2500 * time.Millisecond, + 4 * 2500 * time.Millisecond, + 5 * 2500 * time.Millisecond, + }, + }, { + name: "429 Response With Valid Future Time Retry-After", + respectRetryAfter: true, + backoffPolicy: duckv1.BackoffPolicyLinear, + backoffDelay: "PT2.5S", + response: createResponse(http.StatusTooManyRequests, time.Now().Add(30*time.Second).Format(time.RFC850)), + roundExpectedDuration: 5 * time.Second, // Rough rounding time for test to verify against time.Now() in implementation ; ) + expectedBackoffDurations: []time.Duration{ + 30 * time.Second, + 30 * time.Second, + 30 * time.Second, + 30 * time.Second, + 30 * time.Second, + }, + }, { + name: "429 Response With Valid Prior Time Retry-After", + respectRetryAfter: true, + backoffPolicy: duckv1.BackoffPolicyLinear, + backoffDelay: "PT2.5S", + response: createResponse(http.StatusTooManyRequests, time.Now().Add(-30*time.Second).Format(time.RFC850)), + expectedBackoffDurations: []time.Duration{ + 1 * 2500 * time.Millisecond, + 2 * 2500 * time.Millisecond, + 3 * 2500 * time.Millisecond, + 4 * 2500 * time.Millisecond, + 5 * 2500 * time.Millisecond, + }, + }, { + name: "429 Response With Invalid Retry-After", + respectRetryAfter: true, + backoffPolicy: duckv1.BackoffPolicyLinear, + backoffDelay: "PT2.5S", + response: createResponse(http.StatusTooManyRequests, "FOO"), + expectedBackoffDurations: []time.Duration{ + 1 * 2500 * time.Millisecond, + 2 * 2500 * time.Millisecond, + 3 * 2500 * time.Millisecond, + 4 * 2500 * time.Millisecond, + 5 * 2500 * time.Millisecond, + }, + }, { + name: "200 Response With Valid Retry-After", + respectRetryAfter: true, + backoffPolicy: duckv1.BackoffPolicyLinear, + backoffDelay: "PT2.5S", + response: createResponse(http.StatusOK, "10"), + expectedBackoffDurations: []time.Duration{ + 1 * 2500 * time.Millisecond, + 2 * 2500 * time.Millisecond, + 3 * 2500 * time.Millisecond, + 4 * 2500 * time.Millisecond, + 5 * 2500 * time.Millisecond, + }, }} + // Execute The Individual TestCases for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { + // Create The DeliverySpec To Test deliverySpec := duckv1.DeliverySpec{ DeadLetterSink: nil, @@ -99,7 +224,7 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) { } // Create the RetryConfig from the deliverySpec - retryConfig, err := RetryConfigFromDeliverySpec(deliverySpec) + retryConfig, err := RetryConfigFromDeliverySpecWith429(deliverySpec, tc.respectRetryAfter) assert.Equal(t, tc.wantErr, err != nil) // If successful then validate the retryConfig (Max & Backoff calculations). @@ -107,8 +232,12 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) { assert.Equal(t, retry, retryConfig.RetryMax) for i := 1; i < retry; i++ { expectedBackoffDuration := tc.expectedBackoffDurations[i-1] - actualBackoffDuration := retryConfig.Backoff(i, nil) - assert.Equal(t, expectedBackoffDuration, actualBackoffDuration) + actualBackoffDuration := retryConfig.Backoff(i, tc.response) + if tc.roundExpectedDuration > 0 { + assert.Equal(t, expectedBackoffDuration, actualBackoffDuration.Round(tc.roundExpectedDuration)) // Round Time String Cases + } else { + assert.Equal(t, expectedBackoffDuration, actualBackoffDuration) + } } } }) From 81399c7c47f62dcfb5e3098bd3437cca84449f2b Mon Sep 17 00:00:00 2001 From: I867318 Date: Tue, 20 Apr 2021 13:41:16 -0600 Subject: [PATCH 2/2] PR Feedback --- pkg/kncloudevents/message_sender_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go index 37e323262fb..9723ffd786c 100644 --- a/pkg/kncloudevents/message_sender_test.go +++ b/pkg/kncloudevents/message_sender_test.go @@ -233,11 +233,7 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) { for i := 1; i < retry; i++ { expectedBackoffDuration := tc.expectedBackoffDurations[i-1] actualBackoffDuration := retryConfig.Backoff(i, tc.response) - if tc.roundExpectedDuration > 0 { - assert.Equal(t, expectedBackoffDuration, actualBackoffDuration.Round(tc.roundExpectedDuration)) // Round Time String Cases - } else { - assert.Equal(t, expectedBackoffDuration, actualBackoffDuration) - } + assert.Equal(t, expectedBackoffDuration, actualBackoffDuration.Round(tc.roundExpectedDuration)) // Round Time String Cases } } })