Skip to content

Commit

Permalink
feat!: expose a version agnostic event emitter (#141)
Browse files Browse the repository at this point in the history
* feat!: expose a version agnostic event emitter

This is a breaking change.

This commit exposes an HTTP based event emitter that simplifes the API.
To use it, simply import the SDK and start emitting. The default spec
version is 1.0, but you can use 0.3 by supplying that to the constructor.

By default, CloudEvents are emitted in binary mode, but this can be changed
by providing the "structured" parameter to the `send()` function.

This commit also eliminates the version specific emitters and receivers
from the `v1` and `v03` exports, and eliminates the explicit usage of
versioned emitters from `lib/bindings/http`.

Finally, the CE headers can be retrieved from the emitter for a given
event by passing the event to the `headers()` function.

Fixes: #124
Fixes: #149

Signed-off-by: Lance Ball <lball@redhat.com>
  • Loading branch information
lance authored May 14, 2020
1 parent 7665969 commit 250a0a1
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 323 deletions.
59 changes: 47 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,60 @@ console.log(receivedEvent.format());

#### Emitting Events

Currently, to emit events, you'll need to decide whether the event is in
To emit events, you'll need to decide whether the event should be sent in
binary or structured format, and determine what version of the CloudEvents
specification you want to send the event as.

```js
const { CloudEvent } = require("cloudevents-sdk");
const { StructuredHTTPEmitter } = require("cloudevents-sdk/v1");
By default, the `HTTPEmitter` will emit events over HTTP POST using the
1.0 specification, in binary mode. You can emit 0.3 events by providing
the specication version in the constructor to `HTTPEmitter`. To send
structured events, add that string as a parameter to `emitter.sent()`.

const myevent = new CloudEvent()
.type("com.github.pull.create")
.source("urn:event:from:myapi/resource/123");
```js
const { CloudEvent, HTTPEmitter } = require("cloudevents-sdk");

const emitter = new StructuredHTTPEmitter({
method: "POST",
url : "https://myserver.com"
// With only an endpoint URL, this creates a v1 emitter
const v1Emitter = new HTTPEmitter({
url: "https://cloudevents.io/example"
});
const event = new CloudEvent()
.type(type)
.source(source)
.time(new Date())
.data(data)

// By default, the emitter will send binary events
v1Emitter.send(event).then((response) => {
// handle the response
}).catch(console.error);

// To send a structured event, just add that as an option
v1Emitter.send(event, { mode: "structured" })
.then((response) => {
// handle the response
}).catch(console.error);

// To send an event to an alternate URL, add that as an option
v1Emitter.send(event, { url: "https://alternate.com/api" })
.then((response) => {
// handle the response
}).catch(console.error);

// Sending a v0.3 event works the same, just let the emitter know when
// you create it that you are working with the 0.3 spec
const v03Emitter = new HTTPEmitter({
url: "https://cloudevents.io/example",
version: "0.3"
});

// Again, the default is to send binary events
// To send a structured event or to an alternate URL, provide those
// as parameters in a options object as above
v3Emitter.send(event)
.then((response) => {
// handle the response
}).catch(console.error);

// Emit the event
emitter.emit(myevent)
```

## Supported specification features
Expand Down
4 changes: 3 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const CloudEvent = require("./lib/cloudevent.js");
const HTTPReceiver = require("./lib/bindings/http/http_receiver.js");
const HTTPEmitter = require("./lib/bindings/http/http_emitter.js");

module.exports = {
CloudEvent,
HTTPReceiver
HTTPReceiver,
HTTPEmitter
};
122 changes: 79 additions & 43 deletions lib/bindings/http/emitter_binary.js
Original file line number Diff line number Diff line change
@@ -1,49 +1,85 @@
const axios = require("axios");

const Constants = require("./constants.js");
const defaults = {};
defaults[Constants.HEADERS] = {};
defaults[Constants.HEADERS][Constants.HEADER_CONTENT_TYPE] =
Constants.DEFAULT_CONTENT_TYPE;

function BinaryHTTPEmitter(config, headerByGetter, extensionPrefix) {
this.config = Object.assign({}, defaults, config);
this.headerByGetter = headerByGetter;
this.extensionPrefix = extensionPrefix;
}
const {
HEADERS,
BINARY_HEADERS_03,
BINARY_HEADERS_1,
HEADER_CONTENT_TYPE,
DEFAULT_CONTENT_TYPE,
DATA_ATTRIBUTE,
SPEC_V1,
SPEC_V03
} = require("./constants.js");

BinaryHTTPEmitter.prototype.emit = function(cloudevent) {
const config = Object.assign({}, this.config);
const headers = Object.assign({}, this.config[Constants.HEADERS]);

Object.keys(this.headerByGetter)
.filter((getter) => cloudevent[getter]())
.forEach((getter) => {
const header = this.headerByGetter[getter];
headers[header.name] =
header.parser(
cloudevent[getter]()
);
});

// Set the cloudevent payload
const formatted = cloudevent.format();
let data = formatted.data;
data = (formatted.data_base64 ? formatted.data_base64 : data);

// Have extensions?
const exts = cloudevent.getExtensions();
Object.keys(exts)
.filter((ext) => Object.hasOwnProperty.call(exts, ext))
.forEach((ext) => {
headers[this.extensionPrefix + ext] = exts[ext];
});

config[Constants.DATA_ATTRIBUTE] = data;
config.headers = headers;

// Return the Promise
return axios.request(config);
const defaults = {
[HEADERS]: {
[HEADER_CONTENT_TYPE]: DEFAULT_CONTENT_TYPE
},
method: "POST"
};

/**
* A class to emit binary CloudEvents over HTTP.
*/
class BinaryHTTPEmitter {
/**
* Create a new {BinaryHTTPEmitter} for the provided CloudEvent specification version.
* Once an instance is created for a given spec version, it may only be used to send
* events for that version.
* Default version is 1.0
* @param {string} version - the CloudEvent HTTP specification version.
* Default: 1.0
*/
constructor(version) {
if (version === SPEC_V1) {
this.headerByGetter = require("./emitter_binary_1.js");
this.extensionPrefix = BINARY_HEADERS_1.EXTENSIONS_PREFIX;
} else if (version === SPEC_V03) {
this.headerByGetter = require("./emitter_binary_0_3.js");
this.extensionPrefix = BINARY_HEADERS_03.EXTENSIONS_PREFIX;
}
}

/**
* Sends this cloud event to a receiver over HTTP.
*
* @param {Object} options The configuration options for this event. Options
* provided other than `url` will be passed along to Node.js `http.request`.
* https://nodejs.org/api/http.html#http_http_request_options_callback
* @param {URL} options.url The HTTP/S url that should receive this event
* @param {Object} cloudevent the CloudEvent to be sent
* @returns {Promise} Promise with an eventual response from the receiver
*/
async emit(options, cloudevent) {
const config = { ...options, ...defaults };
const headers = config[HEADERS];

Object.keys(this.headerByGetter)
.filter((getter) => cloudevent[getter]())
.forEach((getter) => {
const header = this.headerByGetter[getter];
headers[header.name] = header.parser(cloudevent[getter]());
});

// Set the cloudevent payload
const formatted = cloudevent.format();
let data = formatted.data;
data = (formatted.data_base64 ? formatted.data_base64 : data);

// Have extensions?
const exts = cloudevent.getExtensions();
Object.keys(exts)
.filter((ext) => Object.hasOwnProperty.call(exts, ext))
.forEach((ext) => {
headers[this.extensionPrefix + ext] = exts[ext];
});

config[DATA_ATTRIBUTE] = data;
config.headers = headers;

// Return the Promise
return axios.request(config);
}
}

module.exports = BinaryHTTPEmitter;
39 changes: 14 additions & 25 deletions lib/bindings/http/emitter_binary_0_3.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,53 @@
const BinaryHTTPEmitter = require("./emitter_binary.js");

const Constants = require("./constants.js");
const {
HEADER_CONTENT_TYPE,
BINARY_HEADERS_03
} = require("./constants.js");

const headerByGetter = {};

headerByGetter.getDataContentType = {
name: Constants.HEADER_CONTENT_TYPE,
name: HEADER_CONTENT_TYPE,
parser: (v) => v
};

headerByGetter.getDataContentEncoding = {
name: Constants.BINARY_HEADERS_03.CONTENT_ENCONDING,
name: BINARY_HEADERS_03.CONTENT_ENCONDING,
parser: (v) => v
};

headerByGetter.getSubject = {
name: Constants.BINARY_HEADERS_03.SUBJECT,
name: BINARY_HEADERS_03.SUBJECT,
parser: (v) => v
};

headerByGetter.getType = {
name: Constants.BINARY_HEADERS_03.TYPE,
name: BINARY_HEADERS_03.TYPE,
parser: (v) => v
};

headerByGetter.getSpecversion = {
name: Constants.BINARY_HEADERS_03.SPEC_VERSION,
name: BINARY_HEADERS_03.SPEC_VERSION,
parser: (v) => v
};

headerByGetter.getSource = {
name: Constants.BINARY_HEADERS_03.SOURCE,
name: BINARY_HEADERS_03.SOURCE,
parser: (v) => v
};

headerByGetter.getId = {
name: Constants.BINARY_HEADERS_03.ID,
name: BINARY_HEADERS_03.ID,
parser: (v) => v
};

headerByGetter.getTime = {
name: Constants.BINARY_HEADERS_03.TIME,
name: BINARY_HEADERS_03.TIME,
parser: (v) => v
};

headerByGetter.getSchemaurl = {
name: Constants.BINARY_HEADERS_03.SCHEMA_URL,
name: BINARY_HEADERS_03.SCHEMA_URL,
parser: (v) => v
};

function HTTPBinary(configuration) {
this.emitter = new BinaryHTTPEmitter(
configuration,
headerByGetter,
Constants.BINARY_HEADERS_03.EXTENSIONS_PREFIX
);
}

HTTPBinary.prototype.emit = function(cloudevent) {
return this.emitter.emit(cloudevent);
};

module.exports = HTTPBinary;
module.exports = headerByGetter;
37 changes: 13 additions & 24 deletions lib/bindings/http/emitter_binary_1.js
Original file line number Diff line number Diff line change
@@ -1,59 +1,48 @@
const BinaryHTTPEmitter = require("./emitter_binary.js");

const Constants = require("./constants.js");
const {
HEADER_CONTENT_TYPE,
BINARY_HEADERS_1
} = require("./constants.js");

const headerByGetter = {};

headerByGetter.getDataContentType = {
name: Constants.HEADER_CONTENT_TYPE,
name: HEADER_CONTENT_TYPE,
parser: (v) => v
};

headerByGetter.getSubject = {
name: Constants.BINARY_HEADERS_1.SUBJECT,
name: BINARY_HEADERS_1.SUBJECT,
parser: (v) => v
};

headerByGetter.getType = {
name: Constants.BINARY_HEADERS_1.TYPE,
name: BINARY_HEADERS_1.TYPE,
parser: (v) => v
};

headerByGetter.getSpecversion = {
name: Constants.BINARY_HEADERS_1.SPEC_VERSION,
name: BINARY_HEADERS_1.SPEC_VERSION,
parser: (v) => v
};

headerByGetter.getSource = {
name: Constants.BINARY_HEADERS_1.SOURCE,
name: BINARY_HEADERS_1.SOURCE,
parser: (v) => v
};

headerByGetter.getId = {
name: Constants.BINARY_HEADERS_1.ID,
name: BINARY_HEADERS_1.ID,
parser: (v) => v
};

headerByGetter.getTime = {
name: Constants.BINARY_HEADERS_1.TIME,
name: BINARY_HEADERS_1.TIME,
parser: (v) => v
};

headerByGetter.getDataschema = {
name: Constants.BINARY_HEADERS_1.DATA_SCHEMA,
name: BINARY_HEADERS_1.DATA_SCHEMA,
parser: (v) => v
};

function HTTPBinary(configuration) {
this.emitter = new BinaryHTTPEmitter(
configuration,
headerByGetter,
Constants.BINARY_HEADERS_1.EXTENSIONS_PREFIX
);
}

HTTPBinary.prototype.emit = function(cloudevent) {
return this.emitter.emit(cloudevent);
};

module.exports = HTTPBinary;
module.exports = headerByGetter;
Loading

0 comments on commit 250a0a1

Please sign in to comment.