From a12edfb804cb725a2fe25b5d856964288db010e9 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 1 Oct 2024 16:33:29 -0400 Subject: [PATCH 1/7] Upgrade Jackson to 2.18 --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index b026933e8f..cac5050f5f 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ ext { awaitilityVersion = '4.2.2' hamcrestVersion = '2.2' hibernateValidationVersion = '8.0.1.Final' - jacksonBomVersion = '2.17.2' + jacksonBomVersion = '2.18.0' jaywayJsonPathVersion = '2.9.0' junit4Version = '4.13.2' junitJupiterVersion = '5.11.1' From 8da1aa30efb9bee5f7fd2434b0cfe242dd2207fa Mon Sep 17 00:00:00 2001 From: Anders Swanson <91502735+anders-swanson@users.noreply.github.com> Date: Fri, 4 Oct 2024 11:41:43 -0700 Subject: [PATCH 2/7] GH-3483: Allow Overriding `KafkaAdmin#createAdmin()` Fixes: #3483 https://github.com/spring-projects/spring-kafka/issues/3483 - Refactor `KafkaAdmin` to use `Admin` interface - Change `createAdmin()` method to protected visibility - Update the return type to `org.apache.kafka.clients.admin.Admin` - Modify `KafkaAdmin` to use `Admin` interface instead of `AdminClient` class --- .../antora/modules/ROOT/pages/whats-new.adoc | 8 ++++- .../kafka/core/KafkaAdmin.java | 34 ++++++++++++------- .../kafka/core/KafkaAdminTests.java | 4 ++- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index ad8caa2e02..aa37535b38 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -46,7 +46,13 @@ For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section When using `DeadLetterPublishingRecovererFactory`, the user applications can override the `maybeLogListenerException` method to customize the logging behavior. +[[x33-customize-admin-client-in-KafkaAdmin]] +=== Customize Admin client in KafkaAdmin + +When extending `KafkaAdmin`, user applications may override the `createAdmin` method to customize Admin client creation. + [[x33-customize-kafka-streams-implementation]] === Customizing The Implementation of Kafka Streams -When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. \ No newline at end of file +When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index e988eae6a0..9aaa3adc1b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -68,7 +69,7 @@ import org.springframework.util.Assert; /** - * An admin that delegates to an {@link AdminClient} to create topics defined + * An admin that delegates to an {@link Admin} to create topics defined * in the application context. * * @author Gary Russell @@ -76,6 +77,7 @@ * @author Adrian Gygax * @author Sanghyeok An * @author Valentina Armenise + * @author Anders Swanson * * @since 1.3 */ @@ -114,9 +116,9 @@ public class KafkaAdmin extends KafkaResourceFactory private String clusterId; /** - * Create an instance with an {@link AdminClient} based on the supplied + * Create an instance with an {@link Admin} based on the supplied * configuration. - * @param config the configuration for the {@link AdminClient}. + * @param config the configuration for the {@link Admin}. */ public KafkaAdmin(Map config) { this.configs = new HashMap<>(config); @@ -251,7 +253,7 @@ public void afterSingletonsInstantiated() { public final boolean initialize() { Collection newTopics = newTopics(); if (!newTopics.isEmpty()) { - AdminClient adminClient = null; + Admin adminClient = null; try { adminClient = createAdmin(); } @@ -347,7 +349,7 @@ protected Collection newTopics() { @Nullable public String clusterId() { if (this.clusterId == null) { - try (AdminClient client = createAdmin()) { + try (Admin client = createAdmin()) { this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS); if (this.clusterId == null) { this.clusterId = "null"; @@ -365,14 +367,14 @@ public String clusterId() { @Override public void createOrModifyTopics(NewTopic... topics) { - try (AdminClient client = createAdmin()) { + try (Admin client = createAdmin()) { addOrModifyTopicsIfNeeded(client, Arrays.asList(topics)); } } @Override public Map describeTopics(String... topicNames) { - try (AdminClient admin = createAdmin()) { + try (Admin admin = createAdmin()) { Map results = new HashMap<>(); DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames)); try { @@ -389,7 +391,13 @@ public Map describeTopics(String... topicNames) { } } - AdminClient createAdmin() { + /** + * Creates a new {@link Admin} client instance using the {@link AdminClient} class. + * @return the new {@link Admin} client instance. + * @since 3.3.0 + * @see AdminClient#create(Map) + */ + protected Admin createAdmin() { return AdminClient.create(getAdminConfig()); } @@ -409,7 +417,7 @@ protected Map getAdminConfig() { return configs2; } - private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection topics) { + private void addOrModifyTopicsIfNeeded(Admin adminClient, Collection topics) { if (!topics.isEmpty()) { Map topicNameToTopic = new HashMap<>(); topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t)); @@ -439,7 +447,7 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection> checkTopicsForConfigMismatches( - AdminClient adminClient, Collection topics) { + Admin adminClient, Collection topics) { List configResources = topics.stream() .map(topic -> new ConfigResource(Type.TOPIC, topic.name())) @@ -484,7 +492,7 @@ private Map> checkTopicsForConfigMismatches( } } - private void adjustConfigMismatches(AdminClient adminClient, Collection topics, + private void adjustConfigMismatches(Admin adminClient, Collection topics, Map> mismatchingConfigs) { for (Map.Entry> mismatchingConfigsOfTopic : mismatchingConfigs.entrySet()) { ConfigResource topicConfigResource = mismatchingConfigsOfTopic.getKey(); @@ -556,7 +564,7 @@ else if (topic.numPartitions() > topicDescription.partitions().size()) { return topicsToModify; } - private void addTopics(AdminClient adminClient, List topicsToAdd) { + private void addTopics(Admin adminClient, List topicsToAdd) { CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd); try { topicResults.all().get(this.operationTimeout, TimeUnit.SECONDS); @@ -579,7 +587,7 @@ private void addTopics(AdminClient adminClient, List topicsToAdd) { } } - private void createMissingPartitions(AdminClient adminClient, Map topicsToModify) { + private void createMissingPartitions(Admin adminClient, Map topicsToModify) { CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify); try { partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java index 5405c5ea8f..19c904fcc7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -69,6 +70,7 @@ /** * @author Gary Russell * @author Adrian Gygax + * @author Anders Swanson * * @since 1.3 */ @@ -286,7 +288,7 @@ void nullClusterId() { KafkaAdmin admin = new KafkaAdmin(Map.of()) { @Override - AdminClient createAdmin() { + protected Admin createAdmin() { return mock; } From 82f02b476c224026f4cc141fb01d3a6dfa96911c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 5 Oct 2024 10:13:19 +0000 Subject: [PATCH 3/7] Bump log4jVersion from 2.24.0 to 2.24.1 (#3530) Bumps `log4jVersion` from 2.24.0 to 2.24.1. Updates `org.apache.logging.log4j:log4j-slf4j-impl` from 2.24.0 to 2.24.1 Updates `org.apache.logging.log4j:log4j-core` from 2.24.0 to 2.24.1 --- updated-dependencies: - dependency-name: org.apache.logging.log4j:log4j-slf4j-impl dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: org.apache.logging.log4j:log4j-core dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index cac5050f5f..492a049b60 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ ext { junitJupiterVersion = '5.11.1' kafkaVersion = '3.8.0' kotlinCoroutinesVersion = '1.8.1' - log4jVersion = '2.24.0' + log4jVersion = '2.24.1' micrometerDocsVersion = '1.0.4' micrometerVersion = '1.14.0-SNAPSHOT' micrometerTracingVersion = '1.4.0-SNAPSHOT' From fd76c7c9df1d270dfe3484617f2324b22b1e47d8 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 5 Oct 2024 10:13:36 +0000 Subject: [PATCH 4/7] Bump the development-dependencies group with 2 updates (#3529) Bumps the development-dependencies group with 2 updates: com.github.spotbugs and [io.spring.develocity.conventions](https://github.com/spring-io/develocity-conventions). Updates `com.github.spotbugs` from 6.0.23 to 6.0.24 Updates `io.spring.develocity.conventions` from 0.0.21 to 0.0.22 - [Release notes](https://github.com/spring-io/develocity-conventions/releases) - [Commits](https://github.com/spring-io/develocity-conventions/compare/v0.0.21...v0.0.22) --- updated-dependencies: - dependency-name: com.github.spotbugs dependency-type: direct:production update-type: version-update:semver-patch dependency-group: development-dependencies - dependency-name: io.spring.develocity.conventions dependency-type: direct:production update-type: version-update:semver-patch dependency-group: development-dependencies ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- settings.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 492a049b60..62dd3add38 100644 --- a/build.gradle +++ b/build.gradle @@ -20,7 +20,7 @@ plugins { id 'org.ajoberstar.grgit' version '5.2.2' id 'io.spring.nohttp' version '0.0.11' id 'io.spring.dependency-management' version '1.1.6' apply false - id 'com.github.spotbugs' version '6.0.23' + id 'com.github.spotbugs' version '6.0.24' id 'io.freefair.aggregate-javadoc' version '8.6' } diff --git a/settings.gradle b/settings.gradle index 07ddd58a7a..a81eefa98d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,7 +6,7 @@ pluginManagement { } plugins { - id 'io.spring.develocity.conventions' version '0.0.21' + id 'io.spring.develocity.conventions' version '0.0.22' } rootProject.name = 'spring-kafka-dist' From 3155c6bbdecdbcc8eaaeae3ff39cf8e3df35d2ba Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 5 Oct 2024 10:14:11 +0000 Subject: [PATCH 5/7] Bump org.junit:junit-bom from 5.11.1 to 5.11.2 (#3531) Bumps [org.junit:junit-bom](https://github.com/junit-team/junit5) from 5.11.1 to 5.11.2. - [Release notes](https://github.com/junit-team/junit5/releases) - [Commits](https://github.com/junit-team/junit5/compare/r5.11.1...r5.11.2) --- updated-dependencies: - dependency-name: org.junit:junit-bom dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 62dd3add38..d3fe068ea7 100644 --- a/build.gradle +++ b/build.gradle @@ -57,7 +57,7 @@ ext { jacksonBomVersion = '2.18.0' jaywayJsonPathVersion = '2.9.0' junit4Version = '4.13.2' - junitJupiterVersion = '5.11.1' + junitJupiterVersion = '5.11.2' kafkaVersion = '3.8.0' kotlinCoroutinesVersion = '1.8.1' log4jVersion = '2.24.1' From 367f40f3d5623c853b2786aec49c05143ea9c46b Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Mon, 7 Oct 2024 22:57:37 +0900 Subject: [PATCH 6/7] Fix typos across multiple documents (#3538) --- .../antora/modules/ROOT/pages/appendix/change-history.adoc | 2 +- .../pages/kafka/receiving-messages/enforced-rebalance.adoc | 4 ++-- .../pages/kafka/receiving-messages/listener-annotation.adoc | 4 ++-- .../pages/kafka/receiving-messages/rebalance-listeners.adoc | 2 +- .../src/main/antora/modules/ROOT/pages/kafka/seek.adoc | 2 +- .../src/main/antora/modules/ROOT/pages/kafka/serdes.adoc | 2 +- .../src/main/antora/modules/ROOT/pages/streams.adoc | 4 ++-- .../src/main/antora/modules/ROOT/pages/tips.adoc | 2 +- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc index a6292c8035..5388c207df 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc @@ -906,7 +906,7 @@ Also, a `StringOrBytesSerializer` is now available; it can serialize `byte[]`, ` See xref:kafka/serdes.adoc#messaging-message-conversion[Spring Messaging Message Conversion] for more information. The `JsonSerializer`, `JsonDeserializer` and `JsonSerde` now have fluent APIs to make programmatic configuration simpler. -See the javadocs, xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion], and xref:streams.adoc#serde[Streams JSON Serialization and Deserialization] for more informaion. +See the javadocs, xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion], and xref:streams.adoc#serde[Streams JSON Serialization and Deserialization] for more information. [[cb-2-2-and-2-3-replyingkafkatemplate]] === ReplyingKafkaTemplate diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc index 30e0ea7548..4bf3118009 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc @@ -29,6 +29,6 @@ public ApplicationRunner runner(KafkaTemplate template, KafkaLis } ---- -As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalnce` API on it. +As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalance` API on it. When calling the `enforceRebalance` on the listener container, it delegates the call to the underlying Kafka consumer. -The Kafka consumer will trigger a rebalance as part of the next `poll()` operation. \ No newline at end of file +The Kafka consumer will trigger a rebalance as part of the next `poll()` operation. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc index 587b270a63..2de479ae11 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc @@ -256,7 +256,7 @@ public KafkaListenerContainerFactory batchFactory() { } ---- -NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation. +NOTE: Starting with version 2.8, you can override the factory's `batchListener` property using the `batch` property on the `@KafkaListener` annotation. This, together with the changes to xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers] allows the same factory to be used for both record and batch listeners. NOTE: Starting with version 2.9.6, the container factory has separate setters for the `recordMessageConverter` and `batchMessageConverter` properties. @@ -404,7 +404,7 @@ public class Listener { } ---- -If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token byusing the `beanRef` attribute. +If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token by using the `beanRef` attribute. The following example shows how to do so: [source, java] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc index 3b42474b6d..d1db7d38f1 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc @@ -52,7 +52,7 @@ containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListe ---- IMPORTANT: Starting with version 2.4, a new method `onPartitionsLost()` has been added (similar to a method with the same name in `ConsumerRebalanceLister`). -The default implementation on `ConsumerRebalanceLister` simply calls `onPartionsRevoked`. +The default implementation on `ConsumerRebalanceLister` simply calls `onPartitionsRevoked`. The default implementation on `ConsumerAwareRebalanceListener` does nothing. When supplying the listener container with a custom listener (of either type), it is important that your implementation does not call `onPartitionsRevoked` from `onPartitionsLost`. If you implement `ConsumerRebalanceListener` you should override the default method. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc index ba4edab223..a4803d864d 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc @@ -216,7 +216,7 @@ Example: public class MyListener extends AbstractConsumerSeekAware { @KafkaListener(...) - void listn(...) { + void listen(...) { ... } } diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc index 1781a2b195..9dc92cf19c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc @@ -349,7 +349,7 @@ public ProducerFactory producerFactory(Map conf ---- Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes. -In this case, if there are amiguous matches, an ordered `Map`, such as a `LinkedHashMap` should be provided. +In this case, if there are ambiguous matches, an ordered `Map`, such as a `LinkedHashMap` should be provided. [[by-topic]] === By Topic diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc index e139c15e25..7675c3ab24 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc @@ -336,7 +336,7 @@ Of course, the `recoverer()` bean can be your own implementation of `ConsumerRec Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams. Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful stores in the application. Thus, if an application wants to materialize the current view of the system under consideration, interactive queries provide a way to do that. -To learn more about interacive queries, see this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html[article]. +To learn more about interactive queries, see this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html[article]. The support in Spring for Apache Kafka is centered around an API called `KafkaStreamsInteractiveQueryService` which is a facade around interactive queries APIs in Kafka Streams library. An application can create an instance of this service as a bean and then later on use it to retrieve the state store by its name. @@ -376,7 +376,7 @@ Here is the type signature from the API. public T retrieveQueryableStore(String storeName, QueryableStoreType storeType) ---- -When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example. +When calling this method, the user can specifically ask for the proper state store type, as we have done in the above example. === Retrying State Store Retrieval diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc index 8eebb2129b..9f003d0c4e 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc @@ -166,7 +166,7 @@ public void sendToKafka(String in) { [[tip-json]] == Customizing the JsonSerializer and JsonDeserializer -The serializer and deserializer support a number of cusomizations using properties, see xref:kafka/serdes.adoc#json-serde[JSON] for more information. +The serializer and deserializer support a number of customizations using properties, see xref:kafka/serdes.adoc#json-serde[JSON] for more information. The `kafka-clients` code, not Spring, instantiates these objects, unless you inject them directly into the consumer and producer factories. If you wish to configure the (de)serializer using properties, but wish to use, say, a custom `ObjectMapper`, simply create a subclass and pass the custom mapper into the `super` constructor. For example: From 3b645362f03d7d09d3d2f37d78634e6dded29747 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 7 Oct 2024 19:00:49 -0400 Subject: [PATCH 7/7] GH-3526: Configure deserializers against modified configs (#3540) Fixes: https://github.com/spring-projects/spring-kafka/issues/3526 --- .../core/DefaultKafkaConsumerFactory.java | 74 ++++++++----------- .../DefaultKafkaConsumerFactoryTests.java | 3 +- 2 files changed, 33 insertions(+), 44 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index 532be3298d..2bb3486444 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -169,42 +169,8 @@ public DefaultKafkaConsumerFactory(Map configs, this.configs = new ConcurrentHashMap<>(configs); this.configureDeserializers = configureDeserializers; - this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier); - this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier); - } - - private Supplier> keyDeserializerSupplier( - @Nullable Supplier> keyDeserializerSupplier) { - - if (!this.configureDeserializers) { - return keyDeserializerSupplier; - } - return keyDeserializerSupplier == null - ? () -> null - : () -> { - Deserializer deserializer = keyDeserializerSupplier.get(); - if (deserializer != null) { - deserializer.configure(this.configs, true); - } - return deserializer; - }; - } - - private Supplier> valueDeserializerSupplier( - @Nullable Supplier> valueDeserializerSupplier) { - - if (!this.configureDeserializers) { - return valueDeserializerSupplier; - } - return valueDeserializerSupplier == null - ? () -> null - : () -> { - Deserializer deserializer = valueDeserializerSupplier.get(); - if (deserializer != null) { - deserializer.configure(this.configs, false); - } - return deserializer; - }; + this.keyDeserializerSupplier = keyDeserializerSupplier; + this.valueDeserializerSupplier = valueDeserializerSupplier; } @Override @@ -219,7 +185,7 @@ public void setBeanName(String name) { * @param keyDeserializer the deserializer. */ public void setKeyDeserializer(@Nullable Deserializer keyDeserializer) { - this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer); + this.keyDeserializerSupplier = () -> keyDeserializer; } /** @@ -229,7 +195,7 @@ public void setKeyDeserializer(@Nullable Deserializer keyDeserializer) { * @param valueDeserializer the value deserializer. */ public void setValueDeserializer(@Nullable Deserializer valueDeserializer) { - this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer); + this.valueDeserializerSupplier = () -> valueDeserializer; } /** @@ -240,7 +206,7 @@ public void setValueDeserializer(@Nullable Deserializer valueDeserializer) { * @since 2.8 */ public void setKeyDeserializerSupplier(Supplier> keyDeserializerSupplier) { - this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier); + this.keyDeserializerSupplier = keyDeserializerSupplier; } /** @@ -251,7 +217,7 @@ public void setKeyDeserializerSupplier(Supplier> keyDeserializer * @since 2.8 */ public void setValueDeserializerSupplier(Supplier> valueDeserializerSupplier) { - this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier); + this.valueDeserializerSupplier = valueDeserializerSupplier; } /** @@ -499,14 +465,36 @@ public void setApplicationContext(ApplicationContext applicationContext) throws this.applicationContext = applicationContext; } + @Nullable + private Deserializer keyDeserializer(Map configs) { + Deserializer deserializer = + this.keyDeserializerSupplier != null + ? this.keyDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, true); + } + return deserializer; + } + + @Nullable + private Deserializer valueDeserializer(Map configs) { + Deserializer deserializer = + this.valueDeserializerSupplier != null + ? this.valueDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, false); + } + return deserializer; + } + protected class ExtendedKafkaConsumer extends KafkaConsumer { private String idForListeners; protected ExtendedKafkaConsumer(Map configProps) { - super(configProps, - DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(), - DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get()); + super(configProps, keyDeserializer(configProps), valueDeserializer(configProps)); if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) { Iterator metricIterator = metrics().keySet().iterator(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index a8463ea3c2..5cdc3cf8ab 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -503,10 +503,11 @@ public void consumerRemoved(String id, Consumer consumer) { void configDeserializer() { Deserializer key = mock(Deserializer.class); Deserializer value = mock(Deserializer.class); - Map config = new HashMap<>(); + Map config = KafkaTestUtils.consumerProps("mockGroup", "false", this.embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(config, key, value); Deserializer keyDeserializer = cf.getKeyDeserializer(); assertThat(keyDeserializer).isSameAs(key); + cf.createKafkaConsumer(config); verify(key).configure(config, true); Deserializer valueDeserializer = cf.getValueDeserializer(); assertThat(valueDeserializer).isSameAs(value);