diff --git a/messagehub.js b/messagehub.js index b386f89..8262dfe 100644 --- a/messagehub.js +++ b/messagehub.js @@ -1,115 +1,166 @@ +var async = require('async'); + +var INSTANCE_RETRY_MS = 3000, + POLL_INTERVAL_MS = 2000; + /** * Created by fwang1 on 3/25/15. */ module.exports = function(RED) { + + var cgGroupName; + + function random() { + return "" + Math.floor((Math.random() * 100) + 1) + "-" + Date.now(); + } + try { + // If we're on Bluemix, use the unique app ID as the consumer group name + var vcapApplication = JSON.parse(process.env.VCAP_APPLICATION); + cgGroupName = vcapApplication.application_id; + } catch (e) { + // Otherwise use a random string + cgGroupName = "nodered-" + random(); + } + console.log("Using " + cgGroupName + " as consumer group name"); + /* * MessageHub Producer */ function MessageHubProducer(config) { - RED.nodes.createNode(this, config); - - var node = this; - var MessageHub = require('message-hub-rest'); - - var apikey = config.apikey; - var kafka_rest_url = config.kafkaresturl; - var services = { - "messagehub": [ - { + var node = this, + MessageHub = require('message-hub-rest'), + apikey = config.apikey, + kafka_rest_url = config.kafkaresturl, + services = { + "messagehub": [{ "credentials": { "api_key": apikey, "kafka_rest_url": kafka_rest_url } - } - ] - } + }] + }, + topic = config.topic; - var instance = new MessageHub(services); - var topic = config.topic; + RED.nodes.createNode(node, config); - try { - this.on("input", function(msg) { - var payloads = []; + node.on("input", function(msg) { + var instance = new MessageHub(services), + payloads = []; - node.log(msg.payload); - payloads.push(msg.payload); + payloads.push(msg.payload); - var list = new MessageHub.MessageList(payloads); + var list = new MessageHub.MessageList(payloads); - instance.produce(topic, list.messages) + instance.produce(topic, list.messages) .then(function(data) { - node.log("Message sent"); - node.log(data); + node.log("Message successfully published on messsage hub"); + console.log(data); }) .fail(function(error) { - node.error(error); + node.error("Unable to publish message on message hub (" + error.message + ")"); }); - }); - } - catch(e) { - node.error(e); - } + }); } - RED.nodes.registerType("messagehub out", MessageHubProducer); /* * Message Hub Consumer */ + function createConsumerInstance(messageHub, topic, cb) { + if (!messageHub) { + cb(new Error("Message Hub client is null")); + return; + } + messageHub.consume('nodered-' + cgGroupName, random(), { + 'auto.offset.reset': 'largest' + }) + .then(function(response) { + cb(null, response[0]); + return; + }) + .fail(function(err) { + cb(err); + return; + }); + } + + function poll(consumerInstance, topic, cb) { + if (!consumerInstance) { + cb(new Error("Consumer instance is null")); + return; + } + consumerInstance.get(topic) + .then(function(data) { + cb(null, data); + }) + .fail(function(err) { + cb(err); + }); + } + function MessageHubConsumer(config) { - RED.nodes.createNode(this,config); - - var node = this; - var MessageHub = require('message-hub-rest'); - var apikey = config.apikey; - var kafka_rest_url = config.kafkaresturl; - var services = { - "messagehub": [ - { + RED.nodes.createNode(this, config); + + var node = this, + MessageHub = require('message-hub-rest'), + apikey = config.apikey, + kafka_rest_url = config.kafkaresturl, + services = { + "messagehub": [{ "credentials": { "api_key": apikey, "kafka_rest_url": kafka_rest_url } - } - ] - } - - var instance = new MessageHub(services); - var topic = config.topic; - var consumerInstance; - - function random() { - return Math.floor((Math.random() * 100) + 1); - } + }] + }, + messageHub = new MessageHub(services), + topic = config.topic; - node.log(topic); - instance.consume('nodered-' + topic + "-" + random(), 'nodered', { 'auto.offset.reset': 'largest' }) - .then(function(response) { - consumerInstance = response[0]; - }) - .fail(function(error) { - node.error(error); - }); + var currentErr; - try { - this.log("Consumer created..."); - setInterval(function() { - consumerInstance.get(topic) - .then(function(data) { - for(var i=0; i