Skip to content

Commit

Permalink
feat(Kafka Trigger Node): Add additional options (#3600)
Browse files Browse the repository at this point in the history
* 🔨 additional options to kafka trigger

* ⚡ option for maxInFlightRequests

* ⚡ Small change

Co-authored-by: ricardo <ricardoespinoza105@gmail.com>
  • Loading branch information
michael-radency and RicardoE105 authored Jul 27, 2022
1 parent 3ebfa45 commit 3496a39
Showing 1 changed file with 46 additions and 8 deletions.
54 changes: 46 additions & 8 deletions packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,36 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Whether to allow sending message to a previously non exisiting topic',
},
{
displayName: 'Auto Commit Threshold',
name: 'autoCommitThreshold',
type: 'number',
default: 0,
description: 'The consumer will commit offsets after resolving a given number of messages',
},
{
displayName: 'Auto Commit Interval',
name: 'autoCommitInterval',
type: 'number',
default: 0,
description: 'The consumer will commit offsets after a given period, for example, five seconds',
hint: 'Value in milliseconds',
},
{
displayName: 'Heartbeat Interval',
name: 'heartbeatInterval',
type: 'number',
default: 3000,
description: 'Heartbeats are used to ensure that the consumer\'s session stays active',
hint: 'The value must be set lower than Session Timeout',
},
{
displayName: 'Max Number of Requests',
name: 'maxInFlightRequests',
type: 'number',
default: 0,
description: 'Max number of requests that may be in progress at any time. If falsey then no limit.',
},
{
displayName: 'Read Messages From Beginning',
name: 'fromBeginning',
Expand Down Expand Up @@ -122,20 +152,21 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Whether to return only the message property',
},
{
displayName: 'Session Timeout',
name: 'sessionTimeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms',
},
{
displayName: 'Return Headers',
name: 'returnHeaders',
type: 'boolean',
default: false,
description: 'Whether to return the headers received from Kafka',
},
{
displayName: 'Session Timeout',
name: 'sessionTimeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms',
hint: 'Value in milliseconds',
},
],
},
],
Expand Down Expand Up @@ -175,7 +206,12 @@ export class KafkaTrigger implements INodeType {

const kafka = new apacheKafka(config);

const consumer = kafka.consumer({ groupId });
const consumer = kafka.consumer({
groupId,
maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number,
sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number,
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number,
});

await consumer.connect();

Expand All @@ -191,6 +227,8 @@ export class KafkaTrigger implements INodeType {

const startConsumer = async () => {
await consumer.run({
autoCommitInterval: options.autoCommitInterval as number || null,
autoCommitThreshold: options.autoCommitThreshold as number || null,
eachMessage: async ({ topic, message }) => {

let data: IDataObject = {};
Expand Down

0 comments on commit 3496a39

Please sign in to comment.