Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental-Feature] DeliverySpec.RetryAfter #5813

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
91b3795
Retry-After Experimental-Feature
travis-minke-sap Oct 8, 2021
b942a39
update-codegen
travis-minke-sap Oct 8, 2021
fea4cc5
minor enhancements
travis-minke-sap Oct 13, 2021
a6d0d35
Updated Issue Number TODO
travis-minke-sap Oct 13, 2021
bfd444b
Merging main/
travis-minke-sap Oct 14, 2021
8e602dc
Merge branch 'main' into retry-after-experimental-feature
travis-minke-sap Oct 21, 2021
3f7d714
Add e2e tests
travis-minke-sap Oct 21, 2021
d227062
build tag update
travis-minke-sap Oct 21, 2021
692765c
Merge branch 'main' into retry-after-experimental-feature
travis-minke-sap Oct 26, 2021
88e75b6
Merge branch 'main' into retry-after-experimental-feature
travis-minke-sap Nov 3, 2021
9fc7448
Refactor implementation for new design based on community feedback.
travis-minke-sap Nov 8, 2021
c28795b
Refactor implementation for new design based on community feedback.
travis-minke-sap Nov 8, 2021
6bb059a
Refactor implementation for new design based on community feedback.
travis-minke-sap Nov 9, 2021
6d5ec5f
PR feedback
travis-minke-sap Nov 12, 2021
3e0e3e2
Merge main
travis-minke-sap Nov 12, 2021
2e71d1d
Merge branch 'main' into retry-after-experimental-feature
travis-minke-sap Nov 17, 2021
87be0cc
PR Feedback
travis-minke-sap Nov 17, 2021
094a102
fix parallel data race
travis-minke-sap Nov 17, 2021
1602941
PR Feedback
travis-minke-sap Nov 17, 2021
2eab560
Refactor RetryAfter message_sender test
travis-minke-sap Nov 19, 2021
5ce88dd
Merge branch 'main' into retry-after-experimental-feature
travis-minke-sap Nov 19, 2021
1cab330
Merge branch 'main' into retry-after-experimental-feature
travis-minke-sap Nov 23, 2021
5be0e20
PR feedback
travis-minke-sap Nov 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ data:
# For more details: https://github.com/knative/eventing/issues/5086
kreference-group: "disabled"

# ALPHA feature: The delivery-retryafter allows you to use the RetryAfter field in DeliverySpec.
# For more details: https://github.com/knative/eventing/issues/5811
delivery-retryafter: "disabled"

