Skip to content
This repository has been archived by the owner on Nov 21, 2020. It is now read-only.

Commit

Permalink
feat(message-queue): used rabbitmq
Browse files Browse the repository at this point in the history
BREAKING CHANGE: No longer using redis as message broker
close #223
  • Loading branch information
batamar committed Jul 31, 2019
1 parent 5a44df4 commit fdb26ab
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 104 deletions.
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=

RABBITMQ_HOST=amqp://localhost

# Email
COMPANY_EMAIL_FROM=noreply@erxes.io
DEFAULT_EMAIL_SERVICE=sendgrid
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@axelspringer/graphql-google-pubsub": "^1.2.1",
"@google-cloud/pubsub": "0.18.0",
"@google-cloud/storage": "^2.5.0",
"amqplib": "0.5.3",
"apollo-server-express": "^2.3.1",
"aws-sdk": "^2.151.0",
"bcryptjs": "^2.4.3",
Expand Down
22 changes: 8 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@ import { handleEngageUnSubscribe } from './data/resolvers/mutations/engageUtils'
import { checkFile, getEnv, readFileRequest, uploadFile } from './data/utils';
import { connect } from './db/connection';
import { debugExternalApi, debugInit } from './debuggers';
import './messageQueue';
import integrationsApiMiddleware from './middlewares/integrationsApiMiddleware';
import userMiddleware from './middlewares/userMiddleware';
import { initSubscribe, unsubscribe } from './pubsub';
import { initRedis } from './redisClient';
import { init } from './startup';

initRedis();

// load environment variables
dotenv.config();

// connect to redis server
initRedis(() => {
initSubscribe();
});

const { NODE_ENV } = process.env;
const MAIN_APP_DOMAIN = getEnv({ name: 'MAIN_APP_DOMAIN', defaultValue: '' });
const WIDGETS_DOMAIN = getEnv({ name: 'WIDGETS_DOMAIN', defaultValue: '' });

