Skip to content

Commit fe170ee

Browse files
garyrussellartembilan
authored andcommitted
GH-974 Batch listeners replace KafkaNull with null
Fixes #974 Replace `KafkaNull` elements with `null` when the payload is resolved to a list.
1 parent dbbc958 commit fe170ee

File tree

2 files changed

+117
-0
lines changed

2 files changed

+117
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.springframework.beans.factory.config.Scope;
5757
import org.springframework.context.expression.StandardBeanExpressionResolver;
5858
import org.springframework.core.MethodIntrospector;
59+
import org.springframework.core.MethodParameter;
5960
import org.springframework.core.Ordered;
6061
import org.springframework.core.annotation.AnnotatedElementUtils;
6162
import org.springframework.core.annotation.AnnotationUtils;
@@ -73,6 +74,7 @@
7374
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
7475
import org.springframework.kafka.support.KafkaNull;
7576
import org.springframework.kafka.support.TopicPartitionInitialOffset;
77+
import org.springframework.messaging.Message;
7678
import org.springframework.messaging.converter.GenericMessageConverter;
7779
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
7880
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
@@ -832,6 +834,24 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
832834
argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
833835
argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {
834836

837+
838+
@Override
839+
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
840+
Object resolved = super.resolveArgument(parameter, message);
841+
/*
842+
* Replace KafkaNull list elements with null.
843+
*/
844+
if (resolved instanceof List) {
845+
List<?> list = ((List<?>) resolved);
846+
for (int i = 0; i < list.size(); i++) {
847+
if (list.get(i) instanceof KafkaNull) {
848+
list.set(i, null);
849+
}
850+
}
851+
}
852+
return resolved;
853+
}
854+
835855
@Override
836856
protected boolean isEmptyPayload(Object payload) {
837857
return payload == null || payload instanceof KafkaNull;
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
21+
22+
import java.util.Collections;
23+
import java.util.List;
24+
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.junit.jupiter.api.Test;
27+
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.kafka.annotation.EnableKafka;
32+
import org.springframework.kafka.annotation.KafkaListener;
33+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
34+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
35+
import org.springframework.kafka.core.ConsumerFactory;
36+
import org.springframework.test.annotation.DirtiesContext;
37+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
38+
39+
/**
40+
* @author Gary Russell
41+
* @since 2.2.5
42+
*
43+
*/
44+
@SpringJUnitConfig
45+
@DirtiesContext
46+
public class BatchMessagingMessageListenerAdapterTests {
47+
48+
@SuppressWarnings("unchecked")
49+
@Test
50+
public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Foo foo) {
51+
BatchMessagingMessageListenerAdapter<String, String> adapter =
52+
(BatchMessagingMessageListenerAdapter<String, String>) registry
53+
.getListenerContainer("foo").getContainerProperties().getMessageListener();
54+
adapter.onMessage(Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, null, null)), null, null);
55+
assertThat(foo.value).isNull();
56+
}
57+
58+
public static class Foo {
59+
60+
public String value = "someValue";
61+
62+
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
63+
public void listen(List<String> list) {
64+
list.forEach(s -> {
65+
this.value = s;
66+
});
67+
}
68+
69+
}
70+
71+
@Configuration
72+
@EnableKafka
73+
public static class Config {
74+
75+
@Bean
76+
public Foo foo() {
77+
return new Foo();
78+
}
79+
80+
@SuppressWarnings({ "rawtypes" })
81+
@Bean
82+
public ConsumerFactory consumerFactory() {
83+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
84+
return consumerFactory;
85+
}
86+
87+
@SuppressWarnings({ "rawtypes", "unchecked" })
88+
@Bean
89+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
90+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
91+
factory.setConsumerFactory(consumerFactory());
92+
factory.setBatchListener(true);
93+
return factory;
94+
}
95+
}
96+
97+
}

0 commit comments

Comments
 (0)