diff --git a/README.md b/README.md index 7683262..f87ada1 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,35 @@ $ npm install --save @team-supercharge/nest-amqp In the subsections you can see how to send and receive messages, how to handle message transfer and how to use DTO classes to message payload transformation and validation. +### Create connection + +To create a connection, you have to import the `QueueModule.forRoot()` module into your application's root module. The `forRoot()` +static method has 2 parameters: the first and required is the connection URI string, and the second is the *optional* connection +options object. The connection options object extends the [Rhea's connection options](https://www.npmjs.com/package/rhea#connectoptions) +and accepts these new properties: +* **throwExceptionOnConnectionError**?: A boolean value. If it's `true` then AMQPModule will throw forward the exception which occurs + during the connection creation. Default value is `false`. + +> Note: the AMQPModule package does not support multiple connection! + +```typescript +import { Module } from '@nestjs/common'; +import { QueueModule } from '@team-supercharge/nest-amqp'; + +@Module({ + imports: [ + QueueModule.forRoot( + 'amqp://user:password@localhost:5672', + { + throwExceptionOnConnectionError: true + } + ), + // ... + ], +}) +export class AppModule {} +``` + ### Send a message You can send messages with the `QueueService`. First you have to inject the service instance in the constructor, then you can use it to diff --git a/src/interface/amqp-connection-options.interface.ts b/src/interface/amqp-connection-options.interface.ts index 7d4e3df..ff90178 100644 --- a/src/interface/amqp-connection-options.interface.ts +++ b/src/interface/amqp-connection-options.interface.ts @@ -5,4 +5,6 @@ import { ConnectionOptions } from 'rhea-promise'; * * @publicApi */ -export type AMQPConnectionOptions = ConnectionOptions +export type AMQPConnectionOptions = ConnectionOptions & { + throwExceptionOnConnectionError?: boolean; +} diff --git a/src/service/amqp/amqp.service.spec.ts b/src/service/amqp/amqp.service.spec.ts index 8e6766c..166cf28 100644 --- a/src/service/amqp/amqp.service.spec.ts +++ b/src/service/amqp/amqp.service.spec.ts @@ -15,13 +15,14 @@ describe('AMQPService', () => { let connection: Connection; let connectionEvents: Array<{ event: ConnectionEvents; callback: (context: any) => any }> = []; let senderEvents: Array<{ event: SenderEvents; callback: (context: any) => any }> = []; + let connectionOpenMock: jest.Mock = jest.fn().mockResolvedValue(null); const receiverEvents: Array<{ event: ReceiverEvents; callback: (context: any) => any }> = []; beforeAll(() => { // mock the Connection constructor (Connection as any).mockImplementation(() => ({ on: (event: ConnectionEvents, callback: (context: any) => any) => connectionEvents.push({ event, callback }), - open: jest.fn().mockResolvedValue(null), + open: connectionOpenMock, close: jest.fn().mockResolvedValue(null), createAwaitableSender: jest.fn().mockResolvedValue({ on: (event: SenderEvents, callback: (context: any) => any) => senderEvents.push({ event, callback }), @@ -68,6 +69,35 @@ describe('AMQPService', () => { expect((connection as any).open).toHaveBeenCalled(); }); + it('should create throw error if connection options is not a valid object', async () => { + await expect( + AMQPService.createConnection(connectionSecureUri, null) + ).rejects.toThrow(/connection options must an object/); + }); + + describe('connection options', () => { + it('should not throw connection error by default', async () => { + connectionOpenMock = jest.fn().mockRejectedValue(new Error('Test')); + + await expect( + AMQPService.createConnection(connectionUri) + ).resolves.toBeInstanceOf(Object); + + connectionOpenMock = jest.fn().mockResolvedValue(null); + }); + + it('should throw connection error by connection options', async () => { + const exception = new Error('Test'); + connectionOpenMock = jest.fn().mockRejectedValue(exception); + + await expect( + AMQPService.createConnection(connectionUri, { throwExceptionOnConnectionError: true }) + ).rejects.toBe(exception); + + connectionOpenMock = jest.fn().mockResolvedValue(null); + }); + }); + it('should listen to connection events', async () => { connectionEvents = []; diff --git a/src/service/amqp/amqp.service.ts b/src/service/amqp/amqp.service.ts index d2bde58..8ba87be 100644 --- a/src/service/amqp/amqp.service.ts +++ b/src/service/amqp/amqp.service.ts @@ -23,7 +23,7 @@ export class AMQPService { /** * Parses the connection URI and connect to the message broker by the given - * informations. + * information. * * NOTE: If the connection closes and there was no error then the service will * attempt to reconnect to the message broker but only once. @@ -33,13 +33,18 @@ export class AMQPService { * ``` * * @param {string} connectionUri The URI which contains the main connection settings. - * @param {object} [connectionOptions] Options for the `rhea-promise` Connection. + * @param {AMQPConnectionOptions} [connectionOptions] Options for the `rhea-promise` Connection. * @return {Connection} The created `rhea-promise` Connection. * @static */ public static async createConnection(connectionUri: string, connectionOptions: AMQPConnectionOptions = {}): Promise { + if (Object.prototype.toString.call(connectionOptions) !== '[object Object]') { + throw new Error('AMQPModule connection options must an object'); + } + logger.info('creating AMQP client'); + const { throwExceptionOnConnectionError, ...rheaConnectionOptions } = connectionOptions; const { protocol, username, password, hostname, port } = new URL(connectionUri); logger.info('initializing client connection to', { @@ -56,7 +61,7 @@ export class AMQPService { transport: protocol === 'amqps:' ? 'ssl' : 'tcp', host: hostname, port: Number.parseInt(port, 10), - ...connectionOptions, + ...rheaConnectionOptions, }); connection.on(ConnectionEvents.connectionOpen, (_: EventContext) => { @@ -94,7 +99,15 @@ export class AMQPService { logger.warn('connection closed by peer', context); }); - await connection.open(); + try { + await connection.open(); + } catch (err) { + logger.error('connection error', err); + + if (throwExceptionOnConnectionError === true) { + throw err; + } + } logger.info('created AMQP connection'); return connection;