Skip to content

Commit

Permalink
feat(RabbitMQ Trigger Node): Make message acknowledgement and paralle…
Browse files Browse the repository at this point in the history
…l processing configurable (#3385)

* feat(RabbitMQ Trigger Node): Make message acknowledgement and concurrent
processing configurable

* ⚡ Make sure that messages do not get executed multiple times

* 👕 Fix lint issue

* 🐛 Fix issue that for manual executions in "own" mode messages got
know acknowledged

* ⚡ Increment count now that console.log got removed

* ⚡ Improvements

* ⚡ Fix default value

* ⚡ Improve display name
  • Loading branch information
janober authored May 30, 2022
1 parent d7c6833 commit b851289
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 69 deletions.
25 changes: 23 additions & 2 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
IGetExecuteTriggerFunctions,
INode,
INodeExecutionData,
IRun,
IRunExecutionData,
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
NodeHelpers,
Expand Down Expand Up @@ -52,6 +53,9 @@ import config from '../config';
import { User } from './databases/entities/User';
import { whereClause } from './WorkflowHelpers';
import { WorkflowEntity } from './databases/entities/WorkflowEntity';
import * as ActiveExecutions from './ActiveExecutions';

const activeExecutions = ActiveExecutions.getInstance();

const WEBHOOK_PROD_UNREGISTERED_HINT = `The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)`;

Expand Down Expand Up @@ -675,14 +679,31 @@ export class ActiveWorkflowRunner {
returnFunctions.emit = (
data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
donePromise?: IDeferredPromise<IRun | undefined>,
): void => {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
Logger.debug(`Received trigger for workflow "${workflow.name}"`);
WorkflowHelpers.saveStaticData(workflow);
// eslint-disable-next-line id-denylist
this.runWorkflow(workflowData, node, data, additionalData, mode, responsePromise).catch(
(error) => console.error(error),
const executePromise = this.runWorkflow(
workflowData,
node,
data,
additionalData,
mode,
responsePromise,
);

if (donePromise) {
executePromise.then((executionId) => {
activeExecutions
.getPostExecutePromise(executionId)
.then(donePromise.resolve)
.catch(donePromise.reject);
});
} else {
executePromise.catch(console.error);
}
};
returnFunctions.emitError = async (error: Error): Promise<void> => {
await this.activeWorkflows?.remove(workflowData.id.toString());
Expand Down
43 changes: 41 additions & 2 deletions packages/core/src/WorkflowExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ export class WorkflowExecute {

let currentExecutionTry = '';
let lastExecutionTry = '';
let closeFunction: Promise<void> | undefined;

return new PCancelable(async (resolve, reject, onCancel) => {
let gotCancel = false;
Expand Down Expand Up @@ -811,7 +812,7 @@ export class WorkflowExecute {
node: executionNode.name,
workflowId: workflow.id,
});
nodeSuccessData = await workflow.runNode(
const runNodeData = await workflow.runNode(
executionData.node,
executionData.data,
this.runExecutionData,
Expand All @@ -820,6 +821,14 @@ export class WorkflowExecute {
NodeExecuteFunctions,
this.mode,
);
nodeSuccessData = runNodeData.data;

if (runNodeData.closeFunction) {
// Explanation why we do this can be found in n8n-workflow/Workflow.ts -> runNode
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
closeFunction = runNodeData.closeFunction();
}

Logger.debug(`Running node "${executionNode.name}" finished successfully`, {
node: executionNode.name,
workflowId: workflow.id,
Expand Down Expand Up @@ -1033,9 +1042,10 @@ export class WorkflowExecute {
startedAt,
workflow,
new WorkflowOperationError('Workflow has been canceled or timed out!'),
closeFunction,
);
}
return this.processSuccessExecution(startedAt, workflow, executionError);
return this.processSuccessExecution(startedAt, workflow, executionError, closeFunction);
})
.catch(async (error) => {
const fullRunData = this.getFullRunData(startedAt);
Expand All @@ -1061,6 +1071,20 @@ export class WorkflowExecute {
},
);

if (closeFunction) {
try {
await closeFunction;
} catch (errorClose) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/restrict-template-expressions
`There was a problem deactivating trigger of workflow "${workflow.id}": "${errorClose.message}"`,
{
workflowId: workflow.id,
},
);
}
}

return fullRunData;
});

Expand All @@ -1072,6 +1096,7 @@ export class WorkflowExecute {
startedAt: Date,
workflow: Workflow,
executionError?: ExecutionError,
closeFunction?: Promise<void>,
): Promise<IRun> {
const fullRunData = this.getFullRunData(startedAt);

Expand Down Expand Up @@ -1106,6 +1131,20 @@ export class WorkflowExecute {

await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);

if (closeFunction) {
try {
await closeFunction;
} catch (error) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`There was a problem deactivating trigger of workflow "${workflow.id}": "${error.message}"`,
{
workflowId: workflow.id,
},
);
}
}

return fullRunData;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export const rabbitDefaultOptions: Array<INodePropertyOptions | INodeProperties
],
},
{
displayName: 'Auto Delete',
displayName: 'Auto Delete Queue',
name: 'autoDelete',
type: 'boolean',
default: false,
Expand Down
67 changes: 61 additions & 6 deletions packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ import {
ITriggerFunctions,
} from 'n8n-workflow';

const amqplib = require('amqplib');
import * as amqplib from 'amqplib';

export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
declare module 'amqplib' {
interface Channel {
connection: amqplib.Connection;
}
}

export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, options: IDataObject): Promise<amqplib.Channel> {
const credentials = await this.getCredentials('rabbitmq');

const credentialKeys = [
Expand Down Expand Up @@ -44,7 +50,7 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction
reject(error);
});

const channel = await connection.createChannel().catch(console.warn);
const channel = await connection.createChannel().catch(console.warn) as amqplib.Channel;

if (options.arguments && ((options.arguments as IDataObject).argument! as IDataObject[]).length) {
const additionalArguments: IDataObject = {};
Expand All @@ -54,15 +60,14 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction
options.arguments = additionalArguments;
}


resolve(channel);
} catch (error) {
reject(error);
}
});
}

