From c7aa51129bcd81cd97010b6bb933d51edcd5bf2c Mon Sep 17 00:00:00 2001 From: tipusinghaw <126460794+tipusinghaw@users.noreply.github.com> Date: Fri, 5 Jul 2024 14:48:02 +0530 Subject: [PATCH] feat: batch bulk issunace (#826) * feat: batch bulk issuance Signed-off-by: tipusinghaw * stoped queue to remove issuance data Signed-off-by: tipusinghaw * feat: added delay for 1 min Signed-off-by: tipusinghaw * refactor: changed batch config Signed-off-by: tipusinghaw * feat: changed delay time for batch Signed-off-by: tipusinghaw * feat: added batch for retry Signed-off-by: tipusinghaw * refactor: added common constant in libs Signed-off-by: tipusinghaw * refactor: removed duplicate value from schema DTO Signed-off-by: tipusinghaw --------- Signed-off-by: tipusinghaw Signed-off-by: KulkarniShashank --- .../interfaces/issuance.interfaces.ts | 7 + apps/issuance/src/issuance.repository.ts | 10 +- apps/issuance/src/issuance.service.ts | 202 ++++++++++++------ libs/common/src/common.constant.ts | 5 + 4 files changed, 155 insertions(+), 69 deletions(-) diff --git a/apps/issuance/interfaces/issuance.interfaces.ts b/apps/issuance/interfaces/issuance.interfaces.ts index e66aa1f52..29a04ac4e 100644 --- a/apps/issuance/interfaces/issuance.interfaces.ts +++ b/apps/issuance/interfaces/issuance.interfaces.ts @@ -345,3 +345,10 @@ export interface IDeletedFileUploadRecords { deleteFileDetails: Prisma.BatchPayload; deleteFileUploadDetails: Prisma.BatchPayload; } + +export interface BulkPayloadDetails { + clientId: string, + orgId: string, + requestId?: string, + isRetry: boolean +} diff --git a/apps/issuance/src/issuance.repository.ts b/apps/issuance/src/issuance.repository.ts index 0f7a92afd..2dac82790 100644 --- a/apps/issuance/src/issuance.repository.ts +++ b/apps/issuance/src/issuance.repository.ts @@ -354,7 +354,10 @@ export class IssuanceRepository { const errorCount = await this.prisma.file_data.count({ where: { fileUploadId, - isError: true + OR: [ + { isError: true }, + { status: false } + ] } }); @@ -569,7 +572,10 @@ export class IssuanceRepository { return this.prisma.file_data.findMany({ where: { fileUploadId: fileId, - isError: true + OR: [ + { isError: true }, + { status: false } + ] } }); } catch (error) { diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index fa1bc3168..46ee7afb2 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -9,7 +9,7 @@ import { CommonConstants } from '@credebl/common/common.constant'; import { ResponseMessages } from '@credebl/common/response-messages'; import { ClientProxy, RpcException } from '@nestjs/microservices'; import { map } from 'rxjs'; -import { CredentialOffer, FileUpload, FileUploadData, IAttributes, IBulkPayloadObject, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISchemaAttributes, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces'; +import { BulkPayloadDetails, CredentialOffer, FileUpload, FileUploadData, IAttributes, IBulkPayloadObject, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISchemaAttributes, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces'; import { IssuanceProcessState, OrgAgentType, PromiseResult, SchemaType, TemplateIdentifier} from '@credebl/enum/enum'; import * as QRCode from 'qrcode'; import { OutOfBandIssuance } from '../templates/out-of-band-issuance.template'; @@ -32,7 +32,7 @@ import { ICredentialOfferResponse, IDeletedIssuanceRecords, IIssuedCredential, I import { OOBIssueCredentialDto } from 'apps/api-gateway/src/issuance/dtos/issuance.dto'; import { RecordType, agent_invitations, organisation, user } from '@prisma/client'; import { createOobJsonldIssuancePayload, validateEmail } from '@credebl/common/cast.helper'; -import { sendEmail } from '@credebl/common/send-grid-helper-file'; +// import { sendEmail } from '@credebl/common/send-grid-helper-file'; import * as pLimit from 'p-limit'; import { UserActivityRepository } from 'libs/user-activity/repositories'; import { validateW3CSchemaAttributes } from '../libs/helpers/attributes.validator'; @@ -800,7 +800,10 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO disposition: 'attachment' } ]; - const isEmailSent = await sendEmail(this.emailData); + + const isEmailSent = true; // change this after testing on local + // const isEmailSent = await sendEmail(this.emailData); + this.logger.log(`isEmailSent ::: ${JSON.stringify(isEmailSent)}-${this.counter}`); this.counter++; if (!isEmailSent) { @@ -820,6 +823,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO message: `${ResponseMessages.issuance.error.walletError} at position ${iterationNo}`, error: `${errorStack?.error?.message} at position ${iterationNo}` }) ); + throw error; // Check With other issuance flow } else { errors.push( new RpcException({ @@ -828,6 +832,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO error: error?.response?.error }) ); + throw error; // Check With other issuance flow } } } @@ -1163,17 +1168,105 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO return new Promise(resolve => setTimeout(resolve, ms)); } - async issueBulkCredential(requestId: string, orgId: string, clientDetails: IClientDetails, reqPayload: ImportFileDetails): Promise { + /** + * Processes bulk payload in batches and adds jobs to the queue. + * @param bulkPayload + * @param clientDetails + * @param orgId + * @param requestId + */ + + private async processInBatches(bulkPayload, bulkPayloadDetails: BulkPayloadDetails):Promise { + + const {clientId, isRetry, orgId, requestId} = bulkPayloadDetails; + const delay = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); + const batchSize = CommonConstants.ISSUANCE_BATCH_SIZE; // initial 1000 + const uniqueJobId = uuidv4(); + const limit = pLimit(CommonConstants.ISSUANCE_MAX_CONCURRENT_OPERATIONS); + + // Generator function to yield batches + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type + function* createBatches(array, size) { + for (let i = 0; i < array.length; i += size) { + yield array.slice(i, i + size); + } + } + + // Helper function to process a batch + const processBatch = async (batch, batchIndex): Promise<[]> => { + const queueJobsArray = batch.map((item) => ({ + data: { + id: item.id, + jobId: uniqueJobId, + cacheId: requestId, + clientId, + referenceId: item.referenceId, + fileUploadId: item.fileUploadId, + schemaLedgerId: item.schemaId, + credentialDefinitionId: item.credDefId, + status: item.status, + credential_data: item.credential_data, + orgId, + credentialType: item.credential_type, + totalJobs: bulkPayload.length, + isRetry, + isLastData: false + } + })); + + this.logger.log(`Processing batch ${batchIndex + 1} with ${batch.length} items.`); + + // Execute the batched jobs with limited concurrency + await Promise.all(queueJobsArray.map(job => limit(() => job))); + + return queueJobsArray; + }; + + let batchIndex = 0; + + for (const batch of createBatches(bulkPayload, batchSize)) { + const resolvedBatchJobs = await processBatch(batch, batchIndex); + + this.logger.log("Adding resolved jobs to the queue:", resolvedBatchJobs); + await this.bulkIssuanceQueue.addBulk(resolvedBatchJobs); + + batchIndex++; + + // Wait for 60 seconds before processing the next batch, if more batches are remaining + if ((batchIndex * batchSize) < bulkPayload.length) { + await delay(CommonConstants.ISSUANCE_BATCH_DELAY); + } + } + } + + /** + * Handles bulk credential issuance. + * @param requestId - The request ID. + * @param orgId - The organization ID. + * @param clientDetails - Client details. + * @param reqPayload - Request payload containing file details. + * @returns A promise resolving to a success message. + */ + async issueBulkCredential( + requestId: string, + orgId: string, + clientDetails: IClientDetails, + reqPayload: ImportFileDetails + ): Promise { if (!requestId) { throw new BadRequestException(ResponseMessages.issuance.error.missingRequestId); } + const fileUpload: FileUpload = { lastChangedDateTime: null, upload_type: '', status: '', orgId: '', - createDateTime: null + createDateTime: null, + name: '', + credentialType: '' }; + let csvFileDetail; try { @@ -1181,7 +1274,8 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO if (!cachedData) { throw new BadRequestException(ResponseMessages.issuance.error.cacheTimeOut); } - //for demo UI + + // For demo UI if (cachedData && clientDetails?.isSelectiveIssuance) { await this.cacheManager.del(requestId); await this.uploadCSVTemplate(reqPayload, requestId); @@ -1204,16 +1298,15 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO fileUpload.createDateTime = new Date(); fileUpload.name = parsedFileDetails.fileName; fileUpload.credentialType = parsedFileDetails.credentialType; - - csvFileDetail = await this.issuanceRepository.saveFileUploadDetails(fileUpload, clientDetails.userId); + csvFileDetail = await this.issuanceRepository.saveFileUploadDetails(fileUpload, clientDetails.userId); - const bulkPayloadObject: IBulkPayloadObject = { - parsedData, - parsedFileDetails, - userId: clientDetails.userId, - fileUploadId: csvFileDetail.id - }; + const bulkPayloadObject: IBulkPayloadObject = { + parsedData, + parsedFileDetails, + userId: clientDetails.userId, + fileUploadId: csvFileDetail.id + }; const storeBulkPayload = await this._storeBulkPayloadInBatch(bulkPayloadObject); @@ -1221,41 +1314,30 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO throw new BadRequestException(ResponseMessages.issuance.error.storeBulkData); } - const bulkpayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id); - if (!bulkpayload) { + // Process in batches + const bulkPayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id); + if (!bulkPayload) { throw new BadRequestException(ResponseMessages.issuance.error.fileData); } - const uniqueJobId = uuidv4(); - const queueJobsArrayPromises = bulkpayload.map(async (item) => ({ - data: { - id: item.id, - jobId: uniqueJobId, - cacheId: requestId, - clientId: clientDetails.clientId, - referenceId: item.referenceId, - fileUploadId: item.fileUploadId, - schemaLedgerId: item.schemaId, - credentialDefinitionId: item.credDefId, - status: item.status, - credential_data: item.credential_data, - orgId, - credentialType: item.credential_type, - totalJobs: bulkpayload.length, - isRetry: false, - isLastData: false - } - })); - const queueJobsArray = await Promise.all(queueJobsArrayPromises); - try { - await this.bulkIssuanceQueue.addBulk(queueJobsArray); - } catch (error) { - this.logger.error(`Error processing issuance data: ${error}`); - } + try { + + const bulkPayloadDetails: BulkPayloadDetails = { + clientId: clientDetails.clientId, + orgId, + requestId, + isRetry: false + }; + + this.processInBatches(bulkPayload, bulkPayloadDetails); + } catch (error) { + this.logger.error(`Error processing issuance data: ${error}`); + } + return ResponseMessages.issuance.success.bulkProcess; } catch (error) { fileUpload.status = FileUploadStatus.interrupted; - this.logger.error(`error in issueBulkCredential : ${error}`); + this.logger.error(`Error in issueBulkCredential: ${error}`); throw new RpcException(error.response); } finally { if (csvFileDetail !== undefined && csvFileDetail.id !== undefined) { @@ -1277,28 +1359,14 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO const errorMessage = ResponseMessages.bulkIssuance.error.fileDetailsNotFound; throw new BadRequestException(`${errorMessage}`); } - const uniqueJobId = uuidv4(); - const queueJobsArrayPromises = bulkpayloadRetry.map(async (item) => ({ - data: { - id: item.id, - jobId: uniqueJobId, + + try { + const bulkPayloadDetails: BulkPayloadDetails = { clientId, - referenceId: item.referenceId, - fileUploadId: item.fileUploadId, - schemaLedgerId: item.schemaId, - credentialDefinitionId: item.credDefId, - status: item.status, - credential_data: item.credential_data, orgId, - credentialType: item.credential_type, - totalJobs: bulkpayloadRetry.length, - isRetry: true, - isLastData: false - } - })); - const queueJobsArray = await Promise.all(queueJobsArrayPromises); - try { - await this.bulkIssuanceQueue.addBulk(queueJobsArray); + isRetry: true + }; + this.processInBatches(bulkpayloadRetry, bulkPayloadDetails); } catch (error) { this.logger.error(`Error processing issuance data: ${error}`); } @@ -1336,7 +1404,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO fileUploadData.fileRow = JSON.stringify(jobDetails); fileUploadData.isError = false; fileUploadData.createDateTime = new Date(); - fileUploadData.referenceId = jobDetails.credential_data.email_identifier; + fileUploadData.referenceId = jobDetails?.credential_data?.email_identifier; fileUploadData.jobId = jobDetails.id; const {orgId} = jobDetails; @@ -1534,13 +1602,13 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO const {parsedFileDetails, parsedData, fileUploadId, userId} = bulkPayloadObject; const limit = pLimit(CommonConstants.MAX_CONCURRENT_OPERATIONS); - const startTime = Date.now(); // Start timing the entire process + const startTime = Date.now(); const batches = await this.splitIntoBatches(parsedData, CommonConstants.BATCH_SIZE); this.logger.log("Total number of batches:", batches.length); for (const [index, batch] of batches.entries()) { - const batchStartTime = Date.now(); // Start timing the current batch + const batchStartTime = Date.now(); // Create an array of limited promises for the current batch const saveFileDetailsPromises = batch.map(element => limit(() => { @@ -1567,7 +1635,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO this.logger.log(`Batch ${index + 1} processed in ${(batchEndTime - batchStartTime)} milliseconds.`); } - const endTime = Date.now(); // End timing the entire process + const endTime = Date.now(); this.logger.log(`Total processing time: ${(endTime - startTime)} milliseconds.`); return true; } catch (error) { diff --git a/libs/common/src/common.constant.ts b/libs/common/src/common.constant.ts index b62a21fa9..e00a8cd6b 100644 --- a/libs/common/src/common.constant.ts +++ b/libs/common/src/common.constant.ts @@ -323,6 +323,11 @@ CACHE_TTL_SECONDS = 604800, // Bulk-issuance BATCH_SIZE = 100, MAX_CONCURRENT_OPERATIONS = 50, +ISSUANCE_BATCH_SIZE = 2000, +ISSUANCE_MAX_CONCURRENT_OPERATIONS = 1000, +ISSUANCE_BATCH_DELAY = 60000 //Intially 60000 + + // MICROSERVICES NAMES API_GATEWAY_SERVICE = 'api-gateway', ORGANIZATION_SERVICE = 'organization',