Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 128 additions & 77 deletions messagehub.js
Original file line number Diff line number Diff line change
@@ -1,115 +1,166 @@
var async = require('async');

var INSTANCE_RETRY_MS = 3000,
POLL_INTERVAL_MS = 2000;

/**
* Created by fwang1 on 3/25/15.
*/
module.exports = function(RED) {

var cgGroupName;

function random() {
return "" + Math.floor((Math.random() * 100) + 1) + "-" + Date.now();
}
try {
// If we're on Bluemix, use the unique app ID as the consumer group name
var vcapApplication = JSON.parse(process.env.VCAP_APPLICATION);
cgGroupName = vcapApplication.application_id;
} catch (e) {
// Otherwise use a random string
cgGroupName = "nodered-" + random();
}
console.log("Using " + cgGroupName + " as consumer group name");

/*
* MessageHub Producer
*/
function MessageHubProducer(config) {
RED.nodes.createNode(this, config);

var node = this;
var MessageHub = require('message-hub-rest');

var apikey = config.apikey;
var kafka_rest_url = config.kafkaresturl;
var services = {
"messagehub": [
{
var node = this,
MessageHub = require('message-hub-rest'),
apikey = config.apikey,
kafka_rest_url = config.kafkaresturl,
services = {
"messagehub": [{
"credentials": {
"api_key": apikey,
"kafka_rest_url": kafka_rest_url
}
}
]
}
}]
},
topic = config.topic;

var instance = new MessageHub(services);
var topic = config.topic;
RED.nodes.createNode(node, config);

try {
this.on("input", function(msg) {
var payloads = [];
node.on("input", function(msg) {
var instance = new MessageHub(services),
payloads = [];

node.log(msg.payload);
payloads.push(msg.payload);
payloads.push(msg.payload);

var list = new MessageHub.MessageList(payloads);
var list = new MessageHub.MessageList(payloads);

instance.produce(topic, list.messages)
instance.produce(topic, list.messages)
.then(function(data) {
node.log("Message sent");
node.log(data);
node.log("Message successfully published on messsage hub");
console.log(data);
})
.fail(function(error) {
node.error(error);
node.error("Unable to publish message on message hub (" + error.message + ")");
});
});
}
catch(e) {
node.error(e);
}
});
}

RED.nodes.registerType("messagehub out", MessageHubProducer);

/*
* Message Hub Consumer
*/
function createConsumerInstance(messageHub, topic, cb) {
if (!messageHub) {
cb(new Error("Message Hub client is null"));
return;
}
messageHub.consume('nodered-' + cgGroupName, random(), {
'auto.offset.reset': 'largest'
})
.then(function(response) {
cb(null, response[0]);
return;
})
.fail(function(err) {
cb(err);
return;
});
}

function poll(consumerInstance, topic, cb) {
if (!consumerInstance) {
cb(new Error("Consumer instance is null"));
return;
}
consumerInstance.get(topic)
.then(function(data) {
cb(null, data);
})
.fail(function(err) {
cb(err);
});
}

function MessageHubConsumer(config) {
RED.nodes.createNode(this,config);

var node = this;
var MessageHub = require('message-hub-rest');
var apikey = config.apikey;
var kafka_rest_url = config.kafkaresturl;
var services = {
"messagehub": [
{
RED.nodes.createNode(this, config);

var node = this,
MessageHub = require('message-hub-rest'),
apikey = config.apikey,
kafka_rest_url = config.kafkaresturl,
services = {
"messagehub": [{
"credentials": {
"api_key": apikey,
"kafka_rest_url": kafka_rest_url
}
}
]
}

var instance = new MessageHub(services);
var topic = config.topic;
var consumerInstance;

function random() {
return Math.floor((Math.random() * 100) + 1);
}
}]
},
messageHub = new MessageHub(services),
topic = config.topic;

node.log(topic);
instance.consume('nodered-' + topic + "-" + random(), 'nodered', { 'auto.offset.reset': 'largest' })
.then(function(response) {
consumerInstance = response[0];
})
.fail(function(error) {
node.error(error);
});
var currentErr;

try {
this.log("Consumer created...");
setInterval(function() {
consumerInstance.get(topic)
.then(function(data) {
for(var i=0; i<data.length; i++) {
node.send({payload: data[i]});
async.forever(function(next) {
createConsumerInstance(messageHub, topic, function(err, consumerInstance) {
if (err) { // We keep on retrying in case of error - it could be just that Message Hub is not responding
if (!currentErr) {
node.error("Message hub is in error: " + err.message);
}
})
.fail(function(err) {
node.error(err);
});
}, 2000);
}
catch(e){
node.error(e);
return;
}
currentErr = err;
setTimeout(next, INSTANCE_RETRY_MS);
} else {
if (currentErr) {
node.log("Message hub is back to work");
currentErr = null;
}
async.forever(function(_next) {
poll(consumerInstance, topic, function(err, data) {
if (err) {
_next(err);
} else {
if (currentErr) {
node.log("Message hub is back to work");
} else if (currentErr === undefined) {
node.log("Message hub is connected for topic " + topic);
}
currentErr = null;
for (var i = 0; i < data.length; i++) {
node.send({
payload: data[i]
});
}
setTimeout(_next, POLL_INTERVAL_MS);
}
});
}, function(err) {
if (!currentErr) {
node.error("Message hub is in error: " + err.message);
}
currentErr = err;
next();
});
}
});
});
}

RED.nodes.registerType("messagehub in", MessageHubConsumer);
};
};
34 changes: 17 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
{
"name": "node-red-contrib-messagehub-node",
"version": "0.0.2",
"version": "0.1.0",
"description": "Node-RED nodes of HighLevel MessageHub Producer and Consumer",
"dependencies": {
"async": "^2.1.5",
"message-hub-rest": "^1.1.2"
},
"repository": {
"type": "git",
"url": "https://github.com/luiselizondo/node-red-contrib-messagehub-node.git"
"url": "https://github.com/bluemixgarage/node-red-contrib-messagehub-node"
},
"license": "Apache-2.0",
"keywords": [
Expand All @@ -32,20 +33,19 @@
}
},
"author": {
"name": "IBM",
"name": "IBM"
},
"maintainers": [
{
"name": "luiselizondo",
"email": "luis.elizondo@mx1.ibm.com"
},
{
"name": "niall-weedon",
"email": "nweedo3@uk.ibm.com"
},
{
"name": "kfarmer3",
"email": "kfarme3@uk.ibm.com"
}
]
"maintainers": [{
"name": "luiselizondo",
"email": "luis.elizondo@mx1.ibm.com"
}, {
"name": "niall-weedon",
"email": "nweedo3@uk.ibm.com"
}, {
"name": "kfarmer3",
"email": "kfarme3@uk.ibm.com"
}, {
"name": "BluemixGarage",
"email": "garage@fr.ibm.com"
}]
}