-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
BlockingSingleSubscriber.java accumulates exception in suppressed exception list in the same error object which gets accumulated over time leading to memory leak.
Expected Behavior
New error object should be created and accumulation on old error object should be avoided.
Actual Behavior
Error object is cached internally in the kafka sender which is used as a parent error. Future errors while performing send operations which results in error are accumulated as suppressed exceptions inside the parent error.
Steps to Reproduce
Create a normal kafka sender. Introduce a config exception like below:
delivery.timeout.ms=1200
max.block.ms=1000
request.timeout.ms=2000
linger.ms=10
This will throw an error while creating a kafka sender - org.apache.kafka.common.config.ConfigException: delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms
Next, send some messages using the kafka sender.
kafkaSender.send(Mono.just(SenderRecord.create(
new ProducerRecord<>("topic", "test"),1)))
.doOnError((Throwable throwable) -> {
})
.blockFirst();
Possible Solution
Your Environment
reactor-core:3.4.16
200 messages/sec creates 200 exceptions in the suppressed exception list. This keeps growing resulting in memory leak.
- Reactor version(s) used: 3.4.16
- Other relevant libraries versions (eg.
netty
, ...): reactor-kafka:1.3.11 - JVM version (
java -version
): 1.8 - OS and version (eg
uname -a
): ubuntu20