diff --git a/lib/bindings/http/receiver_binary.js b/lib/bindings/http/receiver_binary.js index 71a0c80c..a7acafeb 100644 --- a/lib/bindings/http/receiver_binary.js +++ b/lib/bindings/http/receiver_binary.js @@ -1,4 +1,5 @@ -const Constants = require("./constants.js"); +const { HEADER_CONTENT_TYPE, MIME_JSON, DEFAULT_SPEC_VERSION_HEADER } = + require("./constants.js"); const Commons = require("./commons.js"); const CloudEvent = require("../../cloudevent.js"); @@ -52,16 +53,13 @@ BinaryHTTPReceiver.prototype.check = function(payload, headers) { // Clone and low case all headers names const sanityHeaders = Commons.sanityAndClone(headers); - // If no content type is provided, default to application/json - if (!sanityHeaders[Constants.HEADER_CONTENT_TYPE]) { - sanityHeaders[Constants.HEADER_CONTENT_TYPE] = Constants.MIME_JSON; - } - - // Validation Level 1 - if (!this.allowedContentTypes - .includes(sanityHeaders[Constants.HEADER_CONTENT_TYPE])) { + // Validation Level 1 - if content-type exists, be sure it's + // an allowed type + const contentTypeHeader = sanityHeaders[HEADER_CONTENT_TYPE]; + const noContentType = !this.allowedContentTypes.includes(contentTypeHeader); + if (contentTypeHeader && noContentType) { const err = new TypeError("invalid content type"); - err.errors = [sanityHeaders[Constants.HEADER_CONTENT_TYPE]]; + err.errors = [sanityHeaders[HEADER_CONTENT_TYPE]]; throw err; } @@ -71,10 +69,10 @@ BinaryHTTPReceiver.prototype.check = function(payload, headers) { throw new TypeError(`header '${required}' not found`); }); - if (sanityHeaders[Constants.DEFAULT_SPEC_VERSION_HEADER] !== + if (sanityHeaders[DEFAULT_SPEC_VERSION_HEADER] !== this.specversion) { const err = new TypeError("invalid spec version"); - err.errors = [sanityHeaders[Constants.DEFAULT_SPEC_VERSION_HEADER]]; + err.errors = [sanityHeaders[DEFAULT_SPEC_VERSION_HEADER]]; throw err; } @@ -83,7 +81,7 @@ BinaryHTTPReceiver.prototype.check = function(payload, headers) { function parserFor(parsersByEncoding, cloudevent, headers) { const encoding = cloudevent.spec.payload.datacontentencoding; - return parsersByEncoding[encoding][headers[Constants.HEADER_CONTENT_TYPE]]; + return parsersByEncoding[encoding][headers[HEADER_CONTENT_TYPE]]; } BinaryHTTPReceiver.prototype.parse = function(payload, headers) { @@ -91,6 +89,9 @@ BinaryHTTPReceiver.prototype.parse = function(payload, headers) { // Clone and low case all headers names const sanityHeaders = Commons.sanityAndClone(headers); + if (!sanityHeaders[HEADER_CONTENT_TYPE]) { + sanityHeaders[HEADER_CONTENT_TYPE] = MIME_JSON; + } const processedHeaders = []; const cloudevent = new CloudEvent(this.Spec); diff --git a/lib/bindings/http/receiver_binary_1.js b/lib/bindings/http/receiver_binary_1.js index 6cbc8d59..0f0e6d22 100644 --- a/lib/bindings/http/receiver_binary_1.js +++ b/lib/bindings/http/receiver_binary_1.js @@ -101,9 +101,6 @@ Receiver.prototype.check = function(payload, headers) { }; Receiver.prototype.parse = function(payload, headers) { - // firstly specific local checks - this.check(payload, headers); - payload = isString(payload) && isBase64(payload) ? Buffer.from(payload, "base64").toString() : payload; diff --git a/test/bindings/http/promiscuous_receiver_test.js b/test/bindings/http/promiscuous_receiver_test.js index e5509166..e99aa3cb 100644 --- a/test/bindings/http/promiscuous_receiver_test.js +++ b/test/bindings/http/promiscuous_receiver_test.js @@ -1,6 +1,13 @@ const { expect } = require("chai"); const { CloudEvent, HTTPReceiver } = require("../../../index.js"); -const constants = require("../../../lib/bindings/http/constants.js"); +const { + HEADER_CONTENT_TYPE, + DEFAULT_CONTENT_TYPE, + MIME_CE_JSON, + DEFAULT_SPEC_VERSION_HEADER, + BINARY_HEADERS_03, + BINARY_HEADERS_1 +} = require("../../../lib/bindings/http/constants.js"); const receiver = new HTTPReceiver(); const id = "1234"; @@ -24,7 +31,7 @@ describe("HTTP Transport Binding Receiver for CloudEvents", () => { }; const headers = { - [constants.HEADER_CONTENT_TYPE]: constants.MIME_CE_JSON + [HEADER_CONTENT_TYPE]: MIME_CE_JSON }; const event = receiver.accept(headers, payload); @@ -33,11 +40,11 @@ describe("HTTP Transport Binding Receiver for CloudEvents", () => { it("Binary data returns a CloudEvent", () => { const headers = { - [constants.HEADER_CONTENT_TYPE]: constants.DEFAULT_CONTENT_TYPE, - [constants.DEFAULT_SPEC_VERSION_HEADER]: specversion, - [constants.BINARY_HEADERS_1.ID]: id, - [constants.BINARY_HEADERS_1.TYPE]: type, - [constants.BINARY_HEADERS_1.SOURCE]: source + [HEADER_CONTENT_TYPE]: DEFAULT_CONTENT_TYPE, + [DEFAULT_SPEC_VERSION_HEADER]: specversion, + [BINARY_HEADERS_1.ID]: id, + [BINARY_HEADERS_1.TYPE]: type, + [BINARY_HEADERS_1.SOURCE]: source }; const event = receiver.accept(headers, data); @@ -58,7 +65,7 @@ describe("HTTP Transport Binding Receiver for CloudEvents", () => { }; const headers = { - [constants.HEADER_CONTENT_TYPE]: constants.MIME_CE_JSON + [HEADER_CONTENT_TYPE]: MIME_CE_JSON }; const event = receiver.accept(headers, payload); @@ -67,17 +74,53 @@ describe("HTTP Transport Binding Receiver for CloudEvents", () => { it("Binary data returns a CloudEvent", () => { const headers = { - [constants.HEADER_CONTENT_TYPE]: constants.DEFAULT_CONTENT_TYPE, - [constants.DEFAULT_SPEC_VERSION_HEADER]: specversion, - [constants.BINARY_HEADERS_03.ID]: id, - [constants.BINARY_HEADERS_03.TYPE]: type, - [constants.BINARY_HEADERS_03.SOURCE]: source + [HEADER_CONTENT_TYPE]: DEFAULT_CONTENT_TYPE, + [DEFAULT_SPEC_VERSION_HEADER]: specversion, + [BINARY_HEADERS_03.ID]: id, + [BINARY_HEADERS_03.TYPE]: type, + [BINARY_HEADERS_03.SOURCE]: source }; const event = receiver.accept(headers, data); validateEvent(event, specversion); }); }); + + describe("Kafka-Knative event source", () => { + const specversion = "1.0"; + const id = "partition:1/offset:23"; + const type = "dev.knative.kafka.event"; + const source = + "/apis/v1/namespaces/kafka/kafkasources/kafka-source#knative-demo-topic"; + + it("Should be parsable", () => { + const headers = { + host: "event-display.kafka.svc.cluster.local", + "user-agent": "Go-http-client/1.1", + "content-length": "59", + "accept-encoding": "gzip", + "ce-id": id, + "ce-source": source, + "ce-specversion": "1.0", + "ce-subject": "partition:1#23", + "ce-time": "2020-05-07T14:16:30.245Z", + "ce-type": type, + forwarded: "for=10.131.0.72;proto=http", + "k-proxy-request": "activator", + "x-envoy-expected-rq-timeout-ms": "600000", + "x-forwarded-for": "10.131.0.72, 10.128.2.99", + "x-forwarded-proto": "http", + "x-request-id": "d3649c1b-a968-40bf-a9da-3e853abc0c8b" + }; + const event = receiver.accept(headers, data); + expect(event instanceof CloudEvent).to.equal(true); + expect(event.getId()).to.equal(id); + expect(event.getType()).to.equal(type); + expect(event.getSource()).to.equal(source); + expect(event.getData()).to.deep.equal(data); + expect(event.getSpecversion()).to.equal(specversion); + }); + }); }); function validateEvent(event, specversion) {