Skip to content
This repository has been archived by the owner on Nov 21, 2020. It is now read-only.

Commit

Permalink
perf(import): stream excel file import
Browse files Browse the repository at this point in the history
  • Loading branch information
munkhorgil committed Jul 2, 2020
1 parent ab5c6ab commit 2cf2f53
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 72 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"uuid-by-string": "^3.0.2",
"validator": "^9.0.0",
"xlsx-populate": "^1.20.1",
"xlsx-stream-reader": "^1.1.0",
"xss": "^1.0.6"
},
"peerOptionalDependencies": {
Expand Down
24 changes: 21 additions & 3 deletions src/data/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ const createGCS = async () => {
/*
* Save binary data to amazon s3
*/
export const uploadFileAWS = async (file: { name: string; path: string; type: string }): Promise<string> => {
const IS_PUBLIC = await getConfig('FILE_SYSTEM_PUBLIC', 'true');
export const uploadFileAWS = async (
file: { name: string; path: string; type: string },
forcePrivate: boolean = false,
): Promise<string> => {
const IS_PUBLIC = forcePrivate ? false : await getConfig('FILE_SYSTEM_PUBLIC', 'true');
const AWS_PREFIX = await getConfig('AWS_PREFIX');
const AWS_BUCKET = await getConfig('AWS_BUCKET');

Expand Down Expand Up @@ -164,7 +167,7 @@ export const uploadFileAWS = async (file: { name: string; path: string; type: st
/*
* Delete file from amazon s3
*/
const deleteFileAWS = async (fileName: string) => {
export const deleteFileAWS = async (fileName: string) => {
const AWS_BUCKET = await getConfig('AWS_BUCKET');

const params = { Bucket: AWS_BUCKET, Key: fileName };
Expand Down Expand Up @@ -992,3 +995,18 @@ export const chunkArray = (myArray, chunkSize: number) => {

return tempArray;
};

/**
* Create s3 stream for excel file
*/
export const s3Stream = async (key: string): Promise<any> => {
try {
const AWS_BUCKET = await getConfig('AWS_BUCKET');

const s3 = await createAWS();

return s3.getObject({ Bucket: AWS_BUCKET, Key: key }).createReadStream();
} catch (e) {
throw e;
}
};
69 changes: 3 additions & 66 deletions src/middlewares/fileMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,74 +1,12 @@
import * as formidable from 'formidable';
import * as fs from 'fs';
import * as request from 'request';
import * as _ from 'underscore';
import * as xlsxPopulate from 'xlsx-populate';
import { RABBITMQ_QUEUES } from '../data/constants';
import { can } from '../data/permissions/utils';
import { checkFile, frontendEnv, getSubServiceDomain, uploadFile } from '../data/utils';
import { checkFile, frontendEnv, getSubServiceDomain, uploadFile, uploadFileAWS } from '../data/utils';
import { debugExternalApi } from '../debuggers';
import { sendRPCMessage } from '../messageBroker';

const readXlsFile = async (file): Promise<{ fieldNames: string[]; usedSheets: any[] }> => {
return new Promise(async (resolve, reject) => {
const readStream = fs.createReadStream(file.path);

// Directory to save file
const downloadDir = `${__dirname}/../private/xlsTemplateOutputs/${file.name}`;

// Converting pipe into promise
const pipe = stream =>
new Promise((resolver, rejecter) => {
stream.on('finish', resolver);
stream.on('error', rejecter);
});

// Creating streams
const writeStream = fs.createWriteStream(downloadDir);
const streamObj = await readStream.pipe(writeStream);

pipe(streamObj)
.then(async () => {
// After finished saving instantly create and load workbook from xls
const workbook = await xlsxPopulate.fromFileAsync(downloadDir);

// Deleting file after read
fs.unlink(downloadDir, () => {
return true;
});

const usedRange = workbook.sheet(0).usedRange();

if (!usedRange) {
return reject(new Error('Invalid file'));
}

const usedSheets = usedRange.value();
const compactedRows: any[] = [];

for (const row of usedSheets) {
// to prevent empty data entry since xlsPopulate returns empty cells
const compactRow = _.compact(row);

if (compactRow.length > 0) {
compactedRows.push(row);
}
}

// Getting columns
const fieldNames = usedSheets[0];

// Removing column
compactedRows.shift();

return resolve({ fieldNames, usedSheets: compactedRows });
})
.catch(e => {
return reject(e);
});
});
};

export const importer = async (req: any, res, next) => {
if (!(await can('importXlsFile', req.user))) {
return next(new Error('Permission denied!'));
Expand All @@ -93,13 +31,12 @@ export const importer = async (req: any, res, next) => {
}

try {
const { fieldNames, usedSheets } = await readXlsFile(response.file);
const fileName = await uploadFileAWS(response.file, true);

const result = await sendRPCMessage(RABBITMQ_QUEUES.RPC_API_TO_WORKERS, {
action: 'createImport',
type: fields.type,
fieldNames,
datas: usedSheets,
fileName,
scopeBrandIds,
user: req.user,
});
Expand Down
7 changes: 7 additions & 0 deletions src/workers/bulkInsert.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
Tickets,
Users,
} from '../db/models';
import { initRabbitMQ } from '../messageBroker';
import { graphqlPubsub } from '../pubsub';
import { connect } from './utils';

Expand All @@ -34,6 +35,8 @@ connect().then(async () => {
return;
}

await initRabbitMQ();

const { user, scopeBrandIds, result, contentType, properties, importHistoryId, percentagePerData } = workerData;

let percentage = '0';
Expand Down Expand Up @@ -198,6 +201,10 @@ connect().then(async () => {
doc.emailValidationStatus = 'unknown';
}

if (contentType === 'customer' && !doc.phoneValidationStatus) {
doc.phoneValidationStatus = 'unknown';
}

// set board item created user
if (isBoardItem()) {
doc.userId = user._id;
Expand Down
69 changes: 68 additions & 1 deletion src/workers/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ import * as dotenv from 'dotenv';
import * as mongoose from 'mongoose';
import * as os from 'os';
import * as path from 'path';
import * as XlsxStreamReader from 'xlsx-stream-reader';
import { checkFieldNames } from '../data/modules/fields/utils';
import { deleteFileAWS, s3Stream } from '../data/utils';
import { ImportHistory } from '../db/models';
import ImportHistories from '../db/models/ImportHistory';
import { debugImport } from '../debuggers';

const { MONGO_URL = '' } = process.env;

export const connect = () => mongoose.connect(MONGO_URL, { useNewUrlParser: true, useCreateIndex: true });

dotenv.config();
Expand Down Expand Up @@ -130,8 +133,70 @@ export const receiveImportCancel = () => {
return { status: 'ok' };
};

const readXlsFile = async (fileName: string): Promise<{ fieldNames: string[]; datas: any[] }> => {
return new Promise(async (resolve, reject) => {
let rowCount = 0;

const usedSheets: any[] = [];

const xlsxReader = XlsxStreamReader();

try {
const stream = await s3Stream(fileName);

stream.pipe(xlsxReader);

xlsxReader.on('worksheet', workSheetReader => {
if (workSheetReader > 1) {
return workSheetReader.skip();
}

workSheetReader.on('row', row => {
if (rowCount > 100000) {
return reject(new Error('You can only import 100000 rows one at a time'));
}

if (row.values.length > 0) {
usedSheets.push(row.values);
rowCount++;
}
});

workSheetReader.process();
});

xlsxReader.on('end', () => {
const compactedRows: any = [];

for (const row of usedSheets) {
if (row.length > 0) {
row.shift();

compactedRows.push(row);
}
}

const fieldNames = usedSheets[0];

// Removing column
compactedRows.shift();

return resolve({ fieldNames, datas: compactedRows });
});

xlsxReader.on('error', error => {
return reject(error);
});
} catch (e) {
reject(e);
}
});
};

export const receiveImportXls = async (content: any) => {
const { type, fieldNames, scopeBrandIds, user, datas } = content;
const { fileName, type, scopeBrandIds, user } = content;

const { fieldNames, datas } = await readXlsFile(fileName);

if (datas.length === 0) {
throw new Error('Please import at least one row of data');
Expand Down Expand Up @@ -168,5 +233,7 @@ export const receiveImportXls = async (content: any) => {

await createWorkers(workerPath, workerData, results);

await deleteFileAWS(fileName);

return { id: importHistory.id };
};
Loading

0 comments on commit 2cf2f53

Please sign in to comment.