From ed5e0a7416073c88d245ac071dbf1bb17b3f6178 Mon Sep 17 00:00:00 2001 From: Teresaliu Date: Wed, 16 Nov 2022 20:31:39 +0800 Subject: [PATCH] Knative enhance failed events extensions in mt channel broker (#6569) Fixes #6541 Signed-off-by: Teresaliu [changyan.liu@intel.com](https://github.com/knative/eventing/pull/changyan.liu@intel.com) ## Proposed Changes - Add Reconciler Test of Channel to test failer extensions metadata - Add failed events extensions `knativeerrordest, knativeerrordata,knativeerrorcode` to MTChannelBroker and corresponding Reconciler Test to Broker ### Pre-review Checklist - [ ] **At least 80% unit test coverage** - [ ] **E2E tests** for any new behavior - [ ] **Docs PR** for any user-facing impact - [ ] **Spec PR** for any new API feature - [ ] **Conformance test** for any change to the spec **Release Note** ```release-note Add contextual information on why the message landed in deadletter sink of the MTChannel-based Broker or its Triggers. ``` **Docs** --- pkg/broker/err_extension.go | 25 +++ pkg/broker/filter/filter_handler.go | 99 +++++++-- pkg/channel/message_dispatcher.go | 22 +- .../message_dispatcher_test.go | 1 + test/rekt/broker_test.go | 15 ++ test/rekt/channel_test.go | 4 +- test/rekt/features/broker/feature.go | 205 ++++++++++++++++++ test/rekt/features/channel/features.go | 101 ++++++++- 8 files changed, 452 insertions(+), 20 deletions(-) create mode 100644 pkg/broker/err_extension.go diff --git a/pkg/broker/err_extension.go b/pkg/broker/err_extension.go new file mode 100644 index 00000000000..583fbfc6f47 --- /dev/null +++ b/pkg/broker/err_extension.go @@ -0,0 +1,25 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broker + +import "net/url" + +// ErrExtensionInfo struct store the broker-filter's destination and responsebody +type ErrExtensionInfo struct { + ErrDestination *url.URL `json:"errdestination"` + ErrResponseBody []byte `json:"errresponsebody"` +} diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index ba98e90d3cb..e8be6f9095e 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -18,9 +18,12 @@ package filter import ( "context" + "encoding/json" "errors" "fmt" + "io" "net/http" + "net/url" "strings" "time" @@ -31,6 +34,7 @@ import ( cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "go.opencensus.io/trace" "go.uber.org/zap" + channelAttributes "knative.dev/eventing/pkg/channel/attributes" "knative.dev/pkg/logging" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" @@ -57,6 +61,18 @@ const ( defaultMaxIdleConnectionsPerHost = 100 ) +const ( + // NoResponse signals the step that send event to trigger's subscriber hasn't started + NoResponse = -1 +) + +// ErrHandler handle the different errors of filter dispatch process +type ErrHandler struct { + ResponseCode int + ResponseBody []byte + err error +} + // HeaderProxyAllowList contains the headers that are proxied from the reply; other than the CloudEvents headers. // Other headers are not proxied because of security concerns. var HeaderProxyAllowList = map[string]struct{}{ @@ -206,34 +222,64 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { h.reportArrivalTime(event, reportArgs) - h.send(ctx, writer, request.Header, subscriberURI.String(), reportArgs, event, ttl) + h.send(ctx, writer, request.Header, subscriberURI.URL(), reportArgs, event, ttl) } -func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target string, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) { +func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target *url.URL, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) { // send the event to trigger's subscriber - response, err := h.sendEvent(ctx, headers, target, event, reportArgs) - if err != nil { - h.logger.Error("failed to send event", zap.Error(err)) - writer.WriteHeader(http.StatusInternalServerError) - _ = h.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError) + response, responseErr := h.sendEvent(ctx, headers, target, event, reportArgs) + + if responseErr.err != nil { + h.logger.Error("failed to send event", zap.Error(responseErr.err)) + // If error is not because of the response, it should respond with http.StatusInternalServerError + if responseErr.ResponseCode == NoResponse { + + writer.WriteHeader(http.StatusInternalServerError) + _ = h.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError) + return + } + // If error has a response propagate subscriber's headers back to channel + if response != nil { + proxyHeaders(response.Header, writer) + } + writer.WriteHeader(responseErr.ResponseCode) + + // Read Response body to responseErr + errExtensionInfo := broker.ErrExtensionInfo{ + ErrDestination: target, + ErrResponseBody: responseErr.ResponseBody, + } + errExtensionBytes, msErr := json.Marshal(errExtensionInfo) + if msErr != nil { + h.logger.Error("failed to marshal errExtensionInfo", zap.Error(msErr)) + return + } + _, _ = writer.Write(errExtensionBytes) + _ = h.reporter.ReportEventCount(reportArgs, responseErr.ResponseCode) + return } - h.logger.Debug("Successfully dispatched message", zap.Any("target", target)) + h.logger.Debug("Successfully dispatched message", zap.Any("target", target.String())) // If there is an event in the response write it to the response - statusCode, err := h.writeResponse(ctx, writer, response, ttl, target) + statusCode, err := h.writeResponse(ctx, writer, response, ttl, target.String()) if err != nil { h.logger.Error("failed to write response", zap.Error(err)) } _ = h.reporter.ReportEventCount(reportArgs, statusCode) } -func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target string, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, error) { +func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target *url.URL, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, ErrHandler) { + responseErr := ErrHandler{ + ResponseCode: NoResponse, + } + // Send the event to the subscriber - req, err := h.sender.NewCloudEventRequestWithTarget(ctx, target) + req, err := h.sender.NewCloudEventRequestWithTarget(ctx, target.String()) if err != nil { - return nil, fmt.Errorf("failed to create the request: %w", err) + responseErr.err = fmt.Errorf("failed to create the request: %w", err) + return nil, responseErr } message := binding.ToMessage(event) @@ -246,24 +292,47 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target str err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders) if err != nil { - return nil, fmt.Errorf("failed to write request: %w", err) + responseErr.err = fmt.Errorf("failed to write request: %w", err) + return nil, responseErr } start := time.Now() resp, err := h.sender.Send(req) dispatchTime := time.Since(start) if err != nil { - err = fmt.Errorf("failed to dispatch message: %w", err) + responseErr.ResponseCode = http.StatusInternalServerError + responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error())) + responseErr.err = fmt.Errorf("failed to dispatch message: %w", err) + return resp, responseErr } sc := 0 if resp != nil { sc = resp.StatusCode + responseErr.ResponseCode = sc } _ = h.reporter.ReportEventDispatchTime(reporterArgs, sc, dispatchTime) - return resp, err + if resp.StatusCode < http.StatusOK || + resp.StatusCode >= http.StatusMultipleChoices { + // Read response body into errHandler for failures + body := make([]byte, channelAttributes.KnativeErrorDataExtensionMaxLength) + + readLen, readErr := resp.Body.Read(body) + if readErr != nil && readErr != io.EOF { + h.logger.Error("failed to read response body into DispatchExecutionInfo", zap.Error(readErr)) + responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", readErr.Error())) + } else { + responseErr.ResponseBody = body[:readLen] + } + responseErr.err = fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", resp.StatusCode) + + // Reject non-successful responses. + return resp, responseErr + } + + return resp, responseErr } // The return values are the status diff --git a/pkg/channel/message_dispatcher.go b/pkg/channel/message_dispatcher.go index 54de7eaadee..47e014aaf54 100644 --- a/pkg/channel/message_dispatcher.go +++ b/pkg/channel/message_dispatcher.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/base64" + "encoding/json" "fmt" "io" nethttp "net/http" @@ -33,10 +34,13 @@ import ( "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/channel/attributes" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" "knative.dev/eventing/pkg/utils" + "knative.dev/pkg/network" + "knative.dev/pkg/system" ) const ( @@ -291,16 +295,30 @@ func (d *MessageDispatcherImpl) dispatchExecutionInfoTransformers(destination *u if destination == nil { destination = &url.URL{} } + + httpResponseBody := dispatchExecutionInfo.ResponseBody + if destination.Host == network.GetServiceHostname("broker-filter", system.Namespace()) { + + var errExtensionInfo broker.ErrExtensionInfo + + err := json.Unmarshal(dispatchExecutionInfo.ResponseBody, &errExtensionInfo) + if err != nil { + d.logger.Debug("Unmarshal dispatchExecutionInfo ResponseBody failed", zap.Error(err)) + return nil + } + destination = errExtensionInfo.ErrDestination + httpResponseBody = errExtensionInfo.ErrResponseBody + } + destination = d.sanitizeURL(destination) // Unprintable control characters are not allowed in header values // and cause HTTP requests to fail if not removed. // https://pkg.go.dev/golang.org/x/net/http/httpguts#ValidHeaderFieldValue - httpBody := sanitizeHTTPBody(dispatchExecutionInfo.ResponseBody) + httpBody := sanitizeHTTPBody(httpResponseBody) // Encodes response body as base64 for the resulting length. bodyLen := len(httpBody) encodedLen := base64.StdEncoding.EncodedLen(bodyLen) - if encodedLen > attributes.KnativeErrorDataExtensionMaxLength { encodedLen = attributes.KnativeErrorDataExtensionMaxLength } diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/message_dispatcher_test.go index cae1010497f..e245417afce 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/message_dispatcher_test.go @@ -44,6 +44,7 @@ import ( "knative.dev/eventing/pkg/kncloudevents" logtesting "knative.dev/pkg/logging/testing" + _ "knative.dev/pkg/system/testing" ) func TestNewMessageDispatcher(t *testing.T) { diff --git a/test/rekt/broker_test.go b/test/rekt/broker_test.go index 8d37414bb67..03e18c2a541 100644 --- a/test/rekt/broker_test.go +++ b/test/rekt/broker_test.go @@ -151,3 +151,18 @@ func TestBrokerRedelivery(t *testing.T) { env.TestSet(ctx, t, broker.BrokerRedelivery()) } + +func TestBrokerDeadLetterSinkExtensions(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + environment.WithPollTimings(5*time.Second, 4*time.Minute), + ) + + env.TestSet(ctx, t, broker.BrokerDeadLetterSinkExtensions()) +} diff --git a/test/rekt/channel_test.go b/test/rekt/channel_test.go index 1ffe6f800a2..96ccfdb875d 100644 --- a/test/rekt/channel_test.go +++ b/test/rekt/channel_test.go @@ -301,7 +301,7 @@ func TestChannelPreferHeaderCheck(t *testing.T) { env.Test(ctx, t, channel.ChannelPreferHeaderCheck(createSubscriberFn)) } -func TestChannelSubscriptionReturnedErrorData(t *testing.T) { +func TestChannelDeadLetterSinkExtensions(t *testing.T) { t.Parallel() ctx, env := global.Environment( @@ -316,5 +316,5 @@ func TestChannelSubscriptionReturnedErrorData(t *testing.T) { return subscription.WithSubscriber(ref, uri) } - env.Test(ctx, t, channel.ChannelSubscriptionReturnedErrorData(createSubscriberFn)) + env.TestSet(ctx, t, channel.ChannelDeadLetterSinkExtensions(createSubscriberFn)) } diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index 4e55cc4d97d..7c41aea7c3f 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -17,6 +17,9 @@ limitations under the License. package broker import ( + "context" + "encoding/base64" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" @@ -200,3 +203,205 @@ func brokerRedeliveryDropN(retryNum int32, dropNum uint) *feature.Feature { return f } + +func BrokerDeadLetterSinkExtensions() *feature.FeatureSet { + fs := &feature.FeatureSet{ + Name: "Knative Broker - DeadLetterSink - with Extensions", + + Features: []*feature.Feature{ + brokerSubscriberUnreachable(), + brokerSubscriberErrorNodata(), + brokerSubscriberErrorWithdata(), + }, + } + return fs +} + +func brokerSubscriberUnreachable() *feature.Feature { + f := feature.NewFeatureNamed("Broker Subscriber Unreachable") + + source := feature.MakeRandomK8sName("source") + sink := feature.MakeRandomK8sName("sink") + triggerName := feature.MakeRandomK8sName("triggerName") + + eventSource := "source1" + eventType := "type1" + eventBody := `{"msg":"test msg"}` + event := cloudevents.NewEvent() + event.SetID(uuid.New().String()) + event.SetType(eventType) + event.SetSource(eventSource) + event.SetData(cloudevents.ApplicationJSON, []byte(eventBody)) + + //Install the broker + brokerName := feature.MakeRandomK8sName("broker") + f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Requirement("broker is ready", broker.IsReady(brokerName)) + f.Requirement("broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + // Install the trigger and Point the Trigger subscriber to the sink svc. + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(nil, "http://fake.svc.cluster.local"), + trigger.WithDeadLetterSink(svc.AsKReference(sink), ""), + )) + f.Setup("trigger goes ready", trigger.IsReady(triggerName)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(event), + )) + + f.Assert("Receives dls extensions when subscriber is unreachable", + eventasssert.OnStore(sink). + MatchEvent( + test.HasExtension("knativeerrordest", "http://fake.svc.cluster.local"), + ). + AtLeast(1), + ) + return f +} + +func brokerSubscriberErrorNodata() *feature.Feature { + f := feature.NewFeatureNamed("Broker Subscriber Error Nodata") + + source := feature.MakeRandomK8sName("source") + sink := feature.MakeRandomK8sName("sink") + failer := feature.MakeRandomK8sName("failer") + triggerName := feature.MakeRandomK8sName("triggerName") + + eventSource := "source1" + eventType := "type1" + eventBody := `{"msg":"test msg"}` + event := cloudevents.NewEvent() + event.SetID(uuid.New().String()) + event.SetType(eventType) + event.SetSource(eventSource) + event.SetData(cloudevents.ApplicationJSON, []byte(eventBody)) + + //Install the broker + brokerName := feature.MakeRandomK8sName("broker") + f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Requirement("broker is ready", broker.IsReady(brokerName)) + f.Requirement("broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + f.Setup("install failing receiver", eventshub.Install(failer, + eventshub.StartReceiver, + eventshub.DropFirstN(1), + eventshub.DropEventsResponseCode(422), + )) + + // Install the trigger and Point the Trigger subscriber to the sink svc. + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(svc.AsKReference(failer), ""), + trigger.WithDeadLetterSink(svc.AsKReference(sink), ""), + )) + f.Setup("trigger goes ready", trigger.IsReady(triggerName)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(event), + )) + + f.Assert("Receives dls extensions without errordata", assertEnhancedWithKnativeErrorExtensions( + sink, + func(ctx context.Context) test.EventMatcher { + failerAddress, _ := svc.Address(ctx, failer) + return test.HasExtension("knativeerrordest", failerAddress.String()) + }, + func(ctx context.Context) test.EventMatcher { + return test.HasExtension("knativeerrorcode", "422") + }, + )) + + return f +} + +func brokerSubscriberErrorWithdata() *feature.Feature { + f := feature.NewFeatureNamed("Broker Subscriber Error With data encoded") + + source := feature.MakeRandomK8sName("source") + sink := feature.MakeRandomK8sName("sink") + failer := feature.MakeRandomK8sName("failer") + triggerName := feature.MakeRandomK8sName("triggerName") + + eventSource := "source1" + eventType := "type1" + eventBody := `{"msg":"test msg"}` + event := cloudevents.NewEvent() + event.SetID(uuid.New().String()) + event.SetType(eventType) + event.SetSource(eventSource) + event.SetData(cloudevents.ApplicationJSON, []byte(eventBody)) + + //Install the broker + brokerName := feature.MakeRandomK8sName("broker") + f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Requirement("broker is ready", broker.IsReady(brokerName)) + f.Requirement("broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + errorData := `{ "message": "catastrophic failure" }` + f.Setup("install failing receiver", eventshub.Install(failer, + eventshub.StartReceiver, + eventshub.DropFirstN(1), + eventshub.DropEventsResponseCode(422), + eventshub.DropEventsResponseBody(errorData), + )) + + // Install the trigger and Point the Trigger subscriber to the sink svc. + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(svc.AsKReference(failer), ""), + trigger.WithDeadLetterSink(svc.AsKReference(sink), ""), + )) + f.Setup("trigger goes ready", trigger.IsReady(triggerName)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(event), + )) + + f.Assert("Receives dls extensions with errordata", assertEnhancedWithKnativeErrorExtensions( + sink, + func(ctx context.Context) test.EventMatcher { + failerAddress, _ := svc.Address(ctx, failer) + return test.HasExtension("knativeerrordest", failerAddress.String()) + }, + func(ctx context.Context) test.EventMatcher { + return test.HasExtension("knativeerrorcode", "422") + }, + func(ctx context.Context) test.EventMatcher { + return test.HasExtension("knativeerrordata", base64.StdEncoding.EncodeToString([]byte(errorData))) + }, + )) + + return f +} + +func assertEnhancedWithKnativeErrorExtensions(sinkName string, matcherfns ...func(ctx context.Context) test.EventMatcher) feature.StepFn { + return func(ctx context.Context, t feature.T) { + matchers := make([]test.EventMatcher, len(matcherfns)) + for i, fn := range matcherfns { + matchers[i] = fn(ctx) + } + _ = eventshub.StoreFromContext(ctx, sinkName).AssertExact( + t, + 1, + eventasssert.MatchKind(eventshub.EventReceived), + eventasssert.MatchEvent(matchers...), + ) + } +} diff --git a/test/rekt/features/channel/features.go b/test/rekt/features/channel/features.go index 6bd8d0af920..f393e2b2b06 100644 --- a/test/rekt/features/channel/features.go +++ b/test/rekt/features/channel/features.go @@ -323,7 +323,106 @@ func ChannelPreferHeaderCheck(createSubscriberFn func(ref *duckv1.KReference, ur return f } -func ChannelSubscriptionReturnedErrorData(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { +func ChannelDeadLetterSinkExtensions(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.FeatureSet { + fs := &feature.FeatureSet{ + Name: "Knative Channel - DeadLetterSink - with Extensions", + Features: []*feature.Feature{ + channelSubscriberUnreachable(createSubscriberFn), + channelSubscriberReturnedErrorNoData(createSubscriberFn), + channelSubscriberReturnedErrorWithData(createSubscriberFn), + }, + } + return fs +} + +func channelSubscriberUnreachable(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { + f := feature.NewFeature() + sink := feature.MakeRandomK8sName("sink") + + sourceName := feature.MakeRandomK8sName("source") + channelName := feature.MakeRandomK8sName("channel") + + ev := test.FullEvent() + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + f.Setup("install channel", channel_impl.Install(channelName, delivery.WithDeadLetterSink(svc.AsKReference(sink), ""))) + + f.Setup("install subscription", subscription.Install(feature.MakeRandomK8sName("subscription"), + subscription.WithChannel(channel_impl.AsRef(channelName)), + createSubscriberFn(nil, "http://fake.svc.cluster.local"), + )) + + f.Requirement("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(channel_impl.GVR(), channelName), + eventshub.InputEvent(ev), + )) + + f.Setup("channel is ready", channel_impl.IsReady(channelName)) + f.Setup("channel is addressable", channel_impl.IsAddressable(channelName)) + + f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(channelName, channel_impl.GVR())) + + f.Assert("Receives dls extensions when subscriber is unreachable", eventasssert.OnStore(sink). + MatchEvent( + test.HasExtension("knativeerrordest", "http://fake.svc.cluster.local")). + AtLeast(1), + ) + + return f +} + +func channelSubscriberReturnedErrorNoData(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { + f := feature.NewFeature() + sink := feature.MakeRandomK8sName("sink") + + sourceName := feature.MakeRandomK8sName("source") + failer := feature.MakeRandomK8sName("failerWitdata") + channelName := feature.MakeRandomK8sName("channel") + + ev := test.FullEvent() + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + f.Setup("install failing receiver", eventshub.Install(failer, + eventshub.StartReceiver, + eventshub.DropFirstN(1), + eventshub.DropEventsResponseCode(422), + )) + f.Setup("install channel", channel_impl.Install(channelName, delivery.WithDeadLetterSink(svc.AsKReference(sink), ""))) + + f.Setup("install subscription", subscription.Install(feature.MakeRandomK8sName("subscription"), + subscription.WithChannel(channel_impl.AsRef(channelName)), + createSubscriberFn(svc.AsKReference(failer), ""), + )) + + f.Requirement("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(channel_impl.GVR(), channelName), + eventshub.InputEvent(ev), + )) + + f.Setup("channel is ready", channel_impl.IsReady(channelName)) + f.Setup("channel is addressable", channel_impl.IsAddressable(channelName)) + + f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(channelName, channel_impl.GVR())) + + f.Assert("Receives dls extensions without errordata", assertEnhancedWithKnativeErrorExtensions( + sink, + func(ctx context.Context) test.EventMatcher { + failerAddress, _ := svc.Address(ctx, failer) + return test.HasExtension("knativeerrordest", failerAddress.String()) + }, + func(ctx context.Context) test.EventMatcher { + return test.HasExtension("knativeerrorcode", "422") + }, + )) + + return f +} + +func channelSubscriberReturnedErrorWithData(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { f := feature.NewFeature() sink := feature.MakeRandomK8sName("sink")