Skip to content

Latest commit

 

History

History
334 lines (222 loc) · 8.1 KB

README.md

File metadata and controls

334 lines (222 loc) · 8.1 KB

rab-q

🐇 💨 💨 💨 💨 🐢
Bunnies love turtles
A tiny (opinionated) wrapper over amqplib for RabbitMQ publish-subscribe pattern

Table of Contents

Features

  • minimal dependencies: only amqplib and uuid are needed
  • promises-based: async/await support makes it easy to use
  • small api: 4 methods only, there's not much to learn 😉
  • instantiation-able: the library itself is a Class ; get as many independent instance as needed
  • events logging: choose your own log transport
  • disconnections-proof: auto reconnect/resend when errors happen

Our use case at @RadioFrance

This wrapper was tailored made for our microservices architecture.

We use RabbitMQ as the bus in the publish-subscribe pattern. A message is never sent directly to a queue, it is always sent to a configured exchange.

Every message should be acknowledged by returning a value : ACK, NACK, REJECT (message.ACK, message.NACK or message.REJECT in the message object).

A non-acknowledged message will be redelivered one time to the exchange. A rejected message will be directly deleted or redirected to the dead letter exchange if configured.

Our queues and exchanges are defined by a external engine. rab-q doesn't provide API to create, assert or delete these.

Installation

$ npm install rab-q

Usage

const RabQ = require('rab-q');

const rabQ = new RabQ({
  exchange: 'your_topic_exchange',
  queues: 'queue_bind_to_exchange'
});

rabQ.on('error', err => {
  console.error(err);
});
rabQ.on('log', log => {
  console[log.level](log.msg, log.err || '');
});

rabQ.subscribesTo(/.*/, message => {
  // Do stuff!

  // Then acknowledge message by returning a constant
  return message.ACK;
});

rabQ.start()
  .then(() => {
    rabQ.publish('yourRoutingKeyHere', {your: 'message'});
  });

API

new RabQ(options)

options

username

Type: string
Default: guest

User to connect on RabbitMQ.

password

Type: string
Default: guest

Password associate to username.

protocol

Type: amqp|amqps
Default: amqp

Connection protocol.

hostname

Type: string
Default: localhost

Hostname to trying to get connection.

port

Type: number
Default: 5672

Port of connection.

socketOptions

Type: object Default: null

socketOptions for amqplib

https://amqp-node.github.io/amqplib/channel_api.html#connect

vhost

Type: string
Default: /

Selected virtual host.

exchange

Required
Type: string

Exchange to publish messages.

queues

Type: Array<string> string

Consumes messages from these queues.

If this option is omitted or has a falsey value, server will create random names.

maxMessages

Type: number
Default: 10

Defines the number of messages prefetched by channel. Once the max number of messages is reached, RabbitMQ will wait for some messages to be acknowledged before proceeding with new messages.

nackDelay

Type: number
Default: 0

Defines a delay in milliseconds before a message is rejected with NACK.

reconnectInterval

Type: number (milliseconds)
Default: 0

Time in milliseconds before trying to reconnect when connection is lost.

autoAck

Type: boolean
Default: false

Enables auto acknowledgment.

autoReconnect

Type: boolean
Default: true

Enables auto reconnection if an error happens while connecting to the server.

validators

Type: Object of function

validators.consumer

Type: boolean
Default: return true

Function run before each message treatment. If it return a false value, the message is reject.

beforeHook

Type: function
Default: () => {} Parameters: message (see below)

Function run before each message treatment, can modify message.

afterHook

Type: function
Default: () => {} Parameters: message (see below), subscriberResult (ACK/NACK/REJECT)

Function run after each message treatment.

prePublish

Type: function
Default: null Parameters: routingKey, content, properties (see publish method) Must return an object with {routingKey, content, properties}

Function run before each publish call

rabQ.start()

Starts a connection.

Returns a promise resolved with true for a successful connection or false if a connection already exists.

rabQ.stop()

Stops and closes a connection.

rabQ.publish(routingKey, content, properties)

Publishes a message on exchange rabQ.exchange.

routingKey

Required
Type: string

A regular expression to match the routing keys of consumed messages.

content

Required
Type: Object

A message object to send to the exchange.

properties

Type: Object
Default: {}

Adds RabbitMQ properties to the message. See http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish (options)

If properties does not have a headers key, properties is considered to be headers.

You can provide a property x-query-token in headers to trace the lifecyle of a request. If not provided a new UUID will be generated.

rabQ.subscribesTo(patternMatch, action)

Adds a Subscriber on consumed messages filtered by the routing key

patternMatch

Required
Type: RegExp

A regular expression to match on the routing keys of consumed messages.

action(message)

Required
Type: Function

A function should acknowledge (or not) messages by returning either a value or a resolved promise with value. ('ACK', 'NACK' or 'REJECT')

message is a object with following properties:

  • ACK string: the constant returned for a positive acknowledgment
  • NACK string: the constant returned for a negative acknowledgment. NACK will re-queuing message one time.
  • REJECT string: the constant returned for a rejection without redelivery.
  • content Object: content of the received message
  • rk string: routing key of the message
  • queue string: queue name where the message is consumed
  • token string: a UUID to identify the message
  • consumeAt number: timestamp when the message is consumed

Events

'log'

Emits a log object with the following properties:

  • level string: can be info, warn, error
  • token string: the UUID to identify the message
  • msg string: the message,
  • err Object: the original error if log level is error

'error'

Emits an Error if something goes wrong with the connection. If you don't catch this event the process will sto

See Also

License

CECILL-B