Skip to content

Commit

Permalink
Enhance DLQ CloudEvents with destination (#5727)
Browse files Browse the repository at this point in the history
* Enhance DLQ CloudEvents with destination

* PR feedback - updated comments
  • Loading branch information
travis-minke-sap authored Sep 15, 2021
1 parent deafb9b commit 21155e9
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
10 changes: 7 additions & 3 deletions pkg/channel/attributes/knative_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@ limitations under the License.
package attributes

import (
"net/url"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/transformer"
)

const (
KnativeErrorDestExtensionKey = "knativeerrordest"
KnativeErrorCodeExtensionKey = "knativeerrorcode"
KnativeErrorDataExtensionKey = "knativeerrordata"
KnativeErrorDataExtensionMaxLength = 1024
)

// KnativeErrorTransformers returns Transformers which add the specified error code and data extensions.
func KnativeErrorTransformers(code int, data string) binding.Transformers {
// KnativeErrorTransformers returns Transformers which add the specified destination and error code/data extensions.
func KnativeErrorTransformers(destination url.URL, code int, data string) binding.Transformers {
destTransformer := transformer.AddExtension(KnativeErrorDestExtensionKey, destination)
codeTransformer := transformer.AddExtension(KnativeErrorCodeExtensionKey, code)
if len(data) > KnativeErrorDataExtensionMaxLength {
data = data[:KnativeErrorDataExtensionMaxLength] // Truncate data to max length
}
dataTransformer := transformer.AddExtension(KnativeErrorDataExtensionKey, data)
return binding.Transformers{codeTransformer, dataTransformer}
return binding.Transformers{destTransformer, codeTransformer, dataTransformer}
}
18 changes: 17 additions & 1 deletion pkg/channel/attributes/knative_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package attributes
import (
"context"
"math/rand"
"net/url"
"testing"

"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -30,29 +31,43 @@ import (
// Test the KnativeErrorTransformers() functionality
func TestKnativeErrorTransformers(t *testing.T) {

// Test Data
destinationURL, _ := url.Parse("http://foo.bar.svc.cluster.local")

// Define the test cases
testCases := []struct {
name string
dest url.URL
code int
data string
}{
{
name: "Destination Empty",
dest: url.URL{},
code: 500,
data: "",
},
{
name: "Data Empty",
dest: *destinationURL,
code: 500,
data: "",
},
{
name: "Data Less Than Max Length",
dest: *destinationURL,
code: 500,
data: randomString(t, KnativeErrorDataExtensionMaxLength-1),
},
{
name: "Data Exactly Max Length",
dest: *destinationURL,
code: 500,
data: randomString(t, KnativeErrorDataExtensionMaxLength),
},
{
name: "Data More Than Max Length",
dest: *destinationURL,
code: 500,
data: randomString(t, KnativeErrorDataExtensionMaxLength+1),
},
Expand All @@ -65,12 +80,13 @@ func TestKnativeErrorTransformers(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {

// Get the KnativeErrorTransformers for the current testCase
knativeErrorDataTransformer := KnativeErrorTransformers(testCase.code, testCase.data)
knativeErrorDataTransformer := KnativeErrorTransformers(testCase.dest, testCase.code, testCase.data)

// Create the Transformer Input/Want Events
inputEvent := cetest.MinEvent()
inputMessage := binding.ToMessage(&inputEvent)
wantEvent := inputEvent.Clone()
wantEvent.SetExtension(KnativeErrorDestExtensionKey, testCase.dest)
wantEvent.SetExtension(KnativeErrorCodeExtensionKey, testCase.code)
data := testCase.data
if len(data) > KnativeErrorDataExtensionMaxLength {
Expand Down
19 changes: 12 additions & 7 deletions pkg/channel/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing/pkg/channel/attributes"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
Expand Down Expand Up @@ -71,7 +72,7 @@ type DispatchExecutionInfo struct {
ResponseBody []byte
}

// NewMessageDispatcherFromConfig creates a new Message dispatcher based on config.
// NewMessageDispatcher creates a new Message dispatcher based on config.
func NewMessageDispatcher(logger *zap.Logger) *MessageDispatcherImpl {
sender, err := kncloudevents.NewHTTPMessageSenderWithTarget("")
if err != nil {
Expand All @@ -80,7 +81,7 @@ func NewMessageDispatcher(logger *zap.Logger) *MessageDispatcherImpl {
return NewMessageDispatcherFromSender(logger, sender)
}

// NewMessageDispatcherFromConfig creates a new event dispatcher.
// NewMessageDispatcherFromSender creates a new Message dispatcher with specified sender.
func NewMessageDispatcherFromSender(logger *zap.Logger, sender *kncloudevents.HTTPMessageSender) *MessageDispatcherImpl {
return &MessageDispatcherImpl{
sender: sender,
Expand Down Expand Up @@ -122,7 +123,7 @@ func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context,
if err != nil {
// If DeadLetter is configured, then send original message with knative error extensions
if deadLetter != nil {
dispatchTransformers := d.dispatchExecutionInfoTransformers(dispatchExecutionInfo)
dispatchTransformers := d.dispatchExecutionInfoTransformers(destination, dispatchExecutionInfo)
_, deadLetterResponse, _, dispatchExecutionInfo, deadLetterErr := d.executeRequest(ctx, deadLetter, message, additionalHeaders, retriesConfig, append(transformers, dispatchTransformers)...)
if deadLetterErr != nil {
return dispatchExecutionInfo, fmt.Errorf("unable to complete request to either %s (%v) or %s (%v)", destination, err, deadLetter, deadLetterErr)
Expand Down Expand Up @@ -158,7 +159,7 @@ func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context,
if err != nil {
// If DeadLetter is configured, then send original message with knative error extensions
if deadLetter != nil {
dispatchTransformers := d.dispatchExecutionInfoTransformers(dispatchExecutionInfo)
dispatchTransformers := d.dispatchExecutionInfoTransformers(reply, dispatchExecutionInfo)
_, deadLetterResponse, _, dispatchExecutionInfo, deadLetterErr := d.executeRequest(ctx, deadLetter, message, responseAdditionalHeaders, retriesConfig, append(transformers, dispatchTransformers)...)
if deadLetterErr != nil {
return dispatchExecutionInfo, fmt.Errorf("failed to forward reply to %s (%v) and failed to send it to the dead letter sink %s (%v)", reply, err, deadLetter, deadLetterErr)
Expand Down Expand Up @@ -262,13 +263,17 @@ func (d *MessageDispatcherImpl) sanitizeURL(u *url.URL) *url.URL {
}
}

// dispatchExecutionTransformer returns Transformers based on the specified DispatchExecutionInfo
func (d *MessageDispatcherImpl) dispatchExecutionInfoTransformers(dispatchExecutionInfo *DispatchExecutionInfo) binding.Transformers {
// dispatchExecutionTransformer returns Transformers based on the specified destination and DispatchExecutionInfo
func (d *MessageDispatcherImpl) dispatchExecutionInfoTransformers(destination *url.URL, dispatchExecutionInfo *DispatchExecutionInfo) binding.Transformers {
if destination == nil {
destination = &url.URL{}
}
destination = d.sanitizeURL(destination)
// Unprintable control characters are not allowed in header values
// and cause HTTP requests to fail if not removed.
// https://pkg.go.dev/golang.org/x/net/http/httpguts#ValidHeaderFieldValue
httpBody := sanitizeHTTPBody(dispatchExecutionInfo.ResponseBody)
return attributes.KnativeErrorTransformers(dispatchExecutionInfo.ResponseCode, httpBody)
return attributes.KnativeErrorTransformers(*destination, dispatchExecutionInfo.ResponseCode, httpBody)
}

func sanitizeHTTPBody(body []byte) string {
Expand Down
5 changes: 5 additions & 0 deletions pkg/channel/message_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,11 @@ func TestDispatchMessage(t *testing.T) {
assertEquality(t, replyServer.URL, *tc.expectedReplyRequest, rv)
}
if tc.expectedDeadLetterRequest != nil {
if tc.sendToReply {
tc.expectedDeadLetterRequest.Headers.Set("ce-knativeerrordest", replyServer.URL+"/")
} else if tc.sendToDestination {
tc.expectedDeadLetterRequest.Headers.Set("ce-knativeerrordest", destServer.URL+"/")
}
rv := deadLetterSinkHandler.popRequest(t)
assertEquality(t, deadLetterSinkServer.URL, *tc.expectedDeadLetterRequest, rv)
}
Expand Down

0 comments on commit 21155e9

Please sign in to comment.