Skip to content

Commit 88869e6

Browse files
garyrussellartembilan
authored andcommitted
JsonDeserializer Header Removal Polishing
Only remove the headers for the object being deserialized. Previously, it removed all type headers, with explicit reference to the key headers. This happened to work ok because the key was deserialized first. If this ever changes, the key headers would be removed when the value is deserialized and would not be available for key deserialization. * `JsonDeserializer`: add `spring.json.use.type.headers` configuration property
1 parent 43fd291 commit 88869e6

File tree

6 files changed

+55
-12
lines changed

6 files changed

+55
-12
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/DefaultJackson2JavaTypeMapper.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,6 @@ public void removeHeaders(Headers headers) {
188188
headers.remove(getClassIdFieldName());
189189
headers.remove(getContentClassIdFieldName());
190190
headers.remove(getKeyClassIdFieldName());
191-
headers.remove(KEY_DEFAULT_CLASSID_FIELD_NAME);
192-
headers.remove(KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME);
193-
headers.remove(KEY_DEFAULT_KEY_CLASSID_FIELD_NAME);
194191
}
195192
catch (Exception e) { // NOSONAR
196193
// NOSONAR

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,16 @@ public class JsonDeserializer<T> implements ExtendedDeserializer<T> {
9696
public static final String TYPE_MAPPINGS = JsonSerializer.TYPE_MAPPINGS;
9797

9898
/**
99-
* Kafka config property for removing type headers.
99+
* Kafka config property for removing type headers (default true).
100100
*/
101101
public static final String REMOVE_TYPE_INFO_HEADERS = "spring.json.remove.type.headers";
102102

103+
/**
104+
* Kafka config property for using type headers (default true).
105+
* @since 2.2.3
106+
*/
107+
public static final String USE_TYPE_INFO_HEADERS = "spring.json.use.type.headers";
108+
103109
protected final ObjectMapper objectMapper; // NOSONAR
104110

105111
protected Class<T> targetType; // NOSONAR
@@ -235,6 +241,7 @@ public void setRemoveTypeHeaders(boolean removeTypeHeaders) {
235241
@Override
236242
public void configure(Map<String, ?> configs, boolean isKey) {
237243
setUseTypeMapperForKey(isKey);
244+
setUpTypePrecedence(configs);
238245
setupTarget(configs, isKey);
239246
if (configs.containsKey(TRUSTED_PACKAGES)
240247
&& configs.get(TRUSTED_PACKAGES) instanceof String) {
@@ -244,10 +251,20 @@ public void configure(Map<String, ?> configs, boolean isKey) {
244251
if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet
245252
&& this.typeMapper instanceof AbstractJavaTypeMapper) {
246253
((AbstractJavaTypeMapper) this.typeMapper).setIdClassMapping(
247-
JsonSerializer.createMappings((String) configs.get(JsonSerializer.TYPE_MAPPINGS)));
254+
JsonSerializer.createMappings(configs.get(JsonSerializer.TYPE_MAPPINGS).toString()));
248255
}
249256
if (configs.containsKey(REMOVE_TYPE_INFO_HEADERS)) {
250-
this.removeTypeHeaders = Boolean.parseBoolean((String) configs.get(REMOVE_TYPE_INFO_HEADERS));
257+
this.removeTypeHeaders = Boolean.parseBoolean(configs.get(REMOVE_TYPE_INFO_HEADERS).toString());
258+
}
259+
}
260+
261+
private void setUpTypePrecedence(Map<String, ?> configs) {
262+
if (!this.typeMapperExplicitlySet) {
263+
boolean useTypeHeaders = true;
264+
if (configs.containsKey(USE_TYPE_INFO_HEADERS)) {
265+
useTypeHeaders = Boolean.parseBoolean(configs.get(USE_TYPE_INFO_HEADERS).toString());
266+
}
267+
this.typeMapper.setTypePrecedence(useTypeHeaders ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
251268
}
252269
}
253270

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,9 +1638,10 @@ public void testJsonSerDeConfiguredType() throws Exception {
16381638
public void testJsonSerDeHeaderSimpleType() throws Exception {
16391639
this.logger.info("Start JSON2");
16401640
Map<String, Object> props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka);
1641+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
16411642
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
16421643
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
1643-
DefaultKafkaConsumerFactory<Integer, Foo> cf = new DefaultKafkaConsumerFactory<>(props);
1644+
DefaultKafkaConsumerFactory<Bar, Foo> cf = new DefaultKafkaConsumerFactory<>(props);
16441645
ContainerProperties containerProps = new ContainerProperties(topic2);
16451646

16461647
final CountDownLatch latch = new CountDownLatch(1);
@@ -1651,21 +1652,23 @@ public void testJsonSerDeHeaderSimpleType() throws Exception {
16511652
latch.countDown();
16521653
});
16531654

1654-
KafkaMessageListenerContainer<Integer, Foo> container =
1655+
KafkaMessageListenerContainer<Bar, Foo> container =
16551656
new KafkaMessageListenerContainer<>(cf, containerProps);
16561657
container.setBeanName("testJson2");
16571658
container.start();
16581659

16591660
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
16601661

16611662
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
1663+
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
16621664
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
1663-
ProducerFactory<Integer, Foo> pf = new DefaultKafkaProducerFactory<>(senderProps);
1664-
KafkaTemplate<Integer, Foo> template = new KafkaTemplate<>(pf);
1665+
ProducerFactory<Bar, Foo> pf = new DefaultKafkaProducerFactory<>(senderProps);
1666+
KafkaTemplate<Bar, Foo> template = new KafkaTemplate<>(pf);
16651667
template.setDefaultTopic(topic2);
1666-
template.sendDefault(0, new Foo("bar"));
1668+
template.sendDefault(new Bar("foo"), new Foo("bar"));
16671669
template.flush();
16681670
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
1671+
assertThat(received.get().key()).isInstanceOf(Bar.class);
16691672
assertThat(received.get().value()).isInstanceOf(Foo.class);
16701673
container.stop();
16711674
this.logger.info("Stop JSON2");

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/JsonSerializationTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.fail;
2121

2222
import java.util.Arrays;
23+
import java.util.Collections;
2324
import java.util.HashMap;
2425
import java.util.List;
2526

@@ -32,7 +33,10 @@
3233
import org.junit.Test;
3334

3435
import org.springframework.kafka.support.converter.AbstractJavaTypeMapper;
36+
import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper;
37+
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence;
3538
import org.springframework.kafka.support.serializer.testentities.DummyEntity;
39+
import org.springframework.kafka.test.utils.KafkaTestUtils;
3640

3741
import com.fasterxml.jackson.core.JsonParseException;
3842

@@ -41,6 +45,7 @@
4145
* @author Artem Bilan
4246
* @author Yanming Zhou
4347
* @author Torsten Schleede
48+
* @author Gary Russell
4449
*/
4550
public class JsonSerializationTests {
4651

@@ -180,6 +185,24 @@ public void testExtraFieldIgnored() {
180185
deser.close();
181186
}
182187

188+
@Test
189+
public void testDeserTypeHeadersConfig() {
190+
this.jsonReader.configure(Collections.singletonMap(JsonDeserializer.USE_TYPE_INFO_HEADERS, false), false);
191+
assertThat(KafkaTestUtils.getPropertyValue(this.jsonReader, "typeMapper.typePrecedence"))
192+
.isEqualTo(TypePrecedence.INFERRED);
193+
this.jsonReader.configure(Collections.singletonMap(JsonDeserializer.USE_TYPE_INFO_HEADERS, true), false);
194+
assertThat(KafkaTestUtils.getPropertyValue(this.jsonReader, "typeMapper.typePrecedence"))
195+
.isEqualTo(TypePrecedence.TYPE_ID);
196+
this.jsonReader.configure(Collections.singletonMap(JsonDeserializer.USE_TYPE_INFO_HEADERS, false), false);
197+
this.jsonReader.configure(Collections.emptyMap(), false);
198+
assertThat(KafkaTestUtils.getPropertyValue(this.jsonReader, "typeMapper.typePrecedence"))
199+
.isEqualTo(TypePrecedence.TYPE_ID);
200+
this.jsonReader.setTypeMapper(new DefaultJackson2JavaTypeMapper());
201+
this.jsonReader.configure(Collections.singletonMap(JsonDeserializer.USE_TYPE_INFO_HEADERS, true), false);
202+
assertThat(KafkaTestUtils.getPropertyValue(this.jsonReader, "typeMapper.typePrecedence"))
203+
.isEqualTo(TypePrecedence.INFERRED);
204+
}
205+
183206
static class DummyEntityJsonDeserializer extends JsonDeserializer<DummyEntity> {
184207

185208
}

src/reference/asciidoc/kafka.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,6 +1877,7 @@ In addition, the serializer/deserializer can be configured using Kafka propertie
18771877

18781878
- `JsonSerializer.ADD_TYPE_INFO_HEADERS` (default `true`); set to `false` to disable this feature on the `JsonSerializer` (sets the `addTypeInfo` property).
18791879
- `JsonSerializer.TYPE_MAPPINGS` (default `empty`); see below.
1880+
- `JsonDeserializer.USE_TYPE_INFO_HEADERS` (default `true`); set to `false` to ignore headers set by the serializer.
18801881
- `JsonDeserializer.REMOVE_TYPE_INFO_HEADERS` (default `true`); set to `false` to retain headers set by the serializer.
18811882
- `JsonDeserializer.KEY_DEFAULT_TYPE`; fallback type for deserialization of keys if no header information is present.
18821883
- `JsonDeserializer.VALUE_DEFAULT_TYPE`; fallback type for deserialization of values if no header information is present.

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ You can now provide type mapping information using producer/consumer properties.
8181

8282
New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.
8383

84-
The JsonDeserializer will now remove any type information headers by default.
84+
The `JsonDeserializer` will now remove any type information headers by default.
85+
86+
The `JsonDeserializer` can now be configured to ignore type information headers using a kafka property (since 2.2.3).
8587

8688
See <<serdes>> for more information.
8789

0 commit comments

Comments
 (0)