Skip to content

Commit

Permalink
Dedupe mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ShireenMissi committed Sep 2, 2024
1 parent 3f7d429 commit b1d9b74
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 280 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/commands/base-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { generateHostInstanceId } from '@/databases/utils/generators';
import { WorkflowHistoryManager } from '@/workflows/workflow-history/workflow-history-manager.ee';
import { ShutdownService } from '@/shutdown/shutdown.service';
import { TelemetryEventRelay } from '@/events/telemetry-event-relay';
import { getProcessedDataManagers } from '@/ProcessedDataManagers';
import { getProcessedDataManagers } from '@/processed-data-managers';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';

export abstract class BaseCommand extends Command {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/databases/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { ExecutionData } from './execution-data';
import { WorkflowHistory } from './workflow-history';
import { Project } from './project';
import { ProjectRelation } from './project-relation';
import { ProcessedData } from './ProcessedData';
import { ProcessedData } from './processed-data';
import { InvalidAuthToken } from './invalid-auth-token';

export const entities = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Column, Entity, Index, PrimaryColumn } from '@n8n/typeorm';

import type { IProcessedDataEntries, IProcessedDataLatest } from '@/Interfaces';
import { jsonColumnType, WithTimestamps } from './AbstractEntity';
import type { IProcessedDataEntries, IProcessedDataLatest } from '@/interfaces';
import { jsonColumnType, WithTimestamps } from './abstract-entity';
import { objectRetriever } from '../utils/transformers';

@Entity()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
import type { MigrationContext, ReversibleMigration } from '@/databases/types';

