-
Notifications
You must be signed in to change notification settings - Fork 25
/
publisher.js
66 lines (58 loc) · 1.56 KB
/
publisher.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
'use strict';
/**
*
* @module client/publisher
*/
const isObject = require('lodash/isObject');
const isFunction = require('lodash/isFunction');
const uuid = require('uuid');
// Module API
module.exports = createPublisher;
/**
* IANA JSON Media Type string.
* See https://tools.ietf.org/html/rfc4627
*
* @type {String}
*/
const JSON_CONTENT_TYPE = 'application/json';
function createPublisher(
ch,
{ replyQueue, replyHandler, correlationId = uuid.v4() } = {}
) {
if (!isObject(ch)) {
throw new TypeError(
'Channel parameter `ch` must be provided (got: [' + typeof ch + '])'
);
}
const handleReply = isFunction(replyHandler)
? replyHandler
: Function.prototype;
function publish(message, exchange, rk, options) {
const opts = Object.assign({}, options, {
replyTo: replyQueue,
contentType: JSON_CONTENT_TYPE,
correlationId: correlationId
});
return ch.publish(exchange, rk, Buffer.from(message), opts);
}
function consumeReply(message) {
// Only handle the message if it belongs to this client.
// Consumer must supply the `correlationId` sent with the
// original message
if (message.properties.correlationId === correlationId) {
const content = message.content ? message.content.toString() : undefined;
handleReply(content);
}
}
function awaitReply() {
// Consume from callback queue and await response
return ch.consume(replyQueue, consumeReply, {
noAck: true
});
}
const Publisher = {
publish,
awaitReply
};
return Object.create(Publisher);
}