Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core & function nodes): Update function nodes to work with binary-data-mode 'filesystem'. #3845

Merged
merged 9 commits into from
Sep 11, 2022
Merged
8 changes: 8 additions & 0 deletions packages/core/src/BinaryDataManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,23 @@ export class BinaryDataManager {
): Promise<IBinaryData> {
const retBinaryData = binaryData;

// If a manager handles this binary, return the binary data with it's reference id.
if (this.managers[this.binaryDataMode]) {
return this.managers[this.binaryDataMode]
.storeBinaryData(binaryBuffer, executionId)
.then((filename) => {
// Add data manager reference id.
retBinaryData.id = this.generateBinaryId(filename);

// Prevent preserving data in memory if handled by a data manager.
retBinaryData.data = this.binaryDataMode;

// Short-circuit return to prevent further actions.
return retBinaryData;
});
}

// Else fallback to storing this data in memory.
retBinaryData.data = binaryBuffer.toString(BINARY_ENCODING);
return binaryData;
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export interface IExecuteFunctions extends IExecuteFunctionsBase {
mimeType?: string,
): Promise<IBinaryData>;
getBinaryDataBuffer(itemIndex: number, propertyName: string): Promise<Buffer>;
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
request: (uriOrObject: string | IDataObject | any, options?: IDataObject) => Promise<any>; // tslint:disable-line:no-any
requestWithAuthentication(
this: IAllExecuteFunctions,
Expand Down Expand Up @@ -74,6 +75,7 @@ export interface IExecuteFunctions extends IExecuteFunctionsBase {
export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase {
helpers: {
getBinaryDataBuffer(propertyName: string, inputIndex?: number): Promise<Buffer>;
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
httpRequest(requestOptions: IHttpRequestOptions): Promise<any>; // tslint:disable-line:no-any
prepareBinaryData(
binaryData: Buffer,
Expand Down
33 changes: 32 additions & 1 deletion packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,22 @@ export async function getBinaryDataBuffer(
return BinaryDataManager.getInstance().retrieveBinaryData(binaryData);
}

/**
* Store an incoming IBinaryData & related buffer using the configured binary data manager.
*
* @export
* @param {IBinaryData} data
* @param {Buffer} binaryData
* @returns {Promise<IBinaryData>}
*/
export async function setBinaryDataBuffer(
data: IBinaryData,
binaryData: Buffer,
executionId: string,
): Promise<IBinaryData> {
return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId);
}

/**
* Takes a buffer and converts it into the format n8n uses. It encodes the binary data as
* base64 and adds metadata.
Expand Down Expand Up @@ -874,7 +890,7 @@ export async function prepareBinaryData(
}
}

return BinaryDataManager.getInstance().storeBinaryData(returnData, binaryData, executionId);
return setBinaryDataBuffer(returnData, binaryData, executionId);
}

/**
Expand Down Expand Up @@ -1920,6 +1936,9 @@ export function getExecutePollFunctions(
},
helpers: {
httpRequest,
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
},
async prepareBinaryData(
binaryData: Buffer,
filePath?: string,
Expand Down Expand Up @@ -2091,6 +2110,9 @@ export function getExecuteTriggerFunctions(
additionalCredentialOptions,
);
},
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
},
async prepareBinaryData(
binaryData: Buffer,
filePath?: string,
Expand Down Expand Up @@ -2347,6 +2369,9 @@ export function getExecuteFunctions(
additionalCredentialOptions,
);
},
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
},
async prepareBinaryData(
binaryData: Buffer,
filePath?: string,
Expand Down Expand Up @@ -2589,6 +2614,9 @@ export function getExecuteSingleFunctions(
additionalCredentialOptions,
);
},
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
},
async prepareBinaryData(
binaryData: Buffer,
filePath?: string,
Expand Down Expand Up @@ -3086,6 +3114,9 @@ export function getExecuteWebhookFunctions(
additionalCredentialOptions,
);
},
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
},
async prepareBinaryData(
binaryData: Buffer,
filePath?: string,
Expand Down
127 changes: 127 additions & 0 deletions packages/core/test/NodeExecuteFunctions.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { join } from 'path';
import { tmpdir } from 'os';
import { readFileSync, mkdtempSync } from 'fs';

import { BinaryDataManager, NodeExecuteFunctions } from '../src';
import { IBinaryData, ITaskDataConnections } from 'n8n-workflow';

const temporaryDir = mkdtempSync(join(tmpdir(), 'n8n'));

describe('NodeExecuteFunctions', () => {
describe(`test binary data helper methods`, () => {
// Reset BinaryDataManager for each run. This is a dirty operation, as individual managers are not cleaned.
beforeEach(() => {
//@ts-ignore
BinaryDataManager.instance = undefined;
});

test(`test getBinaryDataBuffer(...) & setBinaryDataBuffer(...) methods in 'default' mode`, async () => {
// Setup a 'default' binary data manager instance
await BinaryDataManager.init({
mode: 'default',
availableModes: 'default',
localStoragePath: temporaryDir,
binaryDataTTL: 1,
persistedBinaryDataTTL: 1,
});

// Set our binary data buffer
let inputData: Buffer = Buffer.from('This is some binary data', 'utf8');
let setBinaryDataBufferResponse: IBinaryData = await NodeExecuteFunctions.setBinaryDataBuffer(
{
mimeType: 'txt',
data: 'This should be overwritten by the actual payload in the response',
},
inputData,
'executionId',
);

// Expect our return object to contain the base64 encoding of the input data, as it should be stored in memory.
expect(setBinaryDataBufferResponse.data).toEqual(inputData.toString('base64'));

// Now, re-fetch our data.
// An ITaskDataConnections object is used to share data between nodes. The top level property, 'main', represents the successful output object from a previous node.
let taskDataConnectionsInput: ITaskDataConnections = {
main: [],
};

// We add an input set, with one item at index 0, to this input. It contains an empty json payload and our binary data.
taskDataConnectionsInput.main.push([
{
json: {},
binary: {
data: setBinaryDataBufferResponse,
},
},
]);

// Now, lets fetch our data! The item will be item index 0.
let getBinaryDataBufferResponse: Buffer = await NodeExecuteFunctions.getBinaryDataBuffer(
taskDataConnectionsInput,
0,
'data',
0,
);

expect(getBinaryDataBufferResponse).toEqual(inputData);
});

test(`test getBinaryDataBuffer(...) & setBinaryDataBuffer(...) methods in 'filesystem' mode`, async () => {
// Setup a 'filesystem' binary data manager instance
await BinaryDataManager.init({
mode: 'filesystem',
availableModes: 'filesystem',
localStoragePath: temporaryDir,
binaryDataTTL: 1,
persistedBinaryDataTTL: 1,
});

// Set our binary data buffer
let inputData: Buffer = Buffer.from('This is some binary data', 'utf8');
let setBinaryDataBufferResponse: IBinaryData = await NodeExecuteFunctions.setBinaryDataBuffer(
{
mimeType: 'txt',
data: 'This should be overwritten with the name of the configured data manager',
},
inputData,
'executionId',
);

// Expect our return object to contain the name of the configured data manager.
expect(setBinaryDataBufferResponse.data).toEqual('filesystem');

// Ensure that the input data was successfully persisted to disk.
expect(
readFileSync(
`${temporaryDir}/${setBinaryDataBufferResponse.id?.replace('filesystem:', '')}`,
),
).toEqual(inputData);

// Now, re-fetch our data.
// An ITaskDataConnections object is used to share data between nodes. The top level property, 'main', represents the successful output object from a previous node.
let taskDataConnectionsInput: ITaskDataConnections = {
main: [],
};

// We add an input set, with one item at index 0, to this input. It contains an empty json payload and our binary data.
taskDataConnectionsInput.main.push([
{
json: {},
binary: {
data: setBinaryDataBufferResponse,
},
},
]);

// Now, lets fetch our data! The item will be item index 0.
let getBinaryDataBufferResponse: Buffer = await NodeExecuteFunctions.getBinaryDataBuffer(
taskDataConnectionsInput,
0,
'data',
0,
);

expect(getBinaryDataBufferResponse).toEqual(inputData);
});
});
});
48 changes: 48 additions & 0 deletions packages/nodes-base/nodes/Function/Function.node.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IExecuteFunctions } from 'n8n-core';
import {
IBinaryKeyData,
IDataObject,
INodeExecutionData,
INodeType,
Expand Down Expand Up @@ -61,6 +62,11 @@ return items;`,
// Copy the items as they may get changed in the functions
items = JSON.parse(JSON.stringify(items));

// Assign item indexes
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
items[itemIndex].index = itemIndex;
}

const cleanupData = (inputData: IDataObject): IDataObject => {
Object.keys(inputData).map((key) => {
if (inputData[key] !== null && typeof inputData[key] === 'object') {
Expand All @@ -84,6 +90,48 @@ return items;`,
items,
// To be able to access data of other items
$item: (index: number) => this.getWorkflowDataProxy(index),
getBinaryDataAsync: async (item: INodeExecutionData): Promise<IBinaryKeyData | undefined> => {
// Fetch Binary Data, if available. Cannot check item with `if (item?.index)`, as index may be 0.
if (item?.binary && item?.index !== undefined && item?.index !== null) {
for (const binaryPropertyName of Object.keys(item.binary)) {
item.binary[binaryPropertyName].data = (
await this.helpers.getBinaryDataBuffer(item.index, binaryPropertyName)
)?.toString('base64');
}
}

// Return Data
return item.binary;
},
setBinaryDataAsync: async (item: INodeExecutionData, data: IBinaryKeyData) => {
// Ensure item is provided, else return a friendly error.
if (!item) {
throw new NodeOperationError(
this.getNode(),
'No item was provided to setBinaryDataAsync (item: INodeExecutionData, data: IBinaryKeyData).',
);
}

// Ensure data is provided, else return a friendly error.
if (!data) {
throw new NodeOperationError(
this.getNode(),
'No data was provided to setBinaryDataAsync (item: INodeExecutionData, data: IBinaryKeyData).',
);
}

// Set Binary Data
for (const binaryPropertyName of Object.keys(data)) {
const binaryItem = data[binaryPropertyName];
data[binaryPropertyName] = await this.helpers.setBinaryDataBuffer(
binaryItem,
Buffer.from(binaryItem.data, 'base64'),
);
}

// Set Item Reference
item.binary = data;
},
};

// Make it possible to access data via $node, $parameter, ...
Expand Down
52 changes: 49 additions & 3 deletions packages/nodes-base/nodes/FunctionItem/FunctionItem.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,70 @@ return item;`,
};

for (let itemIndex = 0; itemIndex < length; itemIndex++) {
const mode = this.getMode();

try {
item = items[itemIndex];
item.index = itemIndex;

// Copy the items as they may get changed in the functions
item = JSON.parse(JSON.stringify(item));

// Define the global objects for the custom function
const sandbox = {
/** @deprecated for removal - replaced by getBinaryDataAsync() */
getBinaryData: (): IBinaryKeyData | undefined => {
if (mode === 'manual') {
this.sendMessageToUI(
'getBinaryData(...) is deprecated and will be removed in a future version. Please consider switching to getBinaryDataAsync(...) instead.',
);
}
return item.binary;
},
/** @deprecated for removal - replaced by setBinaryDataAsync() */
setBinaryData: async (data: IBinaryKeyData) => {
if (mode === 'manual') {
this.sendMessageToUI(
'setBinaryData(...) is deprecated and will be removed in a future version. Please consider switching to setBinaryDataAsync(...) instead.',
);
}
item.binary = data;
},
getNodeParameter: this.getNodeParameter,
getWorkflowStaticData: this.getWorkflowStaticData,
helpers: this.helpers,
item: item.json,
setBinaryData: (data: IBinaryKeyData) => {
getBinaryDataAsync: async (): Promise<IBinaryKeyData | undefined> => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getBinaryDataAsync: async ()

The Function node requires one to pass in the item here. It isn't necessary in the FunctionItem node, however perhaps we should align the two for better a better user experience when switching between them? Though - to be honest - I quite like it this way if we think about the nodes independently.

// Fetch Binary Data, if available. Cannot check item with `if (item?.index)`, as index may be 0.
if (item?.binary && item?.index !== undefined && item?.index !== null) {
for (const binaryPropertyName of Object.keys(item.binary)) {
item.binary[binaryPropertyName].data = (
await this.helpers.getBinaryDataBuffer(item.index, binaryPropertyName)
)?.toString('base64');
}
}
// Retrun Data
return item.binary;
},
setBinaryDataAsync: async (data: IBinaryKeyData) => {
// Ensure data is present
if (!data) {
throw new NodeOperationError(
this.getNode(),
'No data was provided to setBinaryDataAsync (data: IBinaryKeyData).',
);
}

// Set Binary Data
for (const binaryPropertyName of Object.keys(data)) {
const binaryItem = data[binaryPropertyName];
data[binaryPropertyName] = await this.helpers.setBinaryDataBuffer(
binaryItem,
Buffer.from(binaryItem.data, 'base64'),
);
}

// Set Item Reference
item.binary = data;
},
};
Expand All @@ -99,8 +147,6 @@ return item;`,
const dataProxy = this.getWorkflowDataProxy(itemIndex);
Object.assign(sandbox, dataProxy);

const mode = this.getMode();

const options = {
console: mode === 'manual' ? 'redirect' : 'inherit',
sandbox,
Expand Down
Loading