Skip to content

Commit

Permalink
feat(Redis Node): Add Redis Trigger node and publish operation to reg…
Browse files Browse the repository at this point in the history
…ular node

* add database number select to redis credentials

* add publish to channel to redis node

* add redis trigger

* ⚡ small fixes

* ⚡ small fixes for trigger node

* fix(Strapi Node): Add support for Strapi v4

* 🐛 Fix get all operation for v4

* 🔨 Fix create operation

* 🔨 Fix update operation

* 🔨 Fix delete operation

* 🔨 Fix get operation

* 🔨 Fix Return All

* 👕 Fix nodelinter issues

* ⚡ Add Credential Test

* 🔨 Code improvement

* 👕 Fix lint issue

* Removed extra /api from Get All on v4

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
Co-authored-by: Jonathan Bennetts <jonathan.bennetts@gmail.com>

* refactor(editor): Replace 'Workflows' help menu item with 'Course'

* N8N-3110 Replace Workflows help menu item with course

* N8N-3110 Re-order props in en.json

* N8N-3110 Update URL Link for courses

* 🐛 Fix issue with messages being sent twice

* ⚡ Remove not needed return

Co-authored-by: Michael Kret <michael.k@radency.com>
Co-authored-by: Harshil Agrawal <ghagrawal17@gmail.com>
Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
Co-authored-by: Jonathan Bennetts <jonathan.bennetts@gmail.com>
Co-authored-by: Oliver Trajceski <olivertrajceski@yahoo.com>
  • Loading branch information
6 people authored Mar 12, 2022
1 parent 6065f68 commit 5c2deb4
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 2 deletions.
2 changes: 1 addition & 1 deletion packages/nodes-base/credentials/Redis.credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class Redis implements ICredentialType {
default: 6379,
},
{
displayName: 'Database',
displayName: 'Database Number',
name: 'database',
type: 'number',
default: 0,
Expand Down
50 changes: 49 additions & 1 deletion packages/nodes-base/nodes/Redis/Redis.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ export class Redis implements INodeType {
value: 'set',
description: 'Set the value of a key in redis.',
},
{
name: 'Publish',
value: 'publish',
description: 'Publish message to redis channel.',
},
],
default: 'info',
description: 'The operation to perform.',
Expand Down Expand Up @@ -370,6 +375,42 @@ export class Redis implements INodeType {
default: 60,
description: 'Number of seconds before key expiration.',
},
// ----------------------------------
// publish
// ----------------------------------
{
displayName: 'Channel',
name: 'channel',
type: 'string',
displayOptions: {
show: {
operation: [
'publish',
],
},
},
default: '',
required: true,
description: 'Channel name.',
},
{
displayName: 'Data',
name: 'messageData',
type: 'string',
displayOptions: {
show: {
operation: [
'publish',
],
},
},
typeOptions: {
alwaysOpenEditWindow: true,
},
default: '',
required: true,
description: 'Data to publish.',
},
],
};

Expand Down Expand Up @@ -491,6 +532,7 @@ export class Redis implements INodeType {
const redisOptions: redis.ClientOpts = {
host: credentials.host as string,
port: credentials.port as number,
db: credentials.database as number,
};

if (credentials.password) {
Expand All @@ -516,7 +558,7 @@ export class Redis implements INodeType {
resolve(this.prepareOutputData([{ json: convertInfoToObject(result as unknown as string) }]));
client.quit();

} else if (['delete', 'get', 'keys', 'set', 'incr'].includes(operation)) {
} else if (['delete', 'get', 'keys', 'set', 'incr', 'publish'].includes(operation)) {
const items = this.getInputData();
const returnItems: INodeExecutionData[] = [];

Expand Down Expand Up @@ -587,6 +629,12 @@ export class Redis implements INodeType {
await clientExpire(keyIncr, ttl);
}
returnItems.push({json: {[keyIncr]: incrementVal}});
} else if (operation === 'publish'){
const channel = this.getNodeParameter('channel', itemIndex) as string;
const messageData = this.getNodeParameter('messageData', itemIndex) as string;
const clientPublish = util.promisify(client.publish).bind(client);
await clientPublish(channel, messageData);
returnItems.push(items[itemIndex]);
}
}

Expand Down
21 changes: 21 additions & 0 deletions packages/nodes-base/nodes/Redis/RedisTrigger.node.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"node": "n8n-nodes-base.redisTrigger",
"nodeVersion": "1.0",
"codexVersion": "1.0",
"categories": [
"Communication",
"Development"
],
"resources": {
"credentialDocumentation": [
{
"url": "https://docs.n8n.io/credentials/redis"
}
],
"primaryDocumentation": [
{
"url": "https://docs.n8n.io/nodes/n8n-nodes-base.redisTrigger/"
}
]
}
}
143 changes: 143 additions & 0 deletions packages/nodes-base/nodes/Redis/RedisTrigger.node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import {
ITriggerFunctions,
} from 'n8n-core';

