From 8da1aa30efb9bee5f7fd2434b0cfe242dd2207fa Mon Sep 17 00:00:00 2001 From: Anders Swanson <91502735+anders-swanson@users.noreply.github.com> Date: Fri, 4 Oct 2024 11:41:43 -0700 Subject: [PATCH] GH-3483: Allow Overriding `KafkaAdmin#createAdmin()` Fixes: #3483 https://github.com/spring-projects/spring-kafka/issues/3483 - Refactor `KafkaAdmin` to use `Admin` interface - Change `createAdmin()` method to protected visibility - Update the return type to `org.apache.kafka.clients.admin.Admin` - Modify `KafkaAdmin` to use `Admin` interface instead of `AdminClient` class --- .../antora/modules/ROOT/pages/whats-new.adoc | 8 ++++- .../kafka/core/KafkaAdmin.java | 34 ++++++++++++------- .../kafka/core/KafkaAdminTests.java | 4 ++- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index ad8caa2e02..aa37535b38 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -46,7 +46,13 @@ For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section When using `DeadLetterPublishingRecovererFactory`, the user applications can override the `maybeLogListenerException` method to customize the logging behavior. +[[x33-customize-admin-client-in-KafkaAdmin]] +=== Customize Admin client in KafkaAdmin + +When extending `KafkaAdmin`, user applications may override the `createAdmin` method to customize Admin client creation. + [[x33-customize-kafka-streams-implementation]] === Customizing The Implementation of Kafka Streams -When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. \ No newline at end of file +When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index e988eae6a0..9aaa3adc1b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -68,7 +69,7 @@ import org.springframework.util.Assert; /** - * An admin that delegates to an {@link AdminClient} to create topics defined + * An admin that delegates to an {@link Admin} to create topics defined * in the application context. * * @author Gary Russell @@ -76,6 +77,7 @@ * @author Adrian Gygax * @author Sanghyeok An * @author Valentina Armenise + * @author Anders Swanson * * @since 1.3 */ @@ -114,9 +116,9 @@ public class KafkaAdmin extends KafkaResourceFactory private String clusterId; /** - * Create an instance with an {@link AdminClient} based on the supplied + * Create an instance with an {@link Admin} based on the supplied * configuration. - * @param config the configuration for the {@link AdminClient}. + * @param config the configuration for the {@link Admin}. */ public KafkaAdmin(Map config) { this.configs = new HashMap<>(config); @@ -251,7 +253,7 @@ public void afterSingletonsInstantiated() { public final boolean initialize() { Collection newTopics = newTopics(); if (!newTopics.isEmpty()) { - AdminClient adminClient = null; + Admin adminClient = null; try { adminClient = createAdmin(); } @@ -347,7 +349,7 @@ protected Collection newTopics() { @Nullable public String clusterId() { if (this.clusterId == null) { - try (AdminClient client = createAdmin()) { + try (Admin client = createAdmin()) { this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS); if (this.clusterId == null) { this.clusterId = "null"; @@ -365,14 +367,14 @@ public String clusterId() { @Override public void createOrModifyTopics(NewTopic... topics) { - try (AdminClient client = createAdmin()) { + try (Admin client = createAdmin()) { addOrModifyTopicsIfNeeded(client, Arrays.asList(topics)); } } @Override public Map describeTopics(String... topicNames) { - try (AdminClient admin = createAdmin()) { + try (Admin admin = createAdmin()) { Map results = new HashMap<>(); DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames)); try { @@ -389,7 +391,13 @@ public Map describeTopics(String... topicNames) { } } - AdminClient createAdmin() { + /** + * Creates a new {@link Admin} client instance using the {@link AdminClient} class. + * @return the new {@link Admin} client instance. + * @since 3.3.0 + * @see AdminClient#create(Map) + */ + protected Admin createAdmin() { return AdminClient.create(getAdminConfig()); } @@ -409,7 +417,7 @@ protected Map getAdminConfig() { return configs2; } - private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection topics) { + private void addOrModifyTopicsIfNeeded(Admin adminClient, Collection topics) { if (!topics.isEmpty()) { Map topicNameToTopic = new HashMap<>(); topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t)); @@ -439,7 +447,7 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection> checkTopicsForConfigMismatches( - AdminClient adminClient, Collection topics) { + Admin adminClient, Collection topics) { List configResources = topics.stream() .map(topic -> new ConfigResource(Type.TOPIC, topic.name())) @@ -484,7 +492,7 @@ private Map> checkTopicsForConfigMismatches( } } - private void adjustConfigMismatches(AdminClient adminClient, Collection topics, + private void adjustConfigMismatches(Admin adminClient, Collection topics, Map> mismatchingConfigs) { for (Map.Entry> mismatchingConfigsOfTopic : mismatchingConfigs.entrySet()) { ConfigResource topicConfigResource = mismatchingConfigsOfTopic.getKey(); @@ -556,7 +564,7 @@ else if (topic.numPartitions() > topicDescription.partitions().size()) { return topicsToModify; } - private void addTopics(AdminClient adminClient, List topicsToAdd) { + private void addTopics(Admin adminClient, List topicsToAdd) { CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd); try { topicResults.all().get(this.operationTimeout, TimeUnit.SECONDS); @@ -579,7 +587,7 @@ private void addTopics(AdminClient adminClient, List topicsToAdd) { } } - private void createMissingPartitions(AdminClient adminClient, Map topicsToModify) { + private void createMissingPartitions(Admin adminClient, Map topicsToModify) { CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify); try { partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java index 5405c5ea8f..19c904fcc7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -69,6 +70,7 @@ /** * @author Gary Russell * @author Adrian Gygax + * @author Anders Swanson * * @since 1.3 */ @@ -286,7 +288,7 @@ void nullClusterId() { KafkaAdmin admin = new KafkaAdmin(Map.of()) { @Override - AdminClient createAdmin() { + protected Admin createAdmin() { return mock; }