Skip to content

Commit 962defb

Browse files
committed
Fix consumeMessages test and add TEST_DEBUG env var
1 parent 32681e3 commit 962defb

File tree

3 files changed

+39
-23
lines changed

3 files changed

+39
-23
lines changed

lib/kafkajs/_consumer.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,11 @@ class Consumer {
322322
this.#logger.setLogLevel(logLevel.DEBUG);
323323
}
324324

325+
/* Even if we are in compability mode, setting a 'debug' in the main config must override the logger's level. */
326+
if (Object.hasOwn(this.#userConfig, 'debug')) {
327+
this.#logger.setLogLevel(logLevel.DEBUG);
328+
}
329+
325330
let rdKafkaConfig = Object.assign(compatibleConfig, this.#userConfig);
326331

327332
/* Delete properties which are already processed, or cannot be passed to node-rdkafka */

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,6 @@ describe('Consumer', () => {
642642
async () => {
643643
// Seed the topic with some messages. We don't need a tx producer for this.
644644
await producer.connect();
645-
646645
const partition = 0;
647646
const messages = generateMessages().map(message => ({
648647
...message,
@@ -691,7 +690,7 @@ describe('Consumer', () => {
691690
eachMessage,
692691
})
693692

694-
// Consume pre-produced messages.
693+
// 2. Consume pre-produced messages.
695694

696695
const number = messages.length;
697696
await waitForMessages(messagesConsumed, {
@@ -729,8 +728,8 @@ describe('Consumer', () => {
729728
await consumer.connect();
730729
await consumer.subscribe({ topic: topicName });
731730

732-
messagesConsumed = []
733-
uncommittedOffsetsPerMessage = []
731+
messagesConsumed = [];
732+
uncommittedOffsetsPerMessage = [];
734733

735734
consumer.run({ eachMessage })
736735

@@ -802,19 +801,6 @@ describe('Consumer', () => {
802801
// Consume produced messages.
803802
await waitForMessages(messagesConsumed, { number: messages.length });
804803

805-
// Restart consumer - we cannot stop it, so we recreate it.
806-
await consumer.disconnect();
807-
808-
consumer = createConsumer({
809-
groupId,
810-
maxWaitTimeInMs: 100,
811-
fromBeginning: true,
812-
autoCommit: false,
813-
});
814-
815-
await consumer.connect();
816-
await consumer.subscribe({ topic: topicName });
817-
818804
expect(messagesConsumed[0].value.toString()).toMatch(/value-0/);
819805
expect(messagesConsumed[99].value.toString()).toMatch(/value-99/);
820806
expect(uncommittedOffsetsPerMessage).toHaveLength(messagesConsumed.length);
@@ -833,13 +819,25 @@ describe('Consumer', () => {
833819
});
834820
await txnToAbort.abort()
835821

836-
// Restart consumer
822+
/* Restart consumer - we cannot stop it, so we recreate it. */
837823
messagesConsumed = []
838824
uncommittedOffsetsPerMessage = []
839825

826+
await consumer.disconnect();
827+
828+
consumer = createConsumer({
829+
groupId,
830+
maxWaitTimeInMs: 100,
831+
fromBeginning: true,
832+
autoCommit: false,
833+
});
834+
835+
await consumer.connect();
836+
await consumer.subscribe({ topic: topicName });
837+
840838
consumer.run({
841-
eachMessage
842-
})
839+
eachMessage,
840+
});
843841

844842
await waitForMessages(messagesConsumed, { number: 1 });
845843
expect(messagesConsumed[0].value.toString()).toMatch(/value-0/)

test/promisified/testhelpers.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,37 @@
11
const crypto = require('crypto');
22
const process = require('process');
3+
const { logLevel } = require('../../lib/kafkajs');
34
const { Kafka } = require('../../lib').KafkaJS;
45

56
// TODO: pick this up from a file
67
const clusterInformation = {
78
brokers: process.env.KAFKA_HOST ? process.env.KAFKA_HOST.split(',') : ['localhost:9092'],
89
};
910

11+
const debug = process.env.TEST_DEBUG;
12+
13+
function makeConfig(config) {
14+
const kafkaJs = Object.assign(config, clusterInformation);
15+
const common = {};
16+
if (debug) {
17+
common['debug'] = debug;
18+
}
19+
20+
return Object.assign(common, { kafkaJs });
21+
}
22+
1023
function createConsumer(config) {
11-
const kafka = new Kafka({ kafkaJs: Object.assign(config, clusterInformation) });
24+
const kafka = new Kafka(makeConfig(config));
1225
return kafka.consumer();
1326
}
1427

1528
function createProducer(config) {
16-
const kafka = new Kafka({ kafkaJs: Object.assign(config, clusterInformation) });
29+
const kafka = new Kafka(makeConfig(config));
1730
return kafka.producer();
1831
}
1932

2033
function createAdmin(config) {
21-
const kafka = new Kafka({ kafkaJs: Object.assign(config, clusterInformation) });
34+
const kafka = new Kafka(makeConfig(config));
2235
return kafka.admin();
2336
}
2437

0 commit comments

Comments
 (0)