export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);

return new Promise(async (resolve, reject) => {
Expand All @@ -75,7 +80,7 @@ export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFun
});
}

export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);

return new Promise(async (resolve, reject) => {
Expand All @@ -87,3 +92,53 @@ export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITrigger
}
});
}

export class MessageTracker {
messages: number[] = [];
isClosing = false;

received(message: amqplib.ConsumeMessage) {
this.messages.push(message.fields.deliveryTag);
}

answered(message: amqplib.ConsumeMessage) {
if (this.messages.length === 0) {
return;
}

const index = this.messages.findIndex(value => value !== message.fields.deliveryTag);
this.messages.splice(index);
}

unansweredMessages() {
return this.messages.length;
}

async closeChannel(channel: amqplib.Channel, consumerTag: string) {
if (this.isClosing) {
return;
}
this.isClosing = true;

// Do not accept any new messages
await channel.cancel(consumerTag);

let count = 0;
let unansweredMessages = this.unansweredMessages();

// Give currently executing messages max. 5 minutes to finish before
// the channel gets closed. If we would not do that, it would not be possible
// to acknowledge messages anymore for which the executions were already running
// when for example a new version of the workflow got saved. That would lead to
// them getting delivered and processed again.
while (unansweredMessages !== 0 && count++ <= 300) {
await new Promise((resolve) => {
setTimeout(resolve, 1000);
});
unansweredMessages = this.unansweredMessages();
}

await channel.close();
await channel.connection.close();
}
}
2 changes: 1 addition & 1 deletion packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ export class RabbitMQ implements INodeType {
],
},
{
displayName: 'Auto Delete',
displayName: 'Auto Delete Queue',
name: 'autoDelete',
type: 'boolean',
default: false,
Expand Down
Loading

0 comments on commit b851289

Please sign in to comment.