Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign parent workflow ID to subworkflows from generic JSON #3867

Merged
merged 1 commit into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
IWorkflowSettings,
LoggerProxy as Logger,
Workflow,
WorkflowExecuteMode,
Expand Down Expand Up @@ -810,6 +811,8 @@ export async function getRunData(
export async function getWorkflowData(
workflowInfo: IExecuteWorkflowInfo,
userId: string,
parentWorkflowId?: string,
parentWorkflowSettings?: IWorkflowSettings,
): Promise<IWorkflowBase> {
if (workflowInfo.id === undefined && workflowInfo.code === undefined) {
throw new Error(
Expand Down Expand Up @@ -847,6 +850,14 @@ export async function getWorkflowData(
}
} else {
workflowData = workflowInfo.code;
if (workflowData) {
if (!workflowData.id) {
workflowData.id = parentWorkflowId;
}
if (!workflowData.settings) {
workflowData.settings = parentWorkflowSettings;
}
}
}

return workflowData!;
Expand All @@ -864,41 +875,53 @@ export async function getWorkflowData(
export async function executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
inputData?: INodeExecutionData[],
parentExecutionId?: string,
loadedWorkflowData?: IWorkflowBase,
loadedRunData?: IWorkflowExecutionDataProcess,
options?: {
parentWorkflowId?: string;
inputData?: INodeExecutionData[];
parentExecutionId?: string;
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
},
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const externalHooks = ExternalHooks();
await externalHooks.init();

const nodeTypes = NodeTypes();

const workflowData =
loadedWorkflowData ?? (await getWorkflowData(workflowInfo, additionalData.userId));
options?.loadedWorkflowData ??
(await getWorkflowData(
workflowInfo,
additionalData.userId,
options?.parentWorkflowId,
options?.parentWorkflowSettings,
));

const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({
id: workflowInfo.id,
id: workflowData.id?.toString(),
name: workflowName,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});

const runData =
loadedRunData ?? (await getRunData(workflowData, additionalData.userId, inputData));
options?.loadedRunData ??
(await getRunData(workflowData, additionalData.userId, options?.inputData));

let executionId;

if (parentExecutionId !== undefined) {
executionId = parentExecutionId;
if (options?.parentExecutionId !== undefined) {
executionId = options?.parentExecutionId;
} else {
executionId =
parentExecutionId !== undefined
? parentExecutionId
options?.parentExecutionId !== undefined
? options?.parentExecutionId
: await ActiveExecutions.getInstance().add(runData);
}

Expand Down Expand Up @@ -946,7 +969,7 @@ export async function executeWorkflow(
runData.executionMode,
runExecutionData,
);
if (parentExecutionId !== undefined) {
if (options?.parentExecutionId !== undefined) {
// Must be changed to become typed
return {
startedAt: new Date(),
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ export class WorkflowRunner {
active: data.workflowData.active,
nodeTypes,
staticData: data.workflowData.staticData,
settings: data.workflowData.settings,
});
const additionalData = await WorkflowExecuteAdditionalData.getBase(
data.userId,
Expand Down
29 changes: 23 additions & 6 deletions packages/cli/src/WorkflowRunnerProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
ITaskData,
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
IWorkflowSettings,
LoggerProxy,
Workflow,
WorkflowExecuteMode,
Expand Down Expand Up @@ -183,6 +184,12 @@ export class WorkflowRunnerProcess {
if (Object.keys(node.credentials === undefined ? {} : node.credentials).length > 0) {
shouldInitializaDb = true;
}
if (node.type === 'n8n-nodes-base.executeWorkflow') {
// With UM, child workflows from arbitrary JSON
// Should be persisted by the child process,
// so DB needs to be initialized
shouldInitializaDb = true;
}
});

// This code has been split into 4 ifs just to make it easier to understand
Expand Down Expand Up @@ -271,16 +278,22 @@ export class WorkflowRunnerProcess {
additionalData.executeWorkflow = async (
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
inputData?: INodeExecutionData[] | undefined,
options?: {
parentWorkflowId?: string;
inputData?: INodeExecutionData[];
parentWorkflowSettings?: IWorkflowSettings;
},
): Promise<Array<INodeExecutionData[] | null> | IRun> => {
const workflowData = await WorkflowExecuteAdditionalData.getWorkflowData(
workflowInfo,
userId,
options?.parentWorkflowId,
options?.parentWorkflowSettings,
);
const runData = await WorkflowExecuteAdditionalData.getRunData(
workflowData,
additionalData.userId,
inputData,
options?.inputData,
);
await sendToParentProcess('startExecution', { runData });
const executionId: string = await new Promise((resolve) => {
Expand All @@ -293,10 +306,14 @@ export class WorkflowRunnerProcess {
const executeWorkflowFunctionOutput = (await executeWorkflowFunction(
workflowInfo,
additionalData,
inputData,
executionId,
workflowData,
runData,
{
parentWorkflowId: options?.parentWorkflowId,
inputData: options?.inputData,
parentExecutionId: executionId,
loadedWorkflowData: workflowData,
loadedRunData: runData,
parentWorkflowSettings: options?.parentWorkflowSettings,
},
)) as { workflowExecute: WorkflowExecute; workflow: Workflow } as IWorkflowExecuteProcess;
const { workflowExecute } = executeWorkflowFunctionOutput;
this.childExecutions[executionId] = executeWorkflowFunctionOutput;
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2199,7 +2199,11 @@ export function getExecuteFunctions(
inputData?: INodeExecutionData[],
): Promise<any> {
return additionalData
.executeWorkflow(workflowInfo, additionalData, inputData)
.executeWorkflow(workflowInfo, additionalData, {
parentWorkflowId: workflow.id?.toString(),
inputData,
parentWorkflowSettings: workflow.settings,
})
.then(async (result) =>
BinaryDataManager.getInstance().duplicateBinaryData(
result,
Expand Down
12 changes: 8 additions & 4 deletions packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1437,10 +1437,14 @@ export interface IWorkflowExecuteAdditionalData {
executeWorkflow: (
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
inputData?: INodeExecutionData[],
parentExecutionId?: string,
loadedWorkflowData?: IWorkflowBase,
loadedRunData?: any,
options?: {
parentWorkflowId?: string;
inputData?: INodeExecutionData[];
parentExecutionId?: string;
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: any;
parentWorkflowSettings?: IWorkflowSettings;
},
) => Promise<any>;
// hooks?: IWorkflowExecuteHooks;
executionId?: string;
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow/test/Helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ export function getExecuteFunctions(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
): Promise<any> {
return additionalData.executeWorkflow(workflowInfo, additionalData, inputData);
return additionalData.executeWorkflow(workflowInfo, additionalData, { inputData });
},
getContext(type: string): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
Expand Down