-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
minor improvement to DefaultKafkaHeaderMapper #2940
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2017-2022 the original author or authors. | ||
* Copyright 2017-2023 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -20,8 +20,8 @@ | |
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.LinkedHashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -31,13 +31,13 @@ | |
import org.apache.kafka.common.header.Headers; | ||
import org.apache.kafka.common.header.internals.RecordHeader; | ||
|
||
import org.springframework.lang.Nullable; | ||
import org.springframework.messaging.MessageHeaders; | ||
import org.springframework.util.Assert; | ||
import org.springframework.util.ClassUtils; | ||
import org.springframework.util.MimeType; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.DeserializationContext; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
@@ -63,26 +63,23 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper { | |
|
||
private static final String JAVA_LANG_STRING = "java.lang.String"; | ||
|
||
private static final Set<String> TRUSTED_ARRAY_TYPES = | ||
new HashSet<>(Arrays.asList( | ||
private static final Set<String> TRUSTED_ARRAY_TYPES = Set.of( | ||
"[B", | ||
"[I", | ||
"[J", | ||
"[F", | ||
"[D", | ||
"[C" | ||
)); | ||
); | ||
|
||
private static final List<String> DEFAULT_TRUSTED_PACKAGES = | ||
Arrays.asList( | ||
private static final List<String> DEFAULT_TRUSTED_PACKAGES = List.of( | ||
"java.lang", | ||
"java.net", | ||
"java.util", | ||
"org.springframework.util" | ||
); | ||
|
||
private static final List<String> DEFAULT_TO_STRING_CLASSES = | ||
Arrays.asList( | ||
private static final List<String> DEFAULT_TO_STRING_CLASSES = List.of( | ||
"org.springframework.util.MimeType", | ||
"org.springframework.http.MediaType" | ||
); | ||
|
@@ -142,7 +139,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { | |
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) | ||
*/ | ||
public DefaultKafkaHeaderMapper(String... patterns) { | ||
this(new ObjectMapper(), patterns); | ||
this(JacksonUtils.enhancedObjectMapper(), patterns); | ||
} | ||
|
||
/** | ||
|
@@ -222,7 +219,7 @@ protected boolean isEncodeStrings() { | |
} | ||
|
||
/** | ||
* Set to true to encode String-valued headers as JSON ("..."), by default just the | ||
* Set to true to encode String-valued headers as JSON string ("..."), by default just the | ||
* raw String value is converted to a byte array using the configured charset. Set to | ||
* true if a consumer of the outbound record is using Spring for Apache Kafka version | ||
* less than 2.3 | ||
|
@@ -234,8 +231,15 @@ public void setEncodeStrings(boolean encodeStrings) { | |
} | ||
|
||
/** | ||
* Add packages to the trusted packages list (default {@code java.util, java.lang}) used | ||
* Add packages to the trusted packages list used | ||
* when constructing objects from JSON. | ||
* By default, the following packages are trusted: | ||
* <ul> | ||
* <li>java.lang</li> | ||
* <li>java.net</li> | ||
* <li>java.util</li> | ||
* <li>org.springframework.util</li> | ||
* </ul> | ||
* If any of the supplied packages is {@code "*"}, all packages are trusted. | ||
* If a class for a non-trusted package is encountered, the header is returned to the | ||
* application with value of type {@link NonTrustedHeaderType}. | ||
|
@@ -286,20 +290,19 @@ public void fromHeaders(MessageHeaders headers, Headers target) { | |
} | ||
if (!encodeToJson && valueToAdd instanceof String) { | ||
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset()))); | ||
className = JAVA_LANG_STRING; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems the previous logic has ensured the className is assigned JAVA_LANG_STRING if valueToAdd has been String. |
||
} | ||
else { | ||
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd))); | ||
} | ||
jsonHeaders.put(key, className); | ||
} | ||
catch (Exception e) { | ||
logger.debug(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); | ||
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); | ||
} | ||
} | ||
} | ||
}); | ||
if (jsonHeaders.size() > 0) { | ||
if (!jsonHeaders.isEmpty()) { | ||
try { | ||
target.add(new RecordHeader(JSON_TYPES, headerObjectMapper.writeValueAsBytes(jsonHeaders))); | ||
} | ||
|
@@ -321,7 +324,7 @@ else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(head | |
headers.put(headerName, new String(header.value(), getCharset())); | ||
} | ||
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { | ||
if (jsonTypes != null && jsonTypes.containsKey(headerName)) { | ||
if (jsonTypes.containsKey(headerName)) { | ||
String requestedType = jsonTypes.get(headerName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we might well improve perf by using the following pattern:
for we end up one map query, rather than twice ( |
||
populateJsonValueHeader(header, requestedType, headers); | ||
} | ||
|
@@ -355,8 +358,7 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St | |
} | ||
catch (IOException e) { | ||
logger.error(e, () -> | ||
"Could not decode json type: " + new String(header.value()) + " for key: " | ||
+ header.key()); | ||
"Could not decode json type: " + requestedType + " for key: " + header.key()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems the intention was type, not header value |
||
headers.put(header.key(), header.value()); | ||
} | ||
} | ||
|
@@ -385,18 +387,16 @@ private Object decodeValue(Header h, Class<?> type) throws IOException, LinkageE | |
return value; | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Nullable | ||
private Map<String, String> decodeJsonTypes(Headers source) { | ||
Map<String, String> types = null; | ||
Map<String, String> types = Collections.emptyMap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is common good practise to return empty colletion as opposed to null to avoid NPE as per Effective Java. There is exception, but in this scenario it seems perfectly valid |
||
Header jsonTypes = source.lastHeader(JSON_TYPES); | ||
if (jsonTypes != null) { | ||
ObjectMapper headerObjectMapper = getObjectMapper(); | ||
try { | ||
types = headerObjectMapper.readValue(jsonTypes.value(), Map.class); | ||
types = headerObjectMapper.readValue(jsonTypes.value(), new TypeReference<>() { }); | ||
} | ||
catch (IOException e) { | ||
logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value())); | ||
logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value(), StandardCharsets.UTF_8)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. json serialization result is always of UTF-8 charset |
||
} | ||
} | ||
return types; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we should change above to maintain consistency in the class