Skip to content
This repository has been archived by the owner on Nov 21, 2020. It is now read-only.

Commit

Permalink
perf(engages): used streams
Browse files Browse the repository at this point in the history
close #801
  • Loading branch information
batamar committed Jun 26, 2020
1 parent 30da525 commit 606d072
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 39 deletions.
4 changes: 2 additions & 2 deletions src/__tests__/engageMessageMutations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ describe('engage message mutation tests', () => {
await ConversationMessages.deleteMany({});
});

test('findCustomers', async () => {
test('generateCustomerSelector', async () => {
const segment = await segmentFactory({});
const brand = await brandFactory({});
await integrationFactory({ brandId: brand._id });

await engageUtils.findCustomers({ segmentIds: [segment._id], brandIds: [brand._id] });
await engageUtils.generateCustomerSelector({ segmentIds: [segment._id], brandIds: [brand._id] });
});

test('Engage utils send via messenger', async () => {
Expand Down
184 changes: 148 additions & 36 deletions src/data/resolvers/mutations/engageUtils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import * as Random from 'meteor-random';
import { Transform, Writable } from 'stream';
import {
ConversationMessages,
Conversations,
Expand All @@ -11,14 +13,15 @@ import { CONVERSATION_STATUSES, KIND_CHOICES, METHODS } from '../../../db/models
import { ICustomerDocument } from '../../../db/models/definitions/customers';
import { IEngageMessageDocument } from '../../../db/models/definitions/engages';
import { IUserDocument } from '../../../db/models/definitions/users';
import { debugBase } from '../../../debuggers';
import { sendMessage } from '../../../messageBroker';
import { MESSAGE_KINDS } from '../../constants';
import { fetchBySegments } from '../../modules/segments/queryBuilder';

/**
* Find customers
*/
export const findCustomers = async ({
export const generateCustomerSelector = async ({
customerIds,
segmentIds = [],
tagIds = [],
Expand All @@ -28,7 +31,7 @@ export const findCustomers = async ({
segmentIds?: string[];
tagIds?: string[];
brandIds?: string[];
}): Promise<ICustomerDocument[]> => {
}): Promise<any> => {
// find matched customers
let customerQuery: any = {};

Expand Down Expand Up @@ -66,7 +69,7 @@ export const findCustomers = async ({
customerQuery = { _id: { $in: customerIdsBySegments } };
}

return Customers.find({ $or: [{ doNotDisturb: 'No' }, { doNotDisturb: { $exists: false } }], ...customerQuery });
return { $or: [{ doNotDisturb: 'No' }, { doNotDisturb: { $exists: false } }], ...customerQuery };
};

const sendQueueMessage = args => {
Expand All @@ -86,29 +89,31 @@ export const send = async (engageMessage: IEngageMessageDocument) => {
return;
}

const customers = await findCustomers({ customerIds, segmentIds, tagIds, brandIds });
const customersSelector = await generateCustomerSelector({ customerIds, segmentIds, tagIds, brandIds });

// save matched customers count
await EngageMessages.setCustomersCount(engageMessage._id, 'totalCustomersCount', customers.length);
if (engageMessage.method === METHODS.MESSENGER && engageMessage.kind !== MESSAGE_KINDS.VISITOR_AUTO) {
return sendViaMessenger(engageMessage, customersSelector, user);
}

if (engageMessage.method === METHODS.EMAIL) {
const engageMessageId = engageMessage._id;
return sendViaEmail(engageMessage, customersSelector, user);
}
};

await sendQueueMessage({
action: 'writeLog',
data: {
engageMessageId,
msg: `Run at ${new Date()}`,
},
});
export const sendViaEmail = async (engageMessage: IEngageMessageDocument, customersSelector, user: IUserDocument) => {
const engageMessageId = engageMessage._id;

const customerInfos = customers.map(customer => ({
_id: customer._id,
name: Customers.getCustomerName(customer),
email: customer.primaryEmail,
emailValidationStatus: customer.emailValidationStatus,
}));
await sendQueueMessage({
action: 'writeLog',
data: {
engageMessageId,
msg: `Run at ${new Date()}`,
},
});

const customerInfos: Array<{ _id: string; name: string; email: string; emailValidationStatus: string }> = [];

const onFinishPiping = async () => {
const data = {
email: engageMessage.email,
customers: customerInfos,
Expand All @@ -125,6 +130,9 @@ export const send = async (engageMessage: IEngageMessageDocument) => {
throw new Error('No customers found');
}

// save matched customers count
await EngageMessages.setCustomersCount(engageMessage._id, 'totalCustomersCount', customerInfos.length);

await sendQueueMessage({
action: 'writeLog',
data: {
Expand All @@ -142,21 +150,46 @@ export const send = async (engageMessage: IEngageMessageDocument) => {

await sendQueueMessage({ action: 'sendEngage', data });
}
}
};

const customerTransformerStream = new Transform({
objectMode: true,

transform(customer, _encoding, callback) {
customerInfos.push({
_id: customer._id,
name: Customers.getCustomerName(customer),
email: customer.primaryEmail,
emailValidationStatus: customer.emailValidationStatus,
});

// signal upstream that we are ready to take more data
callback();
},
});

if (engageMessage.method === METHODS.MESSENGER && engageMessage.kind !== MESSAGE_KINDS.VISITOR_AUTO) {
await sendViaMessenger(engageMessage, customers, user);
}
const customerFields = { firstName: 1, lastName: 1, primaryEmail: 1, emailValidationStatus: 1 };
const customersStream = (Customers.find(customersSelector, customerFields) as any).stream();

return new Promise((resolve, reject) => {
const pipe = customersStream.pipe(customerTransformerStream);

pipe.on('finish', async () => {
try {
await onFinishPiping();
} catch (e) {
return reject(e);
}

resolve('done');
});
});
};

/**
* Send via messenger
*/
const sendViaMessenger = async (
message: IEngageMessageDocument,
customers: ICustomerDocument[],
user: IUserDocument,
) => {
const sendViaMessenger = async (message: IEngageMessageDocument, customersSelector, user: IUserDocument) => {
const { fromUserId } = message;

if (!message.messenger) {
Expand All @@ -175,31 +208,110 @@ const sendViaMessenger = async (
throw new Error('Integration not found');
}

for (const customer of customers) {
const bulkSize = 1000;

let iteratorCounter = 0;
let conversationsBulk = Conversations.collection.initializeOrderedBulkOp();
let conversationMessagesBulk = ConversationMessages.collection.initializeOrderedBulkOp();

const customerFields = { firstName: 1, lastName: 1, primaryEmail: 1 };
const customersStream = (Customers.find(customersSelector, customerFields) as any).stream();

const executeBulks = () => {
return new Promise((resolve, reject) => {
/* istanbul ignore next */
conversationsBulk.execute(err => {
if (err) {
if (err.message === 'Invalid Operation, no operations specified') {
debugBase(`Error during execute bulk ${err.message}`);
return resolve('done');
}

return reject(err);
}

conversationMessagesBulk.execute(msgErr => {
if (msgErr) {
return reject(msgErr);
}

conversationsBulk = Conversations.collection.initializeOrderedBulkOp();
conversationMessagesBulk = ConversationMessages.collection.initializeOrderedBulkOp();

resolve('done');
});
});
});
};

const createConversations = async (customer: ICustomerDocument) => {
iteratorCounter++;

// replace keys in content
const replacedContent = EngageMessages.replaceKeys({ content, customer, user });

const now = new Date();
const conversationId = Random.id();

// create conversation
const conversation = await Conversations.createConversation({
conversationsBulk.insert({
_id: conversationId,
status: CONVERSATION_STATUSES.NEW,
createdAt: now,
updatedAt: now,
userId: fromUserId,
customerId: customer._id,
integrationId: integration._id,
content: replacedContent,
status: CONVERSATION_STATUSES.NEW,
messageCount: 1,
});

// create message
await ConversationMessages.createMessage({
conversationMessagesBulk.insert({
engageData: {
engageKind: 'auto',
messageId: message._id,
fromUserId,
...message.messenger.toJSON(),
...(message.messenger ? message.messenger.toJSON() : {}),
},
conversationId: conversation._id,
conversationId,
userId: fromUserId,
customerId: customer._id,
content: replacedContent,
});
}

/* istanbul ignore next */
if (iteratorCounter % bulkSize === 0) {
customersStream.pause();

await executeBulks();

customersStream.resume();
}
};

const streamConversationWriter = new Writable({
objectMode: true,

async write(data, _encoding, callback) {
await createConversations(data);

callback();
},
});

return new Promise(resolve => {
const pipe = customersStream.pipe(streamConversationWriter);

pipe.on('finish', async () => {
// save matched customers count
await EngageMessages.setCustomersCount(message._id, 'totalCustomersCount', iteratorCounter);

if (iteratorCounter % bulkSize !== 0) {
await executeBulks();
}

resolve('done');
});
});
};
2 changes: 1 addition & 1 deletion src/messageBroker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import amqplib from 'amqplib';
import * as amqplib from 'amqplib';
import * as dotenv from 'dotenv';
import * as uuid from 'uuid';
import {
Expand Down

0 comments on commit 606d072

Please sign in to comment.