Skip to content

Commit

Permalink
GH-2940: Improvements in DefaultKafkaHeaderMapper
Browse files Browse the repository at this point in the history
Fixes: #2940 

Minor improvements and code cleanup in DefaultKafkaHeaderMapper.
  • Loading branch information
NathanQingyangXu authored Dec 15, 2023
1 parent 4b7e53e commit a57c319
Showing 1 changed file with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,8 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -31,13 +31,13 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -63,26 +63,23 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {

private static final String JAVA_LANG_STRING = "java.lang.String";

private static final Set<String> TRUSTED_ARRAY_TYPES =
new HashSet<>(Arrays.asList(
private static final Set<String> TRUSTED_ARRAY_TYPES = Set.of(
"[B",
"[I",
"[J",
"[F",
"[D",
"[C"
));
);

private static final List<String> DEFAULT_TRUSTED_PACKAGES =
Arrays.asList(
private static final List<String> DEFAULT_TRUSTED_PACKAGES = List.of(
"java.lang",
"java.net",
"java.util",
"org.springframework.util"
);

private static final List<String> DEFAULT_TO_STRING_CLASSES =
Arrays.asList(
private static final List<String> DEFAULT_TO_STRING_CLASSES = List.of(
"org.springframework.util.MimeType",
"org.springframework.http.MediaType"
);
Expand Down Expand Up @@ -142,7 +139,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public DefaultKafkaHeaderMapper(String... patterns) {
this(new ObjectMapper(), patterns);
this(JacksonUtils.enhancedObjectMapper(), patterns);
}

/**
Expand Down Expand Up @@ -222,7 +219,7 @@ protected boolean isEncodeStrings() {
}

/**
* Set to true to encode String-valued headers as JSON ("..."), by default just the
* Set to true to encode String-valued headers as JSON string ("..."), by default just the
* raw String value is converted to a byte array using the configured charset. Set to
* true if a consumer of the outbound record is using Spring for Apache Kafka version
* less than 2.3
Expand All @@ -234,8 +231,15 @@ public void setEncodeStrings(boolean encodeStrings) {
}

/**
* Add packages to the trusted packages list (default {@code java.util, java.lang}) used
* Add packages to the trusted packages list used
* when constructing objects from JSON.
* By default, the following packages are trusted:
* <ul>
* <li>java.lang</li>
* <li>java.net</li>
* <li>java.util</li>
* <li>org.springframework.util</li>
* </ul>
* If any of the supplied packages is {@code "*"}, all packages are trusted.
* If a class for a non-trusted package is encountered, the header is returned to the
* application with value of type {@link NonTrustedHeaderType}.
Expand Down Expand Up @@ -286,20 +290,19 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
}
if (!encodeToJson && valueToAdd instanceof String) {
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
className = JAVA_LANG_STRING;
}
else {
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
}
jsonHeaders.put(key, className);
}
catch (Exception e) {
logger.debug(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
}
}
}
});
if (jsonHeaders.size() > 0) {
if (!jsonHeaders.isEmpty()) {
try {
target.add(new RecordHeader(JSON_TYPES, headerObjectMapper.writeValueAsBytes(jsonHeaders)));
}
Expand All @@ -321,7 +324,7 @@ else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(head
headers.put(headerName, new String(header.value(), getCharset()));
}
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
if (jsonTypes != null && jsonTypes.containsKey(headerName)) {
if (jsonTypes.containsKey(headerName)) {
String requestedType = jsonTypes.get(headerName);
populateJsonValueHeader(header, requestedType, headers);
}
Expand Down Expand Up @@ -355,8 +358,7 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St
}
catch (IOException e) {
logger.error(e, () ->
"Could not decode json type: " + new String(header.value()) + " for key: "
+ header.key());
"Could not decode json type: " + requestedType + " for key: " + header.key());
headers.put(header.key(), header.value());
}
}
Expand Down Expand Up @@ -385,18 +387,16 @@ private Object decodeValue(Header h, Class<?> type) throws IOException, LinkageE
return value;
}

@SuppressWarnings("unchecked")
@Nullable
private Map<String, String> decodeJsonTypes(Headers source) {
Map<String, String> types = null;
Map<String, String> types = Collections.emptyMap();
Header jsonTypes = source.lastHeader(JSON_TYPES);
if (jsonTypes != null) {
ObjectMapper headerObjectMapper = getObjectMapper();
try {
types = headerObjectMapper.readValue(jsonTypes.value(), Map.class);
types = headerObjectMapper.readValue(jsonTypes.value(), new TypeReference<>() { });
}
catch (IOException e) {
logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value()));
logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value(), StandardCharsets.UTF_8));
}
}
return types;
Expand Down

0 comments on commit a57c319

Please sign in to comment.