diff --git a/packages/cli/src/databases/entities/WorkflowEntity.ts b/packages/cli/src/databases/entities/WorkflowEntity.ts index 94a1183a9df99..d37decc15d40e 100644 --- a/packages/cli/src/databases/entities/WorkflowEntity.ts +++ b/packages/cli/src/databases/entities/WorkflowEntity.ts @@ -22,7 +22,7 @@ import * as config from '../../../config'; import { DatabaseType, IWorkflowDb } from '../..'; import { TagEntity } from './TagEntity'; import { SharedWorkflow } from './SharedWorkflow'; -import { objectRetriever, serializer } from '../utils/transformers'; +import { objectRetriever, sqlite } from '../utils/transformers'; function resolveDataType(dataType: string) { const dbType = config.getEnv('database.type'); @@ -120,7 +120,7 @@ export class WorkflowEntity implements IWorkflowDb { @Column({ type: config.getEnv('database.type') === 'sqlite' ? 'text' : 'json', nullable: true, - transformer: serializer, + transformer: sqlite.jsonColumn, }) pinData: IPinData; diff --git a/packages/cli/src/databases/migrations/mysqldb/1630451444017-UpdateWorkflowCredentials.ts b/packages/cli/src/databases/migrations/mysqldb/1630451444017-UpdateWorkflowCredentials.ts index a4f12cb9d84be..901f924b2d285 100644 --- a/packages/cli/src/databases/migrations/mysqldb/1630451444017-UpdateWorkflowCredentials.ts +++ b/packages/cli/src/databases/migrations/mysqldb/1630451444017-UpdateWorkflowCredentials.ts @@ -1,6 +1,6 @@ import { MigrationInterface, QueryRunner } from 'typeorm'; import * as config from '../../../../config'; -import { runChunked } from '../../utils/migrationHelpers'; +import { runInBatches } from '../../utils/migrationHelpers'; // replacing the credentials in workflows and execution // `nodeType: name` changes to `nodeType: { id, name }` @@ -22,7 +22,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = workflow.nodes; let credentialsUpdated = false; @@ -65,7 +65,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac WHERE waitTill IS NOT NULL AND finished = 0 `; // @ts-ignore - await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { + await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { waitingExecutions.forEach(async (execution) => { const data = execution.workflowData; let credentialsUpdated = false; @@ -158,7 +158,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac FROM ${tablePrefix}workflow_entity `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = workflow.nodes; let credentialsUpdated = false; @@ -206,7 +206,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac WHERE waitTill IS NOT NULL AND finished = 0 `; // @ts-ignore - await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { + await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { waitingExecutions.forEach(async (execution) => { const data = execution.workflowData; let credentialsUpdated = false; diff --git a/packages/cli/src/databases/migrations/mysqldb/1658932910559-AddNodeIds.ts b/packages/cli/src/databases/migrations/mysqldb/1658932910559-AddNodeIds.ts index 000d0adb1e636..8d67380aad55d 100644 --- a/packages/cli/src/databases/migrations/mysqldb/1658932910559-AddNodeIds.ts +++ b/packages/cli/src/databases/migrations/mysqldb/1658932910559-AddNodeIds.ts @@ -1,6 +1,6 @@ import { MigrationInterface, QueryRunner } from 'typeorm'; import * as config from '../../../../config'; -import { runChunked } from '../../utils/migrationHelpers'; +import { runInBatches } from '../../utils/migrationHelpers'; import { v4 as uuid } from 'uuid'; // add node ids in workflow objects @@ -17,7 +17,7 @@ export class AddNodeIds1658932910559 implements MigrationInterface { `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { let nodes = workflow.nodes; if (typeof nodes === 'string') { @@ -31,16 +31,15 @@ export class AddNodeIds1658932910559 implements MigrationInterface { } }); - const [updateQuery, updateParams] = - queryRunner.connection.driver.escapeQueryWithParameters( - ` + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` UPDATE ${tablePrefix}workflow_entity SET nodes = :nodes WHERE id = '${workflow.id}' `, - { nodes: JSON.stringify(nodes) }, - {}, - ); + { nodes: JSON.stringify(nodes) }, + {}, + ); queryRunner.query(updateQuery, updateParams); }); @@ -56,22 +55,21 @@ export class AddNodeIds1658932910559 implements MigrationInterface { `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = workflow.nodes; // @ts-ignore - nodes.forEach((node) => delete node.id ); + nodes.forEach((node) => delete node.id); - const [updateQuery, updateParams] = - queryRunner.connection.driver.escapeQueryWithParameters( - ` + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` UPDATE ${tablePrefix}workflow_entity SET nodes = :nodes WHERE id = '${workflow.id}' `, - { nodes: JSON.stringify(nodes) }, - {}, - ); + { nodes: JSON.stringify(nodes) }, + {}, + ); queryRunner.query(updateQuery, updateParams); }); diff --git a/packages/cli/src/databases/migrations/mysqldb/1659895550980-AddJsonKeyPinData.ts b/packages/cli/src/databases/migrations/mysqldb/1659895550980-AddJsonKeyPinData.ts new file mode 100644 index 0000000000000..2faca02f6d17d --- /dev/null +++ b/packages/cli/src/databases/migrations/mysqldb/1659895550980-AddJsonKeyPinData.ts @@ -0,0 +1,46 @@ +import { + logMigrationStart, + logMigrationEnd, + runInBatches, + getTablePrefix, +} from '../../utils/migrationHelpers'; +import { addJsonKeyToPinDataColumn } from '../sqlite/1659888469333-AddJsonKeyPinData'; +import type { MigrationInterface, QueryRunner } from 'typeorm'; + +/** + * Convert JSON-type `pinData` column in `workflow_entity` table from + * `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }` + */ +export class AddJsonKeyPinData1659895550980 implements MigrationInterface { + name = 'AddJsonKeyPinData1659895550980'; + + async up(queryRunner: QueryRunner) { + logMigrationStart(this.name); + + const workflowTable = `${getTablePrefix()}workflow_entity`; + + const PINDATA_SELECT_QUERY = ` + SELECT id, pinData + FROM \`${workflowTable}\` + WHERE pinData IS NOT NULL; + `; + + const PINDATA_UPDATE_STATEMENT = ` + UPDATE \`${workflowTable}\` + SET \`pinData\` = :pinData + WHERE id = :id; + `; + + await runInBatches( + queryRunner, + PINDATA_SELECT_QUERY, + addJsonKeyToPinDataColumn(queryRunner, PINDATA_UPDATE_STATEMENT), + ); + + logMigrationEnd(this.name); + } + + async down() { + // irreversible migration + } +} diff --git a/packages/cli/src/databases/migrations/mysqldb/index.ts b/packages/cli/src/databases/migrations/mysqldb/index.ts index d4c13c4e6c410..38b05e86000f8 100644 --- a/packages/cli/src/databases/migrations/mysqldb/index.ts +++ b/packages/cli/src/databases/migrations/mysqldb/index.ts @@ -18,6 +18,7 @@ import { CommunityNodes1652254514003 } from './1652254514003-CommunityNodes'; import { AddAPIKeyColumn1652905585850 } from './1652905585850-AddAPIKeyColumn'; import { IntroducePinData1654090101303 } from './1654090101303-IntroducePinData'; import { AddNodeIds1658932910559 } from './1658932910559-AddNodeIds'; +import { AddJsonKeyPinData1659895550980 } from './1659895550980-AddJsonKeyPinData'; export const mysqlMigrations = [ InitialMigration1588157391238, @@ -40,4 +41,5 @@ export const mysqlMigrations = [ AddAPIKeyColumn1652905585850, IntroducePinData1654090101303, AddNodeIds1658932910559, + AddJsonKeyPinData1659895550980, ]; diff --git a/packages/cli/src/databases/migrations/postgresdb/1630419189837-UpdateWorkflowCredentials.ts b/packages/cli/src/databases/migrations/postgresdb/1630419189837-UpdateWorkflowCredentials.ts index 14766960cc404..dd1425426a540 100644 --- a/packages/cli/src/databases/migrations/postgresdb/1630419189837-UpdateWorkflowCredentials.ts +++ b/packages/cli/src/databases/migrations/postgresdb/1630419189837-UpdateWorkflowCredentials.ts @@ -1,6 +1,6 @@ import { MigrationInterface, QueryRunner } from 'typeorm'; import * as config from '../../../../config'; -import { runChunked } from '../../utils/migrationHelpers'; +import { runInBatches } from '../../utils/migrationHelpers'; // replacing the credentials in workflows and execution // `nodeType: name` changes to `nodeType: { id, name }` @@ -17,7 +17,6 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac await queryRunner.query(`SET search_path TO ${schema};`); - const credentialsEntities = await queryRunner.query(` SELECT id, name, type FROM ${tablePrefix}credentials_entity @@ -29,7 +28,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = workflow.nodes; let credentialsUpdated = false; @@ -72,7 +71,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac WHERE "waitTill" IS NOT NULL AND finished = FALSE `; // @ts-ignore - await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { + await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { waitingExecutions.forEach(async (execution) => { const data = execution.workflowData; let credentialsUpdated = false; @@ -172,7 +171,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac FROM ${tablePrefix}workflow_entity `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = workflow.nodes; let credentialsUpdated = false; @@ -221,7 +220,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac WHERE "waitTill" IS NOT NULL AND finished = FALSE `; // @ts-ignore - await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { + await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { waitingExecutions.forEach(async (execution) => { const data = execution.workflowData; let credentialsUpdated = false; diff --git a/packages/cli/src/databases/migrations/postgresdb/1658932090381-AddNodeIds.ts b/packages/cli/src/databases/migrations/postgresdb/1658932090381-AddNodeIds.ts index 3571693fa4a43..e6bda6c0010fe 100644 --- a/packages/cli/src/databases/migrations/postgresdb/1658932090381-AddNodeIds.ts +++ b/packages/cli/src/databases/migrations/postgresdb/1658932090381-AddNodeIds.ts @@ -1,6 +1,6 @@ import { MigrationInterface, QueryRunner } from 'typeorm'; import * as config from '../../../../config'; -import { runChunked } from '../../utils/migrationHelpers'; +import { runInBatches } from '../../utils/migrationHelpers'; import { v4 as uuid } from 'uuid'; // add node ids in workflow objects @@ -23,7 +23,7 @@ export class AddNodeIds1658932090381 implements MigrationInterface { `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = workflow.nodes; // @ts-ignore @@ -33,16 +33,15 @@ export class AddNodeIds1658932090381 implements MigrationInterface { } }); - const [updateQuery, updateParams] = - queryRunner.connection.driver.escapeQueryWithParameters( - ` + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` UPDATE ${tablePrefix}workflow_entity SET nodes = :nodes WHERE id = '${workflow.id}' `, - { nodes: JSON.stringify(nodes) }, - {}, - ); + { nodes: JSON.stringify(nodes) }, + {}, + ); queryRunner.query(updateQuery, updateParams); }); @@ -64,22 +63,21 @@ export class AddNodeIds1658932090381 implements MigrationInterface { `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = workflow.nodes; // @ts-ignore - nodes.forEach((node) => delete node.id ); + nodes.forEach((node) => delete node.id); - const [updateQuery, updateParams] = - queryRunner.connection.driver.escapeQueryWithParameters( - ` + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` UPDATE ${tablePrefix}workflow_entity SET nodes = :nodes WHERE id = '${workflow.id}' `, - { nodes: JSON.stringify(nodes) }, - {}, - ); + { nodes: JSON.stringify(nodes) }, + {}, + ); queryRunner.query(updateQuery, updateParams); }); diff --git a/packages/cli/src/databases/migrations/postgresdb/1659902242948-AddJsonKeyPinData.ts b/packages/cli/src/databases/migrations/postgresdb/1659902242948-AddJsonKeyPinData.ts new file mode 100644 index 0000000000000..2c0d1aad30b74 --- /dev/null +++ b/packages/cli/src/databases/migrations/postgresdb/1659902242948-AddJsonKeyPinData.ts @@ -0,0 +1,46 @@ +import { + getTablePrefix, + logMigrationEnd, + logMigrationStart, + runInBatches, +} from '../../utils/migrationHelpers'; +import { addJsonKeyToPinDataColumn } from '../sqlite/1659888469333-AddJsonKeyPinData'; +import type { MigrationInterface, QueryRunner } from 'typeorm'; + +/** + * Convert JSON-type `pinData` column in `workflow_entity` table from + * `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }` + */ +export class AddJsonKeyPinData1659902242948 implements MigrationInterface { + name = 'AddJsonKeyPinData1659902242948'; + + async up(queryRunner: QueryRunner) { + logMigrationStart(this.name); + + const workflowTable = `${getTablePrefix()}workflow_entity`; + + const PINDATA_SELECT_QUERY = ` + SELECT id, "pinData" + FROM ${workflowTable} + WHERE "pinData" IS NOT NULL; + `; + + const PINDATA_UPDATE_STATEMENT = ` + UPDATE ${workflowTable} + SET "pinData" = :pinData + WHERE id = :id; + `; + + await runInBatches( + queryRunner, + PINDATA_SELECT_QUERY, + addJsonKeyToPinDataColumn(queryRunner, PINDATA_UPDATE_STATEMENT), + ); + + logMigrationEnd(this.name); + } + + async down() { + // irreversible migration + } +} diff --git a/packages/cli/src/databases/migrations/postgresdb/index.ts b/packages/cli/src/databases/migrations/postgresdb/index.ts index 39a56af6692d8..0665b1829c92d 100644 --- a/packages/cli/src/databases/migrations/postgresdb/index.ts +++ b/packages/cli/src/databases/migrations/postgresdb/index.ts @@ -16,6 +16,7 @@ import { CommunityNodes1652254514002 } from './1652254514002-CommunityNodes'; import { AddAPIKeyColumn1652905585850 } from './1652905585850-AddAPIKeyColumn'; import { IntroducePinData1654090467022 } from './1654090467022-IntroducePinData'; import { AddNodeIds1658932090381 } from './1658932090381-AddNodeIds'; +import { AddJsonKeyPinData1659902242948 } from './1659902242948-AddJsonKeyPinData'; export const postgresMigrations = [ InitialMigration1587669153312, @@ -36,4 +37,5 @@ export const postgresMigrations = [ AddAPIKeyColumn1652905585850, IntroducePinData1654090467022, AddNodeIds1658932090381, + AddJsonKeyPinData1659902242948, ]; diff --git a/packages/cli/src/databases/migrations/sqlite/1630330987096-UpdateWorkflowCredentials.ts b/packages/cli/src/databases/migrations/sqlite/1630330987096-UpdateWorkflowCredentials.ts index 8bb8d8efd40f4..5ffad3a9ba109 100644 --- a/packages/cli/src/databases/migrations/sqlite/1630330987096-UpdateWorkflowCredentials.ts +++ b/packages/cli/src/databases/migrations/sqlite/1630330987096-UpdateWorkflowCredentials.ts @@ -1,7 +1,7 @@ import { MigrationInterface, QueryRunner } from 'typeorm'; import * as config from '../../../../config'; import { logMigrationEnd, logMigrationStart } from '../../utils/migrationHelpers'; -import { runChunked } from '../../utils/migrationHelpers'; +import { runInBatches } from '../../utils/migrationHelpers'; // replacing the credentials in workflows and execution // `nodeType: name` changes to `nodeType: { id, name }` @@ -25,7 +25,7 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = JSON.parse(workflow.nodes); let credentialsUpdated = false; @@ -68,7 +68,7 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac WHERE "waitTill" IS NOT NULL AND finished = 0 `; // @ts-ignore - await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { + await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { waitingExecutions.forEach(async (execution) => { const data = JSON.parse(execution.workflowData); let credentialsUpdated = false; @@ -164,7 +164,7 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { // @ts-ignore workflows.forEach(async (workflow) => { const nodes = JSON.parse(workflow.nodes); @@ -214,7 +214,7 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac `; // @ts-ignore - await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { + await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => { // @ts-ignore waitingExecutions.forEach(async (execution) => { const data = JSON.parse(execution.workflowData); diff --git a/packages/cli/src/databases/migrations/sqlite/1658930531669-AddNodeIds.ts b/packages/cli/src/databases/migrations/sqlite/1658930531669-AddNodeIds.ts index d772d5c47b453..ea3a993d0fc11 100644 --- a/packages/cli/src/databases/migrations/sqlite/1658930531669-AddNodeIds.ts +++ b/packages/cli/src/databases/migrations/sqlite/1658930531669-AddNodeIds.ts @@ -2,7 +2,7 @@ import { INode } from 'n8n-workflow'; import { MigrationInterface, QueryRunner } from 'typeorm'; import * as config from '../../../../config'; import { logMigrationEnd, logMigrationStart } from '../../utils/migrationHelpers'; -import { runChunked } from '../../utils/migrationHelpers'; +import { runInBatches } from '../../utils/migrationHelpers'; import { v4 as uuid } from 'uuid'; // add node ids in workflow objects @@ -21,7 +21,7 @@ export class AddNodeIds1658930531669 implements MigrationInterface { `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = JSON.parse(workflow.nodes); nodes.forEach((node: INode) => { @@ -30,16 +30,15 @@ export class AddNodeIds1658930531669 implements MigrationInterface { } }); - const [updateQuery, updateParams] = - queryRunner.connection.driver.escapeQueryWithParameters( - ` + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` UPDATE "${tablePrefix}workflow_entity" SET nodes = :nodes WHERE id = '${workflow.id}' `, - { nodes: JSON.stringify(nodes) }, - {}, - ); + { nodes: JSON.stringify(nodes) }, + {}, + ); queryRunner.query(updateQuery, updateParams); }); @@ -48,7 +47,6 @@ export class AddNodeIds1658930531669 implements MigrationInterface { logMigrationEnd(this.name); } - public async down(queryRunner: QueryRunner): Promise { const tablePrefix = config.getEnv('database.tablePrefix'); @@ -58,22 +56,21 @@ export class AddNodeIds1658930531669 implements MigrationInterface { `; // @ts-ignore - await runChunked(queryRunner, workflowsQuery, (workflows) => { + await runInBatches(queryRunner, workflowsQuery, (workflows) => { workflows.forEach(async (workflow) => { const nodes = JSON.parse(workflow.nodes); // @ts-ignore - nodes.forEach((node) => delete node.id ); + nodes.forEach((node) => delete node.id); - const [updateQuery, updateParams] = - queryRunner.connection.driver.escapeQueryWithParameters( - ` + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` UPDATE "${tablePrefix}workflow_entity" SET nodes = :nodes WHERE id = '${workflow.id}' `, - { nodes: JSON.stringify(nodes) }, - {}, - ); + { nodes: JSON.stringify(nodes) }, + {}, + ); queryRunner.query(updateQuery, updateParams); }); diff --git a/packages/cli/src/databases/migrations/sqlite/1659888469333-AddJsonKeyPinData.ts b/packages/cli/src/databases/migrations/sqlite/1659888469333-AddJsonKeyPinData.ts new file mode 100644 index 0000000000000..67b8cae704203 --- /dev/null +++ b/packages/cli/src/databases/migrations/sqlite/1659888469333-AddJsonKeyPinData.ts @@ -0,0 +1,93 @@ +import { + logMigrationStart, + logMigrationEnd, + runInBatches, + getTablePrefix, + escapeQuery, +} from '../../utils/migrationHelpers'; +import type { MigrationInterface, QueryRunner } from 'typeorm'; +import { isJsonKeyObject, PinData } from '../../utils/migrations.types'; + +/** + * Convert TEXT-type `pinData` column in `workflow_entity` table from + * `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }` + */ +export class AddJsonKeyPinData1659888469333 implements MigrationInterface { + name = 'AddJsonKeyPinData1659888469333'; + + async up(queryRunner: QueryRunner) { + logMigrationStart(this.name); + + const workflowTable = `${getTablePrefix()}workflow_entity`; + + const PINDATA_SELECT_QUERY = ` + SELECT id, pinData + FROM "${workflowTable}" + WHERE pinData IS NOT NULL; + `; + + const PINDATA_UPDATE_STATEMENT = ` + UPDATE "${workflowTable}" + SET "pinData" = :pinData + WHERE id = :id; + `; + + await runInBatches( + queryRunner, + PINDATA_SELECT_QUERY, + addJsonKeyToPinDataColumn(queryRunner, PINDATA_UPDATE_STATEMENT), + ); + + logMigrationEnd(this.name); + } + + async down() { + // irreversible migration + } +} + +export const addJsonKeyToPinDataColumn = + (queryRunner: QueryRunner, updateStatement: string) => + async (fetchedWorkflows: PinData.FetchedWorkflow[]) => { + makeUpdateParams(fetchedWorkflows).forEach((param) => { + const params = { + pinData: param.pinData, + id: param.id, + }; + + const [escapedStatement, escapedParams] = escapeQuery(queryRunner, updateStatement, params); + + queryRunner.query(escapedStatement, escapedParams); + }); + }; + +function makeUpdateParams(fetchedWorkflows: PinData.FetchedWorkflow[]) { + return fetchedWorkflows.reduce( + (updateParams, { id, pinData: rawPinData }) => { + const pinDataPerWorkflow: PinData.Old | PinData.New = + typeof rawPinData === 'string' ? JSON.parse(rawPinData) : rawPinData; + + const newPinDataPerWorkflow = Object.keys(pinDataPerWorkflow).reduce( + (newPinDataPerWorkflow, nodeName) => { + const pinDataPerNode = pinDataPerWorkflow[nodeName]; + + if (pinDataPerNode.every((item) => item.json)) return newPinDataPerWorkflow; + + newPinDataPerWorkflow[nodeName] = pinDataPerNode.map((item) => + isJsonKeyObject(item) ? item : { json: item }, + ); + + return newPinDataPerWorkflow; + }, + {}, + ); + + if (Object.keys(newPinDataPerWorkflow).length > 0) { + updateParams.push({ id, pinData: JSON.stringify(newPinDataPerWorkflow) }); + } + + return updateParams; + }, + [], + ); +} diff --git a/packages/cli/src/databases/migrations/sqlite/index.ts b/packages/cli/src/databases/migrations/sqlite/index.ts index 41efea5c74060..f51da758ff513 100644 --- a/packages/cli/src/databases/migrations/sqlite/index.ts +++ b/packages/cli/src/databases/migrations/sqlite/index.ts @@ -11,10 +11,11 @@ import { AddExecutionEntityIndexes1644421939510 } from './1644421939510-AddExecu import { CreateUserManagement1646992772331 } from './1646992772331-CreateUserManagement'; import { LowerCaseUserEmail1648740597343 } from './1648740597343-LowerCaseUserEmail'; import { AddUserSettings1652367743993 } from './1652367743993-AddUserSettings'; -import { CommunityNodes1652254514001 } from './1652254514001-CommunityNodes' +import { CommunityNodes1652254514001 } from './1652254514001-CommunityNodes'; import { AddAPIKeyColumn1652905585850 } from './1652905585850-AddAPIKeyColumn'; import { IntroducePinData1654089251344 } from './1654089251344-IntroducePinData'; import { AddNodeIds1658930531669 } from './1658930531669-AddNodeIds'; +import { AddJsonKeyPinData1659888469333 } from './1659888469333-AddJsonKeyPinData'; const sqliteMigrations = [ InitialMigration1588102412422, @@ -34,6 +35,7 @@ const sqliteMigrations = [ AddAPIKeyColumn1652905585850, IntroducePinData1654089251344, AddNodeIds1658930531669, + AddJsonKeyPinData1659888469333, ]; export { sqliteMigrations }; diff --git a/packages/cli/src/databases/ormconfig.ts b/packages/cli/src/databases/ormconfig.ts index f0e91d2ff129f..43c0339b52b2a 100644 --- a/packages/cli/src/databases/ormconfig.ts +++ b/packages/cli/src/databases/ormconfig.ts @@ -2,6 +2,9 @@ import path from 'path'; import { UserSettings } from 'n8n-core'; import { entities } from './entities'; +const MIGRATIONS_DIR = path.resolve('src', 'databases', 'migrations'); +const ENTITIES_DIR = path.resolve('src', 'databases', 'entities'); + export default [ { name: 'sqlite', @@ -9,10 +12,10 @@ export default [ logging: true, entities: Object.values(entities), database: path.resolve(UserSettings.getUserN8nFolderPath(), 'database.sqlite'), - migrations: [path.resolve('migrations', 'sqlite', 'index.ts')], + migrations: [path.resolve(MIGRATIONS_DIR, 'sqlite', 'index.ts')], cli: { - entitiesDir: path.resolve('entities'), - migrationsDir: path.resolve('migrations', 'sqlite'), + entitiesDir: ENTITIES_DIR, + migrationsDir: path.resolve(MIGRATIONS_DIR, 'sqlite'), }, }, { @@ -26,10 +29,10 @@ export default [ port: 5432, logging: false, entities: Object.values(entities), - migrations: [path.resolve('migrations', 'postgresdb', 'index.ts')], + migrations: [path.resolve(MIGRATIONS_DIR, 'postgresdb', 'index.ts')], cli: { - entitiesDir: path.resolve('entities'), - migrationsDir: path.resolve('migrations', 'postgresdb'), + entitiesDir: ENTITIES_DIR, + migrationsDir: path.resolve(MIGRATIONS_DIR, 'postgresdb'), }, }, { @@ -42,10 +45,10 @@ export default [ port: 3306, logging: false, entities: Object.values(entities), - migrations: [path.resolve('migrations', 'mysqldb', 'index.ts')], + migrations: [path.resolve(MIGRATIONS_DIR, 'mysqldb', 'index.ts')], cli: { - entitiesDir: path.resolve('entities'), - migrationsDir: path.resolve('migrations', 'mysqldb'), + entitiesDir: ENTITIES_DIR, + migrationsDir: path.resolve(MIGRATIONS_DIR, 'mysqldb'), }, }, { @@ -58,10 +61,10 @@ export default [ port: 3306, logging: false, entities: Object.values(entities), - migrations: [path.resolve('migrations', 'mysqldb', 'index.ts')], + migrations: [path.resolve(MIGRATIONS_DIR, 'mysqldb', 'index.ts')], cli: { - entitiesDir: path.resolve('entities'), - migrationsDir: path.resolve('migrations', 'mysqldb'), + entitiesDir: ENTITIES_DIR, + migrationsDir: path.resolve(MIGRATIONS_DIR, 'mysqldb'), }, }, ]; diff --git a/packages/cli/src/databases/utils/migrationHelpers.ts b/packages/cli/src/databases/utils/migrationHelpers.ts index 821c95ef8881b..f9e6b89dd4adc 100644 --- a/packages/cli/src/databases/utils/migrationHelpers.ts +++ b/packages/cli/src/databases/utils/migrationHelpers.ts @@ -1,7 +1,8 @@ /* eslint-disable no-await-in-loop */ import { readFileSync, rmSync } from 'fs'; import { UserSettings } from 'n8n-core'; -import { QueryRunner } from 'typeorm/query-runner/QueryRunner'; +import type { QueryRunner } from 'typeorm/query-runner/QueryRunner'; +import config from '../../../config'; import { getLogger } from '../../Logger'; const PERSONALIZATION_SURVEY_FILENAME = 'personalizationSurvey.json'; @@ -35,28 +36,36 @@ export function loadSurveyFromDisk(): string | null { } let logFinishTimeout: NodeJS.Timeout; -const disableLogging = process.argv[1].split('/').includes('jest'); -export function logMigrationStart(migrationName: string): void { +export function logMigrationStart( + migrationName: string, + disableLogging = process.env.NODE_ENV === 'test', +): void { if (disableLogging) return; - const logger = getLogger(); + if (!logFinishTimeout) { - logger.warn('Migrations in progress, please do NOT stop the process.'); + getLogger().warn('Migrations in progress, please do NOT stop the process.'); } - logger.debug(`Starting migration ${migrationName}`); + + getLogger().debug(`Starting migration ${migrationName}`); + clearTimeout(logFinishTimeout); } -export function logMigrationEnd(migrationName: string): void { +export function logMigrationEnd( + migrationName: string, + disableLogging = process.env.NODE_ENV === 'test', +): void { if (disableLogging) return; - const logger = getLogger(); - logger.debug(`Finished migration ${migrationName}`); + + getLogger().debug(`Finished migration ${migrationName}`); + logFinishTimeout = setTimeout(() => { - logger.warn('Migrations finished.'); + getLogger().warn('Migrations finished.'); }, 100); } -export function chunkQuery(query: string, limit: number, offset = 0): string { +export function batchQuery(query: string, limit: number, offset = 0): string { return ` ${query} LIMIT ${limit} @@ -64,7 +73,7 @@ export function chunkQuery(query: string, limit: number, offset = 0): string { `; } -export async function runChunked( +export async function runInBatches( queryRunner: QueryRunner, query: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -72,14 +81,42 @@ export async function runChunked( limit = 100, ): Promise { let offset = 0; - let chunkedQuery: string; - let chunkedQueryResults: unknown[]; + let batchedQuery: string; + let batchedQueryResults: unknown[]; + + // eslint-disable-next-line no-param-reassign + if (query.trim().endsWith(';')) query = query.trim().slice(0, -1); do { - chunkedQuery = chunkQuery(query, limit, offset); - chunkedQueryResults = (await queryRunner.query(chunkedQuery)) as unknown[]; + batchedQuery = batchQuery(query, limit, offset); + batchedQueryResults = (await queryRunner.query(batchedQuery)) as unknown[]; // pass a copy to prevent errors from mutation - await operation([...chunkedQueryResults]); + await operation([...batchedQueryResults]); offset += limit; - } while (chunkedQueryResults.length === limit); + } while (batchedQueryResults.length === limit); } + +export const getTablePrefix = () => { + const tablePrefix = config.getEnv('database.tablePrefix'); + + if (config.getEnv('database.type') === 'postgresdb') { + const schema = config.getEnv('database.postgresdb.schema'); + return [schema, tablePrefix].join('.'); + } + + return tablePrefix; +}; + +export const escapeQuery = ( + queryRunner: QueryRunner, + query: string, + params: { [property: string]: unknown }, +): [string, unknown[]] => + queryRunner.connection.driver.escapeQueryWithParameters( + query, + { + pinData: params.pinData, + id: params.id, + }, + {}, + ); diff --git a/packages/cli/src/databases/utils/migrations.types.ts b/packages/cli/src/databases/utils/migrations.types.ts new file mode 100644 index 0000000000000..aaa9f66472f73 --- /dev/null +++ b/packages/cli/src/databases/utils/migrations.types.ts @@ -0,0 +1,22 @@ +import type { IDataObject, INodeExecutionData } from 'n8n-workflow'; + +export namespace PinData { + export type Old = { [nodeName: string]: IDataObject[] }; + + export type New = { [nodeName: string]: INodeExecutionData[] }; + + export type FetchedWorkflow = { id: number; pinData: string | object }; +} + +export function isObjectLiteral(maybeObject: unknown): maybeObject is { [key: string]: string } { + return typeof maybeObject === 'object' && maybeObject !== null && !Array.isArray(maybeObject); +} + +export function isJsonKeyObject(item: unknown): item is { + json: unknown; + [otherKeys: string]: unknown; +} { + if (!isObjectLiteral(item)) return false; + + return Object.keys(item).includes('json'); +} diff --git a/packages/cli/src/databases/utils/transformers.ts b/packages/cli/src/databases/utils/transformers.ts index 94f706a91d2d4..af6e47ab9b5b2 100644 --- a/packages/cli/src/databases/utils/transformers.ts +++ b/packages/cli/src/databases/utils/transformers.ts @@ -1,4 +1,5 @@ import { ValueTransformer } from 'typeorm'; +import config from '../../../config'; export const idStringifier = { from: (value: number): string | number => (typeof value === 'number' ? value.toString() : value), @@ -20,11 +21,14 @@ export const objectRetriever: ValueTransformer = { }; /** - * Transformer to store object as string and retrieve string as object. + * Transformer for sqlite JSON columns to mimic JSON-as-object behavior + * from Postgres and MySQL. */ -export const serializer: ValueTransformer = { - to: (value: object | string): string => - typeof value === 'object' ? JSON.stringify(value) : value, +const jsonColumn: ValueTransformer = { + to: (value: object): string | object => + config.getEnv('database.type') === 'sqlite' ? JSON.stringify(value) : value, from: (value: string | object): object => typeof value === 'string' ? (JSON.parse(value) as object) : value, }; + +export const sqlite = { jsonColumn }; diff --git a/packages/cli/test/integration/workflows.api.test.ts b/packages/cli/test/integration/workflows.api.test.ts index 8dbf2863b5bf3..6cb291a9dd0ea 100644 --- a/packages/cli/test/integration/workflows.api.test.ts +++ b/packages/cli/test/integration/workflows.api.test.ts @@ -3,8 +3,9 @@ import express from 'express'; import * as utils from './shared/utils'; import * as testDb from './shared/testDb'; import { WorkflowEntity } from '../../src/databases/entities/WorkflowEntity'; + import type { Role } from '../../src/databases/entities/Role'; -import { IPinData } from 'n8n-workflow'; +import type { IPinData } from 'n8n-workflow'; jest.mock('../../src/telemetry'); @@ -46,7 +47,7 @@ test('POST /workflows should store pin data for node in workflow', async () => { const { pinData } = response.body.data as { pinData: IPinData }; - expect(pinData).toMatchObject({ Spotify: [{ myKey: 'myValue' }] }); + expect(pinData).toMatchObject(MOCK_PINDATA); }); test('POST /workflows should set pin data to null if no pin data', async () => { @@ -80,7 +81,7 @@ test('GET /workflows/:id should return pin data', async () => { const { pinData } = workflowRetrievalResponse.body.data as { pinData: IPinData }; - expect(pinData).toMatchObject({ Spotify: [{ myKey: 'myValue' }] }); + expect(pinData).toMatchObject(MOCK_PINDATA); }); function makeWorkflow({ withPinData }: { withPinData: boolean }) { @@ -101,8 +102,10 @@ function makeWorkflow({ withPinData }: { withPinData: boolean }) { ]; if (withPinData) { - workflow.pinData = { Spotify: [{ myKey: 'myValue' }] }; + workflow.pinData = MOCK_PINDATA; } return workflow; } + +const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] }; diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 0c7b5712afe04..b25275f96fb50 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -933,14 +933,9 @@ export class WorkflowExecute { const { pinData } = this.runExecutionData.resultData; if (pinData && !executionNode.disabled && pinData[executionNode.name] !== undefined) { - let nodePinData = pinData[executionNode.name]; + const nodePinData = pinData[executionNode.name]; - if (!Array.isArray(nodePinData)) nodePinData = [nodePinData]; - - const itemsPerRun = nodePinData.map((item, index) => { - return { json: item, pairedItem: { item: index } }; - }); - nodeSuccessData = [itemsPerRun]; // always zeroth runIndex + nodeSuccessData = [nodePinData]; // always zeroth runIndex } else { Logger.debug(`Running node "${executionNode.name}" started`, { node: executionNode.name, diff --git a/packages/editor-ui/src/components/RunData.vue b/packages/editor-ui/src/components/RunData.vue index eca03c04a5e4c..03e7917916ede 100644 --- a/packages/editor-ui/src/components/RunData.vue +++ b/packages/editor-ui/src/components/RunData.vue @@ -381,6 +381,7 @@ import { CodeEditor } from "@/components/forms"; import { dataPinningEventBus } from '../event-bus/data-pinning-event-bus'; import { stringSizeInBytes } from './helpers'; import RunDataTable from './RunDataTable.vue'; +import { isJsonKeyObject } from '@/utils'; // A path that does not exist so that nothing is selected by default const deselectedPlaceholder = '_!^&*'; @@ -619,13 +620,7 @@ export default mixins( let inputData = this.rawInputData; if (this.node && this.pinData) { - inputData = Array.isArray(this.pinData) - ? this.pinData.map((value) => ({ - json: value, - })) - : [{ - json: this.pinData, - }]; + inputData = this.pinData; } const offset = this.pageSize * (this.currentPage - 1); @@ -722,7 +717,10 @@ export default mixins( localStorage.setItem(LOCAL_STORAGE_PIN_DATA_DISCOVERY_CANVAS_FLAG, 'true'); }, enterEditMode({ origin }: EnterEditModeArgs) { - const inputData = this.pinData ? this.pinData : this.convertToJson(this.rawInputData); + const inputData = this.pinData + ? this.clearJsonKey(this.pinData) + : this.convertToJson(this.rawInputData); + const data = inputData.length > 0 ? inputData : TEST_PIN_DATA; @@ -761,25 +759,18 @@ export default mixins( } this.$store.commit('ui/setOutputPanelEditModeEnabled', false); - this.$store.commit('pinData', { node: this.node, data: this.removeJsonKeys(value) }); + this.$store.commit('pinData', { node: this.node, data: this.clearJsonKey(value) }); this.onDataPinningSuccess({ source: 'save-edit' }); this.onExitEditMode({ type: 'save' }); }, - removeJsonKeys(value: string) { - const parsed = JSON.parse(value); - - return Array.isArray(parsed) - ? parsed.map(item => this.isJsonKeyObject(item) ? item.json : item) - : parsed; - }, - isJsonKeyObject(item: unknown): item is { json: unknown } { - if (!this.isObjectLiteral(item)) return false; + clearJsonKey(userInput: string | object) { + const parsedUserInput = typeof userInput === 'string' ? JSON.parse(userInput) : userInput; - const keys = Object.keys(item); + if (!Array.isArray(parsedUserInput)) return parsedUserInput; - return keys.length === 1 && keys[0] === 'json'; + return parsedUserInput.map(item => isJsonKeyObject(item) ? item.json : item); }, onExitEditMode({ type }: { type: 'save' | 'cancel' }) { this.$telemetry.track('User closed ndv edit state', { @@ -1169,7 +1160,7 @@ export default mixins( let selectedValue = this.selectedOutput.value; if (isNotSelected) { if (this.hasPinData) { - selectedValue = this.pinData as object; + selectedValue = this.clearJsonKey(this.pinData as object); } else { selectedValue = this.convertToJson(this.getNodeInputData(this.node, this.runIndex, this.currentOutputIndex)); } diff --git a/packages/editor-ui/src/components/mixins/nodeHelpers.ts b/packages/editor-ui/src/components/mixins/nodeHelpers.ts index 4309b7582abe7..f92c28ab5d194 100644 --- a/packages/editor-ui/src/components/mixins/nodeHelpers.ts +++ b/packages/editor-ui/src/components/mixins/nodeHelpers.ts @@ -34,6 +34,7 @@ import { get } from 'lodash'; import mixins from 'vue-typed-mixins'; import { mapGetters } from 'vuex'; +import { isObjectLiteral } from '@/utils'; export const nodeHelpers = mixins( restApi, @@ -47,14 +48,10 @@ export const nodeHelpers = mixins( return Object.keys(node.parameters).includes('nodeCredentialType'); }, - isObjectLiteral(maybeObject: unknown): maybeObject is { [key: string]: string } { - return typeof maybeObject === 'object' && maybeObject !== null && !Array.isArray(maybeObject); - }, - isCustomApiCallSelected (nodeValues: INodeParameters): boolean { const { parameters } = nodeValues; - if (!this.isObjectLiteral(parameters)) return false; + if (!isObjectLiteral(parameters)) return false; return ( parameters.resource !== undefined && parameters.resource.includes(CUSTOM_API_CALL_KEY) || diff --git a/packages/editor-ui/src/store.ts b/packages/editor-ui/src/store.ts index 62a1ed9fd102b..440411ccb7d2b 100644 --- a/packages/editor-ui/src/store.ts +++ b/packages/editor-ui/src/store.ts @@ -11,6 +11,7 @@ import { IConnections, IDataObject, INodeConnections, + INodeExecutionData, INodeIssueData, INodeTypeDescription, IPinData, @@ -48,6 +49,7 @@ import {stringSizeInBytes} from "@/components/helpers"; import {dataPinningEventBus} from "@/event-bus/data-pinning-event-bus"; import communityNodes from './modules/communityNodes'; import { isCommunityPackageName } from './components/helpers'; +import { isJsonKeyObject } from './utils'; Vue.use(Vuex); @@ -213,15 +215,21 @@ export const store = new Vuex.Store({ }, // Pin data - pinData(state, payload: { node: INodeUi, data: IPinData[string] }) { + pinData(state, payload: { node: INodeUi, data: INodeExecutionData[] }) { if (!state.workflow.pinData) { Vue.set(state.workflow, 'pinData', {}); } - Vue.set(state.workflow.pinData!, payload.node.name, payload.data); + if (!Array.isArray(payload.data)) { + payload.data = [payload.data]; + } + + const storedPinData = payload.data.map(item => isJsonKeyObject(item) ? item : { json: item }); + + Vue.set(state.workflow.pinData!, payload.node.name, storedPinData); state.stateIsDirty = true; - dataPinningEventBus.$emit('pin-data', { [payload.node.name]: payload.data }); + dataPinningEventBus.$emit('pin-data', { [payload.node.name]: storedPinData }); }, unpinData(state, payload: { node: INodeUi }) { if (!state.workflow.pinData) { diff --git a/packages/editor-ui/src/utils.ts b/packages/editor-ui/src/utils.ts index a3c7487352038..5b8786c8a9936 100644 --- a/packages/editor-ui/src/utils.ts +++ b/packages/editor-ui/src/utils.ts @@ -1 +1,14 @@ export const omit = (keyToOmit: string, { [keyToOmit]: _, ...remainder }) => remainder; + +export function isObjectLiteral(maybeObject: unknown): maybeObject is { [key: string]: string } { + return typeof maybeObject === 'object' && maybeObject !== null && !Array.isArray(maybeObject); +} + +export function isJsonKeyObject(item: unknown): item is { + json: unknown; + [otherKeys: string]: unknown; +} { + if (!isObjectLiteral(item)) return false; + + return Object.keys(item).includes('json'); +} diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index af84d154c5185..24386dcd3b8b1 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -843,7 +843,7 @@ export interface INode { } export interface IPinData { - [nodeName: string]: IDataObject[]; + [nodeName: string]: INodeExecutionData[]; } export interface INodes {