# ALPHA feature: The delivery-timeout allows you to use the Timeout field in DeliverySpec.
# For more details: https://github.com/knative/eventing/issues/5148
delivery-timeout: "disabled"
Expand Down
67 changes: 67 additions & 0 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,22 @@ More information on Duration format:
For exponential policy, backoff delay is backoffDelay*2^<numberOfRetries>.</p>
</td>
</tr>
<tr>
<td>
<code>retryAfter</code><br/>
<em>
<a href="#duck.knative.dev/v1.RetryAfter">
RetryAfter
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>RetryAfter controls how &ldquo;Retry-After&rdquo; header durations are handled for 429 and 503 response codes.
If not provided, the default behavior is to ignore &ldquo;Retry-After&rdquo; headers in responses.</p>
<p>Note: This API is EXPERIMENTAL and might break anytime. For more details: <a href="https://github.com/knative/eventing/issues/5811">https://github.com/knative/eventing/issues/5811</a></p>
</td>
</tr>
</tbody>
</table>
<h3 id="duck.knative.dev/v1.DeliveryStatus">DeliveryStatus
Expand Down Expand Up @@ -424,6 +440,57 @@ where failed events are sent to.</p>
</tr>
</tbody>
</table>
<h3 id="duck.knative.dev/v1.RetryAfter">RetryAfter
</h3>
<p>
(<em>Appears on:</em><a href="#duck.knative.dev/v1.DeliverySpec">DeliverySpec</a>)
</p>
<p>
<p>RetryAfter contains configuration related to the handling of &ldquo;Retry-After&rdquo; headers.</p>
</p>
<table>
<thead>
<tr>
<th>Field</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>enabled</code><br/>
<em>
bool
</em>
</td>
<td>
<p>Enabled is a flag indicating whether to respect the &ldquo;Retry-After&rdquo; header duration.
If enabled, the largest of the normal backoff duration and the &ldquo;Retry-After&rdquo;
header value will be used when calculating the next backoff duration. This will
only be considered when a 429 (Too Many Requests) or 503 (Service Unavailable)
response code is received and Retry is greater than 0.</p>
</td>
</tr>
<tr>
<td>
<code>maxDuration</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>MaxDuration is the maximum time to wait before retrying. It is intended as an
override to protect against excessively large &ldquo;Retry-After&rdquo; durations. If provided,
the value must be greater than 0. If not provided, the largest of the &ldquo;Retry-After&rdquo;
duration and the normal backoff duration will be used.
More information on Duration format:
- <a href="https://www.iso.org/iso-8601-date-and-time-format.html">https://www.iso.org/iso-8601-date-and-time-format.html</a>
- <a href="https://en.wikipedia.org/wiki/ISO_8601">https://en.wikipedia.org/wiki/ISO_8601</a></p>
</td>
</tr>
</tbody>
</table>
<h3 id="duck.knative.dev/v1.Subscribable">Subscribable
</h3>
<p>
Expand Down
44 changes: 43 additions & 1 deletion pkg/apis/duck/v1/delivery_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"context"

"github.com/rickb777/date/period"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing/pkg/apis/feature"
)

// DeliverySpec contains the delivery options for event senders,
Expand Down Expand Up @@ -60,6 +61,33 @@ type DeliverySpec struct {
// For exponential policy, backoff delay is backoffDelay*2^<numberOfRetries>.
// +optional
BackoffDelay *string `json:"backoffDelay,omitempty"`

// RetryAfter controls how "Retry-After" header durations are handled for 429 and 503 response codes.
// If not provided, the default behavior is to ignore "Retry-After" headers in responses.
//
// Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5811
// +optional
RetryAfter *RetryAfter `json:"retryAfter,omitempty"`
}

// RetryAfter contains configuration related to the handling of "Retry-After" headers.
type RetryAfter struct {
// Enabled is a flag indicating whether to respect the "Retry-After" header duration.
// If enabled, the largest of the normal backoff duration and the "Retry-After"
// header value will be used when calculating the next backoff duration. This will
// only be considered when a 429 (Too Many Requests) or 503 (Service Unavailable)
// response code is received and Retry is greater than 0.
Enabled bool `json:"enabled"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we always respect the Retry-After?

I am not really sure I like the toggling here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is stale - based on the original design which has evolved (is evolving in the associated Issue). Yes, everyone was against the "enabled" flag and in the latest design it is gone. I'm waiting to update this PR for the new design to settle - i need to summarize and push for agreement again...


// MaxDuration is the maximum time to wait before retrying. It is intended as an
// override to protect against excessively large "Retry-After" durations. If provided,
// the value must be greater than 0. If not provided, the largest of the "Retry-After"
// duration and the normal backoff duration will be used.
// More information on Duration format:
// - https://www.iso.org/iso-8601-date-and-time-format.html
// - https://en.wikipedia.org/wiki/ISO_8601
// +optional
MaxDuration *string `json:"maxDuration,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming we do not toggle (just thinking out loud) and have respecting Retry-after as default.

Setting MaxDuration: -1 could disable it.
If nothing (or a value greater than 0) is provided we do as state on the comments

🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the idea is that at least in GA it would be always respected and you could opt out by specifying "PT0S" or something similar.

}

