diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java index 6348495caf..b359ae3f95 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.support.converter; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -29,6 +30,7 @@ * {@code String<->byte[]} conversion is avoided. * * @author Gary Russell + * @author Vladimir Loginov * @since 2.3 * */ @@ -44,7 +46,9 @@ public ByteArrayJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return getObjectMapper().writeValueAsBytes(message.getPayload()); + return message.getPayload() instanceof KafkaNull + ? null + : getObjectMapper().writeValueAsBytes(message.getPayload()); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java index e219b15e9f..65e3ca86df 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import org.apache.kafka.common.utils.Bytes; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -31,6 +32,7 @@ * {@code String<->byte[]} conversion is avoided. * * @author Gary Russell + * @author Vladimir Loginov * @since 2.1.7 * */ @@ -46,7 +48,9 @@ public BytesJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload())); + return message.getPayload() instanceof KafkaNull + ? null + : Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload())); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java index 2e1df2ca9a..a32872c18c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java index 3162904935..869b8d2762 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.support.converter; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -31,6 +32,7 @@ * @author Gary Russell * @author Artem Bilan * @author Dariusz Szablinski + * @author Vladimir Loginov */ public class StringJsonMessageConverter extends JsonMessageConverter { @@ -44,12 +46,12 @@ public StringJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return getObjectMapper() - .writeValueAsString(message.getPayload()); + return message.getPayload() instanceof KafkaNull + ? null + : getObjectMapper().writeValueAsString(message.getPayload()); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); } } - } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index 3201621f23..d7ed4c1996 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.annotation; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -56,6 +57,7 @@ import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.BytesJsonMessageConverter; import org.springframework.kafka.support.converter.ConversionException; @@ -75,6 +77,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Vladimir Loginov * * @since 1.3.2 * @@ -131,6 +134,8 @@ public void testBatchOfPojoMessages(@Autowired KafkaAdmin admin) throws Exceptio assertThat(listener.received.size()).isGreaterThan(0); assertThat(listener.received.get(0).getPayload()).isInstanceOf(Foo.class); assertThat(listener.received.get(0).getPayload().getBar()).isEqualTo("bar"); + assertThatNoException().isThrownBy(() -> this.template.send( + new GenericMessage<>(KafkaNull.INSTANCE, Collections.singletonMap(KafkaHeaders.TOPIC, topic)))); verify(admin, never()).clusterId(); }