Nodejs Message queue rabbitMQ
To install
npm install raintank/raintank-queue
This module provides wrapper for the amqplib module to handle errors and disconnections from the rabbitMQ server. If an error occurs while trying to publish, the publish method will try up to 'retryCount' time before giving up. If an error occurs while consuming, the consumer will try and restart up to 'retryCount' times, or keep trying forever if retryCount is less then 0
Producer (send messages to the queue/exchange);
var Producer = require('raintank-queue').Producer;
var p = new Producer({
url: "amqp://localhost",
exchangeName: "ex1", //exchange name
exchangeType: "fanout", //exchange type.
retryCount: 5, // how many times to try sending the message before giving up.
retryDelay: 1000, // delay between each retry.
});
p.publish(JSON.stringify({id: 1, data: "text"}), function(err) {
if (err) {
console.log("error publishing message.", err);
}
//else mesage was sent successfully.
});
Consumer (recieve messages from the queue.)
var Consumer = require('raintank-queue').Consumer;
var c = new Consumer({
url: "amqp://localhost",
exchangeName: "ex1", //this should match the name of the exchange the producer is using.
exchangeType: "fanout", // this should match the exchangeType the producer is using.
queueName: '', //leave blank for an auto generated name. recommend when creating an exclusive queue.
exclusive: true, //make the queue exclusive.
durable: false,
autoDelete: true,
queuePattern: null, //fanout exchanges get all messages, so we dont need to bind with a pattern.
retryCount: -1, // keep trying to connect forever.
handler: processMessage
});
function processMessage(message) {
console.log(message.content.toString());
}
c.on('error', function(err) {
console.log("consumer emitted fatal error.")
console.log(err);
process.exit(1);
});