Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(echo): add skip support #5619

Merged
merged 37 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
90326d4
feat: add skip support
djabarovgeorge May 21, 2024
7d52c53
Merge branch 'refs/heads/next' into nv-3775-support-skip-functionality
djabarovgeorge May 22, 2024
7ab4eb3
fix: after next merge
djabarovgeorge May 22, 2024
3d151a0
Merge branch 'next' into nv-3775-support-skip-functionality
djabarovgeorge May 27, 2024
31368d8
fix(worker): remove tenant repository in tests
djabarovgeorge May 28, 2024
5ccbd33
Merge branch 'next' into nv-3775-support-skip-functionality
djabarovgeorge May 28, 2024
df7cad4
Merge branch 'refs/heads/next' into nv-3775-support-skip-functionality
djabarovgeorge May 30, 2024
7244cd7
feat: refactor after next merge
djabarovgeorge Jun 1, 2024
3385cbf
Merge branch 'refs/heads/next' into nv-3775-support-skip-functionality
djabarovgeorge Jun 1, 2024
67b4cd3
feat: update after pr review
djabarovgeorge Jun 1, 2024
b481e3e
fix(api): remove faker dep
djabarovgeorge Jun 2, 2024
4440d0d
fix(api): faker import
djabarovgeorge Jun 2, 2024
cedea99
refactor(api,echo-api): rename chimera url to url
djabarovgeorge Jun 2, 2024
8eed227
feat: update submodule hash
djabarovgeorge Jun 2, 2024
9a0bdfb
feat: update submodule hash
djabarovgeorge Jun 2, 2024
b8e60cc
refactor(app-gen,echo-sdk): update after pr comments
djabarovgeorge Jun 2, 2024
5c6d772
Merge branch 'refs/heads/next' into nv-3775-support-skip-functionality
djabarovgeorge Jun 4, 2024
008c5f1
revert(echo): change skip to be function again
djabarovgeorge Jun 4, 2024
9def7a6
Merge branch 'next' into nv-3775-support-skip-functionality
djabarovgeorge Jun 4, 2024
24eed65
feat: next merge
djabarovgeorge Jun 9, 2024
6dedaa5
chore(echo): update submodule hash
djabarovgeorge Jun 9, 2024
e9ea4ec
feat(echo): refactor after pr comments
djabarovgeorge Jun 9, 2024
8f74e11
feat: update hash
djabarovgeorge Jun 10, 2024
ac9e4c0
fix(cli-next): sync request
djabarovgeorge Jun 10, 2024
35b08ca
feat: update hash
djabarovgeorge Jun 10, 2024
4040715
fix(echo): add skip inputs validation compilation
djabarovgeorge Jun 10, 2024
0a84b43
Merge remote-tracking branch 'refs/remotes/origin/next' into nv-3775-…
djabarovgeorge Jun 10, 2024
e25e1d7
refactor(echo): after pr comments
djabarovgeorge Jun 11, 2024
f080283
Merge branch 'next' into nv-3775-support-skip-functionality
djabarovgeorge Jun 11, 2024
726f056
fix: package name
scopsy Jun 12, 2024
7fb9bcf
fix: ee source
scopsy Jun 12, 2024
8d44de1
fix: still show preview for skipped steps
scopsy Jun 12, 2024
01bc40b
tests: add test for preview mode
scopsy Jun 12, 2024
b46bae5
Merge branch 'next' into nv-3775-support-skip-functionality
scopsy Jun 12, 2024
8d15403
Merge branch 'next' into nv-3775-support-skip-functionality
scopsy Jun 12, 2024
0969f9c
fix: skip tests
scopsy Jun 12, 2024
c152548
fix: resolve app
scopsy Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions apps/api/src/app/events/e2e/echo-trigger.e2e-ee.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import axios from 'axios';
import { expect } from 'chai';
import { UserSession, SubscribersService } from '@novu/testing';
import { MessageRepository, SubscriberEntity, NotificationTemplateRepository } from '@novu/dal';
import { StepTypeEnum } from '@novu/shared';
import { MessageRepository, SubscriberEntity, NotificationTemplateRepository, JobRepository } from '@novu/dal';
import { JobStatusEnum, StepTypeEnum } from '@novu/shared';
import { echoServer } from '../../../../e2e/echo.server';

