diff --git a/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts b/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts index 5ce74f819e8..98b2d8c618a 100644 --- a/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts @@ -1,11 +1,11 @@ import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; import { JobEntity, JobRepository, JobStatusEnum } from '@novu/dal'; import { + DigestCreationResultEnum, + DigestTypeEnum, ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, StepTypeEnum, - DigestCreationResultEnum, - DigestTypeEnum, } from '@novu/shared'; import { AddDelayJob } from './add-delay-job.usecase'; @@ -19,6 +19,7 @@ import { ConditionsFilter, ConditionsFilterCommand, DetailEnum, + ExecuteOutput, ExecutionLogRoute, ExecutionLogRouteCommand, IChimeraDigestResponse, @@ -29,7 +30,6 @@ import { LogDecorator, requireInject, StandardQueueService, - ExecuteOutput, } from '@novu/application-generic'; export enum BackoffStrategiesEnum { @@ -74,128 +74,179 @@ export class AddJob { Logger.log(`Scheduling New Job ${job._id} of type: ${job.type}`, LOG_CONTEXT); + if (isJobDeferredType(job.type)) { + await this.executeDeferredJob(command); + } else { + await this.executeNoneDeferredJob(command); + } + + await this.executionLogRoute.execute( + ExecutionLogRouteCommand.create({ + ...ExecutionLogRouteCommand.getDetailsFromJob(job), + detail: DetailEnum.STEP_QUEUED, + source: ExecutionDetailsSourceEnum.INTERNAL, + status: ExecutionDetailsStatusEnum.PENDING, + isTest: false, + isRetry: false, + }) + ); + } + + private async executeDeferredJob(command: AddJobCommand): Promise { + const job = command.job; + let digestAmount: number | undefined; let delayAmount: number | undefined = undefined; - let filtered = false; - let filterVariables: IFilterVariables | undefined; - if ([StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(job.type as StepTypeEnum)) { - const shouldRun = await this.conditionsFilter.filter( - ConditionsFilterCommand.create({ - filters: job.step.filters || [], - environmentId: command.environmentId, - organizationId: command.organizationId, - userId: command.userId, - step: job.step, - job, - }) - ); + const shouldRun = await this.conditionsFilter.filter( + ConditionsFilterCommand.create({ + filters: job.step.filters || [], + environmentId: command.environmentId, + organizationId: command.organizationId, + userId: command.userId, + step: job.step, + job, + }) + ); - filterVariables = shouldRun.variables; - filtered = !shouldRun.passed; - - let digestCreationResult: DigestCreationResultEnum | undefined; - if (job.type === StepTypeEnum.DIGEST) { - const resonateResponse = await this.resonateUsecase.execute< - AddJobCommand & { variables: IFilterVariables }, - ExecuteOutput - >({ - ...command, - variables: filterVariables, - }); - - validateDigest(job); - - digestAmount = this.calculateDelayService.calculateDelay({ - stepMetadata: job.digest, - payload: job.payload, - overrides: job.overrides, - chimeraResponse: this.fallbackToRegularDigest(resonateResponse?.outputs), - }); - - Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT); - - digestCreationResult = await this.mergeOrCreateDigestUsecase.execute( - MergeOrCreateDigestCommand.create({ - job, - filtered, - chimeraData: resonateResponse?.outputs, - }) - ); - - if (digestCreationResult === DigestCreationResultEnum.MERGED) { - Logger.log('Digest was merged, queueing next job', LOG_CONTEXT); - - return; - } - - if (digestCreationResult === DigestCreationResultEnum.SKIPPED) { - const nextJobToSchedule = await this.jobRepository.findOne({ - _environmentId: command.environmentId, - _parentId: job._id, - }); - - if (!nextJobToSchedule) { - return; - } - - await this.execute({ - userId: job._userId, - environmentId: job._environmentId, - organizationId: command.organizationId, - jobId: nextJobToSchedule._id, - job: nextJobToSchedule, - }); - - return; - } - } + const filterVariables = shouldRun.variables; + const filtered = !shouldRun.passed; - if (job.type === StepTypeEnum.DELAY) { - const resonateResponse = await this.resonateUsecase.execute< - AddJobCommand & { variables: IFilterVariables }, - ExecuteOutput - >({ - ...command, - variables: filterVariables, - }); + if (job.type === StepTypeEnum.DIGEST) { + const digestResult = await this.handleDigest(command, filterVariables, job, digestAmount, filtered); - command.chimeraResponse = resonateResponse; - delayAmount = await this.addDelayJob.execute(command); + if (isShouldHaltJobExecution(digestResult.digestCreationResult)) { + return; + } - Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT); + digestAmount = digestResult.digestAmount; + } + + if (job.type === StepTypeEnum.DELAY) { + delayAmount = await this.handleDelay(command, filterVariables, delayAmount); - if (delayAmount === undefined) { - Logger.warn(`Delay Amount does not exist on a delay job ${job._id}`, LOG_CONTEXT); + if (delayAmount === undefined) { + Logger.warn(`Delay Amount does not exist on a delay job ${job._id}`, LOG_CONTEXT); - return; - } + return; } } - if (digestAmount === undefined && delayAmount === undefined) { - Logger.verbose(`Updating status to queued for job ${job._id}`, LOG_CONTEXT); - await this.jobRepository.updateStatus(command.environmentId, job._id, JobStatusEnum.QUEUED); + if ((digestAmount || delayAmount) && filtered) { + Logger.verbose(`Delay for job ${job._id} will be 0 because job was filtered`, LOG_CONTEXT); } - await this.executionLogRoute.execute( - ExecutionLogRouteCommand.create({ - ...ExecutionLogRouteCommand.getDetailsFromJob(job), - detail: DetailEnum.STEP_QUEUED, - source: ExecutionDetailsSourceEnum.INTERNAL, - status: ExecutionDetailsStatusEnum.PENDING, - isTest: false, - isRetry: false, + const delay = this.getExecutionDelayAmount(filtered, digestAmount, delayAmount); + + await this.queueJob(job, delay); + } + + private async executeNoneDeferredJob(command: AddJobCommand): Promise { + const job = command.job; + + Logger.verbose(`Updating status to queued for job ${job._id}`, LOG_CONTEXT); + await this.jobRepository.updateStatus(command.environmentId, job._id, JobStatusEnum.QUEUED); + + await this.queueJob(job, 0); + } + + private async handleDelay( + command: AddJobCommand, + filterVariables: IFilterVariables, + delayAmount: number | undefined + ) { + command.chimeraResponse = await this.resonateUsecase.execute< + AddJobCommand & { variables: IFilterVariables }, + ExecuteOutput + >({ + ...command, + variables: filterVariables, + }); + delayAmount = await this.addDelayJob.execute(command); + + Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT); + + return delayAmount; + } + + private async handleDigest( + command: AddJobCommand, + filterVariables: IFilterVariables, + job, + digestAmount: number | undefined, + filtered: boolean + ) { + const resonateResponse = await this.resonateUsecase.execute< + AddJobCommand & { variables: IFilterVariables }, + ExecuteOutput + >({ + ...command, + variables: filterVariables, + }); + + validateDigest(job); + + digestAmount = this.calculateDelayService.calculateDelay({ + stepMetadata: job.digest, + payload: job.payload, + overrides: job.overrides, + chimeraResponse: this.fallbackToRegularDigest(resonateResponse?.outputs), + }); + + Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT); + + const digestCreationResult = await this.mergeOrCreateDigestUsecase.execute( + MergeOrCreateDigestCommand.create({ + job, + filtered, + chimeraData: resonateResponse?.outputs, }) ); - const delay = (filtered ? 0 : digestAmount ?? delayAmount) ?? 0; + if (digestCreationResult === DigestCreationResultEnum.MERGED) { + this.handleDigestMerged(); + } - if ((digestAmount || delayAmount) && filtered) { - Logger.verbose(`Delay for job ${job._id} will be 0 because job was filtered`, LOG_CONTEXT); + if (digestCreationResult === DigestCreationResultEnum.SKIPPED) { + await this.handleDigestSkip(command, job); } - await this.queueJob(job, delay); + return { digestAmount, digestCreationResult }; + } + + private handleDigestMerged() { + Logger.log('Digest was merged, queueing next job', LOG_CONTEXT); + + return; + } + + private async handleDigestSkip(command: AddJobCommand, job) { + const nextJobToSchedule = await this.jobRepository.findOne({ + _environmentId: command.environmentId, + _parentId: job._id, + }); + + if (!nextJobToSchedule) { + return; + } + + await this.execute({ + userId: job._userId, + environmentId: job._environmentId, + organizationId: command.organizationId, + jobId: nextJobToSchedule._id, + job: nextJobToSchedule, + }); + + return; + } + + private getExecutionDelayAmount( + filtered: boolean, + digestAmount: number | undefined, + delayAmount: undefined | number + ) { + return (filtered ? 0 : digestAmount ?? delayAmount) ?? 0; } /* @@ -274,3 +325,13 @@ export class AddJob { }); } } + +function isJobDeferredType(jobType: StepTypeEnum | undefined) { + if (!jobType) return false; + + return [StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(jobType); +} + +function isShouldHaltJobExecution(digestCreationResult: DigestCreationResultEnum) { + return [DigestCreationResultEnum.MERGED, DigestCreationResultEnum.SKIPPED].includes(digestCreationResult); +} diff --git a/libs/application-generic/src/services/calculate-delay/calculate-delay.service.ts b/libs/application-generic/src/services/calculate-delay/calculate-delay.service.ts index a6748babafd..983261c80c5 100644 --- a/libs/application-generic/src/services/calculate-delay/calculate-delay.service.ts +++ b/libs/application-generic/src/services/calculate-delay/calculate-delay.service.ts @@ -30,10 +30,12 @@ export class CalculateDelayService { overrides: any; chimeraResponse?: IChimeraDigestResponse | IChimeraDelayResponse; }): number { - const digestType = - (chimeraResponse?.type as DigestTypeEnum) ?? stepMetadata.type; if (!stepMetadata) throw new ApiException(`Step metadata not found`); + const digestType = + (chimeraResponse?.type as DigestTypeEnum | DelayTypeEnum) ?? + stepMetadata.type; + if (digestType === DelayTypeEnum.SCHEDULED) { const delayPath = (stepMetadata as IDelayScheduledMetadata).delayPath; if (!delayPath) throw new ApiException(`Delay path not found`); @@ -51,21 +53,22 @@ export class CalculateDelayService { return delay; } - const chimeraUnit = castToDigestUnitEnum(chimeraResponse?.unit); + const userUnit = castToDigestUnitEnum(chimeraResponse?.unit); + const userAmount = chimeraResponse?.amount; if (isRegularDigest(digestType)) { - if (this.checkValidDelayOverride(overrides)) { + if (this.isValidDelayOverride(overrides)) { return this.toMilliseconds( - chimeraResponse?.amount ?? (overrides.delay.amount as number), - chimeraUnit ?? (overrides.delay.unit as DigestUnitEnum) + userAmount ?? (overrides.delay.amount as number), + userUnit ?? (overrides.delay.unit as DigestUnitEnum) ); } const regularDigestMeta = stepMetadata as IDigestRegularMetadata; return this.toMilliseconds( - chimeraResponse?.amount ?? regularDigestMeta.amount, - chimeraUnit ?? regularDigestMeta.unit + userAmount ?? regularDigestMeta.amount, + userUnit ?? regularDigestMeta.unit ); } @@ -105,16 +108,18 @@ export class CalculateDelayService { return delay; } - private checkValidDelayOverride(overrides: any): boolean { + private isValidDelayOverride(overrides: any): boolean { if (!overrides?.delay) { return false; } - const values = Object.values(DigestUnitEnum); - return ( - typeof overrides.delay.amount === 'number' && - values.includes(overrides.delay.unit as unknown as DigestUnitEnum) + const isDelayAmountANumber = typeof overrides.delay.amount === 'number'; + const digestUnits = Object.values(DigestUnitEnum); + const includesValidDelayUnit = digestUnits.includes( + overrides.delay.unit as unknown as DigestUnitEnum ); + + return isDelayAmountANumber && includesValidDelayUnit; } }