Skip to content

Commit

Permalink
Knative enhance failed events extensions in mt channel broker (#6569)
Browse files Browse the repository at this point in the history
Fixes #6541  
Signed-off-by: Teresaliu
[changyan.liu@intel.com](https://github.com/knative/eventing/pull/changyan.liu@intel.com)

<!-- Please include the 'why' behind your changes if no issue exists -->

## Proposed Changes

<!-- Please categorize your changes:
- 🎁 Add new feature
- 🐛 Fix bug
- 🧹 Update or clean up current behavior
- 🗑️ Remove feature or internal logic
-->

- Add Reconciler Test of Channel to test failer extensions metadata
- Add failed events extensions `knativeerrordest,
knativeerrordata,knativeerrorcode` to MTChannelBroker and corresponding
Reconciler Test to Broker

### Pre-review Checklist

<!-- If these boxes are not checked, you will be asked to complete these
requirements or explain why they do not apply to your PR. -->

- [ ] **At least 80% unit test coverage**
- [ ] **E2E tests** for any new behavior
- [ ] **Docs PR** for any user-facing impact
- [ ] **Spec PR** for any new API feature
- [ ] **Conformance test** for any change to the spec

**Release Note**

<!--
📄 If this change has user-visible impact, write a release
note in the block
below. Include the string "action required" if additional action is
required of
users switching to the new release, for example in case of a breaking
change.

Write as if you are speaking to users, not other Knative contributors.
If this
change has no user-visible impact, no release note is needed.
-->

```release-note
Add contextual information on why the message landed in deadletter sink of the MTChannel-based Broker or its Triggers.
```


**Docs**

<!--
📖 If this change has user-visible impact, link to an issue or PR in
https://github.com/knative/docs.
-->
  • Loading branch information
liuchangyan authored Nov 16, 2022
1 parent e8fec2a commit ed5e0a7
Show file tree
Hide file tree
Showing 8 changed files with 452 additions and 20 deletions.
25 changes: 25 additions & 0 deletions pkg/broker/err_extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2022 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package broker

import "net/url"

// ErrExtensionInfo struct store the broker-filter's destination and responsebody
type ErrExtensionInfo struct {
ErrDestination *url.URL `json:"errdestination"`
ErrResponseBody []byte `json:"errresponsebody"`
}
99 changes: 84 additions & 15 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package filter

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

Expand All @@ -31,6 +34,7 @@ import (
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"go.opencensus.io/trace"
"go.uber.org/zap"
channelAttributes "knative.dev/eventing/pkg/channel/attributes"
"knative.dev/pkg/logging"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
Expand All @@ -57,6 +61,18 @@ const (
defaultMaxIdleConnectionsPerHost = 100
)

const (
// NoResponse signals the step that send event to trigger's subscriber hasn't started
NoResponse = -1
)

// ErrHandler handle the different errors of filter dispatch process
type ErrHandler struct {
ResponseCode int
ResponseBody []byte
err error
}

// HeaderProxyAllowList contains the headers that are proxied from the reply; other than the CloudEvents headers.
// Other headers are not proxied because of security concerns.
var HeaderProxyAllowList = map[string]struct{}{
Expand Down Expand Up @@ -206,34 +222,64 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

h.reportArrivalTime(event, reportArgs)

h.send(ctx, writer, request.Header, subscriberURI.String(), reportArgs, event, ttl)
h.send(ctx, writer, request.Header, subscriberURI.URL(), reportArgs, event, ttl)
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target string, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) {
func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target *url.URL, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) {
// send the event to trigger's subscriber
response, err := h.sendEvent(ctx, headers, target, event, reportArgs)
if err != nil {
h.logger.Error("failed to send event", zap.Error(err))
writer.WriteHeader(http.StatusInternalServerError)
_ = h.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError)
response, responseErr := h.sendEvent(ctx, headers, target, event, reportArgs)

if responseErr.err != nil {
h.logger.Error("failed to send event", zap.Error(responseErr.err))
// If error is not because of the response, it should respond with http.StatusInternalServerError
if responseErr.ResponseCode == NoResponse {

writer.WriteHeader(http.StatusInternalServerError)
_ = h.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError)
return
}
// If error has a response propagate subscriber's headers back to channel
if response != nil {
proxyHeaders(response.Header, writer)
}
writer.WriteHeader(responseErr.ResponseCode)

// Read Response body to responseErr
errExtensionInfo := broker.ErrExtensionInfo{
ErrDestination: target,
ErrResponseBody: responseErr.ResponseBody,
}
errExtensionBytes, msErr := json.Marshal(errExtensionInfo)
if msErr != nil {
h.logger.Error("failed to marshal errExtensionInfo", zap.Error(msErr))
return
}
_, _ = writer.Write(errExtensionBytes)
_ = h.reporter.ReportEventCount(reportArgs, responseErr.ResponseCode)

return
}

h.logger.Debug("Successfully dispatched message", zap.Any("target", target))
h.logger.Debug("Successfully dispatched message", zap.Any("target", target.String()))

// If there is an event in the response write it to the response
statusCode, err := h.writeResponse(ctx, writer, response, ttl, target)
statusCode, err := h.writeResponse(ctx, writer, response, ttl, target.String())
if err != nil {
h.logger.Error("failed to write response", zap.Error(err))
}
_ = h.reporter.ReportEventCount(reportArgs, statusCode)
}

func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target string, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, error) {
func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target *url.URL, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, ErrHandler) {
responseErr := ErrHandler{
ResponseCode: NoResponse,
}

// Send the event to the subscriber
req, err := h.sender.NewCloudEventRequestWithTarget(ctx, target)
req, err := h.sender.NewCloudEventRequestWithTarget(ctx, target.String())
if err != nil {
return nil, fmt.Errorf("failed to create the request: %w", err)
responseErr.err = fmt.Errorf("failed to create the request: %w", err)
return nil, responseErr
}

message := binding.ToMessage(event)
Expand All @@ -246,24 +292,47 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target str

err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders)
if err != nil {
return nil, fmt.Errorf("failed to write request: %w", err)
responseErr.err = fmt.Errorf("failed to write request: %w", err)
return nil, responseErr
}

start := time.Now()
resp, err := h.sender.Send(req)
dispatchTime := time.Since(start)
if err != nil {
err = fmt.Errorf("failed to dispatch message: %w", err)
responseErr.ResponseCode = http.StatusInternalServerError
responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error()))
responseErr.err = fmt.Errorf("failed to dispatch message: %w", err)
return resp, responseErr
}

