Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(worker): add job #5663

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 166 additions & 105 deletions apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common';
import { JobEntity, JobRepository, JobStatusEnum } from '@novu/dal';
import {
DigestCreationResultEnum,
DigestTypeEnum,
ExecutionDetailsSourceEnum,
ExecutionDetailsStatusEnum,
StepTypeEnum,
DigestCreationResultEnum,
DigestTypeEnum,
} from '@novu/shared';

import { AddDelayJob } from './add-delay-job.usecase';
Expand All @@ -19,6 +19,7 @@ import {
ConditionsFilter,
ConditionsFilterCommand,
DetailEnum,
ExecuteOutput,
ExecutionLogRoute,
ExecutionLogRouteCommand,
IChimeraDigestResponse,
Expand All @@ -29,7 +30,6 @@ import {
LogDecorator,
requireInject,
StandardQueueService,
ExecuteOutput,
} from '@novu/application-generic';

export enum BackoffStrategiesEnum {
Expand Down Expand Up @@ -74,128 +74,179 @@ export class AddJob {

Logger.log(`Scheduling New Job ${job._id} of type: ${job.type}`, LOG_CONTEXT);

if (isJobDeferredType(job.type)) {
await this.executeDeferredJob(command);
} else {
await this.executeNoneDeferredJob(command);
}

await this.executionLogRoute.execute(
ExecutionLogRouteCommand.create({
...ExecutionLogRouteCommand.getDetailsFromJob(job),
detail: DetailEnum.STEP_QUEUED,
source: ExecutionDetailsSourceEnum.INTERNAL,
status: ExecutionDetailsStatusEnum.PENDING,
isTest: false,
isRetry: false,
})
);
}

private async executeDeferredJob(command: AddJobCommand): Promise<void> {
const job = command.job;

let digestAmount: number | undefined;
let delayAmount: number | undefined = undefined;

let filtered = false;
let filterVariables: IFilterVariables | undefined;
if ([StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(job.type as StepTypeEnum)) {
const shouldRun = await this.conditionsFilter.filter(
ConditionsFilterCommand.create({
filters: job.step.filters || [],
environmentId: command.environmentId,
organizationId: command.organizationId,
userId: command.userId,
step: job.step,
job,
})
);
const shouldRun = await this.conditionsFilter.filter(
ConditionsFilterCommand.create({
filters: job.step.filters || [],
environmentId: command.environmentId,
organizationId: command.organizationId,
userId: command.userId,
step: job.step,
job,
})
);

filterVariables = shouldRun.variables;
filtered = !shouldRun.passed;

let digestCreationResult: DigestCreationResultEnum | undefined;
if (job.type === StepTypeEnum.DIGEST) {
const resonateResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>({
...command,
variables: filterVariables,
});

validateDigest(job);

digestAmount = this.calculateDelayService.calculateDelay({
stepMetadata: job.digest,
payload: job.payload,
overrides: job.overrides,
chimeraResponse: this.fallbackToRegularDigest(resonateResponse?.outputs),
});

Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT);

digestCreationResult = await this.mergeOrCreateDigestUsecase.execute(
MergeOrCreateDigestCommand.create({
job,
filtered,
chimeraData: resonateResponse?.outputs,
})
);

if (digestCreationResult === DigestCreationResultEnum.MERGED) {
Logger.log('Digest was merged, queueing next job', LOG_CONTEXT);

return;
}

if (digestCreationResult === DigestCreationResultEnum.SKIPPED) {
const nextJobToSchedule = await this.jobRepository.findOne({
_environmentId: command.environmentId,
_parentId: job._id,
});

if (!nextJobToSchedule) {
return;
}

await this.execute({
userId: job._userId,
environmentId: job._environmentId,
organizationId: command.organizationId,
jobId: nextJobToSchedule._id,
job: nextJobToSchedule,
});

return;
}
}
const filterVariables = shouldRun.variables;
const filtered = !shouldRun.passed;

if (job.type === StepTypeEnum.DELAY) {
const resonateResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>({
...command,
variables: filterVariables,
});
if (job.type === StepTypeEnum.DIGEST) {
const digestResult = await this.handleDigest(command, filterVariables, job, digestAmount, filtered);

command.chimeraResponse = resonateResponse;
delayAmount = await this.addDelayJob.execute(command);
if (isShouldHaltJobExecution(digestResult.digestCreationResult)) {
return;
}

Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT);
digestAmount = digestResult.digestAmount;
}

if (job.type === StepTypeEnum.DELAY) {
delayAmount = await this.handleDelay(command, filterVariables, delayAmount);

if (delayAmount === undefined) {
Logger.warn(`Delay Amount does not exist on a delay job ${job._id}`, LOG_CONTEXT);
if (delayAmount === undefined) {
Logger.warn(`Delay Amount does not exist on a delay job ${job._id}`, LOG_CONTEXT);

return;
}
return;
}
}

if (digestAmount === undefined && delayAmount === undefined) {
Logger.verbose(`Updating status to queued for job ${job._id}`, LOG_CONTEXT);
await this.jobRepository.updateStatus(command.environmentId, job._id, JobStatusEnum.QUEUED);
if ((digestAmount || delayAmount) && filtered) {
Logger.verbose(`Delay for job ${job._id} will be 0 because job was filtered`, LOG_CONTEXT);
}

await this.executionLogRoute.execute(
ExecutionLogRouteCommand.create({
...ExecutionLogRouteCommand.getDetailsFromJob(job),
detail: DetailEnum.STEP_QUEUED,
source: ExecutionDetailsSourceEnum.INTERNAL,
status: ExecutionDetailsStatusEnum.PENDING,
isTest: false,
isRetry: false,
const delay = this.getExecutionDelayAmount(filtered, digestAmount, delayAmount);

await this.queueJob(job, delay);
}

private async executeNoneDeferredJob(command: AddJobCommand): Promise<void> {
const job = command.job;

Logger.verbose(`Updating status to queued for job ${job._id}`, LOG_CONTEXT);
await this.jobRepository.updateStatus(command.environmentId, job._id, JobStatusEnum.QUEUED);

await this.queueJob(job, 0);
}

private async handleDelay(
command: AddJobCommand,
filterVariables: IFilterVariables,
delayAmount: number | undefined
) {
command.chimeraResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>({
...command,
variables: filterVariables,
});
delayAmount = await this.addDelayJob.execute(command);

Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT);

return delayAmount;
}

private async handleDigest(
command: AddJobCommand,
filterVariables: IFilterVariables,
job,
digestAmount: number | undefined,
filtered: boolean
) {
const resonateResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>({
...command,
variables: filterVariables,
});

validateDigest(job);

digestAmount = this.calculateDelayService.calculateDelay({
stepMetadata: job.digest,
payload: job.payload,
overrides: job.overrides,
chimeraResponse: this.fallbackToRegularDigest(resonateResponse?.outputs),
});

Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT);

const digestCreationResult = await this.mergeOrCreateDigestUsecase.execute(
MergeOrCreateDigestCommand.create({
job,
filtered,
chimeraData: resonateResponse?.outputs,
})
);

const delay = (filtered ? 0 : digestAmount ?? delayAmount) ?? 0;
if (digestCreationResult === DigestCreationResultEnum.MERGED) {
this.handleDigestMerged();
}

if ((digestAmount || delayAmount) && filtered) {
Logger.verbose(`Delay for job ${job._id} will be 0 because job was filtered`, LOG_CONTEXT);
if (digestCreationResult === DigestCreationResultEnum.SKIPPED) {
await this.handleDigestSkip(command, job);
}

await this.queueJob(job, delay);
return { digestAmount, digestCreationResult };
}

private handleDigestMerged() {
Logger.log('Digest was merged, queueing next job', LOG_CONTEXT);

return;
}

private async handleDigestSkip(command: AddJobCommand, job) {
const nextJobToSchedule = await this.jobRepository.findOne({
_environmentId: command.environmentId,
_parentId: job._id,
});

if (!nextJobToSchedule) {
return;
}

await this.execute({
userId: job._userId,
environmentId: job._environmentId,
organizationId: command.organizationId,
jobId: nextJobToSchedule._id,
job: nextJobToSchedule,
});

return;
}

private getExecutionDelayAmount(
filtered: boolean,
digestAmount: number | undefined,
delayAmount: undefined | number
) {
return (filtered ? 0 : digestAmount ?? delayAmount) ?? 0;
}

/*
Expand Down Expand Up @@ -274,3 +325,13 @@ export class AddJob {
});
}
}

function isJobDeferredType(jobType: StepTypeEnum | undefined) {
if (!jobType) return false;

return [StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(jobType);
}

function isShouldHaltJobExecution(digestCreationResult: DigestCreationResultEnum) {
return [DigestCreationResultEnum.MERGED, DigestCreationResultEnum.SKIPPED].includes(digestCreationResult);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ export class CalculateDelayService {
overrides: any;
chimeraResponse?: IChimeraDigestResponse | IChimeraDelayResponse;
}): number {
const digestType =
(chimeraResponse?.type as DigestTypeEnum) ?? stepMetadata.type;
if (!stepMetadata) throw new ApiException(`Step metadata not found`);

const digestType =
(chimeraResponse?.type as DigestTypeEnum | DelayTypeEnum) ??
stepMetadata.type;

if (digestType === DelayTypeEnum.SCHEDULED) {
const delayPath = (stepMetadata as IDelayScheduledMetadata).delayPath;
if (!delayPath) throw new ApiException(`Delay path not found`);
Expand All @@ -51,21 +53,22 @@ export class CalculateDelayService {
return delay;
}

const chimeraUnit = castToDigestUnitEnum(chimeraResponse?.unit);
const userUnit = castToDigestUnitEnum(chimeraResponse?.unit);
const userAmount = chimeraResponse?.amount;

if (isRegularDigest(digestType)) {
if (this.checkValidDelayOverride(overrides)) {
if (this.isValidDelayOverride(overrides)) {
return this.toMilliseconds(
chimeraResponse?.amount ?? (overrides.delay.amount as number),
chimeraUnit ?? (overrides.delay.unit as DigestUnitEnum)
userAmount ?? (overrides.delay.amount as number),
userUnit ?? (overrides.delay.unit as DigestUnitEnum)
);
}

const regularDigestMeta = stepMetadata as IDigestRegularMetadata;

return this.toMilliseconds(
chimeraResponse?.amount ?? regularDigestMeta.amount,
chimeraUnit ?? regularDigestMeta.unit
userAmount ?? regularDigestMeta.amount,
userUnit ?? regularDigestMeta.unit
);
}

Expand Down Expand Up @@ -105,16 +108,18 @@ export class CalculateDelayService {
return delay;
}

private checkValidDelayOverride(overrides: any): boolean {
private isValidDelayOverride(overrides: any): boolean {
if (!overrides?.delay) {
return false;
}
const values = Object.values(DigestUnitEnum);

return (
typeof overrides.delay.amount === 'number' &&
values.includes(overrides.delay.unit as unknown as DigestUnitEnum)
const isDelayAmountANumber = typeof overrides.delay.amount === 'number';
const digestUnits = Object.values(DigestUnitEnum);
const includesValidDelayUnit = digestUnits.includes(
overrides.delay.unit as unknown as DigestUnitEnum
);

return isDelayAmountANumber && includesValidDelayUnit;
}
}

Expand Down
Loading