Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partitionsConsumedConcurrently > 1 overloads CPU when consumers lag is 0 #195

Open
maksym-opanasenko-ft opened this issue Dec 4, 2024 · 0 comments

Comments

@maksym-opanasenko-ft
Copy link

maksym-opanasenko-ft commented Dec 4, 2024

Environment Information

  • Reproduced on Mac (silicone) and confirmed in AWS ECS:
  • Node Version: v18.20.4
  • NPM Version: 10.7.0
  • Confluent Platform : 7.7.x, 7.5.x, 7.3.x; also confirmed with Apache Kafka 3.3.x
  • confluent-kafka-javascript version: ^0.5.2

Steps to Reproduce

  1. Launch Kafka in docker or in AWS MSK
  2. Create a topic with many partitions
  3. Instantiate a consumer using the kafkaJs constructor:
	const consumer = kafka.consumer({
		kafkaJS: {
			groupId: 'test-group',
		},
	});
  1. Run the consumer with partitionsConsumedConcurrently > 1
	await consumer.run({
		partitionsConsumedConcurrently: 2, // This causes the CPU load
		eachMessage: async ({ topic, partition, message }) => {
			console.log({
				topic,
				partition,
				offset: message.offset,
				value: message.value.toString(),
			});
		},
	});
  1. Don't write any messages to the topic or wait for the consumer to catch up with the lag
  2. Wait for at least 30 seconds and observe the CPU usage

confluent-kafka-javascript Configuration Settings

Here is the full minimal snippet we use to reproduce:

const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;

const topic = 'TestTopic';
const clientId = 'test-client-id';

const kafka = new Kafka({
	kafkaJS: {
		clientId: clientId,
		brokers: ['localhost:9092'],
		logLevel: 4, // optional, but gives a hint what overuses the CPU
	},
});

const consumerRun = async () => {
	const consumer = kafka.consumer({
		kafkaJS: {
			groupId: 'test-group',
		},
	});

	await consumer.connect();
	console.log('Consumer connected');

	await consumer.subscribe({ topic });
	console.log('Consumer subscribed to topic');

	await consumer.run({
		partitionsConsumedConcurrently: 2, // This causes the CPU load
		eachMessage: async ({ topic, partition, message }) => {
			console.log({
				topic,
				partition,
				offset: message.offset,
				value: message.value.toString(),
			});
		},
	});
};

consumerRun().catch((err) => {
	console.error('Error running producer or consumer:', err);
});

Additional context

When setting logLevel: 4 in the Kafka constructor and using partitionsConsumedConcurrently: 1, the following log message

{
  message: 'Attempting to fetch 1 messages to the message cache',
  name: 'test-client-id#consumer-1',
  fac: 'BINDING',
  timestamp: 1733298033731
}
```  is printed ~ once in a second, however, when setting `partitionsConsumedConcurrently: 2` the console is spammed with the message. 
Setting `logLevel: 3` makes it better as IO is used less, but the code that prints the message is still running causing high usage of CPU
@maksym-opanasenko-ft maksym-opanasenko-ft changed the title High CPU usage when utilising a consumer with partitionsConsumedConcurrently > 1 following the kafkaJs constructor partitionsConsumedConcurrently > 1 overloads CPU when consumers lag is 0 Dec 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant