diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 30c5aa7130..3d171488cd 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -1509,6 +1509,9 @@ public KafkaListenerContainerFactory batchFactory() { NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation. This, together with the changes to <> allows the same factory to be used for both record and batch listeners. +NOTE: Starting with version 2.9.6, the container factory has separate setters for the `recordMessageConverter` and `batchMessageConverter` properties. +Previously, there was only one property `messageConverter` which applied to both record and batch listeners. + The following example shows how to receive a list of payloads: ==== @@ -4395,7 +4398,7 @@ public KafkaListenerContainerFactory kafkaJsonListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); - factory.setMessageConverter(new JsonMessageConverter()); + factory.setRecordMessageConverter(new JsonMessageConverter()); return factory; } ... @@ -4677,7 +4680,7 @@ public KafkaListenerContainerFactory kafkaListenerContainerFactory() { new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); - factory.setMessageConverter(new BatchMessagingMessageConverter(converter())); + factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter())); return factory; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 16320ef034..20c70ba30e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,9 @@ import org.springframework.kafka.requestreply.ReplyingKafkaOperations; import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.TopicPartitionOffset; +import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; +import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.util.Assert; /** @@ -82,7 +84,9 @@ public abstract class AbstractKafkaListenerContainerFactory recordFilterStrategy; @@ -154,9 +158,38 @@ public void setPhase(int phase) { /** * Set the message converter to use if dynamic argument type matching is needed. * @param messageConverter the converter. + * @deprecated since 2.9.6 in favor of + * {@link #setBatchMessageConverter(BatchMessageConverter)} and + * {@link #setRecordMessageConverter(RecordMessageConverter)}. */ + @Deprecated public void setMessageConverter(MessageConverter messageConverter) { - this.messageConverter = messageConverter; + if (messageConverter instanceof RecordMessageConverter) { + setRecordMessageConverter((RecordMessageConverter) messageConverter); + } + else { + setBatchMessageConverter((BatchMessageConverter) messageConverter); + } + } + + /** + * Set the message converter to use if dynamic argument type matching is needed for + * record listeners. + * @param recordMessageConverter the converter. + * @since 2.9.6 + */ + public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) { + this.recordMessageConverter = recordMessageConverter; + } + + /** + * Set the message converter to use if dynamic argument type matching is needed for + * batch listeners. + * @param batchMessageConverter the converter. + * @since 2.9.6 + */ + public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) { + this.batchMessageConverter = batchMessageConverter; } /** @@ -391,7 +424,12 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) { configureEndpoint((AbstractKafkaListenerEndpoint) endpoint); } - endpoint.setupListenerContainer(instance, this.messageConverter); + if (Boolean.TRUE.equals(endpoint.getBatchListener())) { + endpoint.setupListenerContainer(instance, this.batchMessageConverter); + } + else { + endpoint.setupListenerContainer(instance, this.recordMessageConverter); + } initializeContainer(instance, endpoint); customizeContainer(instance); return instance; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 9526e3d3c4..509656efec 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -282,6 +282,7 @@ public boolean isBatchListener() { * @return the batch listener flag. * @since 2.8 */ + @Override @Nullable public Boolean getBatchListener() { return this.batchListener; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index 354311166e..a71ad48510 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -165,4 +165,15 @@ default String getMainListenerId() { return null; } + /** + * Return the current batch listener flag for this endpoint, or null if not explicitly + * set. + * @return the batch listener flag. + * @since 2.9.6 + */ + @Nullable + default Boolean getBatchListener() { + return null; + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index c5718263f5..54747fdd6b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -173,7 +173,7 @@ public KafkaListenerContainerFactory kafkaListenerContainerFactory(EmbeddedKa new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(embeddedKafka)); factory.setBatchListener(true); - factory.setMessageConverter(new BatchMessagingMessageConverter(converter())); + factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter())); factory.setReplyTemplate(template(embeddedKafka)); DefaultErrorHandler eh = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template)); factory.setCommonErrorHandler(eh); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 704fc99757..5050200cc6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1091,7 +1091,7 @@ public void handleRecord(Exception thrownException, ConsumerRecord record, } }); factory.getContainerProperties().setMicrometerTags(Collections.singletonMap("extraTag", "foo")); - factory.setMessageConverter(new RecordMessageConverter() { + factory.setRecordMessageConverter(new RecordMessageConverter() { @Override public Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, @@ -1137,7 +1137,7 @@ public KafkaListenerContainerFactory kafkaJsonListenerContainerFactory() { DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); typeMapper.addTrustedPackages("*"); converter.setTypeMapper(typeMapper); - factory.setMessageConverter(converter); + factory.setRecordMessageConverter(converter); return factory; } @@ -1154,7 +1154,7 @@ public KafkaListenerContainerFactory kafkaJsonListenerContainerFactory2() { typeMapper.addTrustedPackages("*"); typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID); converter.setTypeMapper(typeMapper); - factory.setMessageConverter(converter); + factory.setRecordMessageConverter(converter); return factory; } @@ -1167,7 +1167,7 @@ public KafkaListenerContainerFactory projectionListenerContainerFactory() { DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); typeMapper.addTrustedPackages("*"); converter.setTypeMapper(typeMapper); - factory.setMessageConverter(new ProjectingMessageConverter(converter)); + factory.setRecordMessageConverter(new ProjectingMessageConverter(converter)); factory.setChangeConsumerThreadName(true); factory.setThreadNameSupplier(container -> "foo." + container.getListenerId()); return factory; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java index db0f85db66..398f8f0ded 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -165,7 +165,7 @@ public ConsumerFactory consumerFactory() { public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); - factory.setMessageConverter(new BatchMessagingMessageConverter(new JsonMessageConverter())); + factory.setBatchMessageConverter(new BatchMessagingMessageConverter(new JsonMessageConverter())); factory.setBatchListener(true); return factory; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 3d606f37ad..319221d7c6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -900,7 +900,7 @@ public ConcurrentKafkaListenerContainerFactory simpleMapperFact factory.setReplyTemplate(template()); MessagingMessageConverter messageConverter = new MessagingMessageConverter(); messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper()); - factory.setMessageConverter(messageConverter); + factory.setRecordMessageConverter(messageConverter); factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() { @Override