Skip to content

Commit

Permalink
Inject pindata into items flow (#3420)
Browse files Browse the repository at this point in the history
* ⚡ Inject pin data - Second approach

* 🔥 Remove unneeded lint exception
  • Loading branch information
ivov authored and alexgrozav committed Jun 29, 2022
1 parent 07759e0 commit aa34753
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 7 deletions.
2 changes: 2 additions & 0 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
ITaskData,
ITelemetrySettings,
IWorkflowBase as IWorkflowBaseWorkflow,
PinDataPayload,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
Expand Down Expand Up @@ -645,6 +646,7 @@ export interface IWorkflowExecutionDataProcess {
executionMode: WorkflowExecuteMode;
executionData?: IRunExecutionData;
runData?: IRunData;
pinData?: PinDataPayload;
retryOf?: number | string;
sessionId?: string;
startNodes?: string[];
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,7 @@ class App {
): Promise<IExecutionPushResponse> => {
const { workflowData } = req.body;
const { runData } = req.body;
const { pinData } = req.body;
const { startNodes } = req.body;
const { destinationNode } = req.body;
const executionMode = 'manual';
Expand Down Expand Up @@ -1194,6 +1195,7 @@ class App {
destinationNode,
executionMode,
runData,
pinData,
sessionId,
startNodes,
workflowData,
Expand Down
8 changes: 7 additions & 1 deletion packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,12 @@ export class WorkflowRunner {

// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);
workflowExecution = workflowExecute.run(
workflow,
undefined,
data.destinationNode,
data.pinData,
);
} else {
Logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes
Expand All @@ -320,6 +325,7 @@ export class WorkflowRunner {
data.runData,
data.startNodes,
data.destinationNode,
data.pinData,
);
}

Expand Down
8 changes: 7 additions & 1 deletion packages/cli/src/WorkflowRunnerProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,12 @@ export class WorkflowRunnerProcess {

// Can execute without webhook so go on
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
return this.workflowExecute.run(this.workflow, undefined, this.data.destinationNode);
return this.workflowExecute.run(
this.workflow,
undefined,
this.data.destinationNode,
this.data.pinData,
);
}
// Execute only the nodes between start and destination nodes
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
Expand All @@ -356,6 +361,7 @@ export class WorkflowRunnerProcess {
this.data.runData,
this.data.startNodes,
this.data.destinationNode,
this.data.pinData,
);
}

Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/requests.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
INodeCredentialTestRequest,
IRunData,
IWorkflowSettings,
PinDataPayload,
} from 'n8n-workflow';

import { User } from './databases/entities/User';
Expand Down Expand Up @@ -71,6 +72,7 @@ export declare namespace WorkflowRequest {
{
workflowData: IWorkflowDb;
runData: IRunData;
pinData: PinDataPayload;
startNodes?: string[];
destinationNode?: string;
}
Expand Down
41 changes: 36 additions & 5 deletions packages/core/src/WorkflowExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
LoggerProxy as Logger,
NodeApiError,
NodeOperationError,
PinDataPayload,
Workflow,
WorkflowExecuteMode,
WorkflowOperationError,
Expand Down Expand Up @@ -59,6 +60,7 @@ export class WorkflowExecute {
startData: {},
resultData: {
runData: {},
pinData: {},
},
executionData: {
contextData: {},
Expand All @@ -82,7 +84,12 @@ export class WorkflowExecute {
// PCancelable to a regular Promise and does so not allow canceling
// active executions anymore
// eslint-disable-next-line @typescript-eslint/promise-function-async
run(workflow: Workflow, startNode?: INode, destinationNode?: string): PCancelable<IRun> {
run(
workflow: Workflow,
startNode?: INode,
destinationNode?: string,
pinData?: PinDataPayload,
): PCancelable<IRun> {
// Get the nodes to start workflow execution from
startNode = startNode || workflow.getStartNode(destinationNode);

Expand Down Expand Up @@ -121,6 +128,7 @@ export class WorkflowExecute {
},
resultData: {
runData: {},
pinData,
},
executionData: {
contextData: {},
Expand Down Expand Up @@ -152,6 +160,7 @@ export class WorkflowExecute {
runData: IRunData,
startNodes: string[],
destinationNode: string,
pinData?: PinDataPayload,
// @ts-ignore
): PCancelable<IRun> {
let incomingNodeConnections: INodeConnections | undefined;
Expand Down Expand Up @@ -258,6 +267,7 @@ export class WorkflowExecute {
},
resultData: {
runData,
pinData,
},
executionData: {
contextData: {},
Expand Down Expand Up @@ -927,11 +937,32 @@ export class WorkflowExecute {
this.mode,
);
nodeSuccessData = runNodeData.data;
const { pinData } = this.runExecutionData.resultData;

if (runNodeData.closeFunction) {
// Explanation why we do this can be found in n8n-workflow/Workflow.ts -> runNode
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
closeFunction = runNodeData.closeFunction();
if (pinData && pinData[executionNode.name] !== undefined) {
const nodePinData = pinData[executionNode.name][runIndex];
nodeSuccessData = [[{ json: nodePinData }]];
} else {
Logger.debug(`Running node "${executionNode.name}" started`, {
node: executionNode.name,
workflowId: workflow.id,
});
const runNodeData = await workflow.runNode(
executionData.node,
executionData.data,
this.runExecutionData,
runIndex,
this.additionalData,
NodeExecuteFunctions,
this.mode,
);
nodeSuccessData = runNodeData.data;

if (runNodeData.closeFunction) {
// Explanation why we do this can be found in n8n-workflow/Workflow.ts -> runNode
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
closeFunction = runNodeData.closeFunction();
}
}

Logger.debug(`Running node "${executionNode.name}" finished successfully`, {
Expand Down
2 changes: 2 additions & 0 deletions packages/editor-ui/src/Interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
ITelemetrySettings,
IWorkflowSettings as IWorkflowSettingsWorkflow,
WorkflowExecuteMode,
PinDataPayload,
} from 'n8n-workflow';

export * from 'n8n-design-system/src/types';
Expand Down Expand Up @@ -211,6 +212,7 @@ export interface IStartRunData {
startNodes?: string[];
destinationNode?: string;
runData?: IRunData;
pinData?: PinDataPayload;
}

export interface IRunDataUi {
Expand Down
9 changes: 9 additions & 0 deletions packages/editor-ui/src/components/mixins/workflowRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
IRunData,
IRunExecutionData,
NodeHelpers,
PinDataPayload,
} from 'n8n-workflow';

import { externalHooks } from '@/components/mixins/externalHooks';
Expand Down Expand Up @@ -151,9 +152,16 @@ export const workflowRun = mixins(

const workflowData = await this.getWorkflowDataToSave();

const pinData = Object.values(workflow.nodes).reduce<PinDataPayload>((acc, node) => {
if (node.pinData) acc[node.name] = [node.pinData];

return acc;
}, {});

const startRunData: IStartRunData = {
workflowData,
runData: newRunData,
pinData,
startNodes,
};
if (nodeName) {
Expand All @@ -174,6 +182,7 @@ export const workflowRun = mixins(
data: {
resultData: {
runData: newRunData || {},
pinData,
startNodes,
workflowData,
},
Expand Down
5 changes: 5 additions & 0 deletions packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,10 @@ export interface INode {
pinData?: IDataObject;
}

export interface PinDataPayload {
[nodeName: string]: IDataObject[];
}

export interface INodes {
[key: string]: INode;
}
Expand Down Expand Up @@ -1293,6 +1297,7 @@ export interface IRunExecutionData {
resultData: {
error?: ExecutionError;
runData: IRunData;
pinData?: PinDataPayload;
lastNodeExecuted?: string;
};
executionData?: {
Expand Down

0 comments on commit aa34753

Please sign in to comment.