diff --git a/api/src/modules/import-data/file.service.ts b/api/src/modules/import-data/file.service.ts index 1c920bb3c..58b9e3aee 100644 --- a/api/src/modules/import-data/file.service.ts +++ b/api/src/modules/import-data/file.service.ts @@ -3,11 +3,48 @@ import { access, unlink } from 'fs/promises'; import * as XLSX from 'xlsx'; import { WorkBook } from 'xlsx'; import { difference } from 'lodash'; +import { Worker } from 'worker_threads'; +import { getWorkerConfig } from 'modules/import-data/utils'; @Injectable() export class FileService> { private readonly logger: Logger = new Logger(FileService.name); + async transformToJsonInWorker(filePath: string, sheetMap: any): Promise { + await this.isFilePresentInFs(filePath); + const { workerPath, execArgv } = getWorkerConfig('xlsx.worker'); + this.logger.log(`Starting worker to parse ${filePath}...`); + try { + const parsedSheet: any = await new Promise((resolve, reject) => { + const worker: Worker = new Worker(workerPath, { + workerData: { filePath, sheetMap }, + execArgv, + }); + + worker.on('message', (data: T) => { + this.logger.warn(`Worker finished processing ${filePath}`); + resolve(data); + }); + + worker.on('error', (error: Error) => { + this.logger.error(`Worker failed processing ${filePath}: ${error}`); + reject(error); + }); + + worker.on('exit', (code: number) => { + if (code !== 0) { + this.logger.error(`Worker stopped with exit code ${code}`); + reject(new Error(`Worker stopped with exit code ${code}`)); + } + }); + }); + return parsedSheet; + } catch (error) { + this.logger.error(error); + throw new Error(`XLSX file could not been parsed: ${error}`); + } + } + async transformToJson( filePath: string, sheetsMap: Record, diff --git a/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts b/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts index 0fe60fa68..55ad1d453 100644 --- a/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts +++ b/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts @@ -68,89 +68,88 @@ export class SourcingDataImportService { async importSourcingData(filePath: string, taskId: string): Promise { this.logger.log(`Starting import process`); - //await this.fileService.isFilePresentInFs(filePath); try { const parsedXLSXDataset: SourcingRecordsSheets = - await this.fileService.transformToJson(filePath, SHEETS_MAP); + await this.fileService.transformToJsonInWorker(filePath, SHEETS_MAP); const { data: dtoMatchedData, validationErrors } = await this.excelValidator.validate( parsedXLSXDataset as unknown as SourcingDataSheet, ); - // if (validationErrors.length) { - // throw new ExcelValidationError('Validation Errors', validationErrors); - // } - // - // //TODO: Implement transactional import. Move geocoding to first step - // - // await this.dbCleaner.cleanDataBeforeImport(); - // - // const materials: Material[] = - // await this.materialService.findAllUnpaginated(); - // if (!materials.length) { - // throw new ServiceUnavailableException( - // 'No Materials found present in the DB. Please check the LandGriffon installation manual', - // ); - // } + if (validationErrors.length) { + throw new ExcelValidationError('Validation Errors', validationErrors); + } + + //TODO: Implement transactional import. Move geocoding to first step + + await this.dbCleaner.cleanDataBeforeImport(); + + const materials: Material[] = + await this.materialService.findAllUnpaginated(); + if (!materials.length) { + throw new ServiceUnavailableException( + 'No Materials found present in the DB. Please check the LandGriffon installation manual', + ); + } this.logger.log('Activating Indicators...'); const activeIndicators: Indicator[] = await this.indicatorService.activateIndicators( dtoMatchedData.indicators, ); - // this.logger.log('Activating Materials...'); - // const activeMaterials: Material[] = - // await this.materialService.activateMaterials(dtoMatchedData.materials); - // - // await this.tasksService.updateImportTask({ - // taskId, - // newLogs: [ - // `Activated indicators: ${activeIndicators - // .map((i: Indicator) => i.name) - // .join(', ')}`, - // `Activated materials: ${activeMaterials - // .map((i: Material) => i.hsCodeId) - // .join(', ')}`, - // ], - // }); - // - // const businessUnits: BusinessUnit[] = - // await this.businessUnitService.createTree(dtoMatchedData.businessUnits); - // - // const suppliers: Supplier[] = await this.supplierService.createTree( - // dtoMatchedData.suppliers, - // ); - // - // const { geoCodedSourcingData, errors } = - // await this.geoCodingService.geoCodeLocations( - // dtoMatchedData.sourcingData, - // ); - // if (errors.length) { - // throw new GeoCodingError( - // 'Import failed. There are GeoCoding errors present in the file', - // errors, - // ); - // } - // const warnings: string[] = []; - // geoCodedSourcingData.forEach((elem: SourcingData) => { - // if (elem.locationWarning) warnings.push(elem.locationWarning); - // }); - // warnings.length > 0 && - // (await this.tasksService.updateImportTask({ - // taskId, - // newLogs: warnings, - // })); - // - // const sourcingDataWithOrganizationalEntities: SourcingLocation[] = - // await this.relateSourcingDataWithOrganizationalEntities( - // suppliers, - // businessUnits, - // materials, - // geoCodedSourcingData, - // ); - // - // await this.sourcingLocationService.save( - // sourcingDataWithOrganizationalEntities, - // ); + this.logger.log('Activating Materials...'); + const activeMaterials: Material[] = + await this.materialService.activateMaterials(dtoMatchedData.materials); + + await this.tasksService.updateImportTask({ + taskId, + newLogs: [ + `Activated indicators: ${activeIndicators + .map((i: Indicator) => i.name) + .join(', ')}`, + `Activated materials: ${activeMaterials + .map((i: Material) => i.hsCodeId) + .join(', ')}`, + ], + }); + + const businessUnits: BusinessUnit[] = + await this.businessUnitService.createTree(dtoMatchedData.businessUnits); + + const suppliers: Supplier[] = await this.supplierService.createTree( + dtoMatchedData.suppliers, + ); + + const { geoCodedSourcingData, errors } = + await this.geoCodingService.geoCodeLocations( + dtoMatchedData.sourcingData, + ); + if (errors.length) { + throw new GeoCodingError( + 'Import failed. There are GeoCoding errors present in the file', + errors, + ); + } + const warnings: string[] = []; + geoCodedSourcingData.forEach((elem: SourcingData) => { + if (elem.locationWarning) warnings.push(elem.locationWarning); + }); + warnings.length > 0 && + (await this.tasksService.updateImportTask({ + taskId, + newLogs: warnings, + })); + + const sourcingDataWithOrganizationalEntities: SourcingLocation[] = + await this.relateSourcingDataWithOrganizationalEntities( + suppliers, + businessUnits, + materials, + geoCodedSourcingData, + ); + + await this.sourcingLocationService.save( + sourcingDataWithOrganizationalEntities, + ); this.logger.log('Generating Indicator Records...'); diff --git a/api/src/modules/import-data/utils.ts b/api/src/modules/import-data/utils.ts new file mode 100644 index 000000000..1ce5d8e68 --- /dev/null +++ b/api/src/modules/import-data/utils.ts @@ -0,0 +1,21 @@ +import * as path from 'path'; + +interface WorkerConfig { + workerPath: string; + execArgv: string[]; +} + +// Since node workers do not support ts out of the box, we need to specify the JS file for the app, but TS file for the tests. + +export const getWorkerConfig = (fileName: string): WorkerConfig => { + const isTestEnvironment: boolean = process.env.NODE_ENV === 'test'; + const workerPath: WorkerConfig['workerPath'] = isTestEnvironment + ? path.resolve(__dirname, `./workers/${fileName}.ts`) + : path.resolve(__dirname, `./workers/${fileName}.js`); + + const execArgv: WorkerConfig['execArgv'] = isTestEnvironment + ? ['-r', 'ts-node/register'] + : []; + + return { workerPath, execArgv }; +}; diff --git a/api/src/modules/import-data/workers/xlsx.worker.ts b/api/src/modules/import-data/workers/xlsx.worker.ts new file mode 100644 index 000000000..00fe48ea9 --- /dev/null +++ b/api/src/modules/import-data/workers/xlsx.worker.ts @@ -0,0 +1,19 @@ +import { parentPort, workerData } from 'worker_threads'; +import * as XLSX from 'xlsx'; + +try { + const filePath: string = workerData.filePath; + const sheetMap: any = workerData.sheetMap; + const workBook: XLSX.WorkBook = XLSX.readFile(filePath); + const parsedSheets: Record = {}; + + for (const [key, value] of Object.entries(sheetMap)) { + parsedSheets[value as string] = XLSX.utils.sheet_to_json( + workBook.Sheets[key], + ); + } + + parentPort?.postMessage(parsedSheets); +} catch (error: any) { + parentPort?.postMessage({ error: error.message }); +}