Skip to content

Commit 6e702f2

Browse files
garyrussellartembilan
authored andcommitted
GH-987: DeserializationException and dead-letters
Resolves #987 `DeadLetterPublishingRecoverer` now detects `DeserializationException`s and sets the value of the republished record to the original incoming `byte[]`. - STCEH - fix `defaultClassifier` to traverse exception causes
1 parent d31e4d6 commit 6e702f2

File tree

7 files changed

+222
-58
lines changed

7 files changed

+222
-58
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 106 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 the original author or authors.
2+
* Copyright 2018-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,9 @@
2020
import java.io.StringWriter;
2121
import java.nio.ByteBuffer;
2222
import java.nio.charset.StandardCharsets;
23+
import java.util.Collections;
24+
import java.util.Map;
25+
import java.util.Optional;
2326
import java.util.function.BiConsumer;
2427
import java.util.function.BiFunction;
2528

@@ -34,7 +37,11 @@
3437
import org.springframework.kafka.core.KafkaOperations;
3538
import org.springframework.kafka.core.KafkaTemplate;
3639
import org.springframework.kafka.support.KafkaHeaders;
40+
import org.springframework.kafka.support.serializer.DeserializationException;
41+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
42+
import org.springframework.lang.Nullable;
3743
import org.springframework.util.Assert;
44+
import org.springframework.util.ObjectUtils;
3845

3946
/**
4047
* A {@link BiConsumer} that publishes a failed record to a dead-letter topic.
@@ -47,8 +54,13 @@ public class DeadLetterPublishingRecoverer implements BiConsumer<ConsumerRecord<
4754

4855
private static final Log logger = LogFactory.getLog(DeadLetterPublishingRecoverer.class); // NOSONAR
4956

57+
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
58+
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
59+
5060
private final KafkaTemplate<Object, Object> template;
5161

62+
private final Map<Class<?>, KafkaTemplate<?, ?>> templates;
63+
5264
private final boolean transactional;
5365

5466
private final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver;
@@ -60,25 +72,65 @@ public class DeadLetterPublishingRecoverer implements BiConsumer<ConsumerRecord<
6072
* dead-letter topic must have at least as many partitions as the original topic.
6173
* @param template the {@link KafkaTemplate} to use for publishing.
6274
*/
63-
public DeadLetterPublishingRecoverer(KafkaTemplate<Object, Object> template) {
64-
this(template, (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition()));
75+
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template) {
76+
this(template, DEFAULT_DESTINATION_RESOLVER);
6577
}
6678

6779
/**
6880
* Create an instance with the provided template and destination resolving function,
6981
* that receives the failed consumer record and the exception and returns a
70-
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than 0, no
71-
* partition is set when publishing to the topic.
82+
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
83+
* 0, no partition is set when publishing to the topic.
7284
* @param template the {@link KafkaTemplate} to use for publishing.
7385
* @param destinationResolver the resolving function.
7486
*/
75-
public DeadLetterPublishingRecoverer(KafkaTemplate<Object, Object> template,
87+
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template,
7688
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
89+
this(Collections.singletonMap(Object.class, template), destinationResolver);
90+
}
91+
92+
/**
93+
* Create an instance with the provided templates and a default destination resolving
94+
* function that returns a TopicPartition based on the original topic (appended with
95+
* ".DLT") from the failed record, and the same partition as the failed record.
96+
* Therefore the dead-letter topic must have at least as many partitions as the
97+
* original topic. The templates map keys are classes and the value the corresponding
98+
* template to use for objects (producer record values) of that type. A
99+
* {@link java.util.LinkedHashMap} is recommended when there is more than one
100+
* template, to ensure the map is traversed in order.
101+
* @param templates the {@link KafkaTemplate}s to use for publishing.
102+
*/
103+
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Object, ? extends Object>> templates) {
104+
this(templates, DEFAULT_DESTINATION_RESOLVER);
105+
}
77106

