diff --git a/README.md b/README.md index 49610005..debc7857 100644 --- a/README.md +++ b/README.md @@ -163,7 +163,7 @@ There you will find Express.js, TypeScript and Websocket examples. | AMQP Protocol Binding | :x: | :x: | | HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: | | Kafka Protocol Binding | :x: | :heavy_check_mark: | -| MQTT Protocol Binding | :x: | :x: | +| MQTT Protocol Binding | :heavy_check_mark: | :x: | | NATS Protocol Binding | :x: | :x: | --- @@ -176,6 +176,9 @@ There you will find Express.js, TypeScript and Websocket examples. | Kafka Binary | :heavy_check_mark: | :heavy_check_mark: | | Kafka Structured | :heavy_check_mark: | :heavy_check_mark: | | Kafka Batch | :heavy_check_mark: | :heavy_check_mark: +| MQTT Binary | :heavy_check_mark: | :heavy_check_mark: | +| MQTT Structured | :heavy_check_mark: | :heavy_check_mark: | + ## Community - There are bi-weekly calls immediately following the [Serverless/CloudEvents diff --git a/src/index.ts b/src/index.ts index 915acf5b..eea449ad 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,7 +9,7 @@ import { CloudEventV1, CloudEventV1Attributes } from "./event/interfaces"; import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter"; import { - Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message, + Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message, MQTT, MQTTMessage, MQTTMessageFactory, Serializer, Deserializer } from "./message"; import CONSTANTS from "./constants"; @@ -32,6 +32,9 @@ export { Kafka, KafkaEvent, KafkaMessage, + MQTT, + MQTTMessage, + MQTTMessageFactory, // From transport TransportFunction, EmitterFunction, diff --git a/src/message/index.ts b/src/message/index.ts index 783320ec..bf1dd5c9 100644 --- a/src/message/index.ts +++ b/src/message/index.ts @@ -9,6 +9,7 @@ import { CloudEventV1 } from ".."; // reexport the protocol bindings export * from "./http"; export * from "./kafka"; +export * from "./mqtt"; /** * Binding is an interface for transport protocols to implement, diff --git a/src/message/mqtt/index.ts b/src/message/mqtt/index.ts new file mode 100644 index 00000000..3943834f --- /dev/null +++ b/src/message/mqtt/index.ts @@ -0,0 +1,148 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +import { Binding, Deserializer, CloudEvent, CloudEventV1, CONSTANTS, Message, ValidationError, Headers } from "../.."; + +export { + MQTT, + MQTTMessage, + MQTTMessageFactory +}; + +/** + * Extends the base {@linkcode Message} interface to include MQTT attributes, some of which + * are aliases of the {Message} attributes. + */ +interface MQTTMessage extends Message { + /** + * Identifies this message as a PUBLISH packet. MQTTMessages created with + * the `binary` and `structured` Serializers will contain a "Content Type" + * property in the PUBLISH record. + * @see https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md#3-mqtt-publish-message-mapping + */ + PUBLISH: Record | undefined + /** + * Alias of {Message#body} + */ + payload: T | undefined, + /** + * Alias of {Message#headers} + */ + "User Properties": Headers | undefined +} + +/** + * Binding for MQTT transport support + * @implements @linkcode Binding + */ +const MQTT: Binding = { + binary, + structured, + toEvent: toEvent as Deserializer, + isEvent +}; + +/** + * Converts a CloudEvent into an MQTTMessage with the event's data as the message payload + * @param {CloudEventV1} event a CloudEvent + * @returns {MQTTMessage} the event serialized as an MQTTMessage with binary encoding + * @implements {Serializer} + */ +function binary(event: CloudEventV1): MQTTMessage { + let properties; + if (event instanceof CloudEvent) { + properties = event.toJSON(); + } else { + properties = event; + } + const body = properties.data as T; + delete properties.data; + + return MQTTMessageFactory(event.datacontenttype as string, properties, body); +} + +/** + * Converts a CloudEvent into an MQTTMessage with the event as the message payload + * @param {CloudEventV1} event a CloudEvent + * @returns {MQTTMessage} the event serialized as an MQTTMessage with structured encoding + * @implements {Serializer} + */ +function structured(event: CloudEventV1): MQTTMessage { + let body; + if (event instanceof CloudEvent) { + body = event.toJSON(); + } else { + body = event; + } + return MQTTMessageFactory(CONSTANTS.DEFAULT_CE_CONTENT_TYPE, {}, body) as MQTTMessage; +} + +/** + * A helper function to create an MQTTMessage object, with "User Properties" as an alias + * for "headers" and "payload" an alias for body, and a "PUBLISH" record with a "Content Type" + * property. + * @param {string} contentType the "Content Type" attribute on PUBLISH + * @param {Record} headers the headers and "User Properties" + * @param {T} body the message body/payload + * @returns {MQTTMessage} a message initialized with the provided attributes + */ +function MQTTMessageFactory(contentType: string, headers: Record, body: T): MQTTMessage { + return { + PUBLISH: { + "Content Type": contentType + }, + body, + get payload() { + return this.body as T; + }, + headers: headers as Headers, + get "User Properties"() { + return this.headers as any; + } + }; +} + +/** + * Converts an MQTTMessage into a CloudEvent + * @param {Message} message the message to deserialize + * @param {boolean} strict determines if a ValidationError will be thrown on bad input - defaults to false + * @returns {CloudEventV1} an event + * @implements {Deserializer} + */ +function toEvent(message: Message, strict = false): CloudEventV1 | CloudEventV1[] { + if (strict && !isEvent(message)) { + throw new ValidationError("No CloudEvent detected"); + } + if (isStructuredMessage(message as MQTTMessage)) { + const evt = (typeof message.body === "string") ? JSON.parse(message.body): message.body; + return new CloudEvent({ + ...evt as CloudEventV1 + }, false); + } else { + return new CloudEvent({ + ...message.headers, + data: message.body as T, + }, false); + } +} + +/** + * Determine if the message is a CloudEvent + * @param {Message} message an MQTTMessage + * @returns {boolean} true if the message contains an event + */ +function isEvent(message: Message): boolean { + return isBinaryMessage(message) || isStructuredMessage(message as MQTTMessage); +} + +function isBinaryMessage(message: Message): boolean { + return (!!message.headers.id && !!message.headers.source + && !! message.headers.type && !!message.headers.specversion); +} + +function isStructuredMessage(message: MQTTMessage): boolean { + if (!message) { return false; } + return (message.PUBLISH && message?.PUBLISH["Content Type"]?.startsWith(CONSTANTS.MIME_CE_JSON)) || false; +} diff --git a/test/integration/mqtt_tests.ts b/test/integration/mqtt_tests.ts new file mode 100644 index 00000000..411a9cd7 --- /dev/null +++ b/test/integration/mqtt_tests.ts @@ -0,0 +1,309 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +import path from "path"; +import fs from "fs"; + +import { expect } from "chai"; +import { CloudEvent, CONSTANTS, Version, Headers } from "../../src"; +import { asBase64 } from "../../src/event/validation"; +import { Message, MQTT, MQTTMessage } from "../../src/message"; + +const type = "org.cncf.cloudevents.example"; +const source = "urn:event:from:myapi/resource/123"; +const time = new Date().toISOString(); +const subject = "subject.ext"; +const dataschema = "http://cloudevents.io/schema.json"; +const datacontenttype = "application/json"; +const id = "b46cf653-d48a-4b90-8dfa-355c01061361"; + +interface Idata { + foo: string +} +const data: Idata = { + foo: "bar", +}; + +const ext1Name = "extension1"; +const ext1Value = "foobar"; +const ext2Name = "extension2"; +const ext2Value = "acme"; + +// Binary data as base64 +const dataBinary = Uint32Array.from(JSON.stringify(data), (c) => c.codePointAt(0) as number); +const data_base64 = asBase64(dataBinary); + +// Since the above is a special case (string as binary), let's test +// with a real binary file one is likely to encounter in the wild +const imageData = new Uint32Array(fs.readFileSync(path.join(process.cwd(), "test", "integration", "ce.png"))); +const image_base64 = asBase64(imageData); + +const PUBLISH = {"Content Type": "application/json; charset=utf-8"}; + +const fixture = new CloudEvent({ + specversion: Version.V1, + id, + type, + source, + datacontenttype, + subject, + time, + dataschema, + data, + [ext1Name]: ext1Value, + [ext2Name]: ext2Value, +}); + +describe("MQTT transport", () => { + it("Handles events with no content-type and no datacontenttype", () => { + const payload = "{Something[Not:valid}JSON"; + const userProperties = fixture.toJSON() as Headers; + const message: MQTTMessage = { + PUBLISH: undefined, // no Content Type applied + payload, + "User Properties": userProperties, + headers: userProperties, + body: payload, + }; + const event = MQTT.toEvent(message) as CloudEvent; + expect(event.data).to.equal(payload); + expect(event.datacontentype).to.equal(undefined); + }); + + it("Can detect invalid CloudEvent Messages", () => { + // Create a message that is not an actual event + const message: MQTTMessage = { + payload: "Hello world!", + PUBLISH: { + "Content type": "text/plain", + }, + "User Properties": {}, + headers: {}, + body: undefined + }; + expect(MQTT.isEvent(message)).to.be.false; + }); + + it("Can detect valid CloudEvent Messages", () => { + // Now create a message that is an event + const message = MQTT.binary( + new CloudEvent({ + source: "/message-test", + type: "example", + data, + }), + ); + expect(MQTT.isEvent(message)).to.be.true; + }); + + it("Handles CloudEvents with datacontenttype of text/plain", () => { + const message: Message = MQTT.binary( + new CloudEvent({ + source: "/test", + type: "example", + datacontenttype: "text/plain", + data: "Hello, friends!", + }), + ); + const event = MQTT.toEvent(message) as CloudEvent; + expect(event.data).to.equal(message.body); + expect(event.validate()).to.be.true; + }); + + it("Respects extension attribute casing (even if against spec)", () => { + // Create a message that is an event + const body = `{ "greeting": "hello" }`; + const headers = { + id: "1234", + source: "test", + type: "test.event", + specversion: "1.0", + LUNCH: "tacos", + }; + const message: MQTTMessage = { + body, + payload: body, + PUBLISH, + "User Properties": headers, + headers + }; + expect(MQTT.isEvent(message)).to.be.true; + const event = MQTT.toEvent(message) as CloudEvent; + expect(event.LUNCH).to.equal("tacos"); + expect(function () { + event.validate(); + }).to.throw("invalid attribute name: LUNCH"); + }); + + it("Can detect CloudEvent binary Messages with weird versions", () => { + // Now create a message that is an event + const body = `{ "greeting": "hello" }`; + const headers = { + id: "1234", + source: "test", + type: "test.event", + specversion: "11.8", + }; + const message: MQTTMessage = { + body, + payload: body, + PUBLISH, + headers, + "User Properties": headers, + }; + expect(MQTT.isEvent(message)).to.be.true; + const event = MQTT.toEvent(message) as CloudEvent; + expect(event.specversion).to.equal("11.8"); + expect(event.validate()).to.be.false; + }); + + it("Can detect CloudEvent structured Messages with weird versions", () => { + // Now create a message that is an event + const body = `{ "id": "123", "source": "test", "type": "test.event", "specversion": "11.8"}`; + const message: MQTTMessage = { + body, + payload: body, + headers: {}, + PUBLISH: {"Content Type": CONSTANTS.MIME_CE_JSON}, + "User Properties": {} + }; + expect(MQTT.isEvent(message)).to.be.true; + expect(MQTT.toEvent(message)).not.to.throw; + }); + + // Allow for external systems to send bad events - do what we can + // to accept them + it("Does not throw an exception when converting an invalid Message to a CloudEvent", () => { + const body = `"hello world"`; + const headers = { + id: "1234", + type: "example.bad.event", + // no required source, thus an invalid event + }; + const message: MQTTMessage = { + body, + payload: body, + PUBLISH, + headers, + "User Properties": headers, + }; + const event = MQTT.toEvent(message) as CloudEvent; + expect(event).to.be.instanceOf(CloudEvent); + // ensure that we actually now have an invalid event + expect(event.validate).to.throw; + }); + + it("Does not allow an invalid CloudEvent to be converted to a Message", () => { + const badEvent = new CloudEvent( + { + source: "/example.source", + type: "", // type is required, empty string will throw with strict validation + }, + false, // turn off strict validation + ); + expect(() => { + MQTT.binary(badEvent); + }).to.throw; + expect(() => { + MQTT.structured(badEvent); + }).to.throw; + }); + + it("Binary Messages can be created from a CloudEvent", () => { + const message: Message = MQTT.binary(fixture); + expect(message.body).to.equal(data); + // validate all headers + expect(message.headers.datacontenttype).to.equal(datacontenttype); + expect(message.headers.specversion).to.equal(Version.V1); + expect(message.headers.id).to.equal(id); + expect(message.headers.type).to.equal(type); + expect(message.headers.source).to.equal(source); + expect(message.headers.subject).to.equal(subject); + expect(message.headers.time).to.equal(fixture.time); + expect(message.headers.dataschema).to.equal(dataschema); + expect(message.headers[ext1Name]).to.equal(ext1Value); + expect(message.headers[ext2Name]).to.equal(ext2Value); + }); + + it("Sets User Properties on binary messages", () => { + const message: MQTTMessage = MQTT.binary(fixture) as MQTTMessage; + expect(message.body).to.equal(data); + // validate all headers + expect(message["User Properties"]?.datacontenttype).to.equal(datacontenttype); + expect(message["User Properties"]?.specversion).to.equal(Version.V1); + expect(message["User Properties"]?.id).to.equal(id); + expect(message["User Properties"]?.type).to.equal(type); + expect(message["User Properties"]?.source).to.equal(source); + expect(message["User Properties"]?.subject).to.equal(subject); + expect(message["User Properties"]?.time).to.equal(fixture.time); + expect(message["User Properties"]?.dataschema).to.equal(dataschema); + expect(message["User Properties"]?.[ext1Name]).to.equal(ext1Value); + expect(message["User Properties"]?.[ext2Name]).to.equal(ext2Value); + }); + + it("Structured Messages can be created from a CloudEvent", () => { + const message = MQTT.structured(fixture) as MQTTMessage; + expect(message.PUBLISH?.["Content Type"]).to.equal(CONSTANTS.DEFAULT_CE_CONTENT_TYPE); + expect(message.body).to.deep.equal(message.payload); + expect(message.payload).to.deep.equal(fixture.toJSON()); + const body = message.body as Record; + expect(body[CONSTANTS.CE_ATTRIBUTES.SPEC_VERSION]).to.equal(Version.V1); + expect(body[CONSTANTS.CE_ATTRIBUTES.ID]).to.equal(id); + expect(body[CONSTANTS.CE_ATTRIBUTES.TYPE]).to.equal(type); + expect(body[CONSTANTS.CE_ATTRIBUTES.SOURCE]).to.equal(source); + expect(body[CONSTANTS.CE_ATTRIBUTES.SUBJECT]).to.equal(subject); + expect(body[CONSTANTS.CE_ATTRIBUTES.TIME]).to.equal(fixture.time); + expect(body[CONSTANTS.STRUCTURED_ATTRS_1.DATA_SCHEMA]).to.equal(dataschema); + expect(body[ext1Name]).to.equal(ext1Value); + expect(body[ext2Name]).to.equal(ext2Value); + }); + + it("A CloudEvent can be converted from a binary Message", () => { + const message = MQTT.binary(fixture); + const event = MQTT.toEvent(message); + expect(event).to.deep.equal(fixture); + }); + + it("A CloudEvent can be converted from a structured Message", () => { + const message = MQTT.structured(fixture); + const event = MQTT.toEvent(message); + expect(event).to.deep.equal(fixture); + }); + + it("Converts binary data to base64 when serializing structured messages", () => { + const event = fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }); + expect(event.data).to.equal(imageData); + const message = MQTT.structured(event); + expect((message.body as CloudEvent).data_base64).to.equal(image_base64); + }); + + it("Converts base64 encoded data to binary when deserializing structured messages", () => { + const message = MQTT.structured(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" })); + const eventDeserialized = MQTT.toEvent(message) as CloudEvent; + expect(eventDeserialized.data).to.deep.equal(imageData); + expect(eventDeserialized.data_base64).to.equal(image_base64); + }); + + it("Converts base64 encoded data to binary when deserializing binary messages", () => { + const message = MQTT.binary(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" })); + const eventDeserialized = MQTT.toEvent(message) as CloudEvent; + expect(eventDeserialized.data).to.deep.equal(imageData); + expect(eventDeserialized.data_base64).to.equal(image_base64); + }); + + it("Keeps binary data binary when serializing binary messages", () => { + const event = fixture.cloneWith({ data: dataBinary }); + expect(event.data).to.equal(dataBinary); + const message = MQTT.binary(event); + expect(message.body).to.equal(dataBinary); + }); + + it("Does not parse binary data from binary messages with content type application/json", () => { + const message = MQTT.binary(fixture.cloneWith({ data: dataBinary })); + const eventDeserialized = MQTT.toEvent(message) as CloudEvent; + expect(eventDeserialized.data).to.deep.equal(dataBinary); + expect(eventDeserialized.data_base64).to.equal(data_base64); + }); +});