import {
IDataObject,
INodeType,
INodeTypeDescription,
ITriggerResponse,
NodeOperationError,
} from 'n8n-workflow';

import * as redis from 'redis';

export class RedisTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'Redis Trigger',
name: 'redisTrigger',
icon: 'file:redis.svg',
group: ['trigger'],
version: 1,
description: 'Subscribe to redis channel',
defaults: {
name: 'Redis Trigger',
},
inputs: [],
outputs: ['main'],
credentials: [
{
name: 'redis',
required: true,
},
],
properties: [
{
displayName: 'Channels',
name: 'channels',
type: 'string',
default: '',
required: true,
description: `Channels to subscribe to, multiple channels be defined with comma. Wildcard character(*) is supported`,
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
type: 'boolean',
default: false,
description: 'Try to parse the message to an object',
},
{
displayName: 'Only Message',
name: 'onlyMessage',
type: 'boolean',
default: false,
description: 'Returns only the message property',
},
],
},
],
};

async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {

const credentials = await this.getCredentials('redis');

if (credentials === undefined) {
throw new NodeOperationError(this.getNode(), 'No credentials got returned!');
}

const redisOptions: redis.ClientOpts = {
host: credentials.host as string,
port: credentials.port as number,
db: credentials.database as number,
};

if (credentials.password) {
redisOptions.password = credentials.password as string;
}


const channels = (this.getNodeParameter('channels') as string).split(',');

const options = this.getNodeParameter('options') as IDataObject;

if (!channels) {
throw new NodeOperationError(this.getNode(), 'Channels are mandatory!');
}

const client = redis.createClient(redisOptions);

const self = this;

async function manualTriggerFunction() {
await new Promise((resolve, reject) => {
client.on('connect', () => {
for (const channel of channels) {
client.psubscribe(channel);
}
client.on('pmessage', (pattern: string, channel: string, message: string) => {
if (options.jsonParseBody) {
try {
message = JSON.parse(message);
} catch (error) { }
}

if (options.onlyMessage) {
self.emit([self.helpers.returnJsonArray({message})]);
resolve(true);
return;
}

self.emit([self.helpers.returnJsonArray({channel, message})]);
resolve(true);
});
});

client.on('error', (error) => {
reject(error);
});
});
}

if (this.getMode() === 'trigger') {
manualTriggerFunction();
}

async function closeFunction() {
client.quit();
}

return {
closeFunction,
manualTriggerFunction,
};
}
}
1 change: 1 addition & 0 deletions packages/nodes-base/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@
"dist/nodes/ReadPdf/ReadPdf.node.js",
"dist/nodes/Reddit/Reddit.node.js",
"dist/nodes/Redis/Redis.node.js",
"dist/nodes/Redis/RedisTrigger.node.js",
"dist/nodes/RenameKeys/RenameKeys.node.js",
"dist/nodes/RespondToWebhook/RespondToWebhook.node.js",
"dist/nodes/Rocketchat/Rocketchat.node.js",
Expand Down

0 comments on commit 5c2deb4

Please sign in to comment.