Skip to content

Commit

Permalink
Enhance broker filter to proxy response headers.
Browse files Browse the repository at this point in the history
  • Loading branch information
travis-minke-sap committed Dec 1, 2021
1 parent 24e133d commit 1e1623a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 6 deletions.
13 changes: 13 additions & 0 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter,
writer.WriteHeader(http.StatusBadGateway)
return http.StatusBadGateway, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be either empty or a valid CloudEvent")
}
proxyHeaders(resp.Header, writer) // Proxy original Response Headers for downstream use
h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target))
writer.WriteHeader(resp.StatusCode)
return resp.StatusCode, nil
Expand All @@ -297,6 +298,9 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter,
eventResponse := binding.ToMessage(event)
defer eventResponse.Finish(nil)

// Proxy the original Response Headers for downstream use
proxyHeaders(resp.Header, writer)

if err := cehttp.WriteResponseWriter(ctx, eventResponse, resp.StatusCode, writer); err != nil {
return http.StatusInternalServerError, fmt.Errorf("failed to write response event: %w", err)
}
Expand Down Expand Up @@ -352,3 +356,12 @@ func triggerFilterAttribute(filter *eventingv1.TriggerFilter, attributeName stri
}
return attributeValue
}

// proxyHeaders adds the specified HTTP Headers to the ResponseWriter.
func proxyHeaders(httpHeader http.Header, writer http.ResponseWriter) {
for headerKey, headerValues := range httpHeader {
for _, headerValue := range headerValues {
writer.Header().Add(headerKey, headerValue)
}
}
}
54 changes: 48 additions & 6 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestReceiver(t *testing.T) {
expectedEventDispatchTime bool
expectedEventProcessingTime bool
response *http.Response
responseHeaders http.Header
}{
"Not POST": {
request: httptest.NewRequest(http.MethodGet, validPath, nil),
Expand Down Expand Up @@ -263,6 +264,8 @@ func TestReceiver(t *testing.T) {
request.Header.Set("Traceparent", "0")
// Knative-Foo will pass as a prefix match.
request.Header.Set("Knative-Foo", "baz")
// X-B3-Foo will pass as a prefix match.
request.Header.Set("X-B3-Foo", "bing")
// X-Request-Id will pass as an exact header match.
request.Header.Set("X-Request-Id", "123")
// Content-Type will not pass filtering.
Expand All @@ -275,6 +278,8 @@ func TestReceiver(t *testing.T) {
"X-Request-Id": []string{"123"},
// Knative-Foo will pass as a prefix match.
"Knative-Foo": []string{"baz"},
// X-B3-Foo will pass as a prefix match.
"X-B3-Foo": []string{"bing"},
// Prefer: reply will be added for every request as defined in the spec.
"Prefer": []string{"reply"},
},
Expand Down Expand Up @@ -379,17 +384,39 @@ func TestReceiver(t *testing.T) {
expectedStatus: http.StatusAccepted,
response: makeEmptyResponse(202),
},
"Proxy CloudEvent response headers": {
triggers: []*eventingv1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
},
expectedDispatch: true,
expectedEventCount: true,
expectedEventDispatchTime: true,
returnedEvent: makeDifferentEvent(),
responseHeaders: http.Header{"Test-Header": []string{"TestValue"}},
},
"Proxy empty non event response headers": {
triggers: []*eventingv1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
},
expectedDispatch: true,
expectedEventCount: true,
expectedEventDispatchTime: true,
expectedStatus: http.StatusTooManyRequests,
response: makeEmptyResponse(http.StatusTooManyRequests),
responseHeaders: http.Header{"Retry-After": []string{"10"}},
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {

fh := fakeHandler{
failRequest: tc.requestFails,
failStatus: tc.failureStatus,
returnedEvent: tc.returnedEvent,
headers: tc.expectedHeaders,
t: t,
response: tc.response,
failRequest: tc.requestFails,
failStatus: tc.failureStatus,
returnedEvent: tc.returnedEvent,
headers: tc.expectedHeaders,
t: t,
response: tc.response,
responseHeaders: tc.responseHeaders,
}
s := httptest.NewServer(&fh)
defer s.Close()
Expand Down Expand Up @@ -444,6 +471,14 @@ func TestReceiver(t *testing.T) {

response := responseWriter.Result()

if tc.expectedStatus != http.StatusInternalServerError && tc.expectedStatus != http.StatusBadGateway {
for expectedHeaderKey, expectedHeaderValues := range tc.responseHeaders {
if response.Header[expectedHeaderKey] == nil || response.Header[expectedHeaderKey][0] != expectedHeaderValues[0] {
t.Errorf("Response header proxy failed for header '%v'. Expected %v, Actual %v", expectedHeaderKey, expectedHeaderValues[0], response.Header[expectedHeaderKey])
}
}
}

if tc.expectedStatus != 0 && tc.expectedStatus != response.StatusCode {
t.Errorf("Unexpected status. Expected %v. Actual %v.", tc.expectedStatus, response.StatusCode)
}
Expand Down Expand Up @@ -543,6 +578,7 @@ type fakeHandler struct {
returnedEvent *cloudevents.Event
t *testing.T
response *http.Response
responseHeaders http.Header
}

func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -576,6 +612,9 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
if h.returnedEvent != nil {
message := binding.ToMessage(h.returnedEvent)
defer message.Finish(nil)
for k, v := range h.responseHeaders {
resp.Header().Set(k, v[0])
}
err := cehttp.WriteResponseWriter(context.Background(), message, http.StatusAccepted, resp)
if err != nil {
h.t.Fatalf("Unable to write body: %v", err)
Expand All @@ -585,6 +624,9 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
for k, v := range h.response.Header {
resp.Header().Set(k, v[0])
}
for k, v := range h.responseHeaders {
resp.Header().Add(k, v[0])
}
resp.WriteHeader(h.response.StatusCode)
if h.response.Body != nil {
defer h.response.Body.Close()
Expand Down

0 comments on commit 1e1623a

Please sign in to comment.