Expand Down Expand Up @@ -223,11 +221,9 @@ httpServer.listen(PORT, () => {
process.stdin.resume(); // so the program will not close instantly

// If the Node process ends, close the Mongoose connection
(['SIGINT', 'SIGKILL', 'SIGTERM', 'SIGQUIT'] as NodeJS.Signals[]).forEach(sig => {
try {
if (NODE_ENV === 'production') {
(['SIGINT', 'SIGTERM'] as NodeJS.Signals[]).forEach(sig => {
process.on(sig, () => {
unsubscribe();

// Stops the server from accepting new connections and finishes existing connections.
httpServer.close((error: Error) => {
if (error) {
Expand All @@ -241,7 +237,5 @@ process.stdin.resume(); // so the program will not close instantly
});
});
});
} catch (e) {
console.log(e.message);
}
});
});
}
86 changes: 86 additions & 0 deletions src/messageQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import * as amqplib from 'amqplib';
import * as dotenv from 'dotenv';
import { ActivityLogs, Conversations } from './db/models';
import { debugBase } from './debuggers';
import { graphqlPubsub } from './pubsub';
import { get, set } from './redisClient';

dotenv.config();

const { NODE_ENV, RABBITMQ_HOST = 'amqp://localhost' } = process.env;

interface IMessage {
action: string;
data: {
trigger: string;
type: string;
payload: any;
};
}

const reciveMessage = async ({ action, data }: IMessage) => {
if (NODE_ENV === 'test') {
return;
}

if (action === 'callPublish') {
if (data.trigger === 'conversationMessageInserted') {
const { customerId, conversationId } = data.payload;
const conversation = await Conversations.findOne({ _id: conversationId }, { integrationId: 1 });
const customerLastStatus = await get(`customer_last_status_${customerId}`);

// if customer's last status is left then mark as joined when customer ask
if (conversation && customerLastStatus === 'left') {
set(`customer_last_status_${customerId}`, 'joined');

// customer has joined + time
const conversationMessages = await Conversations.changeCustomerStatus(
'joined',
customerId,
conversation.integrationId,
);

for (const message of conversationMessages) {
graphqlPubsub.publish('conversationMessageInserted', {
conversationMessageInserted: message,
});
}

// notify as connected
graphqlPubsub.publish('customerConnectionChanged', {
customerConnectionChanged: {
_id: customerId,
status: 'connected',
},
});
}
}

graphqlPubsub.publish(data.trigger, { [data.trigger]: data.payload });
}

if (action === 'activityLog') {
ActivityLogs.createLogFromWidget(data.type, data.payload);
}
};

const initConsumer = async () => {
// Consumer
try {
const conn = await amqplib.connect(RABBITMQ_HOST);
const channel = await conn.createChannel();

await channel.assertQueue('widgetNotification');

channel.consume('widgetNotification', async msg => {
if (msg !== null) {
await reciveMessage(JSON.parse(msg.content.toString()));
channel.ack(msg);
}
});
} catch (e) {
debugBase(e.message);
}
};

initConsumer();
91 changes: 4 additions & 87 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,11 @@ import * as fs from 'fs';
import { RedisPubSub } from 'graphql-redis-subscriptions';
import * as Redis from 'ioredis';
import * as path from 'path';
import { ActivityLogs, Conversations } from './db/models';
import { debugBase } from './debuggers';
import { get, redisOptions, set } from './redisClient';
import { redisOptions } from './redisClient';

// load environment variables
dotenv.config();

interface IPubsubMessage {
action: string;
data: {
trigger: string;
type: string;
payload: any;
};
}

interface IGoogleOptions {
projectId: string;
credentials: {
Expand All @@ -31,7 +20,7 @@ const { PUBSUB_TYPE, NODE_ENV, PROCESS_NAME } = process.env;

// Google pubsub message handler
const commonMessageHandler = payload => {
return convertPubSubBuffer(payload.data);
return JSON.parse(payload.data.toString());
};

const configGooglePubsub = (): IGoogleOptions => {
Expand Down Expand Up @@ -69,9 +58,9 @@ const createPubsubInstance = () => {

const GooglePubSub = require('@axelspringer/graphql-google-pubsub').GooglePubSub;

pubsub = new GooglePubSub(googleOptions, undefined, commonMessageHandler);
return new GooglePubSub(googleOptions, undefined, commonMessageHandler);
} else {
pubsub = new RedisPubSub({
return new RedisPubSub({
connectionListener: error => {
if (error) {
console.error(error);
Expand All @@ -81,78 +70,6 @@ const createPubsubInstance = () => {
subscriber: new Redis(redisOptions),
});
}

return pubsub;
};

export const initSubscribe = () => {
setTimeout(async () => {
const isSubscribed = await get('isErxesApiSubscribed');

if (isSubscribed !== 'true') {
debugBase('Subscribing .....');
set('isErxesApiSubscribed', 'true');

graphqlPubsub.subscribe('widgetNotification', message => {
return publishMessage(message);
});
}
}, 1000 * Math.floor(Math.random() * 6) + 1);
};

export const unsubscribe = async () => {
debugBase('Unsubscribing .....');
await set('isErxesApiSubscribed', 'false');
};

const publishMessage = async ({ action, data }: IPubsubMessage) => {
if (NODE_ENV === 'test') {
return;
}

if (action === 'callPublish') {
if (data.trigger === 'conversationMessageInserted') {
const { customerId, conversationId } = data.payload;
const conversation = await Conversations.findOne({ _id: conversationId }, { integrationId: 1 });
const customerLastStatus = await get(`customer_last_status_${customerId}`);

// if customer's last status is left then mark as joined when customer ask
if (conversation && customerLastStatus === 'left') {
set(`customer_last_status_${customerId}`, 'joined');

// customer has joined + time
const conversationMessages = await Conversations.changeCustomerStatus(
'joined',
customerId,
conversation.integrationId,
);

for (const message of conversationMessages) {
graphqlPubsub.publish('conversationMessageInserted', {
conversationMessageInserted: message,
});
}

// notify as connected
graphqlPubsub.publish('customerConnectionChanged', {
customerConnectionChanged: {
_id: customerId,
status: 'connected',
},
});
}
}

graphqlPubsub.publish(data.trigger, { [data.trigger]: data.payload });
}

if (action === 'activityLog') {
ActivityLogs.createLogFromWidget(data.type, data.payload);
}
};

const convertPubSubBuffer = (data: Buffer) => {
return JSON.parse(data.toString());
};

export const graphqlPubsub = createPubsubInstance();
Loading

0 comments on commit fdb26ab

Please sign in to comment.