Skip to content

Commit

Permalink
Merge pull request #22 from radiofrance/prePublish
Browse files Browse the repository at this point in the history
Add prePublish option
  • Loading branch information
zckrs authored Apr 19, 2021
2 parents 1fa6c96 + 233a290 commit dbbe813
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ jobs:
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/thirdQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/fourthQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/fithQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/sixthQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPUT http://127.0.0.1:15672/api/queues/%2f/unackedQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.random.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/firstQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.randomBis.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/secondQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test3.*.routingKey.test3"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/thirdQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test4.*.routingKey.test4"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/fourthQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test5.*.routingKey.test5"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/fithQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"test6.*.routingKey.test6"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/sixthQueue
- run: curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"routing_key":"*.randomUnacked.routingKey.*"}' http://127.0.0.1:15672/api/bindings/%2f/e/rf/q/unackedQueue

- run: npm install
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ Parameters: `message` (see below), `subscriberResult` (ACK/NACK/REJECT)

Function run after each message treatment.

##### prePublish

Type: `function`<br>
Default: null
Parameters: `routingKey`, `content`, `properties` (see publish method)
Must return an object with {routingKey, content, properties}

Function run before each publish call

### rabQ.start()

Starts a connection.
Expand Down
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class RabQ extends EventEmitter {

this.beforeHook = opts.beforeHook || (() => {});
this.afterHook = opts.afterHook || (() => {});
this.prePublish = opts.prePublish || null;

_connection.set(this, undefined);
_channel.set(this, undefined);
Expand Down Expand Up @@ -192,6 +193,10 @@ class RabQ extends EventEmitter {
properties.headers['x-query-token'] = uuid.v4();
}

if (this.prePublish) {
({routingKey, content, properties} = this.prePublish(routingKey, content, properties));
}

this.emit('log', {
level: 'info',
uuid: content.uuid,
Expand Down
26 changes: 26 additions & 0 deletions test/set-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ test('set before and after hooks', async t => {
c.afterHook = (msg, result) => {
afterHookMsg = msg.test + result;
};

const p = await makeRabQ(c);

p.subscribesTo(/test5\.random\.routingKey\.test5/, message => {
Expand All @@ -215,3 +216,28 @@ test('set before and after hooks', async t => {
t.is(afterHookMsg, 'beforeACK');
});
});

test('set prePublish', async t => {
t.plan(2);

const contentToSend = {toto: 'tata'};

const c = Object.assign({}, minimalOptions);
c.queues = 'sixthQueue';
c.prePublish = (routingKey, content, properties) => {
t.is(routingKey, 'test6.random.routingKey.test6');
properties.headers.test = 'TEST';
return {routingKey, content, properties};
};

const p = await makeRabQ(c);

p.subscribesTo(/test6\.random\.routingKey\.test6/, message => {
t.is(message.originMsg.properties.headers.test, 'TEST');
return Promise.resolve(message.ACK);
});

p.publish('test6.random.routingKey.test6', contentToSend);

return delay(1000);
});

0 comments on commit dbbe813

Please sign in to comment.