78-
Assert.notNull(template, "The template cannot be null");
107+
/**
108+
* Create an instance with the provided templates and destination resolving function,
109+
* that receives the failed consumer record and the exception and returns a
110+
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
111+
* 0, no partition is set when publishing to the topic. The templates map keys are
112+
* classes and the value the corresponding template to use for objects (producer
113+
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
114+
* there is more than one template, to ensure the map is traversed in order.
115+
* @param templates the {@link KafkaTemplate}s to use for publishing.
116+
* @param destinationResolver the resolving function.
117+
*/
118+
@SuppressWarnings("unchecked")
119+
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Object, ? extends Object>> templates,
120+
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
121+
122+
Assert.isTrue(!ObjectUtils.isEmpty(templates), "At least one template is required");
79123
Assert.notNull(destinationResolver, "The destinationResolver cannot be null");
80-
this.template = template;
81-
this.transactional = template.isTransactional();
124+
this.template = templates.size() == 1 ? (KafkaTemplate<Object, Object>) templates.values().iterator().next() : null;
125+
this.templates = templates;
126+
this.transactional = templates.values().iterator().next().isTransactional();
127+
Boolean tx = this.transactional;
128+
Assert.isTrue(!templates.values()
129+
.stream()
130+
.map(t -> t.isTransactional())
131+
.filter(t -> !t.equals(tx))
132+
.findFirst()
133+
.isPresent(), "All templates must have the same setting for transactional");
82134
this.destinationResolver = destinationResolver;
83135
}
84136

