Skip to content

Commit

Permalink
feat: bulk issuance retry (#747)
Browse files Browse the repository at this point in the history
* feat: added implementation for retry

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* refactor: changed message for issunace

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

---------

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>
  • Loading branch information
tipusinghaw authored and KulkarniShashank committed Sep 6, 2024
1 parent 0d290a7 commit 8eb9c2f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 40 deletions.
18 changes: 18 additions & 0 deletions apps/issuance/interfaces/issuance.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,22 @@ export interface IJobDetails {
credential_data: CredentialData
orgId: string;
credentialType: string;
}

export interface IQueuePayload{
id: string;
jobId: string;
cacheId?: string;
clientId: string;
referenceId: string;
fileUploadId: string;
schemaLedgerId: string;
credentialDefinitionId: string;
status: string;
credential_data: CredentialData;
orgId: string;
credentialType: string;
totalJobs: number;
isRetry: boolean;
isLastData: boolean;
}
3 changes: 2 additions & 1 deletion apps/issuance/src/issuance.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { OnQueueActive, Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { IssuanceService } from './issuance.service';
import { Logger } from '@nestjs/common';
import { IQueuePayload } from '../interfaces/issuance.interfaces';

@Processor('bulk-issuance')
export class BulkIssuanceProcessor {
Expand All @@ -16,7 +17,7 @@ export class BulkIssuanceProcessor {
}

@Process()
async issueCredential(job: Job<unknown>):Promise<void> {
async issueCredential(job: Job<IQueuePayload>):Promise<void> {
this.logger.log(
`Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...`
);
Expand Down
87 changes: 49 additions & 38 deletions apps/issuance/src/issuance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { CommonConstants } from '@credebl/common/common.constant';
import { ResponseMessages } from '@credebl/common/response-messages';
import { ClientProxy, RpcException } from '@nestjs/microservices';
import { map } from 'rxjs';
import { CredentialOffer, FileUpload, FileUploadData, IAttributes, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces';
import { CredentialOffer, FileUpload, FileUploadData, IAttributes, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces';
import { OrgAgentType, SchemaType, TemplateIdentifier } from '@credebl/enum/enum';
import * as QRCode from 'qrcode';
import { OutOfBandIssuance } from '../templates/out-of-band-issuance.template';
Expand Down Expand Up @@ -37,6 +37,7 @@ import { sendEmail } from '@credebl/common/send-grid-helper-file';
@Injectable()
export class IssuanceService {
private readonly logger = new Logger('IssueCredentialService');
private processedJobsCounters: Record<string, number> = {};
constructor(
@Inject('NATS_CLIENT') private readonly issuanceServiceProxy: ClientProxy,
private readonly commonService: CommonService,
Expand Down Expand Up @@ -1106,15 +1107,15 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
// Wait for all saveFileDetails operations to complete
await Promise.all(saveFileDetailsPromises);

// Now fetch the file details
const bulkpayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id);
if (!bulkpayload) {
throw new BadRequestException(ResponseMessages.issuance.error.fileData);
}

const uniqueJobId = uuidv4();
const queueJobsArrayPromises = bulkpayload.map(async (item) => ({
data: {
id: item.id,
jobId: uniqueJobId,
cacheId: requestId,
clientId: clientDetails.clientId,
referenceId: item.referenceId,
Expand All @@ -1124,11 +1125,13 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
status: item.status,
credential_data: item.credential_data,
orgId,
credentialType: item.credential_type
credentialType: item.credential_type,
totalJobs: bulkpayload.length,
isRetry: false,
isLastData: false
}
}));

// Await all promises to complete

const queueJobsArray = await Promise.all(queueJobsArrayPromises);
try {
await this.bulkIssuanceQueue.addBulk(queueJobsArray);
Expand All @@ -1150,55 +1153,63 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
}

async retryBulkCredential(fileId: string, orgId: string, clientId: string): Promise<string> {
let respFile;
let bulkpayloadRetry;

try {

const fileDetails = await this.issuanceRepository.getFileDetailsById(fileId);
if (!fileDetails) {
throw new BadRequestException(ResponseMessages.issuance.error.retry);
}
respFile = await this.issuanceRepository.getFailedCredentials(fileId);

if (0 === respFile.length) {
bulkpayloadRetry = await this.issuanceRepository.getFailedCredentials(fileId);
if (0 === bulkpayloadRetry.length) {
const errorMessage = ResponseMessages.bulkIssuance.error.fileDetailsNotFound;
throw new BadRequestException(`${errorMessage}`);
}

for (const element of respFile) {
try {
const payload = {
data: element.credential_data,
fileUploadId: element.fileUploadId,
clientId,
credentialDefinitionId: element.credDefId,
schemaLedgerId: element.schemaId,
orgId,
id: element.id,
isRetry: true,
isLastData: respFile.indexOf(element) === respFile.length - 1
};

await this.delay(500); // Wait for 0.5 secends
this.processIssuanceData(payload);
if (0 === respFile.length) {
return FileUploadStatus.completed;
}
} catch (error) {
// Handle errors if needed
this.logger.error(`Error processing issuance data: ${error}`);
const uniqueJobId = uuidv4();
const queueJobsArrayPromises = bulkpayloadRetry.map(async (item) => ({
data: {
id: item.id,
jobId: uniqueJobId,
clientId,
referenceId: item.referenceId,
fileUploadId: item.fileUploadId,
schemaLedgerId: item.schemaId,
credentialDefinitionId: item.credDefId,
status: item.status,
credential_data: item.credential_data,
orgId,
credentialType: item.credential_type,
totalJobs: bulkpayloadRetry.length,
isRetry: true,
isLastData: false
}
}

return 'Process reinitiated for bulk issuance';
}));
const queueJobsArray = await Promise.all(queueJobsArrayPromises);
try {
await this.bulkIssuanceQueue.addBulk(queueJobsArray);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}

return ResponseMessages.bulkIssuance.success.reinitiated;
} catch (error) {
throw new RpcException(error.response ? error.response : error);
}
}


// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async processIssuanceData(jobDetails): Promise<void> {
async processIssuanceData(jobDetails: IQueuePayload): Promise<void> {
const {jobId, totalJobs} = jobDetails;
if (!this.processedJobsCounters[jobId]) {
this.processedJobsCounters[jobId] = 0;
}
this.processedJobsCounters[jobId] += 1;
if (this.processedJobsCounters[jobId] === totalJobs) {
jobDetails.isLastData = true;
delete this.processedJobsCounters[jobId];
}

const socket = await io(`${process.env.SOCKET_HOST}`, {
reconnection: true,
reconnectionDelay: 5000,
Expand Down
3 changes: 2 additions & 1 deletion libs/common/src/response-messages/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ export const ResponseMessages = {
},
bulkIssuance: {
success: {
create: 'Issuance process successfully'
create: 'Issuance process initiated successfully',
reinitiated: 'Process reinitiated for bulk issuance'
},
error: {
PathNotFound: 'Path to export data not found.',
Expand Down

0 comments on commit 8eb9c2f

Please sign in to comment.