Skip to content

Commit

Permalink
Private Header Type for DeserializationExceptions
Browse files Browse the repository at this point in the history
Use a package-private header for deserialization exceptions.

**cherry-pick to 2.9.x**
# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
#	spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java
#	spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java
#	spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java
  • Loading branch information
garyrussell authored and artembilan committed Aug 8, 2023
1 parent d370d2d commit 25ac793
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 46 deletions.
19 changes: 12 additions & 7 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4655,10 +4655,15 @@ void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<M
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger,
(byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
Expand All @@ -4668,9 +4673,9 @@ void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<M
----
====

`ListenerUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`.
`SerializationUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`.

When consuming `List<ConsumerRecord<?, ?>`, `ListenerUtils.getExceptionFromHeader()` is used instead:
When consuming `List<ConsumerRecord<?, ?>`, `SerializationUtils.getExceptionFromHeader()` is used instead:

====
[source, java]
Expand All @@ -4680,7 +4685,7 @@ void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec,
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
if (consumer != null && this.verifyPartition) {
tp = checkPartition(tp, consumer);
}
DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record,
DeserializationException vDeserEx = SerializationUtils.getExceptionFromHeader(record,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record,
DeserializationException kDeserEx = SerializationUtils.getExceptionFromHeader(record,
SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
Headers headers = new RecordHeaders(record.headers().toArray());
addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2929,8 +2929,9 @@ private void fixStackTrace(Exception ex, Exception toHandle) {
}
}

public void checkDeser(final ConsumerRecord<K, V> record, String headerName) {
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, headerName, this.logger);
public void checkDeser(final ConsumerRecord<K, V> cRecord, String headerName) {
DeserializationException exception = SerializationUtils.getExceptionFromHeader(cRecord, headerName,
this.logger);
if (exception != null) {
/*
* Wrapping in a LEFE is not strictly correct, but required for backwards compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
Expand Down Expand Up @@ -96,23 +94,15 @@ else if (listener instanceof GenericMessageListener) {
* @param logger the logger for logging errors.
* @return the exception or null.
* @since 2.3
* @deprecated in favor of
* {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}.
*/
@Deprecated
@Nullable
public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record,
String headerName, LogAccessor logger) {

Header header = record.headers().lastHeader(headerName);
if (header != null) {
byte[] value = header.value();
DeserializationException exception = byteArrayToDeserializationException(logger, value);
if (exception != null) {
Headers headers = new RecordHeaders(record.headers().toArray());
headers.remove(headerName);
exception.setHeaders(headers);
}
return exception;
}
return null;
return SerializationUtils.getExceptionFromHeader(record, headerName, logger);
}

/**
Expand All @@ -122,7 +112,11 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco
* @param value the bytes.
* @return the exception or null if deserialization fails.
* @since 2.8.1
* @deprecated in favor of
* {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} or
* {@link SerializationUtils#byteArrayToDeserializationException(LogAccessor, org.apache.kafka.common.header.Header)}.
*/
@Deprecated
@Nullable
public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, byte[] value) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicPartitionOffset;
Expand Down Expand Up @@ -531,7 +530,7 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) {
* Return a {@link DeserializationException} if either the key or value failed
* deserialization; null otherwise. If you need to determine whether it was the key or
* value, call
* {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
* {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
* with {@link SerializationUtils#KEY_DESERIALIZER_EXCEPTION_HEADER} and
* {@link SerializationUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
* @param record the record.
Expand All @@ -541,14 +540,14 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) {
*/
@Nullable
public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record,
DeserializationException exception = SerializationUtils.getExceptionFromHeader(record,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
if (exception != null) {
logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-"
+ record.partition() + "@" + record.offset());
return exception;
}
exception = ListenerUtils.getExceptionFromHeader(record,
exception = SerializationUtils.getExceptionFromHeader(record,
SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
if (exception != null) {
logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.serializer;

import org.apache.kafka.common.header.internals.RecordHeader;

/**
* A package-protected header used to contain serialized
* {@link DeserializationException}s. Only headers of this type will be examined for
* deserialization.
*
* @author Gary Russell
* @since 2.9.11
*/
class DeserializationExceptionHeader extends RecordHeader {

/**
* Construct an instance with the provided properties.
* @param key the key.
* @param value the value;
*/
DeserializationExceptionHeader(String key, byte[] value) {
super(key, value);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,16 +16,24 @@

package org.springframework.kafka.support.serializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.function.BiFunction;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

Expand Down Expand Up @@ -166,10 +174,82 @@ data, isForKeyArg, new RuntimeException("Could not serialize type "
}
}
headers.add(
new RecordHeader(isForKeyArg
new DeserializationExceptionHeader(isForKeyArg
? KEY_DESERIALIZER_EXCEPTION_HEADER
: VALUE_DESERIALIZER_EXCEPTION_HEADER,
stream.toByteArray()));
}

/**
* Extract a {@link DeserializationException} from the supplied header name, if
* present.
* @param record the consumer record.
* @param headerName the header name.
* @param logger the logger for logging errors.
* @return the exception or null.
* @since 2.9.11
*/
@Nullable
public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record,
String headerName, LogAccessor logger) {

Header header = record.headers().lastHeader(headerName);
if (!(header instanceof DeserializationExceptionHeader)) {
logger.warn(
() -> String.format("Foreign deserialization exception header in (%s) ignored; possible attack?",
KafkaUtils.format(record)));
return null;
}
if (header != null) {
byte[] value = header.value();
DeserializationException exception = byteArrayToDeserializationException(logger, header);
if (exception != null) {
Headers headers = new RecordHeaders(record.headers().toArray());
headers.remove(headerName);
exception.setHeaders(headers);
}
return exception;
}
return null;
}

/**
* Convert a byte array containing a serialized {@link DeserializationException} to the
* {@link DeserializationException}.
* @param logger a log accessor to log errors.
* @param header the header.
* @return the exception or null if deserialization fails.
* @since 2.9.11
*/
@Nullable
public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, Header header) {

if (!(header instanceof DeserializationExceptionHeader)) {
throw new IllegalStateException("Foreign deserialization exception header ignored; possible attack?");
}
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(header.value())) {

boolean first = true;

@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
if (this.first) {
this.first = false;
Assert.state(desc.getName().equals(DeserializationException.class.getName()),
"Header does not contain a DeserializationException");
}
return super.resolveClass(desc);
}


};
return (DeserializationException) ois.readObject();
}
catch (IOException | ClassNotFoundException | ClassCastException e) {
logger.error(e, "Failed to deserialize a deserialization exception");
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationTestUtils;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -172,8 +173,10 @@ void valueHeaderStripped() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
Headers headers = new RecordHeaders();
headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false)));
headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true)));
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
header(false)));
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
header(true)));
Headers custom = new RecordHeaders();
custom.add(new RecordHeader("foo", "bar".getBytes()));
recoverer.setHeadersFunction((rec, ex) -> custom);
Expand Down Expand Up @@ -202,7 +205,8 @@ void keyHeaderStripped() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
Headers headers = new RecordHeaders();
headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true)));
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
header(true)));
SettableListenableFuture future = new SettableListenableFuture();
future.set(new Object());
willReturn(future).given(template).send(any(ProducerRecord.class));
Expand All @@ -222,8 +226,8 @@ void keyDeserOnly() {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
Headers headers = new RecordHeaders();
DeserializationException deserEx = createDeserEx(true);
headers.add(
new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true, deserEx)));
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
header(true, deserEx)));
SettableListenableFuture future = new SettableListenableFuture();
future.set(new Object());
willReturn(future).given(template).send(any(ProducerRecord.class));
Expand All @@ -245,8 +249,10 @@ void headersNotStripped() {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setRetainExceptionHeader(true);
Headers headers = new RecordHeaders();
headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false)));
headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true)));
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
header(false)));
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
header(true)));
SettableListenableFuture future = new SettableListenableFuture();
future.set(new Object());
willReturn(future).given(template).send(any(ProducerRecord.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,8 +128,8 @@ public String deserialize(String topic, Headers headers, byte[] data) {
ErrorHandlingDeserializer<String> ehd = new ErrorHandlingDeserializer<>(new MyDes());
Headers headers = new RecordHeaders();
ehd.deserialize("foo", headers, new byte[1]);
DeserializationException dex = ListenerUtils.byteArrayToDeserializationException(null,
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER).value());
DeserializationException dex = SerializationUtils.byteArrayToDeserializationException(null,
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
assertThat(dex.getMessage())
.contains("Could not serialize")
.contains("original exception message");
Expand Down
Loading

0 comments on commit 25ac793

Please sign in to comment.