-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Version
I run into this issue after upgrading a Spring Boot project from 2.4.x to 2.6.3. With Spring Boot 2.6.3 we now use Spring Kafka 2.8.2. This Issue seems to exist since Spring Kafka 2.7.0.
Description
When creating a DefaultKafkaConsumerFactory a map with a configuration has to be passed to the constructor. This map can contain a Type Mapping for the JsonDeserializer. In the past, it was possible to add arrays to the mapping:
// de.x.y.MyType[].getName() -> Lde.x.y.MyType;
config.put(JsonDeserializer.TYPE_MAPPINGS, "key:Lde.x.y.MyType;")
If you do this with the current Kafka 2.8.2, a NullPointerException is thrown:
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:732)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:414)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
at de.x.X.main(X.java:34)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:416)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:384)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:360)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:327)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:304)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:758)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:344)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:430)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:430)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:331)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:276)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
... 17 common frames omitted
Caused by: java.lang.NullPointerException: null
at org.springframework.kafka.support.serializer.JsonDeserializer.lambda$addMappingsToTrusted$2(JsonDeserializer.java:534)
at java.base/java.util.HashMap$Values.forEach(HashMap.java:977)
at org.springframework.kafka.support.serializer.JsonDeserializer.addMappingsToTrusted(JsonDeserializer.java:533)
at org.springframework.kafka.support.serializer.JsonDeserializer.createMappings(JsonDeserializer.java:439)
at org.springframework.kafka.support.serializer.JsonDeserializer.configure(JsonDeserializer.java:418)
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.configure(ErrorHandlingDeserializer.java:139)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:716)
... 31 common frames omitted
The source of this exception is here, in the line clazz.getPackage().getName():
private void addMappingsToTrusted(Map<String, Class<?>> mappings) {
mappings.values().forEach(clazz -> {
doAddTrustedPackages(clazz.getPackage().getName());
doAddTrustedPackages(clazz.getPackage().getName() + ".*");
});
}
In #1764 this code was changed in this commit. Before it looked like this:
private void addMappingsToTrusted(Map<String, Class<?>> mappings) {
mappings.values().forEach(clazz -> {
doAddTrustedPackages(clazz.getPackageName());
doAddTrustedPackages(clazz.getPackageName() + ".*");
});
}
For classes, both calls behave the same, but for Array, the behavior is different:
// No Problem:
MyType[].getPackageName(); // returns package name of the element type
// NullPointerException:
MyType[].getPackage() // returns null...
.getName(); // therefore a NPE is thrown here.
This behavior is also described in the JavaDoc
public Package getPackage()
Gets the package of this class.
If this class represents an array type, a primitive type or void, this method returns null.
I would expect, that the configuration of arrays for the TYPE_MAPPINGS, since it worked before as well.
Is there maybe another way / workaorund to configure arrays or the type-mappings of the JsonDeserializer?
Thank you and regards
m-ibot