-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Expected Behavior
The spring-kafka documentation mentions:
Starting with version 2.8, you can override the factory’s batchListener property using the batch property on the @KafkaListener annotation. This, together with the changes to Container Error Handlers allows the same factory to be used for both record and batch listeners.
This should also work when a custom MessageConverter is defined on the factory.
Current Behavior
For batch listeners, the message converter must be wrapped in a BatchMessagingMessageConverter. Otherwise, the message converter will not be used correctly and the wrong type will be supplied to the batch listener.
Context
We encapsulate spring-kafka into a library that preconfigures the factory as a bean in our setup. This library is then used by many services.
Another option would be to create a separate factory for batch listeners. With this, I am afraid that someone might use @KafkaListener(batch="true") as described in the official documentation but without supplying the correct library, which only works partly.
My current workaround looks like this, but I do not like it as it depends on how spring-kafka internally sets up its configuration, so it might break in future updates:
factory.setContainerCustomizer(container -> {
var messageListener = container.getContainerProperties().getMessageListener();
if (messageListener instanceof FilteringBatchMessageListenerAdapter) {
var meessageListenerDelegate =
((FilteringBatchMessageListenerAdapter<?, ?>) messageListener).getDelegate();
if (meessageListenerDelegate instanceof BatchMessagingMessageListenerAdapter) {
((BatchMessagingMessageListenerAdapter<?, ?>) meessageListenerDelegate).setBatchMessageConverter(
new BatchMessagingMessageConverter(messageConverter));
}
}
});See also this StackOverflow question.