From ff0de9ad7283e79524d6e4c09e41734540178015 Mon Sep 17 00:00:00 2001 From: BatAmar Battulga Date: Sat, 25 Apr 2020 01:19:14 +0800 Subject: [PATCH] reading file content in api and sending json data to workers #767 --- src/data/constants.ts | 3 +- src/data/resolvers/mutations/integrations.ts | 7 +- src/data/resolvers/queries/integrations.ts | 3 +- src/index.ts | 93 +----------- src/messageBroker.ts | 11 +- src/middlewares/fileMiddleware.ts | 148 +++++++++++++++++++ src/workers/bulkInsert.ts | 117 --------------- src/workers/messageBroker.ts | 4 +- src/workers/utils.ts | 55 +++++-- 9 files changed, 210 insertions(+), 231 deletions(-) create mode 100644 src/middlewares/fileMiddleware.ts delete mode 100644 src/workers/bulkInsert.ts diff --git a/src/data/constants.ts b/src/data/constants.ts index ad65ca1b7..07aff2079 100644 --- a/src/data/constants.ts +++ b/src/data/constants.ts @@ -283,6 +283,7 @@ export const MODULE_NAMES = { export const RABBITMQ_QUEUES = { PUT_LOG: 'putLog', - RPC_API: 'rpc_queue:erxes-api', + RPC_API_TO_INTEGRATIONS: 'rpc_queue:api_to_integrations', + RPC_API_TO_WORKERS: 'rpc_queue:api_to_workers', WORKERS: 'workers', }; diff --git a/src/data/resolvers/mutations/integrations.ts b/src/data/resolvers/mutations/integrations.ts index c6c9678e0..d4282aaf6 100644 --- a/src/data/resolvers/mutations/integrations.ts +++ b/src/data/resolvers/mutations/integrations.ts @@ -3,7 +3,7 @@ import { IIntegration, IMessengerData, IUiOptions } from '../../../db/models/def import { IExternalIntegrationParams } from '../../../db/models/Integrations'; import { debugExternalApi } from '../../../debuggers'; import { sendRPCMessage } from '../../../messageBroker'; -import { MODULE_NAMES } from '../../constants'; +import { MODULE_NAMES, RABBITMQ_QUEUES } from '../../constants'; import { putCreateLog, putDeleteLog, putUpdateLog } from '../../logUtils'; import { checkPermission } from '../../permissions/wrappers'; import { IContext } from '../../types'; @@ -222,7 +222,10 @@ const integrationMutations = { */ async integrationsRemoveAccount(_root, { _id }: { _id: string }) { try { - const { erxesApiIds } = await sendRPCMessage({ action: 'remove-account', data: { _id } }); + const { erxesApiIds } = await sendRPCMessage(RABBITMQ_QUEUES.RPC_API_TO_INTEGRATIONS, { + action: 'remove-account', + data: { _id }, + }); for (const id of erxesApiIds) { await Integrations.removeIntegration(id); diff --git a/src/data/resolvers/queries/integrations.ts b/src/data/resolvers/queries/integrations.ts index a640f1076..4e0eb9ae6 100644 --- a/src/data/resolvers/queries/integrations.ts +++ b/src/data/resolvers/queries/integrations.ts @@ -3,6 +3,7 @@ import { INTEGRATION_NAMES_MAP, KIND_CHOICES, TAG_TYPES } from '../../../db/mode import { checkPermission, moduleRequireLogin } from '../../permissions/wrappers'; import { sendRPCMessage } from '../../../messageBroker'; +import { RABBITMQ_QUEUES } from '../../constants'; import { IContext } from '../../types'; import { paginate } from '../../utils'; /** @@ -150,7 +151,7 @@ const integrationQueries = { }, async integrationGetLineWebhookUrl(_root, { _id }: { _id: string }) { - return sendRPCMessage({ action: 'line-webhook', data: { _id } }); + return sendRPCMessage(RABBITMQ_QUEUES.RPC_API_TO_INTEGRATIONS, { action: 'line-webhook', data: { _id } }); }, }; diff --git a/src/index.ts b/src/index.ts index 788944b2e..9e18e3c86 100755 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,6 @@ import * as cookieParser from 'cookie-parser'; import * as cors from 'cors'; import * as dotenv from 'dotenv'; import * as express from 'express'; -import * as formidable from 'formidable'; import * as fs from 'fs'; import { createServer } from 'http'; import * as mongoose from 'mongoose'; @@ -14,7 +13,6 @@ import { buildFile } from './data/modules/fileExporter/exporter'; import insightExports from './data/modules/insights/insightExports'; import { authCookieOptions, - checkFile, deleteFile, frontendEnv, getEnv, @@ -22,12 +20,12 @@ import { handleUnsubscription, readFileRequest, registerOnboardHistory, - uploadFile, } from './data/utils'; import { connect } from './db/connection'; import { debugBase, debugExternalApi, debugInit } from './debuggers'; import { identifyCustomer, trackCustomEvent, trackViewPageEvent, updateCustomerProperty } from './events'; -import { initConsumer, sendRPCMessage } from './messageBroker'; +import { initConsumer } from './messageBroker'; +import { importer, uploader } from './middlewares/fileMiddleware'; import userMiddleware from './middlewares/userMiddleware'; import widgetsMiddleware from './middlewares/widgetsMiddleware'; import { initRedis } from './redisClient'; @@ -211,49 +209,7 @@ app.post('/delete-file', async (req: any, res) => { return res.status(500).send(status); }); -// file upload -app.post('/upload-file', async (req: any, res, next) => { - if (req.query.kind === 'nylas') { - debugExternalApi(`Pipeing request to ${INTEGRATIONS_API_DOMAIN}`); - - return req.pipe( - request - .post(`${INTEGRATIONS_API_DOMAIN}/nylas/upload`) - .on('response', response => { - if (response.statusCode !== 200) { - return next(response.statusMessage); - } - - return response.pipe(res); - }) - .on('error', e => { - debugExternalApi(`Error from pipe ${e.message}`); - next(e); - }), - ); - } - - const form = new formidable.IncomingForm(); - - form.parse(req, async (_error, _fields, response) => { - const file = response.file || response.upload; - - // check file ==== - const status = await checkFile(file, req.headers.source); - - if (status === 'ok') { - try { - const result = await uploadFile(frontendEnv({ name: 'API_URL', req }), file, response.upload ? true : false); - - return res.send(result); - } catch (e) { - return res.status(500).send(filterXSS(e.message)); - } - } - - return res.status(500).send(status); - }); -}); +app.post('/upload-file', uploader); // redirect to integration app.get('/connect-integration', async (req: any, res, _next) => { @@ -267,48 +223,7 @@ app.get('/connect-integration', async (req: any, res, _next) => { }); // file import -app.post('/import-file', async (req: any, res) => { - // require login - if (!req.user) { - return res.end('foribidden'); - } - - try { - const scopeBrandIds = JSON.parse(req.cookies.scopeBrandIds || '[]'); - const form = new formidable.IncomingForm(); - - form.parse(req, async (_err, fields: any, response) => { - let status = ''; - let result; - - try { - status = await checkFile(response.file); - } catch (e) { - return res.json({ status: e.message }); - } - - // if file is not ok then send error - if (status !== 'ok') { - return res.json(status); - } - - try { - result = await sendRPCMessage({ - file: response.file, - type: fields.type, - scopeBrandIds, - user: req.user, - }); - - return res.json(result); - } catch (e) { - return res.json(e.message); - } - }); - } catch (e) { - return res.json({ status: 'error', message: e.message }); - } -}); +app.post('/import-file', importer); // unsubscribe app.get('/unsubscribe', async (req: any, res) => { diff --git a/src/messageBroker.ts b/src/messageBroker.ts index f2c84fff9..9464dee5b 100644 --- a/src/messageBroker.ts +++ b/src/messageBroker.ts @@ -1,7 +1,6 @@ import * as amqplib from 'amqplib'; import * as dotenv from 'dotenv'; import * as uuid from 'uuid'; -import { RABBITMQ_QUEUES } from './data/constants'; import { receiveEmailVerifierNotification, receiveEngagesNotification, @@ -19,8 +18,8 @@ const { NODE_ENV, RABBITMQ_HOST = 'amqp://localhost' } = process.env; let connection; let channel; -export const sendRPCMessage = async (message: any): Promise => { - debugBase(`Sending rpc message to ${JSON.stringify(message)}`); +export const sendRPCMessage = async (queueName: string, message: any): Promise => { + debugBase(`Sending rpc message ${JSON.stringify(message)} to queue ${queueName}`); const response = await new Promise((resolve, reject) => { const correlationId = uuid(); @@ -48,7 +47,7 @@ export const sendRPCMessage = async (message: any): Promise => { { noAck: true }, ); - channel.sendToQueue(RABBITMQ_QUEUES.RPC_API, Buffer.from(JSON.stringify(message)), { + channel.sendToQueue(queueName, Buffer.from(JSON.stringify(message)), { correlationId, replyTo: q.queue, }); @@ -75,9 +74,9 @@ export const initConsumer = async () => { channel = await connection.createChannel(); // listen for rpc queue ========= - await channel.assertQueue('rpc_queue:erxes-integrations'); + await channel.assertQueue('rpc_queue:integrations_to_api'); - channel.consume('rpc_queue:erxes-integrations', async msg => { + channel.consume('rpc_queue:integrations_to_api', async msg => { if (msg !== null) { debugBase(`Received rpc queue message ${msg.content.toString()}`); diff --git a/src/middlewares/fileMiddleware.ts b/src/middlewares/fileMiddleware.ts new file mode 100644 index 000000000..ee1347d1d --- /dev/null +++ b/src/middlewares/fileMiddleware.ts @@ -0,0 +1,148 @@ +import * as formidable from 'formidable'; +import * as fs from 'fs'; +import * as request from 'request'; +import * as xlsxPopulate from 'xlsx-populate'; +import { RABBITMQ_QUEUES } from '../data/constants'; +import { can } from '../data/permissions/utils'; +import { checkFile, frontendEnv, getSubServiceDomain, uploadFile } from '../data/utils'; +import { debugExternalApi } from '../debuggers'; +import { sendRPCMessage } from '../messageBroker'; + +const readXlsFile = async (file): Promise<{ fieldNames: string[]; usedSheets: any[] }> => { + return new Promise(async (resolve, reject) => { + const readStream = fs.createReadStream(file.path); + + // Directory to save file + const downloadDir = `${__dirname}/../private/xlsTemplateOutputs/${file.name}`; + + // Converting pipe into promise + const pipe = stream => + new Promise((resolver, rejecter) => { + stream.on('finish', resolver); + stream.on('error', rejecter); + }); + + // Creating streams + const writeStream = fs.createWriteStream(downloadDir); + const streamObj = await readStream.pipe(writeStream); + + pipe(streamObj) + .then(async () => { + // After finished saving instantly create and load workbook from xls + const workbook = await xlsxPopulate.fromFileAsync(downloadDir); + + // Deleting file after read + fs.unlink(downloadDir, () => { + return true; + }); + + const usedRange = workbook.sheet(0).usedRange(); + + if (!usedRange) { + return reject(new Error('Invalid file')); + } + + const usedSheets = usedRange.value(); + + // Getting columns + const fieldNames = usedSheets[0]; + + // Removing column + usedSheets.shift(); + + return resolve({ fieldNames, usedSheets }); + }) + .catch(e => { + return reject(e); + }); + }); +}; + +export const importer = async (req: any, res, next) => { + if (!(await can('importXlsFile', req.user))) { + return next(new Error('Permission denied!')); + } + + try { + const scopeBrandIds = JSON.parse(req.cookies.scopeBrandIds || '[]'); + const form = new formidable.IncomingForm(); + + form.parse(req, async (_err, fields: any, response) => { + let status = ''; + + try { + status = await checkFile(response.file); + } catch (e) { + return res.json({ status: e.message }); + } + + // if file is not ok then send error + if (status !== 'ok') { + return res.json({ status }); + } + + try { + const { fieldNames, usedSheets } = await readXlsFile(response.file); + + const result = await sendRPCMessage(RABBITMQ_QUEUES.RPC_API_TO_WORKERS, { + type: fields.type, + fieldNames, + datas: usedSheets, + scopeBrandIds, + user: req.user, + }); + + return res.json(result); + } catch (e) { + return res.json({ status: 'error', message: e.message }); + } + }); + } catch (e) { + return res.json({ status: 'error', message: e.message }); + } +}; + +export const uploader = async (req: any, res, next) => { + const INTEGRATIONS_API_DOMAIN = getSubServiceDomain({ name: 'INTEGRATIONS_API_DOMAIN' }); + + if (req.query.kind === 'nylas') { + debugExternalApi(`Pipeing request to ${INTEGRATIONS_API_DOMAIN}`); + + return req.pipe( + request + .post(`${INTEGRATIONS_API_DOMAIN}/nylas/upload`) + .on('response', response => { + if (response.statusCode !== 200) { + return next(response.statusMessage); + } + + return response.pipe(res); + }) + .on('error', e => { + debugExternalApi(`Error from pipe ${e.message}`); + next(e); + }), + ); + } + + const form = new formidable.IncomingForm(); + + form.parse(req, async (_error, _fields, response) => { + const file = response.file || response.upload; + + // check file ==== + const status = await checkFile(file, req.headers.source); + + if (status === 'ok') { + try { + const result = await uploadFile(frontendEnv({ name: 'API_URL', req }), file, response.upload ? true : false); + + return res.send(result); + } catch (e) { + return res.status(500).send(filterXSS(e.message)); + } + } + + return res.status(500).send(status); + }); +}; diff --git a/src/workers/bulkInsert.ts b/src/workers/bulkInsert.ts deleted file mode 100644 index 4f64da5a3..000000000 --- a/src/workers/bulkInsert.ts +++ /dev/null @@ -1,117 +0,0 @@ -import * as fs from 'fs'; -import * as path from 'path'; -import * as xlsxPopulate from 'xlsx-populate'; -import { checkFieldNames } from '../data/modules/fields/utils'; -import { can } from '../data/permissions/utils'; -import { ImportHistory } from '../db/models'; -import { IUserDocument } from '../db/models/definitions/users'; -import { createWorkers, splitToCore } from './utils'; - -export const intervals: any[] = []; - -/** - * Receives and saves xls file in private/xlsImports folder - * and imports customers to the database - */ -export const importXlsFile = async ( - file: any, - type: string, - { user, scopeBrandIds }: { scopeBrandIds: string[]; user: IUserDocument }, -) => { - return new Promise(async (resolve, reject) => { - if (!(await can('importXlsFile', user))) { - return reject(new Error('Permission denied!')); - } - - const versionNumber = process.version - .toString() - .slice(1) - .split('.')[0]; - - if (Number(versionNumber) < 10) { - return reject(new Error('Please upgrade node version above 10.5.0 support worker_threads!')); - } - - const readStream = fs.createReadStream(file.path); - - // Directory to save file - const downloadDir = `${__dirname}/../private/xlsTemplateOutputs/${file.name}`; - - // Converting pipe into promise - const pipe = stream => - new Promise((resolver, rejecter) => { - stream.on('finish', resolver); - stream.on('error', rejecter); - }); - // Creating streams - const writeStream = fs.createWriteStream(downloadDir); - const streamObj = await readStream.pipe(writeStream); - - pipe(streamObj) - .then(async () => { - // After finished saving instantly create and load workbook from xls - const workbook = await xlsxPopulate.fromFileAsync(downloadDir); - - // Deleting file after read - fs.unlink(downloadDir, () => { - return true; - }); - - const usedRange = workbook.sheet(0).usedRange(); - - if (!usedRange) { - return reject(new Error('Invalid file')); - } - - const usedSheets = usedRange.value(); - - // Getting columns - const fieldNames = usedSheets[0]; - - // Removing column - usedSheets.shift(); - - if (usedSheets.length === 0) { - return reject(new Error('Please import more at least one row of data')); - } - - const properties = await checkFieldNames(type, fieldNames); - - const importHistory = await ImportHistory.create({ - contentType: type, - total: usedSheets.length, - userId: user._id, - date: Date.now(), - }); - - const results: string[] = splitToCore(usedSheets); - - const workerFile = - process.env.NODE_ENV === 'production' - ? `./dist/workers/bulkInsert.worker.js` - : './src/workers/bulkInsert.worker.import.js'; - - const workerPath = path.resolve(workerFile); - - const percentagePerData = Number(((1 / usedSheets.length) * 100).toFixed(3)); - - const workerData = { - scopeBrandIds, - user, - contentType: type, - properties, - importHistoryId: importHistory._id, - percentagePerData, - }; - - await createWorkers(workerPath, workerData, results).catch(e => { - return reject(e); - }); - - return resolve({ id: importHistory.id }); - }) - .catch(e => { - return reject(e); - }); - }); -}; diff --git a/src/workers/messageBroker.ts b/src/workers/messageBroker.ts index 76a6112ef..9e8f8b08a 100644 --- a/src/workers/messageBroker.ts +++ b/src/workers/messageBroker.ts @@ -36,9 +36,9 @@ export const initConsumer = async () => { } }); - await channel.assertQueue(RABBITMQ_QUEUES.RPC_API); + await channel.assertQueue(RABBITMQ_QUEUES.RPC_API_TO_WORKERS); - channel.consume(RABBITMQ_QUEUES.RPC_API, async msg => { + channel.consume(RABBITMQ_QUEUES.RPC_API_TO_WORKERS, async msg => { if (msg !== null) { debugWorkers(`Received rpc queue message ${msg.content.toString()}`); diff --git a/src/workers/utils.ts b/src/workers/utils.ts index ecefebbaa..8071b238e 100644 --- a/src/workers/utils.ts +++ b/src/workers/utils.ts @@ -2,9 +2,13 @@ import * as dotenv from 'dotenv'; import * as mongoose from 'mongoose'; import * as os from 'os'; import * as path from 'path'; +import { checkFieldNames } from '../data/modules/fields/utils'; +import { ImportHistory } from '../db/models'; import ImportHistories from '../db/models/ImportHistory'; -import { debugImport, debugWorkers } from '../debuggers'; -import { importXlsFile } from './bulkInsert'; +import { debugImport } from '../debuggers'; + +const { MONGO_URL = '' } = process.env; +export const connect = () => mongoose.connect(MONGO_URL, { useNewUrlParser: true, useCreateIndex: true }); dotenv.config(); @@ -98,10 +102,6 @@ export const clearIntervals = () => { intervals = []; }; -const { MONGO_URL = '' } = process.env; - -export const connect = () => mongoose.connect(MONGO_URL, { useNewUrlParser: true, useCreateIndex: true }); - // xls file import, cancel, removal export const receiveImportRemove = async (content: any) => { const { contentType, importHistoryId } = content; @@ -131,13 +131,42 @@ export const receiveImportCancel = () => { }; export const receiveImportXls = async (content: any) => { - const { file, type, scopeBrandIds, user } = content; - - try { - return importXlsFile(file, type, { scopeBrandIds, user }); - } catch (e) { - debugWorkers(`Error occured while importing ${e.message}`); + const { type, fieldNames, scopeBrandIds, user, datas } = content; - return { status: 'error', message: e.message }; + if (datas.length === 0) { + throw new Error('Please import more at least one row of data'); } + + const properties = await checkFieldNames(type, fieldNames); + + const importHistory = await ImportHistory.create({ + contentType: type, + total: datas.length, + userId: user._id, + date: Date.now(), + }); + + const results: string[] = splitToCore(datas); + + const workerFile = + process.env.NODE_ENV === 'production' + ? `./dist/workers/bulkInsert.worker.js` + : './src/workers/bulkInsert.worker.import.js'; + + const workerPath = path.resolve(workerFile); + + const percentagePerData = Number(((1 / datas.length) * 100).toFixed(3)); + + const workerData = { + scopeBrandIds, + user, + contentType: type, + properties, + importHistoryId: importHistory._id, + percentagePerData, + }; + + await createWorkers(workerPath, workerData, results); + + return { id: importHistory.id }; };