-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Hi there!
I'm facing a case where I want to be able to process deserializing errors and having the topic name in the context.
By following Gary's post
(https://www.confluent.io/blog/spring-for-apache-kafka-deep-dive-part-1-error-handling-message-conversion-transaction-support)
about error handling I can see the class ErrorHandlingDeserializer2 being the best fit for processing deserialization
errors, however when checking the
code
I can see the topic name being present in the code but not being passed to the failedDeserializationFunction.
After checking a bit about a possible solution I came to this snippet:
@FunctionalInterface
public interface FailedDeserializationFunction<R> {
R apply(byte[] data, Headers headers, String topic);
default <V> FailedDeserializationFunction<V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (byte[] data, Headers headers, String topic) -> after.apply(apply(data, headers, topic));
}
}
which would be used quite similar to the old BiFunction:
private FailedDeserializationFunction<T> failedDeserializationFunction;
...
@Override
public T deserialize(String topic, byte[] data) {
try {
return this.delegate.deserialize(topic, data);
}
catch (Exception e) {
return this.failedDeserializationFunction != null
? this.failedDeserializationFunction.apply(data, null, topic)
: null;
}
}
And this solution could be used in a breaking change approach (applying directly the changes in
ErrorHandlingDeserializer2) or rolling the code in a new ErrorHandlingDeserializer class (number 3 or better name).
I'm quite newbie when it comes to contributing to OSS, I tried to grasp the contributing rules before jumping here so I hope the format of the case and the proposal fit what's expected
in the community.
Regards.