Skip to content

Commit

Permalink
feat: add MQTT transport messaging (#459)
Browse files Browse the repository at this point in the history
Add MQTT as a `Message` format.

This commit adds `MQTT` to the supported transport protocols by adding a `Binding` and the `MQTTMessage<T>` type, extending the base `Message` type, adding the MQTT fields for `payload`, `PUBLISH` and `User Properties`. The `payload` field directly maps to `Message#body`, while `User Properties` roughly maps to `Message#headers`, even though the properties here are not formatted with a `ce-` prefix like other transport protocols. This is per the spec. See: https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md. 

Signed-off-by: Lance Ball <lball@redhat.com>
  • Loading branch information
lance authored Jan 14, 2022
1 parent 5d1f744 commit 591d133
Show file tree
Hide file tree
Showing 5 changed files with 466 additions and 2 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ There you will find Express.js, TypeScript and Websocket examples.
| AMQP Protocol Binding | :x: | :x: |
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Protocol Binding | :x: | :heavy_check_mark: |
| MQTT Protocol Binding | :x: | :x: |
| MQTT Protocol Binding | :heavy_check_mark: | :x: |
| NATS Protocol Binding | :x: | :x: |

---
Expand All @@ -176,6 +176,9 @@ There you will find Express.js, TypeScript and Websocket examples.
| Kafka Binary | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Structured | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Batch | :heavy_check_mark: | :heavy_check_mark:
| MQTT Binary | :heavy_check_mark: | :heavy_check_mark: |
| MQTT Structured | :heavy_check_mark: | :heavy_check_mark: |

## Community

- There are bi-weekly calls immediately following the [Serverless/CloudEvents
Expand Down
5 changes: 4 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";

import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
import {
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message,
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message, MQTT, MQTTMessage, MQTTMessageFactory,
Serializer, Deserializer } from "./message";

import CONSTANTS from "./constants";
Expand All @@ -32,6 +32,9 @@ export {
Kafka,
KafkaEvent,
KafkaMessage,
MQTT,
MQTTMessage,
MQTTMessageFactory,
// From transport
TransportFunction,
EmitterFunction,
Expand Down
1 change: 1 addition & 0 deletions src/message/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { CloudEventV1 } from "..";
// reexport the protocol bindings
export * from "./http";
export * from "./kafka";
export * from "./mqtt";

/**
* Binding is an interface for transport protocols to implement,
Expand Down
148 changes: 148 additions & 0 deletions src/message/mqtt/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

import { Binding, Deserializer, CloudEvent, CloudEventV1, CONSTANTS, Message, ValidationError, Headers } from "../..";

export {
MQTT,
MQTTMessage,
MQTTMessageFactory
};

/**
* Extends the base {@linkcode Message} interface to include MQTT attributes, some of which
* are aliases of the {Message} attributes.
*/
interface MQTTMessage<T> extends Message<T> {
/**
* Identifies this message as a PUBLISH packet. MQTTMessages created with
* the `binary` and `structured` Serializers will contain a "Content Type"
* property in the PUBLISH record.
* @see https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md#3-mqtt-publish-message-mapping
*/
PUBLISH: Record<string, string | undefined> | undefined
/**
* Alias of {Message#body}
*/
payload: T | undefined,
/**
* Alias of {Message#headers}
*/
"User Properties": Headers | undefined
}

/**
* Binding for MQTT transport support
* @implements @linkcode Binding
*/
const MQTT: Binding = {
binary,
structured,
toEvent: toEvent as Deserializer,
isEvent
};

/**
* Converts a CloudEvent into an MQTTMessage<T> with the event's data as the message payload
* @param {CloudEventV1} event a CloudEvent
* @returns {MQTTMessage<T>} the event serialized as an MQTTMessage<T> with binary encoding
* @implements {Serializer}
*/
function binary<T>(event: CloudEventV1<T>): MQTTMessage<T> {
let properties;
if (event instanceof CloudEvent) {
properties = event.toJSON();
} else {
properties = event;
}
const body = properties.data as T;
delete properties.data;

return MQTTMessageFactory(event.datacontenttype as string, properties, body);
}

/**
* Converts a CloudEvent into an MQTTMessage<T> with the event as the message payload
* @param {CloudEventV1} event a CloudEvent
* @returns {MQTTMessage<T>} the event serialized as an MQTTMessage<T> with structured encoding
* @implements {Serializer}
*/
function structured<T>(event: CloudEventV1<T>): MQTTMessage<T> {
let body;
if (event instanceof CloudEvent) {
body = event.toJSON();
} else {
body = event;
}
return MQTTMessageFactory(CONSTANTS.DEFAULT_CE_CONTENT_TYPE, {}, body) as MQTTMessage<T>;
}

/**
* A helper function to create an MQTTMessage<T> object, with "User Properties" as an alias
* for "headers" and "payload" an alias for body, and a "PUBLISH" record with a "Content Type"
* property.
* @param {string} contentType the "Content Type" attribute on PUBLISH
* @param {Record<string, unknown>} headers the headers and "User Properties"
* @param {T} body the message body/payload
* @returns {MQTTMessage<T>} a message initialized with the provided attributes
*/
function MQTTMessageFactory<T>(contentType: string, headers: Record<string, unknown>, body: T): MQTTMessage<T> {
return {
PUBLISH: {
"Content Type": contentType
},
body,
get payload() {
return this.body as T;
},
headers: headers as Headers,
get "User Properties"() {
return this.headers as any;
}
};
}

/**
* Converts an MQTTMessage<T> into a CloudEvent
* @param {Message<T>} message the message to deserialize
* @param {boolean} strict determines if a ValidationError will be thrown on bad input - defaults to false
* @returns {CloudEventV1<T>} an event
* @implements {Deserializer}
*/
function toEvent<T>(message: Message<T>, strict = false): CloudEventV1<T> | CloudEventV1<T>[] {
if (strict && !isEvent(message)) {
throw new ValidationError("No CloudEvent detected");
}
if (isStructuredMessage(message as MQTTMessage<T>)) {
const evt = (typeof message.body === "string") ? JSON.parse(message.body): message.body;
return new CloudEvent({
...evt as CloudEventV1<T>
}, false);
} else {
return new CloudEvent<T>({
...message.headers,
data: message.body as T,
}, false);
}
}

/**
* Determine if the message is a CloudEvent
* @param {Message<T>} message an MQTTMessage
* @returns {boolean} true if the message contains an event
*/
function isEvent<T>(message: Message<T>): boolean {
return isBinaryMessage(message) || isStructuredMessage(message as MQTTMessage<T>);
}

function isBinaryMessage<T>(message: Message<T>): boolean {
return (!!message.headers.id && !!message.headers.source
&& !! message.headers.type && !!message.headers.specversion);
}

function isStructuredMessage<T>(message: MQTTMessage<T>): boolean {
if (!message) { return false; }
return (message.PUBLISH && message?.PUBLISH["Content Type"]?.startsWith(CONSTANTS.MIME_CE_JSON)) || false;
}
Loading

0 comments on commit 591d133

Please sign in to comment.