Skip to content

Commit 14c2a30

Browse files
garyrussellartembilan
authored andcommitted
GH-926: Always Detect ErrorHandlingDeserializer2
Fixes #926 Previously, the container only detected the presence of the deserializer when it was configured using Kafka properties. The container now detects it (and subclasses) when configured directly into the consumer factory and properly routes exceptions to the error handler. Also add `setKey` to designate the deserializer as a key deserializer. * Polishing - PR Comments. * Polishing
1 parent 132934d commit 14c2a30

File tree

3 files changed

+83
-16
lines changed

3 files changed

+83
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -546,9 +546,20 @@ else if (listener instanceof MessageListener) {
546546
this.logger.info(this);
547547
}
548548
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
549-
this.checkNullKeyForExceptions = checkDeserializer(props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
549+
this.checkNullKeyForExceptions = checkDeserializer(
550+
findDeserializerClass(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
550551
this.checkNullValueForExceptions = checkDeserializer(
551-
props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
552+
findDeserializerClass(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
553+
}
554+
555+
private Object findDeserializerClass(Map<String, Object> props, String config) {
556+
Object configuredDeserializer = KafkaMessageListenerContainer.this.consumerFactory.getKeyDeserializer();
557+
if (configuredDeserializer == null) {
558+
return props.get(config);
559+
}
560+
else {
561+
return configuredDeserializer.getClass();
562+
}
552563
}
553564

554565
private void subscribeOrAssignTopics(final Consumer<K, V> consumer) {
@@ -576,7 +587,7 @@ private void subscribeOrAssignTopics(final Consumer<K, V> consumer) {
576587

577588
private boolean checkDeserializer(Object deser) {
578589
return deser instanceof Class
579-
? ((Class<?>) deser).equals(ErrorHandlingDeserializer2.class)
590+
? ErrorHandlingDeserializer2.class.isAssignableFrom((Class<?>) deser)
580591
: deser instanceof String
581592
? ((String) deser).equals(ErrorHandlingDeserializer2.class.getName())
582593
: false;

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,31 @@ public void setFailedDeserializationFunction(BiFunction<byte[], Headers, T> fail
9797
this.failedDeserializationFunction = failedDeserializationFunction;
9898
}
9999

100+
public boolean isKey() {
101+
return this.isKey;
102+
}
103+
104+
/**
105+
* Set to true if this deserializer is to be used as a key deserializer when
106+
* configuring outside of Kafka.
107+
* @param isKey true for a key deserializer, false otherwise.
108+
* @since 2.2.3
109+
*/
110+
public void setKey(boolean isKey) {
111+
this.isKey = isKey;
112+
}
113+
114+
/**
115+
* Set to true if this deserializer is to be used as a key deserializer when
116+
* configuring outside of Kafka.
117+
* @param isKey true for a key deserializer, false otherwise.
118+
* @return this
119+
* @since 2.2.3
120+
*/
121+
public ErrorHandlingDeserializer2<T> keyDeserializer(boolean isKey) {
122+
this.isKey = isKey;
123+
return this;
124+
}
100125

101126
@Override
102127
public void configure(Map<String, ?> configs, boolean isKey) {

spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627

2728
import org.apache.kafka.clients.consumer.ConsumerConfig;
2829
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -74,9 +75,9 @@ public void testBadDeserializer() throws Exception {
7475
this.config.template().send(TOPIC, "fail", "bar");
7576
this.config.template().send(TOPIC, "foo", "fail");
7677
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue();
77-
assertThat(this.config.goodCount).isEqualTo(1);
78-
assertThat(this.config.keyErrorCount).isEqualTo(1);
79-
assertThat(this.config.valueErrorCount).isEqualTo(1);
78+
assertThat(this.config.goodCount.get()).withFailMessage("Counts wrong: %s", this.config).isEqualTo(2);
79+
assertThat(this.config.keyErrorCount.get()).withFailMessage("Counts wrong: %s", this.config).isEqualTo(2);
80+
assertThat(this.config.valueErrorCount.get()).withFailMessage("Counts wrong: %s", this.config).isEqualTo(2);
8081
assertThat(this.config.headers).isNotNull();
8182
}
8283

@@ -113,19 +114,25 @@ public void close() {
113114
@EnableKafka
114115
public static class Config {
115116

116-
private final CountDownLatch latch = new CountDownLatch(3);
117+
private final CountDownLatch latch = new CountDownLatch(6);
117118

118-
private int goodCount;
119+
private final AtomicInteger goodCount = new AtomicInteger();
119120

120-
private int keyErrorCount;
121+
private final AtomicInteger keyErrorCount = new AtomicInteger();
121122

122-
private int valueErrorCount;
123+
private final AtomicInteger valueErrorCount = new AtomicInteger();
123124

124125
private Headers headers;
125126

126127
@KafkaListener(topics = TOPIC)
127-
public void listen(ConsumerRecord<String, String> record) {
128-
this.goodCount++;
128+
public void listen1(ConsumerRecord<String, String> record) {
129+
this.goodCount.incrementAndGet();
130+
this.latch.countDown();
131+
}
132+
133+
@KafkaListener(topics = TOPIC, containerFactory = "kafkaListenerContainerFactoryExplicitDesers")
134+
public void listen2(ConsumerRecord<String, String> record) {
135+
this.goodCount.incrementAndGet();
129136
this.latch.countDown();
130137
}
131138

@@ -136,16 +143,25 @@ public EmbeddedKafkaBroker embeddedKafka() {
136143

137144
@Bean
138145
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
146+
return factory(cf());
147+
}
148+
149+
@Bean
150+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryExplicitDesers() {
151+
return factory(cfWithExplicitDeserializers());
152+
}
153+
154+
private ConcurrentKafkaListenerContainerFactory<String, String> factory(ConsumerFactory<String, String> cf) {
139155
ConcurrentKafkaListenerContainerFactory<String, String> factory =
140156
new ConcurrentKafkaListenerContainerFactory<>();
141-
factory.setConsumerFactory(cf());
157+
factory.setConsumerFactory(cf);
142158
factory.setErrorHandler((t, r) -> {
143159
if (r.value() == null && t instanceof DeserializationException) {
144-
this.valueErrorCount++;
160+
this.valueErrorCount.incrementAndGet();
145161
this.headers = ((DeserializationException) t).getHeaders();
146162
}
147163
else if (r.key() == null && t instanceof DeserializationException) {
148-
this.keyErrorCount++;
164+
this.keyErrorCount.incrementAndGet();
149165
}
150166
this.latch.countDown();
151167
});
@@ -154,7 +170,7 @@ else if (r.key() == null && t instanceof DeserializationException) {
154170

155171
@Bean
156172
public ConsumerFactory<String, String> cf() {
157-
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC, "false", embeddedKafka());
173+
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka());
158174
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
159175
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
160176
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
@@ -163,6 +179,15 @@ public ConsumerFactory<String, String> cf() {
163179
return new DefaultKafkaConsumerFactory<>(props);
164180
}
165181

182+
@Bean
183+
public ConsumerFactory<String, String> cfWithExplicitDeserializers() {
184+
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g2", "false", embeddedKafka());
185+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
186+
return new DefaultKafkaConsumerFactory<>(props,
187+
new ErrorHandlingDeserializer2<String>(new FailSometimesDeserializer()).keyDeserializer(true),
188+
new ErrorHandlingDeserializer2<String>(new FailSometimesDeserializer()));
189+
}
190+
166191
@Bean
167192
public ProducerFactory<String, String> pf() {
168193
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka());
@@ -175,6 +200,12 @@ public KafkaTemplate<String, String> template() {
175200
return new KafkaTemplate<>(pf());
176201
}
177202

203+
@Override
204+
public String toString() {
205+
return "Config [goodCount=" + this.goodCount.get() + ", keyErrorCount=" + this.keyErrorCount.get()
206+
+ ", valueErrorCount=" + this.valueErrorCount.get() + "]";
207+
}
208+
178209
}
179210

180211
public static class FailSometimesDeserializer implements ExtendedDeserializer<String> {

0 commit comments

Comments
 (0)