Skip to content

Commit

Permalink
feat: enable Kafka configuration via the environment or convict (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
kalinkrustev authored May 24, 2024
1 parent 197400b commit 11cee90
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 1 deletion.
7 changes: 7 additions & 0 deletions src/kafka/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const insertDots = object => Object.fromEntries(Object.entries(object).map(([key, value]) => [key.replace(/[A-Z]/g, s => '.' + s.toLowerCase()), value]))

module.exports = config => config && ({
...config,
rdkafkaConf: config.rdkafkaConf && insertDots(config.rdkafkaConf),
topicConf: config.topicConf && insertDots(config.topicConf)
})
2 changes: 2 additions & 0 deletions src/kafka/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const Logger = require('@mojaloop/central-services-logger')
const Kafka = require('node-rdkafka')

const Protocol = require('./protocol')
const getConfig = require('./config')

const connectedClients = new Set()
require('async-exit-hook')(callback => Promise.allSettled(
Expand Down Expand Up @@ -205,6 +206,7 @@ exports.ENUMS = ENUMS
class Consumer extends EventEmitter {
constructor (topics = [], config = {}) {
super()
config = getConfig(config)
if (!config.options) {
config.options = {
mode: CONSUMER_MODES.recursive,
Expand Down
2 changes: 2 additions & 0 deletions src/kafka/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const EventEmitter = require('events')
const Logger = require('@mojaloop/central-services-logger')
const Kafka = require('node-rdkafka')
const Protocol = require('./protocol')
const getConfig = require('./config')

const connectedClients = new Set()
require('async-exit-hook')(callback => Promise.allSettled(
Expand Down Expand Up @@ -153,6 +154,7 @@ const
class Producer extends EventEmitter {
constructor (config = {}) {
super()
config = getConfig(config)
if (!config) {
config = {}
}
Expand Down
3 changes: 3 additions & 0 deletions src/util/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const createHandler = async (topicName, config, command) => {
if (config.rdkafkaConf !== undefined && config.rdkafkaConf['enable.auto.commit'] !== undefined) {
autoCommitEnabled = config.rdkafkaConf['enable.auto.commit']
}
if (config.rdkafkaConf !== undefined && config.rdkafkaConf.enableAutoCommit !== undefined) {
autoCommitEnabled = config.rdkafkaConf.enableAutoCommit
}

let connectedTimeStamp = 0
try {
Expand Down
2 changes: 1 addition & 1 deletion test/unit/kafka/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Test('Consumer test', (consumerTests) => {
consumeTimeout: 1000
},
rdkafkaConf: {
'client.id': 'default-client',
clientId: 'default-client',
'group.id': 'kafka-test',
'metadata.broker.list': 'localhost:9092',
'enable.auto.commit': false
Expand Down

0 comments on commit 11cee90

Please sign in to comment.