Skip to content

Commit

Permalink
fix(kafkaTrigger Node): fix kafka trigger not working with default ma…
Browse files Browse the repository at this point in the history
…x requests value
  • Loading branch information
ruanjf authored Sep 5, 2022
1 parent 9314086 commit 71cae90
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 2 deletions.
1 change: 1 addition & 0 deletions packages/nodes-base/credentials/Kafka.credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class Kafka implements ICredentialType {
type: 'string',
default: '',
placeholder: 'my-app',
hint: 'Will not affect the connection, but will be used to identify the client in the Kafka server logs. Read more <a href="https://kafka.apache.org/documentation/#design_quotasgroups">here</a>',
},
{
displayName: 'Brokers',
Expand Down
55 changes: 55 additions & 0 deletions packages/nodes-base/nodes/Kafka/Kafka.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { IExecuteFunctions } from 'n8n-core';

import {
ICredentialDataDecryptedObject,
ICredentialsDecrypted,
ICredentialTestFunctions,
IDataObject,
INodeCredentialTestResult,
INodeExecutionData,
INodeType,
INodeTypeDescription,
Expand All @@ -35,6 +39,7 @@ export class Kafka implements INodeType {
{
name: 'kafka',
required: true,
testedBy: 'kafkaConnectionTest',
},
],
properties: [
Expand Down Expand Up @@ -185,6 +190,56 @@ export class Kafka implements INodeType {
],
};

methods = {
credentialTest: {
async kafkaConnectionTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as ICredentialDataDecryptedObject;
try {
const brokers = ((credentials.brokers as string) || '')
.split(',')
.map((item) => item.trim()) as string[];

const clientId = credentials.clientId as string;

const ssl = credentials.ssl as boolean;

const config: KafkaConfig = {
clientId,
brokers,
ssl,
};
if (credentials.authentication === true) {
if (!(credentials.username && credentials.password)) {
throw Error('Username and password are required for authentication');
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
}

const kafka = new apacheKafka(config);

await kafka.admin().connect();
await kafka.admin().disconnect();
return {
status: 'OK',
message: 'Authentication successful',
};
} catch (error) {
return {
status: 'Error',
message: error.message,
};
}
},
},
};

async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();

Expand Down
10 changes: 8 additions & 2 deletions packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export class KafkaTrigger implements INodeType {
displayName: 'Max Number of Requests',
name: 'maxInFlightRequests',
type: 'number',
default: 0,
default: 1,
description:
'Max number of requests that may be in progress at any time. If falsey then no limit.',
},
Expand Down Expand Up @@ -202,9 +202,15 @@ export class KafkaTrigger implements INodeType {

const kafka = new apacheKafka(config);

const maxInFlightRequests = (
this.getNodeParameter('options.maxInFlightRequests', null) === 0
? null
: this.getNodeParameter('options.maxInFlightRequests', null)
) as number;

const consumer = kafka.consumer({
groupId,
maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number,
maxInFlightRequests,
sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number,
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number,
});
Expand Down

0 comments on commit 71cae90

Please sign in to comment.