Skip to content

Commit

Permalink
[Experimental-Feature] DeliverySpec.RetryAfter (#5813)
Browse files Browse the repository at this point in the history
* Retry-After Experimental-Feature

* update-codegen

* minor enhancements

* Updated Issue Number TODO

* Add e2e tests

* build tag update

* Refactor implementation for new design based on community feedback.

* Refactor implementation for new design based on community feedback.

* Refactor implementation for new design based on community feedback.

* PR feedback

* PR Feedback

* fix parallel data race

* PR Feedback

* Refactor RetryAfter message_sender test

* PR feedback
  • Loading branch information
travis-minke-sap authored Nov 23, 2021
1 parent 2cda8f4 commit 820db20
Show file tree
Hide file tree
Showing 15 changed files with 982 additions and 29 deletions.
4 changes: 4 additions & 0 deletions config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ data:
# For more details: https://github.com/knative/eventing/issues/5086
kreference-group: "disabled"

# ALPHA feature: The delivery-retryafter allows you to use the RetryAfter field in DeliverySpec.
# For more details: https://github.com/knative/eventing/issues/5811
delivery-retryafter: "disabled"

# ALPHA feature: The delivery-timeout allows you to use the Timeout field in DeliverySpec.
# For more details: https://github.com/knative/eventing/issues/5148
delivery-timeout: "disabled"
Expand Down
25 changes: 25 additions & 0 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,31 @@ More information on Duration format:
For exponential policy, backoff delay is backoffDelay*2^<numberOfRetries>.</p>
</td>
</tr>
<tr>
<td>
<code>retryAfterMax</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>RetryAfterMax provides an optional upper bound on the duration specified in a &ldquo;Retry-After&rdquo; header
when calculating backoff times for retrying 429 and 503 response codes. Setting the value to
zero (&ldquo;PT0S&rdquo;) can be used to opt-out of respecting &ldquo;Retry-After&rdquo; header values altogether. This
value only takes effect if &ldquo;Retry&rdquo; is configured, and also depends on specific implementations
(Channels, Sources, etc.) choosing to provide this capability.</p>
<p>Note: This API is EXPERIMENTAL and might be changed at anytime. While this experimental
feature is in the Alpha/Beta stage, you must provide a valid value to opt-in for
supporting &ldquo;Retry-After&rdquo; headers. When the feature becomes Stable/GA &ldquo;Retry-After&rdquo;
headers will be respected by default, and you can choose to specify &ldquo;PT0S&rdquo; to
opt-out of supporting &ldquo;Retry-After&rdquo; headers.
For more details: <a href="https://github.com/knative/eventing/issues/5811">https://github.com/knative/eventing/issues/5811</a></p>
<p>More information on Duration format:
- <a href="https://www.iso.org/iso-8601-date-and-time-format.html">https://www.iso.org/iso-8601-date-and-time-format.html</a>
- <a href="https://en.wikipedia.org/wiki/ISO_8601">https://en.wikipedia.org/wiki/ISO_8601</a></p>
</td>
</tr>
</tbody>
</table>
<h3 id="duck.knative.dev/v1.DeliveryStatus">DeliveryStatus
Expand Down
35 changes: 34 additions & 1 deletion pkg/apis/duck/v1/delivery_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"context"

"github.com/rickb777/date/period"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing/pkg/apis/feature"
)

// DeliverySpec contains the delivery options for event senders,
Expand Down Expand Up @@ -60,6 +61,26 @@ type DeliverySpec struct {
// For exponential policy, backoff delay is backoffDelay*2^<numberOfRetries>.
// +optional
BackoffDelay *string `json:"backoffDelay,omitempty"`

// RetryAfterMax provides an optional upper bound on the duration specified in a "Retry-After" header
// when calculating backoff times for retrying 429 and 503 response codes. Setting the value to
// zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This
// value only takes effect if "Retry" is configured, and also depends on specific implementations
// (Channels, Sources, etc.) choosing to provide this capability.
//
// Note: This API is EXPERIMENTAL and might be changed at anytime. While this experimental
// feature is in the Alpha/Beta stage, you must provide a valid value to opt-in for
// supporting "Retry-After" headers. When the feature becomes Stable/GA "Retry-After"
// headers will be respected by default, and you can choose to specify "PT0S" to
// opt-out of supporting "Retry-After" headers.
// For more details: https://github.com/knative/eventing/issues/5811
//
// More information on Duration format:
// - https://www.iso.org/iso-8601-date-and-time-format.html
// - https://en.wikipedia.org/wiki/ISO_8601
//
// +optional
RetryAfterMax *string `json:"retryAfterMax,omitempty"`
}