const eventTriggerPath = '/v1/events/trigger';
Expand All @@ -11,6 +11,7 @@ describe('Echo Trigger ', async () => {
let session: UserSession;
const messageRepository = new MessageRepository();
const workflowsRepository = new NotificationTemplateRepository();
const jobRepository = new JobRepository();
let subscriber: SubscriberEntity;
let subscriberService: SubscribersService;

Expand Down Expand Up @@ -99,4 +100,98 @@ describe('Echo Trigger ', async () => {
expect(messagesAfter.length).to.be.eq(1);
expect(messagesAfter[0].subject).to.include('This is an email subject TEST');
});

it('should skip step', async () => {
const workflowId = 'hello-world-2';
await echoServer.echo.workflow(
workflowId,
async ({ step, payload }) => {
await step.email(
'send-email',
async (inputs) => {
return {
subject: 'This is an email subject ' + inputs.name,
body: 'Body result ' + payload.name,
};
},
{
inputSchema: {
type: 'object',
properties: {
name: { type: 'string', default: 'TEST' },
},
} as const,
skip: () => true,
}
);
},
{
payloadSchema: {
type: 'object',
properties: {
name: { type: 'string', default: 'default_name' },
},
required: [],
additionalProperties: false,
} as const,
}
);

await syncWorkflow(session);

const workflow = await workflowsRepository.findByTriggerIdentifier(session.environment._id, workflowId);

expect(workflow).to.be.ok;
if (!workflow) throw new Error('Workflow not found');

await triggerEvent(session, workflowId, subscriber);
await session.awaitRunningJobs(workflow._id);

const executedMessage = await messageRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
channel: StepTypeEnum.EMAIL,
});

expect(executedMessage.length).to.be.eq(0);

const executedMessage2 = await jobRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
type: StepTypeEnum.EMAIL,
});

expect(executedMessage2.length).to.be.eq(1);
expect(executedMessage2[0].status).to.be.eq(JobStatusEnum.CANCELED);
});
});

async function syncWorkflow(session) {
const resultDiscover = await axios.get(echoServer.serverPath + '/echo?action=discover');

await session.testAgent.post(`/v1/echo/sync`).send({
chimeraUrl: echoServer.serverPath + '/echo',
djabarovgeorge marked this conversation as resolved.
Show resolved Hide resolved
workflows: resultDiscover.data.workflows,
});
}

async function triggerEvent(session, workflowId: string, subscriber) {
await axios.post(
`${session.serverUrl}${eventTriggerPath}`,
{
name: workflowId,
to: {
subscriberId: subscriber.subscriberId,
email: 'test@subscriber.com',
},
payload: {
name: 'test_name',
},
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);
}
2 changes: 2 additions & 0 deletions apps/api/src/app/integrations/usecases/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
GetDecryptedIntegrations,
CalculateLimitNovuIntegration,
ConditionsFilter,
NormalizeVariables,
} from '@novu/application-generic';

import { GetWebhookSupportStatus } from './get-webhook-support-status/get-webhook-support-status.usecase';
Expand Down Expand Up @@ -33,4 +34,5 @@ export const USE_CASES = [
CalculateLimitNovuIntegration,
SetIntegrationAsPrimary,
CreateNovuIntegrations,
NormalizeVariables,
];
15 changes: 15 additions & 0 deletions apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed because I separated variable normalize from filter logic

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import {
requireInject,
StandardQueueService,
ExecuteOutput,
NormalizeVariablesCommand,
NormalizeVariables,
} from '@novu/application-generic';

export enum BackoffStrategiesEnum {
Expand All @@ -54,6 +56,7 @@ export class AddJob {
private calculateDelayService: CalculateDelayService,
@Inject(forwardRef(() => ConditionsFilter))
private conditionsFilter: ConditionsFilter,
private normalizeVariablesUsecase: NormalizeVariables,
private moduleRef: ModuleRef
) {
this.resonateUsecase = requireInject('resonate', this.moduleRef);
Expand All @@ -80,6 +83,17 @@ export class AddJob {
let filtered = false;
let filterVariables: IFilterVariables | undefined;
if ([StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(job.type as StepTypeEnum)) {
const variables = await this.normalizeVariablesUsecase.execute(
NormalizeVariablesCommand.create({
filters: command.job.step.filters || [],
environmentId: command.environmentId,
organizationId: command.organizationId,
userId: command.userId,
step: job.step,
job: job,
})
);

const shouldRun = await this.conditionsFilter.filter(
ConditionsFilterCommand.create({
filters: job.step.filters || [],
Expand All @@ -88,6 +102,7 @@ export class AddJob {
userId: command.userId,
step: job.step,
job,
variables,
})
);

Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug-fix: we were overring the canceled status (by filter) to complete.

Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class RunJob {

await this.storageHelperService.getAttachments(job.payload?.attachments);

await this.sendMessage.execute(
const sendMessageResult = await this.sendMessage.execute(
SendMessageCommand.create({
identifier: job.identifier,
payload: job.payload ?? {},
Expand All @@ -92,7 +92,9 @@ export class RunJob {
})
);

await this.jobRepository.updateStatus(job._environmentId, job._id, JobStatusEnum.COMPLETED);
if (sendMessageResult.status === 'success') {
await this.jobRepository.updateStatus(job._environmentId, job._id, JobStatusEnum.COMPLETED);
}
} catch (error: any) {
Logger.error({ error }, `Running job ${job._id} has thrown an error`, LOG_CONTEXT);
if (job.step.shouldStopOnFail || this.shouldBackoff(error)) {
Expand Down
Loading
Loading