Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pairedItem for pinned data #3843

Merged
merged 24 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/cli/src/databases/entities/WorkflowEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import * as config from '../../../config';
import { DatabaseType, IWorkflowDb } from '../..';
import { TagEntity } from './TagEntity';
import { SharedWorkflow } from './SharedWorkflow';
import { objectRetriever, serializer } from '../utils/transformers';
import { objectRetriever, sqlite } from '../utils/transformers';

function resolveDataType(dataType: string) {
const dbType = config.getEnv('database.type');
Expand Down Expand Up @@ -120,7 +120,7 @@ export class WorkflowEntity implements IWorkflowDb {
@Column({
type: config.getEnv('database.type') === 'sqlite' ? 'text' : 'json',
nullable: true,
transformer: serializer,
transformer: sqlite.jsonColumn,
})
pinData: IPinData;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';

// replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }`
Expand All @@ -22,7 +22,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
`;

// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
let credentialsUpdated = false;
Expand Down Expand Up @@ -65,7 +65,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
WHERE waitTill IS NOT NULL AND finished = 0
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
Expand Down Expand Up @@ -158,7 +158,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
FROM ${tablePrefix}workflow_entity
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
let credentialsUpdated = false;
Expand Down Expand Up @@ -206,7 +206,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
WHERE waitTill IS NOT NULL AND finished = 0
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';
import { v4 as uuid } from 'uuid';

// add node ids in workflow objects
Expand All @@ -17,7 +17,7 @@ export class AddNodeIds1658932910559 implements MigrationInterface {
`;

// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
let nodes = workflow.nodes;
if (typeof nodes === 'string') {
Expand All @@ -31,16 +31,15 @@ export class AddNodeIds1658932910559 implements MigrationInterface {
}
});

const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);

queryRunner.query(updateQuery, updateParams);
});
Expand All @@ -56,22 +55,21 @@ export class AddNodeIds1658932910559 implements MigrationInterface {
`;

// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
// @ts-ignore
nodes.forEach((node) => delete node.id );
nodes.forEach((node) => delete node.id);

const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);

queryRunner.query(updateQuery, updateParams);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {
logMigrationStart,
logMigrationEnd,
runInBatches,
getTablePrefix,
} from '../../utils/migrationHelpers';
import { addJsonKeyToPinDataColumn } from '../sqlite/1659888469333-AddJsonKeyPinData';
import type { MigrationInterface, QueryRunner } from 'typeorm';

/**
* Convert JSON-type `pinData` column in `workflow_entity` table from
* `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }`
*/
export class AddJsonKeyPinData1659895550980 implements MigrationInterface {
name = 'AddJsonKeyPinData1659895550980';

async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);

const workflowTable = `${getTablePrefix()}workflow_entity`;

const PINDATA_SELECT_QUERY = `
SELECT id, pinData
FROM \`${workflowTable}\`
WHERE pinData IS NOT NULL;
`;

const PINDATA_UPDATE_STATEMENT = `
UPDATE \`${workflowTable}\`
SET \`pinData\` = :pinData
WHERE id = :id;
`;

await runInBatches(
queryRunner,
PINDATA_SELECT_QUERY,
addJsonKeyToPinDataColumn(queryRunner, PINDATA_UPDATE_STATEMENT),
);

logMigrationEnd(this.name);
}

async down() {
// irreversible migration
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/mysqldb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { CommunityNodes1652254514003 } from './1652254514003-CommunityNodes';
import { AddAPIKeyColumn1652905585850 } from './1652905585850-AddAPIKeyColumn';
import { IntroducePinData1654090101303 } from './1654090101303-IntroducePinData';
import { AddNodeIds1658932910559 } from './1658932910559-AddNodeIds';
import { AddJsonKeyPinData1659895550980 } from './1659895550980-AddJsonKeyPinData';

export const mysqlMigrations = [
InitialMigration1588157391238,
Expand All @@ -40,4 +41,5 @@ export const mysqlMigrations = [
AddAPIKeyColumn1652905585850,
IntroducePinData1654090101303,
AddNodeIds1658932910559,
AddJsonKeyPinData1659895550980,
];
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';

// replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }`
Expand All @@ -17,7 +17,6 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac

await queryRunner.query(`SET search_path TO ${schema};`);


const credentialsEntities = await queryRunner.query(`
SELECT id, name, type
FROM ${tablePrefix}credentials_entity
Expand All @@ -29,7 +28,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
`;

// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
let credentialsUpdated = false;
Expand Down Expand Up @@ -72,7 +71,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
WHERE "waitTill" IS NOT NULL AND finished = FALSE
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
Expand Down Expand Up @@ -172,7 +171,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
FROM ${tablePrefix}workflow_entity
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
let credentialsUpdated = false;
Expand Down Expand Up @@ -221,7 +220,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
WHERE "waitTill" IS NOT NULL AND finished = FALSE
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';
import { v4 as uuid } from 'uuid';

// add node ids in workflow objects
Expand All @@ -23,7 +23,7 @@ export class AddNodeIds1658932090381 implements MigrationInterface {
`;

// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
// @ts-ignore
Expand All @@ -33,16 +33,15 @@ export class AddNodeIds1658932090381 implements MigrationInterface {
}
});

const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);

queryRunner.query(updateQuery, updateParams);
});
Expand All @@ -64,22 +63,21 @@ export class AddNodeIds1658932090381 implements MigrationInterface {
`;

// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
// @ts-ignore
nodes.forEach((node) => delete node.id );
nodes.forEach((node) => delete node.id);

const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);

queryRunner.query(updateQuery, updateParams);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {
getTablePrefix,
logMigrationEnd,
logMigrationStart,
runInBatches,
} from '../../utils/migrationHelpers';
import { addJsonKeyToPinDataColumn } from '../sqlite/1659888469333-AddJsonKeyPinData';
import type { MigrationInterface, QueryRunner } from 'typeorm';

/**
* Convert JSON-type `pinData` column in `workflow_entity` table from
* `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }`
*/
export class AddJsonKeyPinData1659902242948 implements MigrationInterface {
name = 'AddJsonKeyPinData1659902242948';

async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);

const workflowTable = `${getTablePrefix()}workflow_entity`;

const PINDATA_SELECT_QUERY = `
SELECT id, "pinData"
FROM ${workflowTable}
WHERE "pinData" IS NOT NULL;
`;

const PINDATA_UPDATE_STATEMENT = `
UPDATE ${workflowTable}
SET "pinData" = :pinData
WHERE id = :id;
`;

await runInBatches(
queryRunner,
PINDATA_SELECT_QUERY,
addJsonKeyToPinDataColumn(queryRunner, PINDATA_UPDATE_STATEMENT),
);

logMigrationEnd(this.name);
}

async down() {
// irreversible migration
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/postgresdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { CommunityNodes1652254514002 } from './1652254514002-CommunityNodes';
import { AddAPIKeyColumn1652905585850 } from './1652905585850-AddAPIKeyColumn';
import { IntroducePinData1654090467022 } from './1654090467022-IntroducePinData';
import { AddNodeIds1658932090381 } from './1658932090381-AddNodeIds';
import { AddJsonKeyPinData1659902242948 } from './1659902242948-AddJsonKeyPinData';

export const postgresMigrations = [
InitialMigration1587669153312,
Expand All @@ -36,4 +37,5 @@ export const postgresMigrations = [
AddAPIKeyColumn1652905585850,
IntroducePinData1654090467022,
AddNodeIds1658932090381,
AddJsonKeyPinData1659902242948,
];
Loading