func (ds *DeliverySpec) Validate(ctx context.Context) *apis.FieldError {
Expand Down Expand Up @@ -101,6 +122,18 @@ func (ds *DeliverySpec) Validate(ctx context.Context) *apis.FieldError {
errs = errs.Also(apis.ErrInvalidValue(*ds.BackoffDelay, "backoffDelay"))
}
}

if ds.RetryAfterMax != nil {
if feature.FromContext(ctx).IsEnabled(feature.DeliveryRetryAfter) {
p, me := period.Parse(*ds.RetryAfterMax)
if me != nil || p.IsNegative() {
errs = errs.Also(apis.ErrInvalidValue(*ds.RetryAfterMax, "retryAfterMax"))
}
} else {
errs = errs.Also(apis.ErrDisallowedFields("retryAfterMax"))
}
}

return errs
}

Expand Down
38 changes: 37 additions & 1 deletion pkg/apis/duck/v1/delivery_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ import (

"github.com/google/go-cmp/cmp"
"k8s.io/utils/pointer"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing/pkg/apis/feature"
)

func TestDeliverySpecValidation(t *testing.T) {
deliveryTimeoutEnabledCtx := feature.ToContext(context.TODO(), feature.Flags{
feature.DeliveryTimeout: feature.Enabled,
})
deliveryRetryAfterEnabledCtx := feature.ToContext(context.TODO(), feature.Flags{
feature.DeliveryRetryAfter: feature.Enabled,
})

invalidString := "invalid time"
bop := BackoffPolicyExponential
Expand Down Expand Up @@ -102,9 +106,41 @@ func TestDeliverySpecValidation(t *testing.T) {
}, {
name: "valid retry 0",
spec: &DeliverySpec{Retry: pointer.Int32Ptr(0)},
want: nil,
}, {
name: "valid retry 1",
spec: &DeliverySpec{Retry: pointer.Int32Ptr(1)},
want: nil,
}, {
name: "valid retryAfterMax",
ctx: deliveryRetryAfterEnabledCtx,
spec: &DeliverySpec{RetryAfterMax: &validDuration},
want: nil,
}, {
name: "zero retryAfterMax",
ctx: deliveryRetryAfterEnabledCtx,
spec: &DeliverySpec{RetryAfterMax: pointer.StringPtr("PT0S")},
want: nil,
}, {
name: "empty retryAfterMax",
ctx: deliveryRetryAfterEnabledCtx,
spec: &DeliverySpec{RetryAfterMax: pointer.StringPtr("")},
want: func() *apis.FieldError {
return apis.ErrInvalidValue("", "retryAfterMax")
}(),
}, {
name: "invalid retryAfterMax",
ctx: deliveryRetryAfterEnabledCtx,
spec: &DeliverySpec{RetryAfterMax: &invalidDuration},
want: func() *apis.FieldError {
return apis.ErrInvalidValue(invalidDuration, "retryAfterMax")
}(),
}, {
name: "disabled feature with retryAfterMax",
spec: &DeliverySpec{RetryAfterMax: &validDuration},
want: func() *apis.FieldError {
return apis.ErrDisallowedFields("retryAfterMax")
}(),
}}