export class CreateProcessedDataTable1721319360300 implements ReversibleMigration {
async up({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
import type { MigrationContext, ReversibleMigration } from '@/databases/types';

export class CreateProcessedDataTable1721319360300 implements ReversibleMigration {
async up({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
import type { MigrationContext, ReversibleMigration } from '@/databases/types';

export class CreateProcessedDataTable1721319360300 implements ReversibleMigration {
async up({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Service } from 'typedi';
import { DataSource, Repository } from '@n8n/typeorm';
import { ProcessedData } from '../entities/ProcessedData';
import { ProcessedData } from '../entities/processed-data';

@Service()
export class ProcessedDataRepository extends Repository<ProcessedData> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
type IProcessedDataManagers,
} from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle
import { ProcessedDataManagerNativeDatabase } from './NativeDatabase';
import { ProcessedDataManagerNativeDatabase } from './native-database';

const activeInstances: {
[key: string]: IProcessedDataManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import type {
IExternalHooksFileData,
IProcessedDataEntries,
IProcessedDataLatest,
} from '@/Interfaces';
import type { DatabaseType } from '@db/types';
} from '@/interfaces';
import type { DatabaseType } from '@/databases/types';
import { ExternalHooks } from '@/external-hooks';
import { Container } from 'typedi';
import { ProcessedDataRepository } from '@/databases/repositories/processedData.repository';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { NodeTypes } from '@/NodeTypes';
import * as testDb from '../shared/testDb';
import { mockInstance } from '@test/mocking';
import { mockNodeTypesData } from '../../unit/Helpers';
import { getProcessedDataManagers } from '../../../src/ProcessedDataManagers';
import { getProcessedDataManagers } from '../../../src/processed-data-managers';

let workflow: Workflow;

Expand Down
6 changes: 3 additions & 3 deletions packages/nodes-base/nodes/Dedupe/Dedupe.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type {
INodeTypeDescription,
ProcessedDataContext,
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';

export class Dedupe implements INodeType {
description: INodeTypeDescription = {
Expand All @@ -21,8 +21,8 @@ export class Dedupe implements INodeType {
name: 'Dedupe',
color: '#0000FF',
},
inputs: ['main'],
outputs: ['main'],
inputs: [NodeConnectionType.Main],
outputs: [NodeConnectionType.Main],
properties: [
{
displayName: 'Mode',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,237 +21,5 @@ export class RemoveDuplicates extends VersionedNodeType {
};

super(nodeVersions, baseDescription);
import get from 'lodash/get';
import isEqual from 'lodash/isEqual';
import lt from 'lodash/lt';
import pick from 'lodash/pick';
import {
NodeOperationError,
NodeConnectionType,
type IExecuteFunctions,
type INodeExecutionData,
type INodeType,
type INodeTypeDescription,
} from 'n8n-workflow';
import { prepareFieldsArray } from '../utils/utils';
import { validateInputData } from './utils';
import { compareItems, flattenKeys } from '@utils/utilities';

export class RemoveDuplicates implements INodeType {
description: INodeTypeDescription = {
displayName: 'Remove Duplicates',
name: 'removeDuplicates',
icon: 'file:removeDuplicates.svg',
group: ['transform'],
subtitle: '',
version: [1, 1.1],
description: 'Delete items with matching field values',
defaults: {
name: 'Remove Duplicates',
},
inputs: [NodeConnectionType.Main],
outputs: [NodeConnectionType.Main],
properties: [
{
displayName: 'Compare',
name: 'compare',
type: 'options',
options: [
{
name: 'All Fields',
value: 'allFields',
},
{
name: 'All Fields Except',
value: 'allFieldsExcept',
},
{
name: 'Selected Fields',
value: 'selectedFields',
},
],
default: 'allFields',
description: 'The fields of the input items to compare to see if they are the same',
},
{
displayName: 'Fields To Exclude',
name: 'fieldsToExclude',
type: 'string',
placeholder: 'e.g. email, name',
requiresDataPath: 'multiple',
description: 'Fields in the input to exclude from the comparison',
default: '',
displayOptions: {
show: {
compare: ['allFieldsExcept'],
},
},
},
{
displayName: 'Fields To Compare',
name: 'fieldsToCompare',
type: 'string',
placeholder: 'e.g. email, name',
requiresDataPath: 'multiple',
description: 'Fields in the input to add to the comparison',
default: '',
displayOptions: {
show: {
compare: ['selectedFields'],
},
},
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Field',
default: {},
displayOptions: {
show: {
compare: ['allFieldsExcept', 'selectedFields'],
},
},
options: [
{
displayName: 'Disable Dot Notation',
name: 'disableDotNotation',
type: 'boolean',
default: false,
description:
'Whether to disallow referencing child fields using `parent.child` in the field name',
},
{
displayName: 'Remove Other Fields',
name: 'removeOtherFields',
type: 'boolean',
default: false,
description:
'Whether to remove any fields that are not being compared. If disabled, will keep the values from the first of the duplicates.',
},
],
},
],
};

async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const compare = this.getNodeParameter('compare', 0) as string;
const disableDotNotation = this.getNodeParameter(
'options.disableDotNotation',
0,
false,
) as boolean;
const removeOtherFields = this.getNodeParameter(
'options.removeOtherFields',
0,
false,
) as boolean;

let keys = disableDotNotation
? Object.keys(items[0].json)
: Object.keys(flattenKeys(items[0].json));

for (const item of items) {
for (const key of disableDotNotation
? Object.keys(item.json)
: Object.keys(flattenKeys(item.json))) {
if (!keys.includes(key)) {
keys.push(key);
}
}
}

if (compare === 'allFieldsExcept') {
const fieldsToExclude = prepareFieldsArray(
this.getNodeParameter('fieldsToExclude', 0, '') as string,
'Fields To Exclude',
);

if (!fieldsToExclude.length) {
throw new NodeOperationError(
this.getNode(),
'No fields specified. Please add a field to exclude from comparison',
);
}
if (!disableDotNotation) {
keys = Object.keys(flattenKeys(items[0].json));
}
keys = keys.filter((key) => !fieldsToExclude.includes(key));
}
if (compare === 'selectedFields') {
const fieldsToCompare = prepareFieldsArray(
this.getNodeParameter('fieldsToCompare', 0, '') as string,
'Fields To Compare',
);
if (!fieldsToCompare.length) {
throw new NodeOperationError(
this.getNode(),
'No fields specified. Please add a field to compare on',
);
}
if (!disableDotNotation) {
keys = Object.keys(flattenKeys(items[0].json));
}
keys = fieldsToCompare.map((key) => key.trim());
}

// This solution is O(nlogn)
// add original index to the items
const newItems = items.map(
(item, index) =>
({
json: { ...item.json, __INDEX: index },
pairedItem: { item: index },
}) as INodeExecutionData,
);
//sort items using the compare keys
newItems.sort((a, b) => {
let result = 0;

for (const key of keys) {
let equal;
if (!disableDotNotation) {
equal = isEqual(get(a.json, key), get(b.json, key));
} else {
equal = isEqual(a.json[key], b.json[key]);
}
if (!equal) {
let lessThan;
if (!disableDotNotation) {
lessThan = lt(get(a.json, key), get(b.json, key));
} else {
lessThan = lt(a.json[key], b.json[key]);
}
result = lessThan ? -1 : 1;
break;
}
}
return result;
});

validateInputData(this.getNode(), newItems, keys, disableDotNotation);

// collect the original indexes of items to be removed
const removedIndexes: number[] = [];
let temp = newItems[0];
for (let index = 1; index < newItems.length; index++) {
if (compareItems(newItems[index], temp, keys, disableDotNotation)) {
removedIndexes.push(newItems[index].json.__INDEX as unknown as number);
} else {
temp = newItems[index];
}
}

let returnData = items.filter((_, index) => !removedIndexes.includes(index));

if (removeOtherFields) {
returnData = returnData.map((item, index) => ({
json: pick(item.json, ...keys),
pairedItem: { item: index },
}));
}

return [returnData];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import get from 'lodash/get';
import isEqual from 'lodash/isEqual';
import lt from 'lodash/lt';
import pick from 'lodash/pick';
import { NodeOperationError } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type {
INodeTypeBaseDescription,
IExecuteFunctions,
Expand All @@ -25,8 +25,8 @@ const versionDescription: INodeTypeDescription = {
defaults: {
name: 'Remove Duplicates',
},
inputs: ['main'],
outputs: ['main'],
inputs: [NodeConnectionType.Main],
outputs: [NodeConnectionType.Main],
properties: [
{
displayName: 'Compare',
Expand Down
Loading

0 comments on commit b1d9b74

Please sign in to comment.