@@ -87,42 +139,72 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
87139
TopicPartition tp = this.destinationResolver.apply(record, exception);
88140
RecordHeaders headers = new RecordHeaders(record.headers().toArray());
89141
enhanceHeaders(headers, record, exception);
90-
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers);
91-
if (this.transactional && !this.template.inTransaction()) {
92-
this.template.executeInTransaction(t -> {
142+
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(record,
143+
ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
144+
if (deserEx == null) {
145+
deserEx = ListenerUtils.getExceptionFromHeader(record,
146+
ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
147+
}
148+
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers,
149+
deserEx == null ? null : deserEx.getData());
150+
KafkaTemplate<Object, Object> kafkaTemplate = findTemplateForValue(outRecord.value());
151+
if (this.transactional && !kafkaTemplate.inTransaction()) {
152+
kafkaTemplate.executeInTransaction(t -> {
93153
publish(outRecord, t);
94154
return null;
95155
});
96156
}
97157
else {
98-
publish(outRecord, this.template);
158+
publish(outRecord, kafkaTemplate);
159+
}
160+
}
161+
162+
@SuppressWarnings("unchecked")
163+
private KafkaTemplate<Object, Object> findTemplateForValue(Object value) {
164+
if (this.template != null) {
165+
return this.template;
166+
}
167+
Optional<Class<?>> key = this.templates.keySet()
168+
.stream()
169+
.filter((k) -> k.isAssignableFrom(value.getClass()))
170+
.findFirst();
171+
if (key.isPresent()) {
172+
return (KafkaTemplate<Object, Object>) this.templates.get(key.get());
173+
}
174+
if (logger.isWarnEnabled()) {
175+
logger.warn("Failed to find a template for " + value.getClass() + " attemting to use the last entry");
99176
}
177+
return (KafkaTemplate<Object, Object>) this.templates.values()
178+
.stream()
179+
.reduce((first, second) -> second)
180+
.get();
100181
}
101182

102183
/**
103-
* Subclasses can override this method to customize the producer record to send to the DLQ.
104-
* The default implementation simply copies the key and value from the consumer record
105-
* and adds the headers. The timestamp is not set (the original timestamp is in one of
106-
* the headers).
107-
* IMPORTANT: if the partition in the {@link TopicPartition} is less than 0, it must be set to null
108-
* in the {@link ProducerRecord}.
184+
* Subclasses can override this method to customize the producer record to send to the
185+
* DLQ. The default implementation simply copies the key and value from the consumer
186+
* record and adds the headers. The timestamp is not set (the original timestamp is in
187+
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
188+
* less than 0, it must be set to null in the {@link ProducerRecord}.
109189
* @param record the failed record
110-
* @param topicPartition the {@link TopicPartition} returned by the destination resolver.
190+
* @param topicPartition the {@link TopicPartition} returned by the destination
191+
* resolver.
111192
* @param headers the headers - original record headers plus DLT headers.
193+
* @param value the value to use instead of the consumer record value.
112194
* @return the producer record to send.
113195
* @see KafkaHeaders
114196
*/
115197
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
116-
TopicPartition topicPartition, RecordHeaders headers) {
198+
TopicPartition topicPartition, RecordHeaders headers, @Nullable byte[] value) {
117199

118200
return new ProducerRecord<>(topicPartition.topic(),
119201
topicPartition.partition() < 0 ? null : topicPartition.partition(),
120-
record.key(), record.value(), headers);
202+
record.key(), value == null ? record.value() : value, headers);
121203
}
122204

123-
private void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> template) {
205+
private void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate) {
124206
try {
125-
template.send(outRecord).addCallback(result -> {
207+
kafkaTemplate.send(outRecord).addCallback(result -> {
126208
if (logger.isDebugEnabled()) {
127209
logger.debug("Successful dead-letter publication: " + result);
128210
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.io.ByteArrayInputStream;
20-
import java.io.IOException;
21-
import java.io.ObjectInputStream;
2219
import java.lang.reflect.Type;
2320
import java.time.Duration;
2421
import java.util.ArrayList;
@@ -54,9 +51,6 @@
5451
import org.apache.kafka.common.MetricName;
5552
import org.apache.kafka.common.TopicPartition;
5653
import org.apache.kafka.common.errors.WakeupException;
57-
import org.apache.kafka.common.header.Header;
58-
import org.apache.kafka.common.header.Headers;
59-
import org.apache.kafka.common.header.internals.RecordHeaders;
6054

6155
import org.springframework.core.task.SimpleAsyncTaskExecutor;
6256
import org.springframework.kafka.KafkaException;
@@ -1341,21 +1335,9 @@ private Exception decorateException(RuntimeException e) {
13411335
}
13421336

13431337
public void checkDeser(final ConsumerRecord<K, V> record, String headerName) {
1344-
Header header = record.headers().lastHeader(headerName);
1345-
if (header != null) {
1346-
try {
1347-
DeserializationException ex = (DeserializationException) new ObjectInputStream(
1348-
new ByteArrayInputStream(header.value())).readObject();
1349-
Headers headers = new RecordHeaders(Arrays.stream(record.headers().toArray())
1350-
.filter(h -> !h.key()
1351-
.startsWith(ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER_PREFIX))
1352-
.collect(Collectors.toList()));
1353-
ex.setHeaders(headers);
1354-
throw ex;
1355-
}
1356-
catch (IOException | ClassNotFoundException | ClassCastException e) {
1357-
this.logger.error("Failed to deserialize a deserialization exception", e);
1358-
}
1338+
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, headerName, this.logger);
1339+
if (exception != null) {
1340+
throw exception;
13591341
}
13601342
}
13611343

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,21 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.io.ByteArrayInputStream;
20+
import java.io.IOException;
21+
import java.io.ObjectInputStream;
22+
import java.util.Arrays;
23+
import java.util.stream.Collectors;
24+
25+
import org.apache.commons.logging.Log;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.common.header.Header;
28+
import org.apache.kafka.common.header.Headers;
29+
import org.apache.kafka.common.header.internals.RecordHeaders;
30+
31+
import org.springframework.kafka.support.serializer.DeserializationException;
32+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
33+
import org.springframework.lang.Nullable;
1934
import org.springframework.util.Assert;
2035

2136
/**
@@ -55,4 +70,36 @@ else if (listener instanceof GenericMessageListener) {
5570
return listenerType;
5671
}
5772

73+
/**
74+
* Extract a {@link DeserializationException} from the supplied header name, if
75+
* present.
76+
* @param record the consumer record.
77+
* @param headerName the header name.
78+
* @param logger the logger for logging errors.
79+
* @return the exception or null.
80+
* @since 2.3
81+
*/
82+
@Nullable
83+
public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record,
84+
String headerName, Log logger) {
85+
86+
Header header = record.headers().lastHeader(headerName);
87+
if (header != null) {
88+
try {
89+
DeserializationException ex = (DeserializationException) new ObjectInputStream(
90+
new ByteArrayInputStream(header.value())).readObject();
91+
Headers headers = new RecordHeaders(Arrays.stream(record.headers().toArray())
92+
.filter(h -> !h.key()
93+
.startsWith(ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER_PREFIX))
94+
.collect(Collectors.toList()));
95+
ex.setHeaders(headers);
96+
return ex;
97+
}
98+
catch (IOException | ClassNotFoundException | ClassCastException e) {
99+
logger.error("Failed to deserialize a deserialization exception", e);
100+
}
101+
}
102+
return null;
103+
}
104+
58105
}

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,9 @@ public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records
241241
}
242242
}
243243

244-
private BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records, Exception thrownException) {
244+
private BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records,
245+
Exception thrownException) {
246+
245247
if (this.classifier.classify(thrownException)) {
246248
return this.failureTracker::skip;
247249
}
@@ -263,7 +265,9 @@ private static BinaryExceptionClassifier configureDefaultClassifier() {
263265
classified.put(MethodArgumentResolutionException.class, false);
264266
classified.put(NoSuchMethodException.class, false);
265267
classified.put(ClassCastException.class, false);
266-
return new ExtendedBinaryExceptionClassifier(classified, true);
268+
ExtendedBinaryExceptionClassifier defaultClassifier = new ExtendedBinaryExceptionClassifier(classified, true);
269+
defaultClassifier.setTraverseCauses(true);
270+
return defaultClassifier;
267271
}
268272

269273
/**

0 commit comments

Comments
 (0)