for _, test := range tests {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/duck/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package feature

const (
KReferenceGroup = "kreference-group"
DeliveryTimeout = "delivery-timeout"
KReferenceMapping = "kreference-mapping"
StrictSubscriber = "strict-subscriber"
NewTriggerFilters = "new-trigger-filters"
KReferenceGroup = "kreference-group"
DeliveryRetryAfter = "delivery-retryafter"
DeliveryTimeout = "delivery-timeout"
KReferenceMapping = "kreference-mapping"
StrictSubscriber = "strict-subscriber"
NewTriggerFilters = "new-trigger-filters"
)
96 changes: 93 additions & 3 deletions pkg/kncloudevents/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ package kncloudevents

import (
"context"
"fmt"
nethttp "net/http"
"strconv"
"time"

"github.com/hashicorp/go-retryablehttp"
)

const RetryAfterHeader = "Retry-After"

// HTTPMessageSender is a wrapper for an http client that can send cloudevents.Request with retries
type HTTPMessageSender struct {
Client *nethttp.Client
Expand Down Expand Up @@ -73,9 +77,7 @@ func (s *HTTPMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC
RetryWaitMax: defaultRetryWaitMax,
RetryMax: config.RetryMax,
CheckRetry: retryablehttp.CheckRetry(config.CheckRetry),
Backoff: func(_, _ time.Duration, attemptNum int, resp *nethttp.Response) time.Duration {
return config.Backoff(attemptNum, resp)
},
Backoff: generateBackoffFn(config),
ErrorHandler: func(resp *nethttp.Response, err error, numTries int) (*nethttp.Response, error) {
return resp, err
},
Expand All @@ -88,3 +90,91 @@ func (s *HTTPMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC

return retryableClient.Do(retryableReq)
}

// generateBackoffFunction returns a valid retryablehttp.Backoff implementation which
// wraps the provided RetryConfig.Backoff implementation with optional "Retry-After"
// header support.
func generateBackoffFn(config *RetryConfig) retryablehttp.Backoff {
return func(_, _ time.Duration, attemptNum int, resp *nethttp.Response) time.Duration {

//
// NOTE - The following logic will need to be altered slightly once the "delivery-retryafter"
// experimental-feature graduates from Alpha/Beta to Stable/GA. This is according to
// plan as described in https://github.com/knative/eventing/issues/5811.
//
// During the Alpha/Beta stages the ability to respect Retry-After headers is "opt-in"
// requiring the DeliverySpec.RetryAfterMax to be populated. The Stable/GA behavior
// will be "opt-out" where Retry-After headers are always respected (in the context of
// calculating backoff durations for 429 / 503 responses) unless the
// DeliverySpec.RetryAfterMax is set to "PT0S".
//
// While this might seem unnecessarily complex, it achieves the following design goals...
// - Does not require an explicit "enabled" flag in the DeliverySpec.
// - Does not require implementations calling the message_sender to be aware of experimental-features.
// - Does not modify existing Knative CRs with arbitrary default "max" values.
//
// The intended behavior of RetryConfig.RetryAfterMaxDuration is as follows...
//
// RetryAfterMaxDuration Alpha/Beta Stable/GA
// --------------------- ---------- ---------
// nil Do NOT respect Retry-After headers Respect Retry-After headers without Max
// 0 Do NOT respect Retry-After headers Do NOT respect Retry-After headers
// >0 Respect Retry-After headers with Max Respect Retry-After headers with Max
//

// If Response is 429 / 503, Then Parse Any Retry-After Header Durations & Enforce Optional MaxDuration
var retryAfterDuration time.Duration
// TODO - Remove this check when experimental-feature moves to Stable/GA to convert behavior from opt-in to opt-out
if config.RetryAfterMaxDuration != nil {
// TODO - Keep this logic as is (no change required) when experimental-feature is Stable/GA
if resp != nil && (resp.StatusCode == nethttp.StatusTooManyRequests || resp.StatusCode == nethttp.StatusServiceUnavailable) {
retryAfterDuration = parseRetryAfterDuration(resp)
if config.RetryAfterMaxDuration != nil && *config.RetryAfterMaxDuration < retryAfterDuration {
retryAfterDuration = *config.RetryAfterMaxDuration
}
}
}

// Calculate The RetryConfig Backoff Duration
backoffDuration := config.Backoff(attemptNum, resp)

// Return The Larger Of The Two Backoff Durations
if retryAfterDuration > backoffDuration {
return retryAfterDuration
}
return backoffDuration
}
}

// parseRetryAfterDuration returns a Duration expressing the amount of time
// requested to wait by a Retry-After header, or 0 if not present or invalid.
// According to the spec (https://tools.ietf.org/html/rfc7231#section-7.1.3)
// the Retry-After Header's value can be one of an HTTP-date or delay-seconds,
// both of which are supported here.
func parseRetryAfterDuration(resp *nethttp.Response) (retryAfterDuration time.Duration) {

// Return 0 Duration If No Response / Headers
if resp == nil || resp.Header == nil {
return
}

// Return 0 Duration If No Retry-After Header
retryAfterString := resp.Header.Get(RetryAfterHeader)
if len(retryAfterString) <= 0 {
return
}

// Attempt To Parse Retry-After Header As Seconds - Return If Successful
retryAfterInt, parseIntErr := strconv.ParseInt(retryAfterString, 10, 64)
if parseIntErr == nil {
return time.Duration(retryAfterInt) * time.Second
}

// Attempt To Parse Retry-After Header As Timestamp (RFC850 & ANSIC) - Return If Successful
retryAfterTime, parseTimeErr := nethttp.ParseTime(retryAfterString)
if parseTimeErr != nil {
fmt.Printf("failed to parse Retry-After header: ParseInt Error = %v, ParseTime Error = %v\n", parseIntErr, parseTimeErr)
return
}
return time.Until(retryAfterTime)
}
Loading

0 comments on commit 820db20

Please sign in to comment.