Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 38 additions & 22 deletions messagehub.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ module.exports = function(RED) {
RED.nodes.createNode(this,config);

var node = this;
node.loop = true;
var interval = parseInt(config.interval) || 2000;
var service = RED.nodes.getNode(config.service);

var instance = new MessageHub(service.getService());
var instance
var timeout
var topic = config.topic;

function random() {
Expand All @@ -158,6 +158,7 @@ module.exports = function(RED) {

function getTopicAndSend(consumerInstance) {
var lastCheck = Date.now();

if (!node.loop)
return;

Expand All @@ -169,7 +170,9 @@ module.exports = function(RED) {
}
})
.fail(function(err) {
node.error("Error getting topic: '"+topic+"' has failed", {error: err});
node.error("Error getting topic: '"+topic+"' has failed. Retrying in 1 minute.", {error: err});
node.loop = false
timeout = setTimeout(connectConsumer,60000)
})
.then(function() {
var deferred = Q.defer();
Expand All @@ -182,31 +185,44 @@ module.exports = function(RED) {
});
}

node.log('Creating consumer for topic: \'' + topic + '\'');
function connectConsumer(){
node.log('Creating consumer for topic: \'' + topic + '\'');

try {
instance
.consume('nodered-' + topic + "-" + random(), 'nodered', { 'auto.offset.reset': 'largest' })
.then(function(response) {
return response[0];
})
.then(function(consumerInstance) {
node.log("Consumer created...");
node.status({fill:"green", shape:"ring", text:"Connected"});
return getTopicAndSend(consumerInstance)
})
.fail(function(error) {
node.status({fill:"red", shape:"ring", text: error.message});
node.error('Error creating consumer', {error: error});
});
} catch(e) {
node.error(e);
node.status({fill:"red", shape:"ring", text:"Error while consuming"});
instance = new MessageHub(service.getService());
node.consumerConnected = false
timeout = setTimeout(() => {if(!node.consumerConnected)connectConsumer()},300000)
try {
instance
.consume('nodered-' + topic + "-" + random(), 'nodered', { 'auto.offset.reset': 'largest' })
.then(function(response) {
return response[0];
})
.then(function(consumerInstance) {
node.log("Consumer created...");
node.status({fill:"green", shape:"ring", text:"Connected"});
node.loop = true
node.consumerConnected = true
return getTopicAndSend(consumerInstance)
})
.fail(function(error) {
node.status({fill:"red", shape:"ring", text: error.message});
node.error('Error creating consumer. Retrying in 1 minute.', {error: error});
timeout = setTimeout(connectConsumer,60000)
});
} catch(e) {
node.error(e);
node.status({fill:"red", shape:"ring", text:"Error while consuming. Retrying in 1 minute."});
timeout = setTimeout(connectConsumer,60000)
}
}

connectConsumer()

this.on('close', function() {
node.loop = false;
clearTimeout(timeout);
});

}

RED.nodes.registerType("messagehub in", MessageHubConsumer);
Expand Down