Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: clean up event unmarshalling, remove async API calls #38

Merged
merged 1 commit into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 59 additions & 51 deletions lib/event-handler.js
Original file line number Diff line number Diff line change
@@ -1,87 +1,95 @@
'use strict';

const v02 = require('cloudevents-sdk/v02');
const v03 = require('cloudevents-sdk/v03');
const v1 = require('cloudevents-sdk/v1');
const V03Binary = require('cloudevents-sdk/lib/bindings/http/receiver_binary_0_3');
const V03Structured = require('cloudevents-sdk/lib/bindings/http/receiver_structured_0_3.js');
const V1Binary = require('cloudevents-sdk/lib/bindings/http/receiver_binary_1.js');
const V1Structured = require('cloudevents-sdk/lib/bindings/http/receiver_structured_1.js');

const Spec = require('./ce-constants.js').Spec;

const v02Unmarshaller = new v02.HTTPUnmarshaller();
const v03Unmarshaller = new v03.HTTPUnmarshaller();
const v1BinaryReceiver = new v1.BinaryHTTPReceiver();
const v1StructuredReceiver = new v1.StructuredHTTPReceiver();
const receivers = {
v1: {
structured: new V1Structured(),
binary: new V1Binary()
},
v03: {
structured: new V03Structured(),
binary: new V03Binary()
}
};

function use(fastify, opts, done) {
fastify.addContentTypeParser('application/cloudevents+json',
{ parseAs: 'string' },
function(req, body, done) {
// unmarshallEvent() handles parsing
done(null, body);
});

fastify.decorateRequest('isCloudEvent', function() {
if (Spec.type in this.req.headers) {
return true;
} else {
const contentType = this.req.headers['content-type'];
if (contentType && contentType.match(/application\/cloudevents/)) {
return true;
}
}
return false;
const contentType = this.req.headers['content-type'];
return contentType && contentType.match(/application\/cloudevents/);
});

// Any incoming requests for cloud events will only be
// processed if it's a cloud event spec version we know about
fastify.addHook('preHandler', async(request, reply) => {
fastify.addHook('preHandler', function(request, reply, done) {
if (request.isCloudEvent()) {
const version = request.headers[Spec.version];
// if there is no version in the headers, it is a
// structured event
if (version && !acceptsVersion(version)) {
try {
request.context.cloudevent =
accept(request.headers, request.body).format();
} catch (err) {
reply.code(406);
const error = new Error(
`Unsupported cloud event version detected: ${version}`);
error.code = 406;
throw error;
} else {
try {
await unmarshallEvent(request);
} catch (err) {
throw new Error(err.message ||
`Failed to unmarshall cloud event: ${err}`);
}
done(err);
}
}
done();
});

done();
}

async function unmarshallEvent(request) {
const version = request.headers[Spec.version];
if (!version) {
// it's a structured event and the version is in the body
// currently only v1 structured events are supported
try {
const event = v1StructuredReceiver.parse(request.body, request.headers);
request.context.cloudevent = event.format();
} catch (err) {
return Promise.reject(err);
function accept(headers, body) {
const mode = getMode(headers);
const version = getVersion(mode, headers, body);
switch (version) {
case '1.0':
return receivers.v1[mode].parse(body, headers);
case '0.3':
return receivers.v03[mode].parse(body, headers);
default:
console.error(`Unknown spec version ${version}`);
throw new TypeError(
`Unsupported cloud event version detected: ${version}`);
}
}

function getMode(headers) {
let mode = 'binary';
if (headers['content-type']) {
if (headers['content-type'].startsWith('application/cloudevents')) {
mode = 'structured';
}
} else if (version === '0.2') {
return v02Unmarshaller.unmarshall(request.body, request.headers)
.then(cloudevent => (request.context.cloudevent = cloudevent.format()));
} else if (version === '0.3') {
return v03Unmarshaller.unmarshall(request.body, request.headers)
.then(cloudevent => (request.context.cloudevent = cloudevent.format()));
} else if (version === '1.0') {
const event = v1BinaryReceiver.parse(request.body, request.headers);
request.context.cloudevent = event.format();
}
return mode;
}

function acceptsVersion(version) {
return ['0.2', '0.3', '1.0'].find(elem => version === elem) !== undefined;
function getVersion(mode, headers, body) {
let version = '1.0'; // default to 1.0

if (mode === 'binary') {
// Check the headers for the version
if (headers['ce-specversion']) {
version = headers['ce-specversion'];
}
} else {
// structured mode - the version is in the body
version = typeof body === 'string'
? JSON.parse(body).specversion : body.specversion;
}
return version;
}

module.exports = exports = use;
Loading