Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
EverettSummer committed Mar 20, 2022
2 parents 6cd7b87 + ed91db1 commit bf8c3da
Showing 1 changed file with 108 additions and 59 deletions.
167 changes: 108 additions & 59 deletions src/services/RabbitMQService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,59 +26,36 @@ import pino from 'pino';
import { capture } from '../utils/sentry';
import { isFatalError } from 'amqplib/lib/connection';

const CHECK_INTERVAL = 5000;
const logger = pino();
const CHECK_INTERVAL = 5000;

interface QueueSetting {
bindingKey: string;
exchangeName: string;
prefetch: boolean;
}

interface Consumer {
queueName: string;
onMessage: (msg: MQMessage) => Promise<boolean>;
consumerTag: string;
}

@injectable()
export class RabbitMQService {
private _connection: Connection;
private _exchanges = new Map<string, string>();
private _channels = new Map<string, ConfirmChannel>();
private _queues = new Map<string, QueueSetting>();
private _consumers = new Map<string, Consumer>();
private _connected: boolean;

constructor(@inject(TYPES.ConfigManager) private _configManager: ConfigManager,
@inject(TYPES.DatabaseService) private _databaseService: DatabaseService) {
}

public publish(exchangeName: string, routingKey: string, message: any): Promise<boolean> {
const channel = this._channels.get(exchangeName);
return new Promise<boolean>((resolve, reject) => {
channel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message), 'utf-8'),
{},
(err, ok) => {
if (err !== null) {
// TODO: currently not reachable, need to figure out how to test this piece of code.
logger.warn('message nacked');
this.saveMessage(exchangeName, routingKey, message)
.then(() => {
logger.info('message saved, will be resent')
});
reject(err);
} else {
resolve(true);
logger.debug('message acked')
}
});
});
}

public async initPublisher(exchangeName: string, exchangeType: string): Promise<void> {
if (!this._connection || !this._connected) {
await this.connectAsync();
}
const channel = await this._connection.createConfirmChannel();
this._channels.set(exchangeName, channel);
this._exchanges.set(exchangeName, exchangeType);
const channel = await this.addChannel(exchangeName, exchangeType);
await channel.assertExchange(exchangeName, exchangeType);
}

Expand All @@ -91,50 +68,69 @@ export class RabbitMQService {
* @param prefetch, see Fair dispatch in the tutorial (https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html)
*/
public async initConsumer(exchangeName: string, exchangeType: string, queueName: string, bindingKey: string = '', prefetch = false): Promise<void> {
if (!this._connection || !this._connected) {
await this.connectAsync();
}
const channel = await this._connection.createConfirmChannel();
this._channels.set(exchangeName, channel);
const channel = await this.addChannel(exchangeName, exchangeType);
await channel.assertExchange(exchangeName, exchangeType);
const q = await channel.assertQueue(queueName);
if (prefetch) {
await channel.prefetch(1);
}
await channel.bindQueue(q.queue, exchangeName, bindingKey);
this._exchanges.set(exchangeName, exchangeType);
this._queues.set(queueName, {bindingKey, exchangeName, prefetch});
}

public async consume(queueName: string, onMessage: (msg: MQMessage) => Promise<boolean>): Promise<string> {
const exchangeName = this._queues.get(queueName).exchangeName;
public publish(exchangeName: string, routingKey: string, message: any): Promise<boolean> {
const channel = this._channels.get(exchangeName);
const result = await channel.consume(queueName, async (msg) => {
if (msg) {
const mqMsg = JSON.parse(msg.content.toString('utf-8')) as MQMessage;
if (await onMessage(mqMsg)) {
channel.ack(msg);
} else {
channel.nack(msg);
}
} else {
// TODO: Handle the consumer cancel in this service
if (channel) {
try {
return new Promise<boolean>((resolve, reject) => {
channel.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message), 'utf-8'),
{},
(err, ok) => {
if (err !== null) {
// TODO: currently not reachable, need to figure out how to test this piece of code.
logger.warn('message nacked');
this.saveMessage(exchangeName, routingKey, message)
.then(() => {
logger.info('message saved, will be resent')
});
reject(err);
} else {
resolve(true);
logger.debug('message acked')
}
});
});
} catch (e: any) {
logger.error(e);
capture(e, {stack: e.stack, line: '143', exchangeName, routingKey, message});
return this.saveMessage(exchangeName, routingKey, message)
.then(() => {
return false;
});
}
});
return result.consumerTag;
}
}

