diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts b/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts index 26007788cd69d..816964a2b94f7 100644 --- a/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts +++ b/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts @@ -1,3 +1,4 @@ +import { IExecuteFunctions } from 'n8n-core'; import { IDataObject, INodeExecutionData, JsonObject } from 'n8n-workflow'; import pgPromise from 'pg-promise'; import pg from 'pg-promise/typescript/pg-subset'; @@ -68,7 +69,7 @@ export function generateReturning(pgp: pgPromise.IMain<{}, pg.IClient>, returnin * @param {input[]} input The Node's input data * @returns Promise> */ -export async function pgQuery( + export async function pgQuery( getNodeParam: Function, pgp: pgPromise.IMain<{}, pg.IClient>, db: pgPromise.IDatabase<{}, pg.IClient>, @@ -128,6 +129,74 @@ export async function pgQuery( throw new Error('multiple, independently or transaction are valid options'); } + +export async function pgQueryV2( + this: IExecuteFunctions, + pgp: pgPromise.IMain<{}, pg.IClient>, + db: pgPromise.IDatabase<{}, pg.IClient>, + items: INodeExecutionData[], + continueOnFail: boolean, + overrideMode?: string, +): Promise { + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + + let valuesArray = [] as string[][]; + if (additionalFields.queryParams) { + const propertiesString = additionalFields.queryParams as string; + const properties = propertiesString.split(',').map(column => column.trim()); + const paramsItems = getItemsCopy(items, properties); + valuesArray = paramsItems.map((row) => properties.map(col => row[col])) as string[][]; + } + + type QueryWithValues = {query: string, values?: string[]}; + const allQueries = new Array(); + for (let i = 0; i < items.length; i++) { + const query = this.getNodeParameter('query', i) as string; + const values = valuesArray[i]; + const queryFormat = { query, values }; + allQueries.push(queryFormat); + } + + const mode = overrideMode ? overrideMode : (additionalFields.mode ?? 'multiple') as string; + if (mode === 'multiple') { + (await db.multi(pgp.helpers.concat(allQueries))).map((result, i) => { + return this.helpers.constructExecutionMetaData({item: i}, this.helpers.returnJsonArray([...result])); + }).flat(); + } else if (mode === 'transaction') { + return db.tx(async t => { + const result: INodeExecutionData[] = []; + for (let i = 0; i < allQueries.length; i++) { + try { + const transactionResult = await t.any(allQueries[i].query, allQueries[i].values); + const executionData = this.helpers.constructExecutionMetaData({ item: i }, this.helpers.returnJsonArray(transactionResult)); + result.push(...executionData); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ json: { ...items[i].json }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: INodeExecutionData[] = []; + for (let i = 0; i < allQueries.length; i++) { + try { + const transactionResult = await t.any(allQueries[i].query, allQueries[i].values); + const executionData = this.helpers.constructExecutionMetaData({ item: i }, this.helpers.returnJsonArray(transactionResult)); + result.push(...executionData); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ json: { ...items[i].json }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData); + } + } + return result; + }); + } + throw new Error('multiple, independently or transaction are valid options'); +} + /** * Inserts the given items into the database. * @@ -207,6 +276,88 @@ export async function pgInsert( throw new Error('multiple, independently or transaction are valid options'); } +/** + * Inserts the given items into the database. + * + * @param {Function} getNodeParam The getter for the Node's parameters + * @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance + * @param {pgPromise.IDatabase<{}, pg.IClient>} db The pgPromise database connection + * @param {INodeExecutionData[]} items The items to be inserted + * @returns Promise> + */ + export async function pgInsertV2( + this: IExecuteFunctions, + pgp: pgPromise.IMain<{}, pg.IClient>, + db: pgPromise.IDatabase<{}, pg.IClient>, + items: INodeExecutionData[], + continueOnFail: boolean, + overrideMode?: string, +): Promise { + const table = this.getNodeParameter('table', 0) as string; + const schema = this.getNodeParameter('schema', 0) as string; + const columnString = this.getNodeParameter('columns', 0) as string; + const guardedColumns: {[key: string]: string} = {}; + + const columns = columnString.split(',') + .map(column => column.trim().split(':')) + .map(([name, cast], i) => { + guardedColumns[`column${i}`] = name; + return { name, cast, prop: `column${i}` }; + }); + + const columnNames = columns.map(column => column.name); + + const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } }); + + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + const mode = overrideMode ? overrideMode : (additionalFields.mode ?? 'multiple') as string; + + const returning = generateReturning(pgp, this.getNodeParameter('returnFields', 0) as string); + if (mode === 'multiple') { + const query = pgp.helpers.insert(getItemsCopy(items, columnNames, guardedColumns), cs) + returning; + return (await db.any(query)).map((result, i) => { + return this.helpers.constructExecutionMetaData({item: i}, this.helpers.returnJsonArray([...result])); + }).flat(); + } else if (mode === 'transaction') { + return db.tx(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames, guardedColumns); + try { + const insertResult = await t.one(pgp.helpers.insert(itemCopy, cs) + returning); + result.push(...this.helpers.constructExecutionMetaData({item: i}, this.helpers.returnJsonArray(insertResult))); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ json: { ...itemCopy }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames, guardedColumns); + try { + const insertResult = await t.oneOrNone(pgp.helpers.insert(itemCopy, cs) + returning); + if (insertResult !== null) { + result.push(...this.helpers.constructExecutionMetaData({item: i}, this.helpers.returnJsonArray(insertResult))); + } + } catch (err) { + if (continueOnFail === false) { + throw err; + } + result.push({ json: { ...itemCopy }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData); + } + } + return result; + }); + } + + throw new Error('multiple, independently or transaction are valid options'); +} + /** * Updates the given items in the database. * @@ -306,3 +457,109 @@ export async function pgUpdate( } throw new Error('multiple, independently or transaction are valid options'); } + + +/** + * Updates the given items in the database. + * + * @param {Function} getNodeParam The getter for the Node's parameters + * @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance + * @param {pgPromise.IDatabase<{}, pg.IClient>} db The pgPromise database connection + * @param {INodeExecutionData[]} items The items to be updated + * @returns Promise> + */ + export async function pgUpdateV2( + this: IExecuteFunctions, + pgp: pgPromise.IMain<{}, pg.IClient>, + db: pgPromise.IDatabase<{}, pg.IClient>, + items: INodeExecutionData[], + continueOnFail = false, +): Promise { + const table = this.getNodeParameter('table', 0) as string; + const schema = this.getNodeParameter('schema', 0) as string; + const updateKey = this.getNodeParameter('updateKey', 0) as string; + const columnString = this.getNodeParameter('columns', 0) as string; + const guardedColumns: {[key: string]: string} = {}; + + const columns: Array<{name:string, cast: string, prop:string}> = columnString.split(',') + .map(column => column.trim().split(':')) + .map(([name, cast], i) => { + guardedColumns[`column${i}`] = name; + return { name, cast, prop: `column${i}` }; + }); + + const updateKeys = updateKey.split(',').map((key, i) => { + const [name, cast] = key.trim().split(':'); + const targetCol = columns.find((column) => column.name === name); + const updateColumn = { name, cast, prop: targetCol ? targetCol.prop : `updateColumn${i}` }; + if (!targetCol) { + guardedColumns[updateColumn.prop] = name; + columns.unshift(updateColumn); + } + else if (!targetCol.cast) { + targetCol.cast = updateColumn.cast || targetCol.cast; + } + return updateColumn; + }); + + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + const mode = additionalFields.mode ?? 'multiple' as string; + + const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } }); + + // Prepare the data to update and copy it to be returned + const columnNames = columns.map(column => column.name); + const updateItems = getItemsCopy(items, columnNames, guardedColumns); + + const returning = generateReturning(pgp, this.getNodeParameter('returnFields', 0) as string); + if (mode === 'multiple') { + const query = + pgp.helpers.update(updateItems, cs) + + ' WHERE ' + updateKeys.map(updateKey => { + const key = pgp.as.name(updateKey.name); + return 'v.' + key + ' = t.' + key; + }).join(' AND ') + + returning; + const updateResult = await db.any(query); + return updateResult; + } else { + const where = ' WHERE ' + + updateKeys.map(updateKey => pgp.as.name(updateKey.name) + + ' = ${' + updateKey.prop + '}').join(' AND '); + if (mode === 'transaction') { + return db.tx(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames, guardedColumns); + try { + const transactionResult = await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning); + const executionData = this.helpers.constructExecutionMetaData({ item: i }, this.helpers.returnJsonArray(transactionResult)); + result.push(...executionData); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...itemCopy, code: (err as JsonObject).code, message: (err as JsonObject).message }); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames, guardedColumns); + try { + const independentResult = await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning); + const executionData = this.helpers.constructExecutionMetaData({ item: i }, this.helpers.returnJsonArray(independentResult)); + result.push(...executionData); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ json: { ...items[i].json }, code: (err as JsonObject).code, message: (err as JsonObject).message, pairedItem: { item: i } } as INodeExecutionData); + } + } + return result; + }); + } + } + throw new Error('multiple, independently or transaction are valid options'); +} diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.ts b/packages/nodes-base/nodes/Postgres/Postgres.node.ts index cda8198cdc7a8..5f417ab6a1e77 100644 --- a/packages/nodes-base/nodes/Postgres/Postgres.node.ts +++ b/packages/nodes-base/nodes/Postgres/Postgres.node.ts @@ -9,7 +9,7 @@ import { import pgPromise from 'pg-promise'; -import { pgInsert, pgQuery, pgUpdate } from './Postgres.node.functions'; +import { pgInsert, pgInsertV2, pgQuery, pgQueryV2, pgUpdate } from './Postgres.node.functions'; export class Postgres implements INodeType { description: INodeTypeDescription = { @@ -303,7 +303,7 @@ export class Postgres implements INodeType { const db = pgp(config); - let returnItems = []; + let returnItems: INodeExecutionData[] = []; const items = this.getInputData(); const operation = this.getNodeParameter('operation', 0) as string; @@ -313,21 +313,17 @@ export class Postgres implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - - returnItems = this.helpers.returnJsonArray(queryResult); + const queryResult = await pgQueryV2.call(this, pgp, db, items, this.continueOnFail()); + returnItems = queryResult as INodeExecutionData[]; } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail()); + const insertData = await pgInsertV2.call(this, pgp, db, items, this.continueOnFail()); - for (let i = 0; i < insertData.length; i++) { - returnItems.push({ - json: insertData[i], - }); - } + // returnItems = this.helpers.returnJsonArray(insertData); + returnItems = insertData as INodeExecutionData[]; } else if (operation === 'update') { // ---------------------------------- // update