Skip to content

Commit bc403e2

Browse files
committed
Add ssl changes
1 parent 2499542 commit bc403e2

6 files changed

+482
-35
lines changed

credentials/SSLKafkaApi.credentials.ts

+64-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import type { ICredentialType, INodeProperties } from 'n8n-workflow';
22

3-
export class Kafka implements ICredentialType {
4-
name = 'kafka';
3+
export class SslKafkaApi implements ICredentialType {
4+
name = 'sslKafkaApi';
55

6-
displayName = 'Kafka';
6+
displayName = 'SSL Kafka';
77

8-
documentationUrl = 'kafka';
8+
documentationUrl = 'https://github.com/unglobalcompact/n8n-ssl-kafka-node';
99

1010
properties: INodeProperties[] = [
1111
{
@@ -29,6 +29,66 @@ export class Kafka implements ICredentialType {
2929
type: 'boolean',
3030
default: true,
3131
},
32+
{
33+
displayName: 'SSL CA',
34+
name: 'sslCa',
35+
type: 'string',
36+
typeOptions: {
37+
password: true,
38+
},
39+
displayOptions: {
40+
show: {
41+
ssl: [true],
42+
},
43+
},
44+
default: '',
45+
description: 'Cert chains in PEM format. One cert chain should be provided per private key. Each cert chain should consist of the PEM formatted certificate for a provided private key, followed by the PEM formatted intermediate certificates (if any), in order, and not including the root CA (the root CA must be pre-known to the peer, see ca). When providing multiple cert chains, they do not have to be in the same order as their private keys in key. If the intermediate certificates are not provided, the peer will not be able to validate the certificate, and the handshake will fail.',
46+
},
47+
{
48+
displayName: 'SSL Cert',
49+
name: 'sslCert',
50+
type: 'string',
51+
typeOptions: {
52+
password: true,
53+
},
54+
displayOptions: {
55+
show: {
56+
ssl: [true],
57+
},
58+
},
59+
default: '',
60+
description: 'Cert chains in PEM format. One cert chain should be provided per private key. Each cert chain should consist of the PEM formatted certificate for a provided private key, followed by the PEM formatted intermediate certificates (if any), in order, and not including the root CA (the root CA must be pre-known to the peer, see ca). When providing multiple cert chains, they do not have to be in the same order as their private keys in key. If the intermediate certificates are not provided, the peer will not be able to validate the certificate, and the handshake will fail.',
61+
},
62+
{
63+
displayName: 'SSL Key',
64+
name: 'sslKey',
65+
type: 'string',
66+
typeOptions: {
67+
password: true,
68+
},
69+
displayOptions: {
70+
show: {
71+
ssl: [true],
72+
},
73+
},
74+
default: '',
75+
description: 'Private keys in PEM format. PEM allows the option of private keys being encrypted. Encrypted keys will be decrypted with options.passphrase. Multiple keys using different algorithms can be provided either as an array of unencrypted key strings or buffers, or an array of objects in the form {pem: <string|buffer>[, passphrase: ]}. The object form can only occur in an array. object.passphrase is optional. Encrypted keys will be decrypted with object.passphrase if provided, or options.passphrase if it is not.',
76+
},
77+
{
78+
displayName: 'SSL Key Passphrase',
79+
name: 'sslPassphrase',
80+
type: 'string',
81+
typeOptions: {
82+
password: true,
83+
},
84+
displayOptions: {
85+
show: {
86+
ssl: [true],
87+
},
88+
},
89+
default: '',
90+
description: 'Shared passphrase used for a single private key and/or a PFX.',
91+
},
3292
{
3393
displayName: 'Authentication',
3494
name: 'authentication',

nodes/Kafka/SslKafka.node.ts

+45-17
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import type {
1515
} from 'n8n-workflow';
1616
import { ApplicationError, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
1717

18-
import { generatePairedItemData } from '../../utils/utilities';
19-
2018
export class SslKafka implements INodeType {
2119
description: INodeTypeDescription = {
2220
displayName: 'SSL Kafka',
@@ -26,13 +24,13 @@ export class SslKafka implements INodeType {
2624
version: 1,
2725
description: 'Sends messages to a Kafka topic',
2826
defaults: {
29-
name: 'Kafka',
27+
name: 'SSL Kafka',
3028
},
3129
inputs: [NodeConnectionType.Main],
3230
outputs: [NodeConnectionType.Main],
3331
credentials: [
3432
{
35-
name: 'kafka',
33+
name: 'sslKafkaApi',
3634
required: true,
3735
testedBy: 'kafkaConnectionTest',
3836
},
@@ -222,16 +220,33 @@ export class SslKafka implements INodeType {
222220

223221
const ssl = credentials.ssl as boolean;
224222

223+
let useSslConnectionOptions = false as boolean;
224+
let sslConnectionOptions: ConnectionOptions = {}
225+
226+
if (ssl === true && (credentials.sslCa !== '' || credentials.sslCert !== '' || credentials.sslKey !== '')) {
227+
useSslConnectionOptions = true;
228+
if (credentials.sslCa !== '') {
229+
sslConnectionOptions.ca = [credentials.sslCa] as string[];
230+
}
231+
if (credentials.sslCert !== '') {
232+
sslConnectionOptions.cert = credentials.sslCert as string;
233+
}
234+
if (credentials.sslKey !== '') {
235+
sslConnectionOptions.key = credentials.sslKey as string;
236+
}
237+
if (credentials.sslPassphrase !== '') {
238+
sslConnectionOptions.passphrase = credentials.sslPassphrase as string;
239+
}
240+
};
241+
225242
const config: KafkaConfig = {
226243
clientId,
227244
brokers,
228-
ssl,
245+
ssl: useSslConnectionOptions ? sslConnectionOptions : ssl,
229246
};
230247
if (credentials.authentication === true) {
231248
if (!(credentials.username && credentials.password)) {
232-
throw new ApplicationError('Username and password are required for authentication', {
233-
level: 'warning',
234-
});
249+
throw new ApplicationError('Username and password are required for authentication');
235250
}
236251
config.sasl = {
237252
username: credentials.username as string,
@@ -260,7 +275,6 @@ export class SslKafka implements INodeType {
260275

261276
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
262277
const items = this.getInputData();
263-
const itemData = generatePairedItemData(items.length);
264278

265279
const length = items.length;
266280

@@ -292,10 +306,29 @@ export class SslKafka implements INodeType {
292306

293307
const ssl = credentials.ssl as boolean;
294308

309+
let useSslConnectionOptions = false as boolean;
310+
let sslConnectionOptions: ConnectionOptions = {}
311+
312+
if (ssl === true && (credentials.sslCa !== '' || credentials.sslCert !== '' || credentials.sslKey !== '')) {
313+
useSslConnectionOptions = true;
314+
if (credentials.sslCa !== '') {
315+
sslConnectionOptions.ca = [credentials.sslCa] as string[];
316+
}
317+
if (credentials.sslCert !== '') {
318+
sslConnectionOptions.cert = credentials.sslCert as string;
319+
}
320+
if (credentials.sslKey !== '') {
321+
sslConnectionOptions.key = credentials.sslKey as string;
322+
}
323+
if (credentials.sslPassphrase !== '') {
324+
sslConnectionOptions.passphrase = credentials.sslPassphrase as string;
325+
}
326+
};
327+
295328
const config: KafkaConfig = {
296329
clientId,
297330
brokers,
298-
ssl,
331+
ssl: useSslConnectionOptions ? sslConnectionOptions : ssl,
299332
};
300333

301334
if (credentials.authentication === true) {
@@ -400,15 +433,10 @@ export class SslKafka implements INodeType {
400433

401434
await producer.disconnect();
402435

403-
const executionData = this.helpers.constructExecutionMetaData(
404-
this.helpers.returnJsonArray(responseData),
405-
{ itemData },
406-
);
407-
408-
return [executionData];
436+
return [this.helpers.returnJsonArray(responseData)];
409437
} catch (error) {
410438
if (this.continueOnFail()) {
411-
return [[{ json: { error: error.message }, pairedItem: itemData }]];
439+
return [this.helpers.returnJsonArray({ error: error.message })];
412440
} else {
413441
throw error;
414442
}

nodes/Kafka/SslKafkaTrigger.node.ts

+26-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
22
import type { KafkaConfig, SASLOptions } from 'kafkajs';
33
import { Kafka as apacheKafka, logLevel } from 'kafkajs';
4+
import { ConnectionOptions } from 'tls';
45
import type {
56
ITriggerFunctions,
67
IDataObject,
@@ -11,10 +12,10 @@ import type {
1112
} from 'n8n-workflow';
1213
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
1314

14-
export class KafkaTrigger implements INodeType {
15+
export class SslKafkaTrigger implements INodeType {
1516
description: INodeTypeDescription = {
16-
displayName: 'Kafka Trigger',
17-
name: 'kafkaTrigger',
17+
displayName: 'SSL Kafka Trigger',
18+
name: 'sslKafkaTrigger',
1819
icon: { light: 'file:kafka.svg', dark: 'file:kafka.dark.svg' },
1920
group: ['trigger'],
2021
version: [1, 1.1],
@@ -26,7 +27,7 @@ export class KafkaTrigger implements INodeType {
2627
outputs: [NodeConnectionType.Main],
2728
credentials: [
2829
{
29-
name: 'kafka',
30+
name: 'sslKafkaApi',
3031
required: true,
3132
},
3233
],
@@ -181,7 +182,7 @@ export class KafkaTrigger implements INodeType {
181182

182183
const groupId = this.getNodeParameter('groupId') as string;
183184

184-
const credentials = await this.getCredentials('kafka');
185+
const credentials = await this.getCredentials('sslKafkaApi');
185186

186187
const brokers = ((credentials.brokers as string) || '').split(',').map((item) => item.trim());
187188

@@ -193,10 +194,29 @@ export class KafkaTrigger implements INodeType {
193194

194195
options.nodeVersion = this.getNode().typeVersion;
195196

197+
let useSslConnectionOptions = false as boolean;
198+
let sslConnectionOptions: ConnectionOptions = {}
199+
200+
if (ssl === true && (credentials.sslCa !== '' || credentials.sslCert !== '' || credentials.sslKey !== '')) {
201+
useSslConnectionOptions = true;
202+
if (credentials.sslCa !== '') {
203+
sslConnectionOptions.ca = [credentials.sslCa] as string[];
204+
}
205+
if (credentials.sslCert !== '') {
206+
sslConnectionOptions.cert = credentials.sslCert as string;
207+
}
208+
if (credentials.sslKey !== '') {
209+
sslConnectionOptions.key = credentials.sslKey as string;
210+
}
211+
if (credentials.sslPassphrase !== '') {
212+
sslConnectionOptions.passphrase = credentials.sslPassphrase as string;
213+
}
214+
};
215+
196216
const config: KafkaConfig = {
197217
clientId,
198218
brokers,
199-
ssl,
219+
ssl: useSslConnectionOptions ? sslConnectionOptions : ssl,
200220
logLevel: logLevel.ERROR,
201221
};
202222

0 commit comments

Comments
 (0)