Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

N8N-4269 Postgres Node PairedItem Support #3815

Merged
merged 13 commits into from
Aug 10, 2022
259 changes: 258 additions & 1 deletion packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { IExecuteFunctions } from 'n8n-core';
import { IDataObject, INodeExecutionData, JsonObject } from 'n8n-workflow';
import pgPromise from 'pg-promise';
import pg from 'pg-promise/typescript/pg-subset';
Expand Down Expand Up @@ -68,7 +69,7 @@ export function generateReturning(pgp: pgPromise.IMain<{}, pg.IClient>, returnin
* @param {input[]} input The Node's input data
* @returns Promise<Array<IDataObject>>
*/
export async function pgQuery(
export async function pgQuery(
getNodeParam: Function,
pgp: pgPromise.IMain<{}, pg.IClient>,
db: pgPromise.IDatabase<{}, pg.IClient>,
Expand Down Expand Up @@ -128,6 +129,74 @@ export async function pgQuery(
throw new Error('multiple, independently or transaction are valid options');
}


export async function pgQueryV2(
this: IExecuteFunctions,
pgp: pgPromise.IMain<{}, pg.IClient>,
db: pgPromise.IDatabase<{}, pg.IClient>,
items: INodeExecutionData[],
continueOnFail: boolean,
overrideMode?: string,
): Promise<IDataObject[]> {
const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject;

let valuesArray = [] as string[][];
if (additionalFields.queryParams) {
const propertiesString = additionalFields.queryParams as string;
const properties = propertiesString.split(',').map(column => column.trim());
const paramsItems = getItemsCopy(items, properties);
valuesArray = paramsItems.map((row) => properties.map(col => row[col])) as string[][];
}

type QueryWithValues = {query: string, values?: string[]};
const allQueries = new Array<QueryWithValues>();
for (let i = 0; i < items.length; i++) {
const query = this.getNodeParameter('query', i) as string;
const values = valuesArray[i];
const queryFormat = { query, values };
allQueries.push(queryFormat);
}

const mode = overrideMode ? overrideMode : (additionalFields.mode ?? 'multiple') as string;
if (mode === 'multiple') {
(await db.multi(pgp.helpers.concat(allQueries))).map((result, i) => {
return this.helpers.constructExecutionMetaData({item: i}, this.helpers.returnJsonArray([...result]));
}).flat();
} else if (mode === 'transaction') {
return db.tx(async t => {
const result: INodeExecutionData[] = [];
for (let i = 0; i < allQueries.length; i++) {
try {
const transactionResult = await t.any(allQueries[i].query, allQueries[i].values);
const executionData = this.helpers.constructExecutionMetaData({ item: i }, this.helpers.returnJsonArray(transactionResult));
result.push(...executionData);
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ json: { ...items[i].json }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData);
return result;
}
}
return result;
});
} else if (mode === 'independently') {
return db.task(async t => {
const result: INodeExecutionData[] = [];
for (let i = 0; i < allQueries.length; i++) {
try {
const transactionResult = await t.any(allQueries[i].query, allQueries[i].values);
const executionData = this.helpers.constructExecutionMetaData({ item: i }, this.helpers.returnJsonArray(transactionResult));
result.push(...executionData);
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ json: { ...items[i].json }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData);
}
}
return result;
});
}
throw new Error('multiple, independently or transaction are valid options');
}

/**
* Inserts the given items into the database.
*
Expand Down Expand Up @@ -207,6 +276,88 @@ export async function pgInsert(
throw new Error('multiple, independently or transaction are valid options');
}

/**
* Inserts the given items into the database.
*
* @param {Function} getNodeParam The getter for the Node's parameters
* @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance
* @param {pgPromise.IDatabase<{}, pg.IClient>} db The pgPromise database connection
* @param {INodeExecutionData[]} items The items to be inserted
* @returns Promise<Array<IDataObject>>
*/
export async function pgInsertV2(
this: IExecuteFunctions,
pgp: pgPromise.IMain<{}, pg.IClient>,
db: pgPromise.IDatabase<{}, pg.IClient>,
items: INodeExecutionData[],
continueOnFail: boolean,
overrideMode?: string,
): Promise<IDataObject[]> {
const table = this.getNodeParameter('table', 0) as string;
const schema = this.getNodeParameter('schema', 0) as string;
const columnString = this.getNodeParameter('columns', 0) as string;
const guardedColumns: {[key: string]: string} = {};

const columns = columnString.split(',')
.map(column => column.trim().split(':'))
.map(([name, cast], i) => {
guardedColumns[`column${i}`] = name;
return { name, cast, prop: `column${i}` };
});

const columnNames = columns.map(column => column.name);

const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } });

const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject;
const mode = overrideMode ? overrideMode : (additionalFields.mode ?? 'multiple') as string;

const returning = generateReturning(pgp, this.getNodeParameter('returnFields', 0) as string);
if (mode === 'multiple') {
const query = pgp.helpers.insert(getItemsCopy(items, columnNames, guardedColumns), cs) + returning;
return (await db.any(query)).map((result, i) => {
return this.helpers.constructExecutionMetaData({item: i}, this.helpers.returnJsonArray([...result]));
}).flat();
} else if (mode === 'transaction') {
return db.tx(async t => {
const result: IDataObject[] = [];
for (let i = 0; i < items.length; i++) {
const itemCopy = getItemCopy(items[i], columnNames, guardedColumns);
try {
const insertResult = await t.one(pgp.helpers.insert(itemCopy, cs) + returning);
result.push(...this.helpers.constructExecutionMetaData({item: i}, this.helpers.returnJsonArray(insertResult)));
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ json: { ...itemCopy }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData);
return result;
}
}
return result;
});
} else if (mode === 'independently') {
return db.task(async t => {
const result: IDataObject[] = [];
for (let i = 0; i < items.length; i++) {
const itemCopy = getItemCopy(items[i], columnNames, guardedColumns);
try {
const insertResult = await t.oneOrNone(pgp.helpers.insert(itemCopy, cs) + returning);
if (insertResult !== null) {
result.push(...this.helpers.constructExecutionMetaData({item: i}, this.helpers.returnJsonArray(insertResult)));
}
} catch (err) {
if (continueOnFail === false) {
throw err;
}
result.push({ json: { ...itemCopy }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData);
}
}
return result;
});
}

throw new Error('multiple, independently or transaction are valid options');
}

/**
* Updates the given items in the database.
*
Expand Down Expand Up @@ -306,3 +457,109 @@ export async function pgUpdate(
}
throw new Error('multiple, independently or transaction are valid options');
}


/**
* Updates the given items in the database.
*
* @param {Function} getNodeParam The getter for the Node's parameters
* @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance
* @param {pgPromise.IDatabase<{}, pg.IClient>} db The pgPromise database connection
* @param {INodeExecutionData[]} items The items to be updated
* @returns Promise<Array<IDataObject>>
*/
export async function pgUpdateV2(
this: IExecuteFunctions,
pgp: pgPromise.IMain<{}, pg.IClient>,
db: pgPromise.IDatabase<{}, pg.IClient>,
items: INodeExecutionData[],
continueOnFail = false,
): Promise<IDataObject[]> {
const table = this.getNodeParameter('table', 0) as string;
const schema = this.getNodeParameter('schema', 0) as string;
const updateKey = this.getNodeParameter('updateKey', 0) as string;
const columnString = this.getNodeParameter('columns', 0) as string;
const guardedColumns: {[key: string]: string} = {};

const columns: Array<{name:string, cast: string, prop:string}> = columnString.split(',')
.map(column => column.trim().split(':'))
.map(([name, cast], i) => {
guardedColumns[`column${i}`] = name;
return { name, cast, prop: `column${i}` };
});

const updateKeys = updateKey.split(',').map((key, i) => {
const [name, cast] = key.trim().split(':');
const targetCol = columns.find((column) => column.name === name);
const updateColumn = { name, cast, prop: targetCol ? targetCol.prop : `updateColumn${i}` };
if (!targetCol) {
guardedColumns[updateColumn.prop] = name;
columns.unshift(updateColumn);
}
else if (!targetCol.cast) {
targetCol.cast = updateColumn.cast || targetCol.cast;
}
return updateColumn;
});

const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject;
const mode = additionalFields.mode ?? 'multiple' as string;

const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } });

// Prepare the data to update and copy it to be returned
const columnNames = columns.map(column => column.name);
const updateItems = getItemsCopy(items, columnNames, guardedColumns);

const returning = generateReturning(pgp, this.getNodeParameter('returnFields', 0) as string);
if (mode === 'multiple') {
const query =
pgp.helpers.update(updateItems, cs)
+ ' WHERE ' + updateKeys.map(updateKey => {
const key = pgp.as.name(updateKey.name);
return 'v.' + key + ' = t.' + key;
}).join(' AND ')
+ returning;
const updateResult = await db.any(query);
return updateResult;
} else {
const where = ' WHERE ' +
updateKeys.map(updateKey => pgp.as.name(updateKey.name) +
' = ${' + updateKey.prop + '}').join(' AND ');
if (mode === 'transaction') {
return db.tx(async t => {
const result: IDataObject[] = [];
for (let i = 0; i < items.length; i++) {
const itemCopy = getItemCopy(items[i], columnNames, guardedColumns);
try {
const transactionResult = await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning);
const executionData = this.helpers.constructExecutionMetaData({ item: i }, this.helpers.returnJsonArray(transactionResult));
result.push(...executionData);
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ ...itemCopy, code: (err as JsonObject).code, message: (err as JsonObject).message });
return result;
}
}
return result;
});
} else if (mode === 'independently') {
return db.task(async t => {
const result: IDataObject[] = [];
for (let i = 0; i < items.length; i++) {
const itemCopy = getItemCopy(items[i], columnNames, guardedColumns);
try {
const independentResult = await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning);
const executionData = this.helpers.constructExecutionMetaData({ item: i }, this.helpers.returnJsonArray(independentResult));
result.push(...executionData);
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ json: { ...items[i].json }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData);
}
}
return result;
});
}
}
throw new Error('multiple, independently or transaction are valid options');
}
18 changes: 7 additions & 11 deletions packages/nodes-base/nodes/Postgres/Postgres.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {

import pgPromise from 'pg-promise';

import { pgInsert, pgQuery, pgUpdate } from './Postgres.node.functions';
import { pgInsert, pgInsertV2, pgQuery, pgQueryV2, pgUpdate } from './Postgres.node.functions';

export class Postgres implements INodeType {
description: INodeTypeDescription = {
Expand Down Expand Up @@ -303,7 +303,7 @@ export class Postgres implements INodeType {

const db = pgp(config);

let returnItems = [];
let returnItems: INodeExecutionData[] = [];

const items = this.getInputData();
const operation = this.getNodeParameter('operation', 0) as string;
Expand All @@ -313,21 +313,17 @@ export class Postgres implements INodeType {
// executeQuery
// ----------------------------------

const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail());

returnItems = this.helpers.returnJsonArray(queryResult);
const queryResult = await pgQueryV2.call(this, pgp, db, items, this.continueOnFail());
returnItems = queryResult as INodeExecutionData[];
} else if (operation === 'insert') {
// ----------------------------------
// insert
// ----------------------------------

const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail());
const insertData = await pgInsertV2.call(this, pgp, db, items, this.continueOnFail());

for (let i = 0; i < insertData.length; i++) {
returnItems.push({
json: insertData[i],
});
}
// returnItems = this.helpers.returnJsonArray(insertData);
returnItems = insertData as INodeExecutionData[];
} else if (operation === 'update') {
// ----------------------------------
// update
Expand Down