func (ds *DeliverySpec) Validate(ctx context.Context) *apis.FieldError {
Expand Down Expand Up @@ -101,6 +129,20 @@ func (ds *DeliverySpec) Validate(ctx context.Context) *apis.FieldError {
errs = errs.Also(apis.ErrInvalidValue(*ds.BackoffDelay, "backoffDelay"))
}
}

if ds.RetryAfter != nil {
if feature.FromContext(ctx).IsEnabled(feature.DeliveryRetryAfter) {
if ds.RetryAfter.MaxDuration != nil && *ds.RetryAfter.MaxDuration != "" {
m, me := period.Parse(*ds.RetryAfter.MaxDuration)
if me != nil || m.IsZero() {
errs = errs.Also(apis.ErrInvalidValue(*ds.RetryAfter.MaxDuration, "retryAfter.maxDuration"))
}
}
} else {
errs = errs.Also(apis.ErrDisallowedFields("retryAfter"))
}
}

return errs
}

Expand Down
40 changes: 39 additions & 1 deletion pkg/apis/duck/v1/delivery_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ import (

"github.com/google/go-cmp/cmp"
"k8s.io/utils/pointer"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing/pkg/apis/feature"
)

// TODO - add test cases for RetryAfter

func TestDeliverySpecValidation(t *testing.T) {
deliveryTimeoutEnabledCtx := feature.ToContext(context.TODO(), feature.Flags{
feature.DeliveryTimeout: feature.Enabled,
})
deliveryRetryAfterEnabledCtx := feature.ToContext(context.TODO(), feature.Flags{
feature.DeliveryRetryAfter: feature.Enabled,
})

invalidString := "invalid time"
bop := BackoffPolicyExponential
Expand Down Expand Up @@ -102,9 +108,41 @@ func TestDeliverySpecValidation(t *testing.T) {
}, {
name: "valid retry 0",
spec: &DeliverySpec{Retry: pointer.Int32Ptr(0)},
want: nil,
}, {
name: "valid retry 1",
spec: &DeliverySpec{Retry: pointer.Int32Ptr(1)},
want: nil,
}, {
name: "valid complete retryAfter",
ctx: deliveryRetryAfterEnabledCtx,
spec: &DeliverySpec{RetryAfter: &RetryAfter{Enabled: true, MaxDuration: &validDuration}},
want: nil,
}, {
name: "valid sparse retryAfter",
ctx: deliveryRetryAfterEnabledCtx,
spec: &DeliverySpec{RetryAfter: &RetryAfter{Enabled: true}},
want: nil,
}, {
name: "zero retryAfter.MaxDuration",
ctx: deliveryRetryAfterEnabledCtx,
spec: &DeliverySpec{RetryAfter: &RetryAfter{Enabled: true, MaxDuration: pointer.StringPtr("PT0S")}},
want: func() *apis.FieldError {
return apis.ErrInvalidValue("PT0S", "retryAfter.maxDuration")
}(),
}, {
name: "invalid retryAfter.MaxDuration",
ctx: deliveryRetryAfterEnabledCtx,
spec: &DeliverySpec{RetryAfter: &RetryAfter{Enabled: true, MaxDuration: &invalidDuration}},
want: func() *apis.FieldError {
return apis.ErrInvalidValue(invalidDuration, "retryAfter.maxDuration")
}(),
}, {
name: "disabled retryAfter",
spec: &DeliverySpec{RetryAfter: &RetryAfter{Enabled: true}},
want: func() *apis.FieldError {
return apis.ErrDisallowedFields("retryAfter")
}(),
}}

