diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 43fd03b3190..c5bfb483a54 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -274,6 +274,7 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, writer.WriteHeader(http.StatusBadGateway) return http.StatusBadGateway, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be either empty or a valid CloudEvent") } + proxyHeaders(resp.Header, writer) // Proxy original Response Headers for downstream use h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target)) writer.WriteHeader(resp.StatusCode) return resp.StatusCode, nil @@ -297,6 +298,9 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, eventResponse := binding.ToMessage(event) defer eventResponse.Finish(nil) + // Proxy the original Response Headers for downstream use + proxyHeaders(resp.Header, writer) + if err := cehttp.WriteResponseWriter(ctx, eventResponse, resp.StatusCode, writer); err != nil { return http.StatusInternalServerError, fmt.Errorf("failed to write response event: %w", err) } @@ -352,3 +356,12 @@ func triggerFilterAttribute(filter *eventingv1.TriggerFilter, attributeName stri } return attributeValue } + +// proxyHeaders adds the specified HTTP Headers to the ResponseWriter. +func proxyHeaders(httpHeader http.Header, writer http.ResponseWriter) { + for headerKey, headerValues := range httpHeader { + for _, headerValue := range headerValues { + writer.Header().Add(headerKey, headerValue) + } + } +} diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index db10abd1093..ac491320c44 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -79,6 +79,7 @@ func TestReceiver(t *testing.T) { expectedEventDispatchTime bool expectedEventProcessingTime bool response *http.Response + responseHeaders http.Header }{ "Not POST": { request: httptest.NewRequest(http.MethodGet, validPath, nil), @@ -263,6 +264,8 @@ func TestReceiver(t *testing.T) { request.Header.Set("Traceparent", "0") // Knative-Foo will pass as a prefix match. request.Header.Set("Knative-Foo", "baz") + // X-B3-Foo will pass as a prefix match. + request.Header.Set("X-B3-Foo", "bing") // X-Request-Id will pass as an exact header match. request.Header.Set("X-Request-Id", "123") // Content-Type will not pass filtering. @@ -275,6 +278,8 @@ func TestReceiver(t *testing.T) { "X-Request-Id": []string{"123"}, // Knative-Foo will pass as a prefix match. "Knative-Foo": []string{"baz"}, + // X-B3-Foo will pass as a prefix match. + "X-B3-Foo": []string{"bing"}, // Prefer: reply will be added for every request as defined in the spec. "Prefer": []string{"reply"}, }, @@ -379,17 +384,39 @@ func TestReceiver(t *testing.T) { expectedStatus: http.StatusAccepted, response: makeEmptyResponse(202), }, + "Proxy CloudEvent response headers": { + triggers: []*eventingv1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("", "")), + }, + expectedDispatch: true, + expectedEventCount: true, + expectedEventDispatchTime: true, + returnedEvent: makeDifferentEvent(), + responseHeaders: http.Header{"Test-Header": []string{"TestValue"}}, + }, + "Proxy empty non event response headers": { + triggers: []*eventingv1.Trigger{ + makeTrigger(makeTriggerFilterWithAttributes("", "")), + }, + expectedDispatch: true, + expectedEventCount: true, + expectedEventDispatchTime: true, + expectedStatus: http.StatusTooManyRequests, + response: makeEmptyResponse(http.StatusTooManyRequests), + responseHeaders: http.Header{"Retry-After": []string{"10"}}, + }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { fh := fakeHandler{ - failRequest: tc.requestFails, - failStatus: tc.failureStatus, - returnedEvent: tc.returnedEvent, - headers: tc.expectedHeaders, - t: t, - response: tc.response, + failRequest: tc.requestFails, + failStatus: tc.failureStatus, + returnedEvent: tc.returnedEvent, + headers: tc.expectedHeaders, + t: t, + response: tc.response, + responseHeaders: tc.responseHeaders, } s := httptest.NewServer(&fh) defer s.Close() @@ -444,6 +471,14 @@ func TestReceiver(t *testing.T) { response := responseWriter.Result() + if tc.expectedStatus != http.StatusInternalServerError && tc.expectedStatus != http.StatusBadGateway { + for expectedHeaderKey, expectedHeaderValues := range tc.responseHeaders { + if response.Header[expectedHeaderKey] == nil || response.Header[expectedHeaderKey][0] != expectedHeaderValues[0] { + t.Errorf("Response header proxy failed for header '%v'. Expected %v, Actual %v", expectedHeaderKey, expectedHeaderValues[0], response.Header[expectedHeaderKey]) + } + } + } + if tc.expectedStatus != 0 && tc.expectedStatus != response.StatusCode { t.Errorf("Unexpected status. Expected %v. Actual %v.", tc.expectedStatus, response.StatusCode) } @@ -543,6 +578,7 @@ type fakeHandler struct { returnedEvent *cloudevents.Event t *testing.T response *http.Response + responseHeaders http.Header } func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { @@ -576,6 +612,9 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { if h.returnedEvent != nil { message := binding.ToMessage(h.returnedEvent) defer message.Finish(nil) + for k, v := range h.responseHeaders { + resp.Header().Set(k, v[0]) + } err := cehttp.WriteResponseWriter(context.Background(), message, http.StatusAccepted, resp) if err != nil { h.t.Fatalf("Unable to write body: %v", err) @@ -585,6 +624,9 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { for k, v := range h.response.Header { resp.Header().Set(k, v[0]) } + for k, v := range h.responseHeaders { + resp.Header().Add(k, v[0]) + } resp.WriteHeader(h.response.StatusCode) if h.response.Body != nil { defer h.response.Body.Close()