3434import org .springframework .messaging .MessageHeaders ;
3535import org .springframework .util .Assert ;
3636import org .springframework .util .ClassUtils ;
37+ import org .springframework .util .MimeType ;
3738
3839import com .fasterxml .jackson .core .JsonProcessingException ;
40+ import com .fasterxml .jackson .databind .DeserializationContext ;
41+ import com .fasterxml .jackson .databind .JsonNode ;
3942import com .fasterxml .jackson .databind .ObjectMapper ;
43+ import com .fasterxml .jackson .databind .deser .std .StdNodeBasedDeserializer ;
44+ import com .fasterxml .jackson .databind .module .SimpleModule ;
45+ import com .fasterxml .jackson .databind .type .TypeFactory ;
4046
4147/**
4248 * Default header mapper for Apache Kafka.
4652 * Header types are added to a special header {@link #JSON_TYPES}.
4753 *
4854 * @author Gary Russell
55+ * @author Artem Bilan
56+ *
4957 * @since 1.3
5058 *
5159 */
@@ -54,7 +62,8 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
5462 private static final List <String > DEFAULT_TRUSTED_PACKAGES =
5563 Arrays .asList (
5664 "java.util" ,
57- "java.lang"
65+ "java.lang" ,
66+ "org.springframework.util"
5867 );
5968
6069 private static final List <String > DEFAULT_TO_STRING_CLASSES =
@@ -136,6 +145,8 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
136145 Assert .notNull (objectMapper , "'objectMapper' must not be null" );
137146 Assert .noNullElements (patterns , "'patterns' must not have null elements" );
138147 this .objectMapper = objectMapper ;
148+ this .objectMapper
149+ .registerModule (new SimpleModule ().addDeserializer (MimeType .class , new MimeTypeJsonDeserializer ()));
139150 }
140151
141152 /**
@@ -233,7 +244,6 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
233244 }
234245 }
235246
236- @ SuppressWarnings ("unchecked" )
237247 @ Override
238248 public void toHeaders (Headers source , final Map <String , Object > headers ) {
239249 final Map <String , String > jsonTypes = decodeJsonTypes (source );
@@ -257,7 +267,8 @@ public void toHeaders(Headers source, final Map<String, Object> headers) {
257267 headers .put (h .key (), getObjectMapper ().readValue (h .value (), type ));
258268 }
259269 catch (IOException e ) {
260- logger .error ("Could not decode json type: " + new String (h .value ()) + " for key: " + h .key (),
270+ logger .error ("Could not decode json type: " + new String (h .value ()) + " for key: " + h
271+ .key (),
261272 e );
262273 headers .put (h .key (), h .value ());
263274 }
@@ -310,6 +321,34 @@ protected boolean trusted(String requestedType) {
310321 return true ;
311322 }
312323
324+
325+ /**
326+ * The {@link StdNodeBasedDeserializer} extension for {@link MimeType} deserialization.
327+ * It is presented here for backward compatibility when older producers send {@link MimeType}
328+ * headers as serialization version.
329+ */
330+ private class MimeTypeJsonDeserializer extends StdNodeBasedDeserializer <MimeType > {
331+
332+ private static final long serialVersionUID = 1L ;
333+
334+ MimeTypeJsonDeserializer () {
335+ super (MimeType .class );
336+ }
337+
338+ @ Override
339+ public MimeType convert (JsonNode root , DeserializationContext ctxt ) throws IOException {
340+ JsonNode type = root .get ("type" );
341+ JsonNode subType = root .get ("subtype" );
342+ JsonNode parameters = root .get ("parameters" );
343+ Map <String , String > params =
344+ DefaultKafkaHeaderMapper .this .objectMapper .readValue (parameters .traverse (),
345+ TypeFactory .defaultInstance ()
346+ .constructMapType (HashMap .class , String .class , String .class ));
347+ return new MimeType (type .asText (), subType .asText (), params );
348+ }
349+
350+ }
351+
313352 /**
314353 * Represents a header that could not be decoded due to an untrusted type.
315354 */
0 commit comments