for _, test := range tests {
Expand Down
26 changes: 26 additions & 0 deletions pkg/apis/duck/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ limitations under the License.
package feature

const (
KReferenceGroup = "kreference-group"
DeliveryTimeout = "delivery-timeout"
KReferenceMapping = "kreference-mapping"
StrictSubscriber = "strict-subscriber"
KReferenceGroup = "kreference-group"
DeliveryRetryAfter = "delivery-retryafter"
DeliveryTimeout = "delivery-timeout"
KReferenceMapping = "kreference-mapping"
StrictSubscriber = "strict-subscriber"
)
60 changes: 57 additions & 3 deletions pkg/kncloudevents/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ package kncloudevents

import (
"context"
"fmt"
nethttp "net/http"
"strconv"
"time"

"github.com/hashicorp/go-retryablehttp"
)

const RetryAfterHeader = "Retry-After"

// HTTPMessageSender is a wrapper for an http client that can send cloudevents.Request with retries
type HTTPMessageSender struct {
Client *nethttp.Client
Expand Down Expand Up @@ -73,9 +77,7 @@ func (s *HTTPMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC
RetryWaitMax: defaultRetryWaitMax,
RetryMax: config.RetryMax,
CheckRetry: retryablehttp.CheckRetry(config.CheckRetry),
Backoff: func(_, _ time.Duration, attemptNum int, resp *nethttp.Response) time.Duration {
return config.Backoff(attemptNum, resp)
},
Backoff: generateBackoffFn(config),
ErrorHandler: func(resp *nethttp.Response, err error, numTries int) (*nethttp.Response, error) {
return resp, err
},
Expand All @@ -88,3 +90,55 @@ func (s *HTTPMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC

return retryableClient.Do(retryableReq)
}

// generateBackoffFunction returns a valid retryablehttp.Backoff implementation which
// wraps the provided RetryConfig.Backoff implementation with optional "Retry-After"
// header support.
func generateBackoffFn(config *RetryConfig) retryablehttp.Backoff {
return func(_, _ time.Duration, attemptNum int, resp *nethttp.Response) time.Duration {

// If RetryAfter Is Enabled And Response is 429 / 503, Then Parse Any Retry-After Header Durations & Enforce Optional MaxDuration
var retryAfterDuration time.Duration
if config.RetryAfterEnabled && resp != nil &&
(resp.StatusCode == nethttp.StatusTooManyRequests || resp.StatusCode == nethttp.StatusServiceUnavailable) {
retryAfterDuration, _ = parseRetryAfterDuration(resp)
if config.RetryAfterMaxDuration > 0 && config.RetryAfterMaxDuration < retryAfterDuration {
retryAfterDuration = config.RetryAfterMaxDuration
}
}

// Calculate The RetryConfig Backoff Duration
backoffDuration := config.Backoff(attemptNum, resp)

// Return The Larger Of The Two Backoff Durations
if retryAfterDuration > backoffDuration {
return retryAfterDuration
} else {
return backoffDuration
}
travis-minke-sap marked this conversation as resolved.
Show resolved Hide resolved
}
}

// parseRetryAfterDuration returns a Duration expressing the amount of time
// requested to wait by a Retry-After header, or none if not present or invalid.
// According to the spec (https://tools.ietf.org/html/rfc7231#section-7.1.3)
// the Retry-After Header's value can be one of an HTTP-date or delay-seconds,
// both of which are supported here.
func parseRetryAfterDuration(resp *nethttp.Response) (time.Duration, error) {
var retryAfterDuration time.Duration
if resp != nil && resp.Header != nil {
retryAfterString := resp.Header.Get(RetryAfterHeader)
if len(retryAfterString) > 0 {
if retryAfterInt, parseIntErr := strconv.ParseInt(retryAfterString, 10, 64); parseIntErr == nil {
retryAfterDuration = time.Duration(retryAfterInt) * time.Second
} else {
retryAfterTime, parseTimeErr := nethttp.ParseTime(retryAfterString) // Supports http.TimeFormat, time.RFC850 & time.ANSIC
if parseTimeErr != nil {
return retryAfterDuration, fmt.Errorf("failed to parse Retry-After header: ParseInt Error = %v, ParseTime Error = %v", parseIntErr, parseTimeErr)
}
retryAfterDuration = time.Until(retryAfterTime)
}
}
}
return retryAfterDuration, nil
}
Loading