diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a13683a..a7be1f7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,10 +20,7 @@ jobs: matrix: node_version: - 18 - - 15 - - 14 - - 12 - - 10 + - 20 os: - ubuntu-latest steps: @@ -40,6 +37,7 @@ jobs: - run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/fourthQueue - run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/fithQueue - run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/sixthQueue + - run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/seventhQueue - run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/unackedQueue - run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.random.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/firstQueue - run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.randomBis.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/secondQueue @@ -47,11 +45,12 @@ jobs: - run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test4.*.routingKey.test4"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/fourthQueue - run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test5.*.routingKey.test5"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/fithQueue - run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test6.*.routingKey.test6"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/sixthQueue + - run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test7.plaintext.routingKey.test7"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/seventhQueue - run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.randomUnacked.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/unackedQueue - run: npm install - run: npm run lint - - run: npm test + - run: npm test --verbose - uses: codecov/codecov-action@v1 if: matrix.os == 'ubuntu-latest' && matrix.node_version == 14 with: diff --git a/index.d.ts b/index.d.ts index ae26129..51d0089 100644 --- a/index.d.ts +++ b/index.d.ts @@ -35,6 +35,7 @@ export interface RabQConfig { reconnectInterval?: number; autoAck?: boolean; autoReconnect?: boolean; + acceptPlainText?: boolean; validators?: { consumer: (exchange: string, queue: string, parsedMesage: Message) => boolean; }; diff --git a/index.js b/index.js index 7342c79..7d075ab 100644 --- a/index.js +++ b/index.js @@ -34,6 +34,7 @@ class RabQ extends EventEmitter { this.password = opts.password || 'guest'; this.socketOptions = opts.socketOptions; this.vhost = opts.vhost || '/'; // Name of virtual host in RabbitMQ to access queues + this.acceptPlainText = opts.acceptPlainText || false; this.exchange = opts.exchange; // Name of exchange who distribute messages to queues through routing key @@ -219,7 +220,8 @@ class RabQ extends EventEmitter { }); try { - ch.publish(this.exchange, routingKey, new Buffer(JSON.stringify(content)), properties, err => { + const stringContent = this.acceptPlainText && typeof content === 'string' ? content : JSON.stringify(content); + ch.publish(this.exchange, routingKey, Buffer.from(stringContent), properties, err => { if (err) { // Store message when error happened this.messagesToSend[messageId] = { diff --git a/lib/set-consumer.js b/lib/set-consumer.js index db94d3a..3a38ab9 100644 --- a/lib/set-consumer.js +++ b/lib/set-consumer.js @@ -10,7 +10,7 @@ module.exports = (conn, ch, currentQueues, rabQ) => { return ch.consume(queue, msg => { try { const parsedMsg = { - content: JSON.parse(msg.content.toString()), + content: rabQ.acceptPlainText ? msg.content.toString() : JSON.parse(msg.content.toString()), rk: msg.fields.routingKey, queue, token: getTokenFromMessage(msg), diff --git a/lib/validate-options.js b/lib/validate-options.js index f86a3c2..f771183 100644 --- a/lib/validate-options.js +++ b/lib/validate-options.js @@ -41,6 +41,9 @@ module.exports = (opts = {}) => { if (typeof opts.autoAck !== 'boolean' && opts.autoAck) { throw new Error(`opts.autoAck should be a boolean. Currently "${typeof opts.autoAck}"`); } + if (typeof opts.acceptPlainText !== 'boolean' && opts.acceptPlainText) { + throw new Error(`opts.acceptPlainText should be a boolean. Currently "${typeof opts.acceptPlainText}"`); + } if (typeof opts.autoReconnect !== 'boolean' && opts.autoReconnect) { throw new Error(`opts.autoReconnect should be a boolean. Currently "${typeof opts.autoReconnect}"`); } diff --git a/package.json b/package.json index 29dc8d7..fd33dd8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rab-q", - "version": "2.0.4", + "version": "2.0.5", "description": "A tiny (opinionated) wrapper over amqplib for RabbitMQ publish/subscribe pattern", "license": "CECILL-B", "repository": "radiofrance/rab-q", @@ -50,5 +50,8 @@ "capitalized-comments": "off", "no-console": "error" } + }, + "ava": { + "concurrency": 1 } } diff --git a/test/set-consumer.js b/test/set-consumer.js index 382b480..6a8c8c4 100644 --- a/test/set-consumer.js +++ b/test/set-consumer.js @@ -8,11 +8,12 @@ import setConsumer from '../lib/set-consumer'; import minimalOptions from './config.json'; +/* eslint-disable */ const fakeLogger = { debug: () => {}, info: () => {}, warn: () => {}, - error: () => {} + error: (msg) => console.error(msg) }; async function makeRabQ(settings) { @@ -50,7 +51,7 @@ test('set consumer with minimal form subscriber', async t => { return Promise.resolve(message.ACK); }); - p.publish('test1.random.routingKey.test1', contentToSend); + p.publish('test1.random.routingKey.test1', contentToSend, {headers: {test: 'toto'}}); return delay(1000) .then(() => { @@ -242,3 +243,23 @@ test('set prePublish', async t => { return delay(1000); }); + +test('receive plain text message', async t => { + t.plan(1); + const contentToSend = 'this is a plain text message'; + + const c = Object.assign({}, minimalOptions); + c.acceptPlainText = true; + c.queues = 'seventhQueue'; + + const p = await makeRabQ(c); + + p.subscribesTo(/test7\.plaintext\.routingKey\.test7/, message => { + t.is(message.content, contentToSend); + return Promise.resolve(message.ACK); + }); + + p.publish('test7.plaintext.routingKey.test7', contentToSend); + + return delay(1000); +});