Skip to content

Commit

Permalink
feat(connection): add throwExceptionOnConnectionError connection option
Browse files Browse the repository at this point in the history
  • Loading branch information
tahubu committed Oct 16, 2020
1 parent 902e594 commit 7b4ba08
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 6 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,35 @@ $ npm install --save @team-supercharge/nest-amqp
In the subsections you can see how to send and receive messages, how to handle message transfer and how to use DTO classes to message
payload transformation and validation.

### Create connection

To create a connection, you have to import the `QueueModule.forRoot()` module into your application's root module. The `forRoot()`
static method has 2 parameters: the first and required is the connection URI string, and the second is the *optional* connection
options object. The connection options object extends the [Rhea's connection options](https://www.npmjs.com/package/rhea#connectoptions)
and accepts these new properties:
* **throwExceptionOnConnectionError**?: A boolean value. If it's `true` then AMQPModule will throw forward the exception which occurs
during the connection creation. Default value is `false`.

> Note: the AMQPModule package does not support multiple connection!
```typescript
import { Module } from '@nestjs/common';
import { QueueModule } from '@team-supercharge/nest-amqp';

@Module({
imports: [
QueueModule.forRoot(
'amqp://user:password@localhost:5672',
{
throwExceptionOnConnectionError: true
}
),
// ...
],
})
export class AppModule {}
```

### Send a message

You can send messages with the `QueueService`. First you have to inject the service instance in the constructor, then you can use it to
Expand Down
4 changes: 3 additions & 1 deletion src/interface/amqp-connection-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ import { ConnectionOptions } from 'rhea-promise';
*
* @publicApi
*/
export type AMQPConnectionOptions = ConnectionOptions
export type AMQPConnectionOptions = ConnectionOptions & {
throwExceptionOnConnectionError?: boolean;
}
32 changes: 31 additions & 1 deletion src/service/amqp/amqp.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ describe('AMQPService', () => {
let connection: Connection;
let connectionEvents: Array<{ event: ConnectionEvents; callback: (context: any) => any }> = [];
let senderEvents: Array<{ event: SenderEvents; callback: (context: any) => any }> = [];
let connectionOpenMock: jest.Mock = jest.fn().mockResolvedValue(null);
const receiverEvents: Array<{ event: ReceiverEvents; callback: (context: any) => any }> = [];

beforeAll(() => {
// mock the Connection constructor
(Connection as any).mockImplementation(() => ({
on: (event: ConnectionEvents, callback: (context: any) => any) => connectionEvents.push({ event, callback }),
open: jest.fn().mockResolvedValue(null),
open: connectionOpenMock,
close: jest.fn().mockResolvedValue(null),
createAwaitableSender: jest.fn().mockResolvedValue({
on: (event: SenderEvents, callback: (context: any) => any) => senderEvents.push({ event, callback }),
Expand Down Expand Up @@ -68,6 +69,35 @@ describe('AMQPService', () => {
expect((connection as any).open).toHaveBeenCalled();
});

it('should create throw error if connection options is not a valid object', async () => {
await expect(
AMQPService.createConnection(connectionSecureUri, null)
).rejects.toThrow(/connection options must an object/);
});

describe('connection options', () => {
it('should not throw connection error by default', async () => {
connectionOpenMock = jest.fn().mockRejectedValue(new Error('Test'));

await expect(
AMQPService.createConnection(connectionUri)
).resolves.toBeInstanceOf(Object);

connectionOpenMock = jest.fn().mockResolvedValue(null);
});

it('should throw connection error by connection options', async () => {
const exception = new Error('Test');
connectionOpenMock = jest.fn().mockRejectedValue(exception);

await expect(
AMQPService.createConnection(connectionUri, { throwExceptionOnConnectionError: true })
).rejects.toBe(exception);

connectionOpenMock = jest.fn().mockResolvedValue(null);
});
});

it('should listen to connection events', async () => {
connectionEvents = [];

Expand Down
21 changes: 17 additions & 4 deletions src/service/amqp/amqp.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class AMQPService {

/**
* Parses the connection URI and connect to the message broker by the given
* informations.
* information.
*
* NOTE: If the connection closes and there was no error then the service will
* attempt to reconnect to the message broker but only once.
Expand All @@ -33,13 +33,18 @@ export class AMQPService {
* ```
*
* @param {string} connectionUri The URI which contains the main connection settings.
* @param {object} [connectionOptions] Options for the `rhea-promise` Connection.
* @param {AMQPConnectionOptions} [connectionOptions] Options for the `rhea-promise` Connection.
* @return {Connection} The created `rhea-promise` Connection.
* @static
*/
public static async createConnection(connectionUri: string, connectionOptions: AMQPConnectionOptions = {}): Promise<Connection> {
if (Object.prototype.toString.call(connectionOptions) !== '[object Object]') {
throw new Error('AMQPModule connection options must an object');
}

logger.info('creating AMQP client');

const { throwExceptionOnConnectionError, ...rheaConnectionOptions } = connectionOptions;
const { protocol, username, password, hostname, port } = new URL(connectionUri);

logger.info('initializing client connection to', {
Expand All @@ -56,7 +61,7 @@ export class AMQPService {
transport: protocol === 'amqps:' ? 'ssl' : 'tcp',
host: hostname,
port: Number.parseInt(port, 10),
...connectionOptions,
...rheaConnectionOptions,
});

connection.on(ConnectionEvents.connectionOpen, (_: EventContext) => {
Expand Down Expand Up @@ -94,7 +99,15 @@ export class AMQPService {
logger.warn('connection closed by peer', context);
});

await connection.open();
try {
await connection.open();
} catch (err) {
logger.error('connection error', err);

if (throwExceptionOnConnectionError === true) {
throw err;
}
}
logger.info('created AMQP connection');

return connection;
Expand Down

0 comments on commit 7b4ba08

Please sign in to comment.