public async consume(queueName: string, onMessage: (msg: MQMessage) => Promise<boolean>): Promise<string> {
const consumer = { queueName, onMessage, consumerTag: null };
this._consumers.set(queueName, consumer);
while (consumer.consumerTag == null) {
consumer.consumerTag = await this.setupConsumer(consumer);
}
return consumer.consumerTag;
}

private async connectAsync(): Promise<void> {
this._connection = await connect(this._configManager.amqpServerUrl() || this._configManager.amqpConfig());
this._connection.on('error', (error: any) => {
logger.error({error, event: 'amqp connection error'});
if (this._connected) {
capture(error);
this.reconnect();
}
logger.error(error, {message: 'connection error on amqp', line:60});
capture(error);
});
this._connection.on('close', (error: any) => {
logger.error({error, event: 'amqp connection close'});
logger.error(error || 'closed no error', {message: 'connection closed on amqp', line:67});
if (this._connected && isFatalError(error)) {
capture(error);
this.reconnect();
Expand All @@ -155,7 +151,9 @@ export class RabbitMQService {
}
for (const [queueName, queueSetting] of this._queues.entries()) {
const exchangeType = this._exchanges.get(queueSetting.exchangeName);
await this.initConsumer(queueSetting.exchangeName, exchangeType, queueName, queueSetting.bindingKey, queueSetting.prefetch)
await this.initConsumer(queueSetting.exchangeName, exchangeType, queueName, queueSetting.bindingKey, queueSetting.prefetch);
const consumer = this._consumers.get(queueName);
consumer.consumerTag = await this.setupConsumer(consumer);
}
})
.then(() => {
Expand All @@ -168,6 +166,36 @@ export class RabbitMQService {
}, 5000);
};

private async setupConsumer(consumer: Consumer) {
const exchangeName = this._queues.get(consumer.queueName).exchangeName;
const channel = this._channels.get(exchangeName);
if (channel) {
try {
const result = await channel.consume(consumer.queueName, async (msg) => {
if (msg) {
const mqMsg = JSON.parse(msg.content.toString('utf-8')) as MQMessage;
if (await consumer.onMessage(mqMsg)) {
channel.ack(msg);
} else {
channel.nack(msg);
}
} else {
// TODO: Handle the consumer cancel in this service
consumer.consumerTag = await this.setupConsumer(consumer);
}
});
return result.consumerTag;
} catch (error: any) {
if (error.isOperational && error.message.includes('BasicConsume; 404')){
return null;
}
throw error;
}
} else {
return null;
}
}

private async saveMessage(exchange: string, routingKey: string, content: any): Promise<void> {
const message = new Message();
message.exchange = exchange;
Expand All @@ -185,6 +213,7 @@ export class RabbitMQService {
try {
result = await this.publish(message.exchange, message.routingKey, message.content);
} catch (err) {
capture(err);
result = false;
break;
}
Expand All @@ -195,4 +224,24 @@ export class RabbitMQService {
}
setTimeout(this.resendMessageInFailedQueue.bind(this), CHECK_INTERVAL);
}

private async addChannel(exchangeName: string, exchangeType: string): Promise<ConfirmChannel> {
if (!this._connection || !this._connected) {
await this.connectAsync();
}
const channel = await this._connection.createConfirmChannel();
this._channels.set(exchangeName, channel);
this._exchanges.set(exchangeName, exchangeType);
channel.on('close', () => this.onChannelClose(exchangeName));
channel.on('error', (error) => this.onChannelError(exchangeName, error));
return channel;
}

private onChannelClose(exchangeName): void {
this._channels.set(exchangeName, null);
}
private onChannelError(exchangeName, error: any): void {
this._channels.set(exchangeName, null);
capture(error, {stack: error.stack, line: '215', exchangeName});
}
}

0 comments on commit bf8c3da

Please sign in to comment.