Skip to content
This repository has been archived by the owner on Nov 21, 2020. It is now read-only.

Commit

Permalink
reading file content in api and sending json data to workers #767
Browse files Browse the repository at this point in the history
  • Loading branch information
batamar committed Apr 24, 2020
1 parent 89ba966 commit ff0de9a
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 231 deletions.
3 changes: 2 additions & 1 deletion src/data/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ export const MODULE_NAMES = {

export const RABBITMQ_QUEUES = {
PUT_LOG: 'putLog',
RPC_API: 'rpc_queue:erxes-api',
RPC_API_TO_INTEGRATIONS: 'rpc_queue:api_to_integrations',
RPC_API_TO_WORKERS: 'rpc_queue:api_to_workers',
WORKERS: 'workers',
};
7 changes: 5 additions & 2 deletions src/data/resolvers/mutations/integrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { IIntegration, IMessengerData, IUiOptions } from '../../../db/models/def
import { IExternalIntegrationParams } from '../../../db/models/Integrations';
import { debugExternalApi } from '../../../debuggers';
import { sendRPCMessage } from '../../../messageBroker';
import { MODULE_NAMES } from '../../constants';
import { MODULE_NAMES, RABBITMQ_QUEUES } from '../../constants';
import { putCreateLog, putDeleteLog, putUpdateLog } from '../../logUtils';
import { checkPermission } from '../../permissions/wrappers';
import { IContext } from '../../types';
Expand Down Expand Up @@ -222,7 +222,10 @@ const integrationMutations = {
*/
async integrationsRemoveAccount(_root, { _id }: { _id: string }) {
try {
const { erxesApiIds } = await sendRPCMessage({ action: 'remove-account', data: { _id } });
const { erxesApiIds } = await sendRPCMessage(RABBITMQ_QUEUES.RPC_API_TO_INTEGRATIONS, {
action: 'remove-account',
data: { _id },
});

for (const id of erxesApiIds) {
await Integrations.removeIntegration(id);
Expand Down
3 changes: 2 additions & 1 deletion src/data/resolvers/queries/integrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { INTEGRATION_NAMES_MAP, KIND_CHOICES, TAG_TYPES } from '../../../db/mode
import { checkPermission, moduleRequireLogin } from '../../permissions/wrappers';

import { sendRPCMessage } from '../../../messageBroker';
import { RABBITMQ_QUEUES } from '../../constants';
import { IContext } from '../../types';
import { paginate } from '../../utils';
/**
Expand Down Expand Up @@ -150,7 +151,7 @@ const integrationQueries = {
},

async integrationGetLineWebhookUrl(_root, { _id }: { _id: string }) {
return sendRPCMessage({ action: 'line-webhook', data: { _id } });
return sendRPCMessage(RABBITMQ_QUEUES.RPC_API_TO_INTEGRATIONS, { action: 'line-webhook', data: { _id } });
},
};

Expand Down
93 changes: 4 additions & 89 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import * as cookieParser from 'cookie-parser';
import * as cors from 'cors';
import * as dotenv from 'dotenv';
import * as express from 'express';
import * as formidable from 'formidable';
import * as fs from 'fs';
import { createServer } from 'http';
import * as mongoose from 'mongoose';
Expand All @@ -14,20 +13,19 @@ import { buildFile } from './data/modules/fileExporter/exporter';
import insightExports from './data/modules/insights/insightExports';
import {
authCookieOptions,
checkFile,
deleteFile,
frontendEnv,
getEnv,
getSubServiceDomain,
handleUnsubscription,
readFileRequest,
registerOnboardHistory,
uploadFile,
} from './data/utils';
import { connect } from './db/connection';
import { debugBase, debugExternalApi, debugInit } from './debuggers';
import { identifyCustomer, trackCustomEvent, trackViewPageEvent, updateCustomerProperty } from './events';
import { initConsumer, sendRPCMessage } from './messageBroker';
import { initConsumer } from './messageBroker';
import { importer, uploader } from './middlewares/fileMiddleware';
import userMiddleware from './middlewares/userMiddleware';
import widgetsMiddleware from './middlewares/widgetsMiddleware';
import { initRedis } from './redisClient';
Expand Down Expand Up @@ -211,49 +209,7 @@ app.post('/delete-file', async (req: any, res) => {
return res.status(500).send(status);
});

// file upload
app.post('/upload-file', async (req: any, res, next) => {
if (req.query.kind === 'nylas') {
debugExternalApi(`Pipeing request to ${INTEGRATIONS_API_DOMAIN}`);

return req.pipe(
request
.post(`${INTEGRATIONS_API_DOMAIN}/nylas/upload`)
.on('response', response => {
if (response.statusCode !== 200) {
return next(response.statusMessage);
}

return response.pipe(res);
})
.on('error', e => {
debugExternalApi(`Error from pipe ${e.message}`);
next(e);
}),
);
}

const form = new formidable.IncomingForm();

form.parse(req, async (_error, _fields, response) => {
const file = response.file || response.upload;

// check file ====
const status = await checkFile(file, req.headers.source);

if (status === 'ok') {
try {
const result = await uploadFile(frontendEnv({ name: 'API_URL', req }), file, response.upload ? true : false);

return res.send(result);
} catch (e) {
return res.status(500).send(filterXSS(e.message));
}
}

return res.status(500).send(status);
});
});
app.post('/upload-file', uploader);

// redirect to integration
app.get('/connect-integration', async (req: any, res, _next) => {
Expand All @@ -267,48 +223,7 @@ app.get('/connect-integration', async (req: any, res, _next) => {
});

// file import
app.post('/import-file', async (req: any, res) => {
// require login
if (!req.user) {
return res.end('foribidden');
}

try {
const scopeBrandIds = JSON.parse(req.cookies.scopeBrandIds || '[]');
const form = new formidable.IncomingForm();

form.parse(req, async (_err, fields: any, response) => {
let status = '';
let result;

try {
status = await checkFile(response.file);
} catch (e) {
return res.json({ status: e.message });
}

// if file is not ok then send error
if (status !== 'ok') {
return res.json(status);
}

try {
result = await sendRPCMessage({
file: response.file,
type: fields.type,
scopeBrandIds,
user: req.user,
});

return res.json(result);
} catch (e) {
return res.json(e.message);
}
});
} catch (e) {
return res.json({ status: 'error', message: e.message });
}
});
app.post('/import-file', importer);

// unsubscribe
app.get('/unsubscribe', async (req: any, res) => {
Expand Down
11 changes: 5 additions & 6 deletions src/messageBroker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as amqplib from 'amqplib';
import * as dotenv from 'dotenv';
import * as uuid from 'uuid';
import { RABBITMQ_QUEUES } from './data/constants';
import {
receiveEmailVerifierNotification,
receiveEngagesNotification,
Expand All @@ -19,8 +18,8 @@ const { NODE_ENV, RABBITMQ_HOST = 'amqp://localhost' } = process.env;
let connection;
let channel;

export const sendRPCMessage = async (message: any): Promise<any> => {
debugBase(`Sending rpc message to ${JSON.stringify(message)}`);
export const sendRPCMessage = async (queueName: string, message: any): Promise<any> => {
debugBase(`Sending rpc message ${JSON.stringify(message)} to queue ${queueName}`);

const response = await new Promise((resolve, reject) => {
const correlationId = uuid();
Expand Down Expand Up @@ -48,7 +47,7 @@ export const sendRPCMessage = async (message: any): Promise<any> => {
{ noAck: true },
);

channel.sendToQueue(RABBITMQ_QUEUES.RPC_API, Buffer.from(JSON.stringify(message)), {
channel.sendToQueue(queueName, Buffer.from(JSON.stringify(message)), {
correlationId,
replyTo: q.queue,
});
Expand All @@ -75,9 +74,9 @@ export const initConsumer = async () => {
channel = await connection.createChannel();

// listen for rpc queue =========
await channel.assertQueue('rpc_queue:erxes-integrations');
await channel.assertQueue('rpc_queue:integrations_to_api');

channel.consume('rpc_queue:erxes-integrations', async msg => {
channel.consume('rpc_queue:integrations_to_api', async msg => {
if (msg !== null) {
debugBase(`Received rpc queue message ${msg.content.toString()}`);

Expand Down
148 changes: 148 additions & 0 deletions src/middlewares/fileMiddleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import * as formidable from 'formidable';
import * as fs from 'fs';
import * as request from 'request';
import * as xlsxPopulate from 'xlsx-populate';
import { RABBITMQ_QUEUES } from '../data/constants';
import { can } from '../data/permissions/utils';
import { checkFile, frontendEnv, getSubServiceDomain, uploadFile } from '../data/utils';
import { debugExternalApi } from '../debuggers';
import { sendRPCMessage } from '../messageBroker';

const readXlsFile = async (file): Promise<{ fieldNames: string[]; usedSheets: any[] }> => {
return new Promise(async (resolve, reject) => {
const readStream = fs.createReadStream(file.path);

// Directory to save file
const downloadDir = `${__dirname}/../private/xlsTemplateOutputs/${file.name}`;

// Converting pipe into promise
const pipe = stream =>
new Promise((resolver, rejecter) => {
stream.on('finish', resolver);
stream.on('error', rejecter);
});

// Creating streams
const writeStream = fs.createWriteStream(downloadDir);
const streamObj = await readStream.pipe(writeStream);

pipe(streamObj)
.then(async () => {
// After finished saving instantly create and load workbook from xls
const workbook = await xlsxPopulate.fromFileAsync(downloadDir);

// Deleting file after read
fs.unlink(downloadDir, () => {
return true;
});

const usedRange = workbook.sheet(0).usedRange();

if (!usedRange) {
return reject(new Error('Invalid file'));
}

const usedSheets = usedRange.value();

// Getting columns
const fieldNames = usedSheets[0];

// Removing column
usedSheets.shift();

return resolve({ fieldNames, usedSheets });
})
.catch(e => {
return reject(e);
});
});
};

export const importer = async (req: any, res, next) => {
if (!(await can('importXlsFile', req.user))) {
return next(new Error('Permission denied!'));
}

try {
const scopeBrandIds = JSON.parse(req.cookies.scopeBrandIds || '[]');
const form = new formidable.IncomingForm();

form.parse(req, async (_err, fields: any, response) => {
let status = '';

try {
status = await checkFile(response.file);
} catch (e) {
return res.json({ status: e.message });
}

// if file is not ok then send error
if (status !== 'ok') {
return res.json({ status });
}

try {
const { fieldNames, usedSheets } = await readXlsFile(response.file);

const result = await sendRPCMessage(RABBITMQ_QUEUES.RPC_API_TO_WORKERS, {
type: fields.type,
fieldNames,
datas: usedSheets,
scopeBrandIds,
user: req.user,
});

return res.json(result);
} catch (e) {
return res.json({ status: 'error', message: e.message });
}
});
} catch (e) {
return res.json({ status: 'error', message: e.message });
}
};

export const uploader = async (req: any, res, next) => {
const INTEGRATIONS_API_DOMAIN = getSubServiceDomain({ name: 'INTEGRATIONS_API_DOMAIN' });

if (req.query.kind === 'nylas') {
debugExternalApi(`Pipeing request to ${INTEGRATIONS_API_DOMAIN}`);

return req.pipe(
request
.post(`${INTEGRATIONS_API_DOMAIN}/nylas/upload`)
.on('response', response => {
if (response.statusCode !== 200) {
return next(response.statusMessage);
}

return response.pipe(res);
})
.on('error', e => {
debugExternalApi(`Error from pipe ${e.message}`);
next(e);
}),
);
}

const form = new formidable.IncomingForm();

form.parse(req, async (_error, _fields, response) => {
const file = response.file || response.upload;

// check file ====
const status = await checkFile(file, req.headers.source);

if (status === 'ok') {
try {
const result = await uploadFile(frontendEnv({ name: 'API_URL', req }), file, response.upload ? true : false);

return res.send(result);
} catch (e) {
return res.status(500).send(filterXSS(e.message));
}
}

return res.status(500).send(status);
});
};
Loading

0 comments on commit ff0de9a

Please sign in to comment.