Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(framework): Explicitly exit workflow evaluation early after evaluating specified stepId #6808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/framework/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@
"cross-fetch": "^4.0.0",
"json-schema-to-ts": "^3.0.0",
"liquidjs": "^10.13.1",
"ora": "^5.4.1",
"sanitize-html": "^2.13.0"
},
"nx": {
Expand Down
37 changes: 37 additions & 0 deletions packages/framework/src/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,43 @@ describe('Novu Client', () => {
expect(mockFn).toHaveBeenCalledTimes(0);
});

it('should NOT log anything after executing the provided stepId', async () => {
const mockFn = vi.fn();
const spyConsoleLog = vi.spyOn(console, 'log');
const newWorkflow = workflow('test-workflow', async ({ step }) => {
await step.email('active-step-id', async () => ({ body: 'Test Body', subject: 'Subject' }));
await step.email('inactive-step-id', async () => {
mockFn();

return { body: 'Test Body', subject: 'Subject' };
});
});

client.addWorkflows([newWorkflow]);

const event: Event = {
action: PostActionEnum.EXECUTE,
workflowId: 'test-workflow',
stepId: 'active-step-id',
subscriber: {},
state: [],
payload: {},
controls: {},
};

await client.executeWorkflow(event);

// Wait for the conclusion promise to resolve.
await new Promise((resolve) => {
setTimeout(resolve);
});
/*
* Not the most robust test, but ensures that the last log call contains the duration,
* which is the last expected log call.
*/
expect(spyConsoleLog.mock.lastCall).toEqual([expect.stringContaining('duration:')]);
});

it('should evaluate code in steps after a skipped step', async () => {
const mockFn = vi.fn();
const newWorkflow = workflow('test-workflow', async ({ step }) => {
Expand Down
89 changes: 38 additions & 51 deletions packages/framework/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Liquid } from 'liquidjs';
import ora from 'ora';

import { ChannelStepEnum, PostActionEnum } from './constants';
import {
Expand Down Expand Up @@ -257,9 +256,18 @@ export class Client {

private executeStepFactory<T_Outputs extends Record<string, unknown>, T_Result extends Record<string, unknown>>(
event: Event,
setResult: (result: Pick<ExecuteOutput, 'outputs' | 'providers' | 'options'>) => void
setResult: (result: Pick<ExecuteOutput, 'outputs' | 'providers' | 'options'>) => void,
hasResult: () => boolean
): ActionStep<T_Outputs, T_Result> {
return async (stepId, stepResolve, options) => {
if (hasResult()) {
/*
* Exit the execution early if the result has already been set.
* This is to ensure that we don't evaluate code in steps after the provided stepId.
*/
return;
}

const step = this.getStep(event.workflowId, stepId);
const controls = await this.createStepControls(step, event);
const isPreview = event.action === PostActionEnum.PREVIEW;
Expand Down Expand Up @@ -364,6 +372,7 @@ export class Client {
};

let concludeExecution: (value?: unknown) => void;
let hasConcludedExecution = false;
const concludeExecutionPromise = new Promise((resolve) => {
concludeExecution = resolve;
});
Expand All @@ -375,15 +384,20 @@ export class Client {
* `workflow.execute` method. By resolving the `concludeExecutionPromise` when setting the result,
* we can ensure that the `workflow.execute` method is not evaluated after the `stepId` is reached.
*
* This function should only be called once per workflow execution.
*
* @param stepResult The result of the workflow execution.
*/
const setResult = (stepResult: Omit<ExecuteOutput, 'metadata'>): void => {
if (hasConcludedExecution) {
throw new Error('setResult can only be called once per workflow execution');
}
concludeExecution();
hasConcludedExecution = true;

result = stepResult;
};

const hasResult = (): boolean => hasConcludedExecution;

let executionError: Error | undefined;
try {
if (
Expand All @@ -408,14 +422,14 @@ export class Client {
controls: {},
subscriber: event.subscriber,
step: {
email: this.executeStepFactory(validatedEvent, setResult),
sms: this.executeStepFactory(validatedEvent, setResult),
inApp: this.executeStepFactory(validatedEvent, setResult),
digest: this.executeStepFactory(validatedEvent, setResult),
delay: this.executeStepFactory(validatedEvent, setResult),
push: this.executeStepFactory(validatedEvent, setResult),
chat: this.executeStepFactory(validatedEvent, setResult),
custom: this.executeStepFactory(validatedEvent, setResult),
email: this.executeStepFactory(validatedEvent, setResult, hasResult),
sms: this.executeStepFactory(validatedEvent, setResult, hasResult),
inApp: this.executeStepFactory(validatedEvent, setResult, hasResult),
digest: this.executeStepFactory(validatedEvent, setResult, hasResult),
delay: this.executeStepFactory(validatedEvent, setResult, hasResult),
push: this.executeStepFactory(validatedEvent, setResult, hasResult),
chat: this.executeStepFactory(validatedEvent, setResult, hasResult),
custom: this.executeStepFactory(validatedEvent, setResult, hasResult),
},
}),
]);
Expand Down Expand Up @@ -541,7 +555,6 @@ export class Client {
provider: DiscoverProviderOutput,
outputs: Record<string, unknown>
): Promise<WithPassthrough<Record<string, unknown>>> {
const spinner = ora({ indent: 2 }).start(`Executing provider: \`${provider.type}\``);
try {
if (event.stepId === step.stepId) {
const controls = await this.createStepControls(step, event);
Expand All @@ -558,26 +571,21 @@ export class Client {
step.stepId,
provider.type
);
spinner.succeed(`Executed provider: \`${provider.type}\``);
console.log(` ${EMOJI.SUCCESS} Executed provider: \`${provider.type}\``);

return {
...validatedOutput,
_passthrough: result._passthrough,
};
} else {
// No-op. We don't execute providers for hydrated steps
spinner.stopAndPersist({
symbol: EMOJI.HYDRATED,
text: `Hydrated provider: \`${provider.type}\``,
});
console.log(` ${EMOJI.HYDRATED} Hydrated provider: \`${provider.type}\``);

return {};
}
} catch (error) {
spinner.stopAndPersist({
symbol: EMOJI.ERROR,
text: `Failed to execute provider: \`${provider.type}\``,
});
console.log(` ${EMOJI.ERROR} Failed to execute provider: \`${provider.type}\``);

throw new ProviderExecutionFailedError(provider.type, event.action, error);
}
}
Expand All @@ -587,7 +595,6 @@ export class Client {
step: DiscoverStepOutput
): Promise<Pick<ExecuteOutput, 'outputs' | 'providers'>> {
if (event.stepId === step.stepId) {
const spinner = ora({ indent: 1 }).start(`Executing stepId: \`${step.stepId}\``);
try {
const templateControls = await this.createStepControls(step, event);
const controls = await this.compileControls(templateControls, event);
Expand All @@ -603,26 +610,21 @@ export class Client {

const providers = await this.executeProviders(event, step, validatedOutput);

spinner.succeed(`Executed stepId: \`${step.stepId}\``);
console.log(` ${EMOJI.SUCCESS} Executed stepId: \`${step.stepId}\``);

return {
outputs: validatedOutput,
providers,
};
} catch (error) {
spinner.stopAndPersist({
prefixText: '',
symbol: EMOJI.ERROR,
text: `Failed to execute stepId: \`${step.stepId}\``,
});
console.log(` ${EMOJI.ERROR} Failed to execute stepId: \`${step.stepId}\``);
if (isFrameworkError(error)) {
throw error;
} else {
throw new StepExecutionFailedError(step.stepId, event.action, error);
}
}
} else {
const spinner = ora({ indent: 1 }).start(`Hydrating stepId: \`${step.stepId}\``);
try {
const result = event.state.find((state) => state.stepId === step.stepId);

Expand All @@ -635,10 +637,7 @@ export class Client {
event.workflowId,
step.stepId
);
spinner.stopAndPersist({
symbol: EMOJI.HYDRATED,
text: `Hydrated stepId: \`${step.stepId}\``,
});
console.log(` ${EMOJI.HYDRATED} Hydrated stepId: \`${step.stepId}\``);

return {
outputs: validatedOutput,
Expand All @@ -648,10 +647,8 @@ export class Client {
throw new ExecutionStateCorruptError(event.workflowId, step.stepId);
}
} catch (error) {
spinner.stopAndPersist({
symbol: EMOJI.ERROR,
text: `Failed to hydrate stepId: \`${step.stepId}\``,
});
console.log(` ${EMOJI.ERROR} Failed to hydrate stepId: \`${step.stepId}\``);

throw error;
}
}
Expand Down Expand Up @@ -696,7 +693,6 @@ export class Client {
event: Event,
step: DiscoverStepOutput
): Promise<Pick<ExecuteOutput, 'outputs' | 'providers'>> {
const spinner = ora({ indent: 1 }).start(`Previewing stepId: \`${step.stepId}\``);
try {
if (event.stepId === step.stepId) {
const templateControls = await this.createStepControls(step, event);
Expand All @@ -712,10 +708,7 @@ export class Client {
step.stepId
);

spinner.stopAndPersist({
symbol: EMOJI.MOCK,
text: `Mocked stepId: \`${step.stepId}\``,
});
console.log(` ${EMOJI.MOCK} Mocked stepId: \`${step.stepId}\``);

return {
outputs: validatedOutput,
Expand All @@ -724,21 +717,15 @@ export class Client {
} else {
const mockResult = this.mock(step.results.schema);

spinner.stopAndPersist({
symbol: EMOJI.MOCK,
text: `Mocked stepId: \`${step.stepId}\``,
});
console.log(` ${EMOJI.MOCK} Mocked stepId: \`${step.stepId}\``);

return {
outputs: mockResult,
providers: await this.executeProviders(event, step, mockResult),
};
}
} catch (error) {
spinner.stopAndPersist({
symbol: EMOJI.ERROR,
text: `Failed to preview stepId: \`${step.stepId}\``,
});
console.log(` ${EMOJI.ERROR} Failed to preview stepId: \`${step.stepId}\``);

if (isFrameworkError(error)) {
throw error;
Expand Down
Loading
Loading