sc := 0
if resp != nil {
sc = resp.StatusCode
responseErr.ResponseCode = sc
}

_ = h.reporter.ReportEventDispatchTime(reporterArgs, sc, dispatchTime)

return resp, err
if resp.StatusCode < http.StatusOK ||
resp.StatusCode >= http.StatusMultipleChoices {
// Read response body into errHandler for failures
body := make([]byte, channelAttributes.KnativeErrorDataExtensionMaxLength)

readLen, readErr := resp.Body.Read(body)
if readErr != nil && readErr != io.EOF {
h.logger.Error("failed to read response body into DispatchExecutionInfo", zap.Error(readErr))
responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", readErr.Error()))
} else {
responseErr.ResponseBody = body[:readLen]
}
responseErr.err = fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", resp.StatusCode)

// Reject non-successful responses.
return resp, responseErr
}

return resp, responseErr
}

// The return values are the status
Expand Down
22 changes: 20 additions & 2 deletions pkg/channel/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
nethttp "net/http"
Expand All @@ -33,10 +34,13 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/channel/attributes"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/network"
"knative.dev/pkg/system"
)

const (
Expand Down Expand Up @@ -291,16 +295,30 @@ func (d *MessageDispatcherImpl) dispatchExecutionInfoTransformers(destination *u
if destination == nil {
destination = &url.URL{}
}

httpResponseBody := dispatchExecutionInfo.ResponseBody
if destination.Host == network.GetServiceHostname("broker-filter", system.Namespace()) {

var errExtensionInfo broker.ErrExtensionInfo

err := json.Unmarshal(dispatchExecutionInfo.ResponseBody, &errExtensionInfo)
if err != nil {
d.logger.Debug("Unmarshal dispatchExecutionInfo ResponseBody failed", zap.Error(err))
return nil
}
destination = errExtensionInfo.ErrDestination
httpResponseBody = errExtensionInfo.ErrResponseBody
}

destination = d.sanitizeURL(destination)
// Unprintable control characters are not allowed in header values
// and cause HTTP requests to fail if not removed.
// https://pkg.go.dev/golang.org/x/net/http/httpguts#ValidHeaderFieldValue
httpBody := sanitizeHTTPBody(dispatchExecutionInfo.ResponseBody)
httpBody := sanitizeHTTPBody(httpResponseBody)

// Encodes response body as base64 for the resulting length.
bodyLen := len(httpBody)
encodedLen := base64.StdEncoding.EncodedLen(bodyLen)

if encodedLen > attributes.KnativeErrorDataExtensionMaxLength {
encodedLen = attributes.KnativeErrorDataExtensionMaxLength
}
Expand Down
1 change: 1 addition & 0 deletions pkg/inmemorychannel/message_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"knative.dev/eventing/pkg/kncloudevents"

logtesting "knative.dev/pkg/logging/testing"
_ "knative.dev/pkg/system/testing"
)

func TestNewMessageDispatcher(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions test/rekt/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,18 @@ func TestBrokerRedelivery(t *testing.T) {

env.TestSet(ctx, t, broker.BrokerRedelivery())
}

func TestBrokerDeadLetterSinkExtensions(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
environment.WithPollTimings(5*time.Second, 4*time.Minute),
)

env.TestSet(ctx, t, broker.BrokerDeadLetterSinkExtensions())
}
4 changes: 2 additions & 2 deletions test/rekt/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestChannelPreferHeaderCheck(t *testing.T) {
env.Test(ctx, t, channel.ChannelPreferHeaderCheck(createSubscriberFn))
}

func TestChannelSubscriptionReturnedErrorData(t *testing.T) {
func TestChannelDeadLetterSinkExtensions(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
Expand All @@ -316,5 +316,5 @@ func TestChannelSubscriptionReturnedErrorData(t *testing.T) {
return subscription.WithSubscriber(ref, uri)
}

env.Test(ctx, t, channel.ChannelSubscriptionReturnedErrorData(createSubscriberFn))
env.TestSet(ctx, t, channel.ChannelDeadLetterSinkExtensions(createSubscriberFn))
}
Loading

0 comments on commit ed5e0a7

Please sign in to comment.