diff --git a/build.gradle b/build.gradle index b026933e8f..d3fe068ea7 100644 --- a/build.gradle +++ b/build.gradle @@ -20,7 +20,7 @@ plugins { id 'org.ajoberstar.grgit' version '5.2.2' id 'io.spring.nohttp' version '0.0.11' id 'io.spring.dependency-management' version '1.1.6' apply false - id 'com.github.spotbugs' version '6.0.23' + id 'com.github.spotbugs' version '6.0.24' id 'io.freefair.aggregate-javadoc' version '8.6' } @@ -54,13 +54,13 @@ ext { awaitilityVersion = '4.2.2' hamcrestVersion = '2.2' hibernateValidationVersion = '8.0.1.Final' - jacksonBomVersion = '2.17.2' + jacksonBomVersion = '2.18.0' jaywayJsonPathVersion = '2.9.0' junit4Version = '4.13.2' - junitJupiterVersion = '5.11.1' + junitJupiterVersion = '5.11.2' kafkaVersion = '3.8.0' kotlinCoroutinesVersion = '1.8.1' - log4jVersion = '2.24.0' + log4jVersion = '2.24.1' micrometerDocsVersion = '1.0.4' micrometerVersion = '1.14.0-SNAPSHOT' micrometerTracingVersion = '1.4.0-SNAPSHOT' diff --git a/settings.gradle b/settings.gradle index 07ddd58a7a..a81eefa98d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,7 +6,7 @@ pluginManagement { } plugins { - id 'io.spring.develocity.conventions' version '0.0.21' + id 'io.spring.develocity.conventions' version '0.0.22' } rootProject.name = 'spring-kafka-dist' diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc index a6292c8035..5388c207df 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc @@ -906,7 +906,7 @@ Also, a `StringOrBytesSerializer` is now available; it can serialize `byte[]`, ` See xref:kafka/serdes.adoc#messaging-message-conversion[Spring Messaging Message Conversion] for more information. The `JsonSerializer`, `JsonDeserializer` and `JsonSerde` now have fluent APIs to make programmatic configuration simpler. -See the javadocs, xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion], and xref:streams.adoc#serde[Streams JSON Serialization and Deserialization] for more informaion. +See the javadocs, xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion], and xref:streams.adoc#serde[Streams JSON Serialization and Deserialization] for more information. [[cb-2-2-and-2-3-replyingkafkatemplate]] === ReplyingKafkaTemplate diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc index 30e0ea7548..4bf3118009 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc @@ -29,6 +29,6 @@ public ApplicationRunner runner(KafkaTemplate template, KafkaLis } ---- -As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalnce` API on it. +As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalance` API on it. When calling the `enforceRebalance` on the listener container, it delegates the call to the underlying Kafka consumer. -The Kafka consumer will trigger a rebalance as part of the next `poll()` operation. \ No newline at end of file +The Kafka consumer will trigger a rebalance as part of the next `poll()` operation. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc index 587b270a63..2de479ae11 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc @@ -256,7 +256,7 @@ public KafkaListenerContainerFactory batchFactory() { } ---- -NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation. +NOTE: Starting with version 2.8, you can override the factory's `batchListener` property using the `batch` property on the `@KafkaListener` annotation. This, together with the changes to xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers] allows the same factory to be used for both record and batch listeners. NOTE: Starting with version 2.9.6, the container factory has separate setters for the `recordMessageConverter` and `batchMessageConverter` properties. @@ -404,7 +404,7 @@ public class Listener { } ---- -If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token byusing the `beanRef` attribute. +If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token by using the `beanRef` attribute. The following example shows how to do so: [source, java] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc index 3b42474b6d..d1db7d38f1 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc @@ -52,7 +52,7 @@ containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListe ---- IMPORTANT: Starting with version 2.4, a new method `onPartitionsLost()` has been added (similar to a method with the same name in `ConsumerRebalanceLister`). -The default implementation on `ConsumerRebalanceLister` simply calls `onPartionsRevoked`. +The default implementation on `ConsumerRebalanceLister` simply calls `onPartitionsRevoked`. The default implementation on `ConsumerAwareRebalanceListener` does nothing. When supplying the listener container with a custom listener (of either type), it is important that your implementation does not call `onPartitionsRevoked` from `onPartitionsLost`. If you implement `ConsumerRebalanceListener` you should override the default method. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc index ba4edab223..a4803d864d 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc @@ -216,7 +216,7 @@ Example: public class MyListener extends AbstractConsumerSeekAware { @KafkaListener(...) - void listn(...) { + void listen(...) { ... } } diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc index 1781a2b195..9dc92cf19c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc @@ -349,7 +349,7 @@ public ProducerFactory producerFactory(Map conf ---- Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes. -In this case, if there are amiguous matches, an ordered `Map`, such as a `LinkedHashMap` should be provided. +In this case, if there are ambiguous matches, an ordered `Map`, such as a `LinkedHashMap` should be provided. [[by-topic]] === By Topic diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc index e139c15e25..7675c3ab24 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc @@ -336,7 +336,7 @@ Of course, the `recoverer()` bean can be your own implementation of `ConsumerRec Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams. Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful stores in the application. Thus, if an application wants to materialize the current view of the system under consideration, interactive queries provide a way to do that. -To learn more about interacive queries, see this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html[article]. +To learn more about interactive queries, see this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html[article]. The support in Spring for Apache Kafka is centered around an API called `KafkaStreamsInteractiveQueryService` which is a facade around interactive queries APIs in Kafka Streams library. An application can create an instance of this service as a bean and then later on use it to retrieve the state store by its name. @@ -376,7 +376,7 @@ Here is the type signature from the API. public T retrieveQueryableStore(String storeName, QueryableStoreType storeType) ---- -When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example. +When calling this method, the user can specifically ask for the proper state store type, as we have done in the above example. === Retrying State Store Retrieval diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc index 8eebb2129b..9f003d0c4e 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc @@ -166,7 +166,7 @@ public void sendToKafka(String in) { [[tip-json]] == Customizing the JsonSerializer and JsonDeserializer -The serializer and deserializer support a number of cusomizations using properties, see xref:kafka/serdes.adoc#json-serde[JSON] for more information. +The serializer and deserializer support a number of customizations using properties, see xref:kafka/serdes.adoc#json-serde[JSON] for more information. The `kafka-clients` code, not Spring, instantiates these objects, unless you inject them directly into the consumer and producer factories. If you wish to configure the (de)serializer using properties, but wish to use, say, a custom `ObjectMapper`, simply create a subclass and pass the custom mapper into the `super` constructor. For example: diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index a2295b63e1..f9a49d18ce 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -46,11 +46,17 @@ For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section When using `DeadLetterPublishingRecovererFactory`, the user applications can override the `maybeLogListenerException` method to customize the logging behavior. +[[x33-customize-admin-client-in-KafkaAdmin]] +=== Customize Admin client in KafkaAdmin + +When extending `KafkaAdmin`, user applications may override the `createAdmin` method to customize Admin client creation. + [[x33-customize-kafka-streams-implementation]] === Customizing The Implementation of Kafka Streams When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. + [[x33-kafka-headers-for-batch-listeners]] === KafkaHeaders.DELIVERY_ATTEMPT for batch listeners When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index 532be3298d..2bb3486444 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -169,42 +169,8 @@ public DefaultKafkaConsumerFactory(Map configs, this.configs = new ConcurrentHashMap<>(configs); this.configureDeserializers = configureDeserializers; - this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier); - this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier); - } - - private Supplier> keyDeserializerSupplier( - @Nullable Supplier> keyDeserializerSupplier) { - - if (!this.configureDeserializers) { - return keyDeserializerSupplier; - } - return keyDeserializerSupplier == null - ? () -> null - : () -> { - Deserializer deserializer = keyDeserializerSupplier.get(); - if (deserializer != null) { - deserializer.configure(this.configs, true); - } - return deserializer; - }; - } - - private Supplier> valueDeserializerSupplier( - @Nullable Supplier> valueDeserializerSupplier) { - - if (!this.configureDeserializers) { - return valueDeserializerSupplier; - } - return valueDeserializerSupplier == null - ? () -> null - : () -> { - Deserializer deserializer = valueDeserializerSupplier.get(); - if (deserializer != null) { - deserializer.configure(this.configs, false); - } - return deserializer; - }; + this.keyDeserializerSupplier = keyDeserializerSupplier; + this.valueDeserializerSupplier = valueDeserializerSupplier; } @Override @@ -219,7 +185,7 @@ public void setBeanName(String name) { * @param keyDeserializer the deserializer. */ public void setKeyDeserializer(@Nullable Deserializer keyDeserializer) { - this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer); + this.keyDeserializerSupplier = () -> keyDeserializer; } /** @@ -229,7 +195,7 @@ public void setKeyDeserializer(@Nullable Deserializer keyDeserializer) { * @param valueDeserializer the value deserializer. */ public void setValueDeserializer(@Nullable Deserializer valueDeserializer) { - this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer); + this.valueDeserializerSupplier = () -> valueDeserializer; } /** @@ -240,7 +206,7 @@ public void setValueDeserializer(@Nullable Deserializer valueDeserializer) { * @since 2.8 */ public void setKeyDeserializerSupplier(Supplier> keyDeserializerSupplier) { - this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier); + this.keyDeserializerSupplier = keyDeserializerSupplier; } /** @@ -251,7 +217,7 @@ public void setKeyDeserializerSupplier(Supplier> keyDeserializer * @since 2.8 */ public void setValueDeserializerSupplier(Supplier> valueDeserializerSupplier) { - this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier); + this.valueDeserializerSupplier = valueDeserializerSupplier; } /** @@ -499,14 +465,36 @@ public void setApplicationContext(ApplicationContext applicationContext) throws this.applicationContext = applicationContext; } + @Nullable + private Deserializer keyDeserializer(Map configs) { + Deserializer deserializer = + this.keyDeserializerSupplier != null + ? this.keyDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, true); + } + return deserializer; + } + + @Nullable + private Deserializer valueDeserializer(Map configs) { + Deserializer deserializer = + this.valueDeserializerSupplier != null + ? this.valueDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, false); + } + return deserializer; + } + protected class ExtendedKafkaConsumer extends KafkaConsumer { private String idForListeners; protected ExtendedKafkaConsumer(Map configProps) { - super(configProps, - DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(), - DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get()); + super(configProps, keyDeserializer(configProps), valueDeserializer(configProps)); if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) { Iterator metricIterator = metrics().keySet().iterator(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index e988eae6a0..9aaa3adc1b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -68,7 +69,7 @@ import org.springframework.util.Assert; /** - * An admin that delegates to an {@link AdminClient} to create topics defined + * An admin that delegates to an {@link Admin} to create topics defined * in the application context. * * @author Gary Russell @@ -76,6 +77,7 @@ * @author Adrian Gygax * @author Sanghyeok An * @author Valentina Armenise + * @author Anders Swanson * * @since 1.3 */ @@ -114,9 +116,9 @@ public class KafkaAdmin extends KafkaResourceFactory private String clusterId; /** - * Create an instance with an {@link AdminClient} based on the supplied + * Create an instance with an {@link Admin} based on the supplied * configuration. - * @param config the configuration for the {@link AdminClient}. + * @param config the configuration for the {@link Admin}. */ public KafkaAdmin(Map config) { this.configs = new HashMap<>(config); @@ -251,7 +253,7 @@ public void afterSingletonsInstantiated() { public final boolean initialize() { Collection newTopics = newTopics(); if (!newTopics.isEmpty()) { - AdminClient adminClient = null; + Admin adminClient = null; try { adminClient = createAdmin(); } @@ -347,7 +349,7 @@ protected Collection newTopics() { @Nullable public String clusterId() { if (this.clusterId == null) { - try (AdminClient client = createAdmin()) { + try (Admin client = createAdmin()) { this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS); if (this.clusterId == null) { this.clusterId = "null"; @@ -365,14 +367,14 @@ public String clusterId() { @Override public void createOrModifyTopics(NewTopic... topics) { - try (AdminClient client = createAdmin()) { + try (Admin client = createAdmin()) { addOrModifyTopicsIfNeeded(client, Arrays.asList(topics)); } } @Override public Map describeTopics(String... topicNames) { - try (AdminClient admin = createAdmin()) { + try (Admin admin = createAdmin()) { Map results = new HashMap<>(); DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames)); try { @@ -389,7 +391,13 @@ public Map describeTopics(String... topicNames) { } } - AdminClient createAdmin() { + /** + * Creates a new {@link Admin} client instance using the {@link AdminClient} class. + * @return the new {@link Admin} client instance. + * @since 3.3.0 + * @see AdminClient#create(Map) + */ + protected Admin createAdmin() { return AdminClient.create(getAdminConfig()); } @@ -409,7 +417,7 @@ protected Map getAdminConfig() { return configs2; } - private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection topics) { + private void addOrModifyTopicsIfNeeded(Admin adminClient, Collection topics) { if (!topics.isEmpty()) { Map topicNameToTopic = new HashMap<>(); topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t)); @@ -439,7 +447,7 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection> checkTopicsForConfigMismatches( - AdminClient adminClient, Collection topics) { + Admin adminClient, Collection topics) { List configResources = topics.stream() .map(topic -> new ConfigResource(Type.TOPIC, topic.name())) @@ -484,7 +492,7 @@ private Map> checkTopicsForConfigMismatches( } } - private void adjustConfigMismatches(AdminClient adminClient, Collection topics, + private void adjustConfigMismatches(Admin adminClient, Collection topics, Map> mismatchingConfigs) { for (Map.Entry> mismatchingConfigsOfTopic : mismatchingConfigs.entrySet()) { ConfigResource topicConfigResource = mismatchingConfigsOfTopic.getKey(); @@ -556,7 +564,7 @@ else if (topic.numPartitions() > topicDescription.partitions().size()) { return topicsToModify; } - private void addTopics(AdminClient adminClient, List topicsToAdd) { + private void addTopics(Admin adminClient, List topicsToAdd) { CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd); try { topicResults.all().get(this.operationTimeout, TimeUnit.SECONDS); @@ -579,7 +587,7 @@ private void addTopics(AdminClient adminClient, List topicsToAdd) { } } - private void createMissingPartitions(AdminClient adminClient, Map topicsToModify) { + private void createMissingPartitions(Admin adminClient, Map topicsToModify) { CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify); try { partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index a8463ea3c2..5cdc3cf8ab 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -503,10 +503,11 @@ public void consumerRemoved(String id, Consumer consumer) { void configDeserializer() { Deserializer key = mock(Deserializer.class); Deserializer value = mock(Deserializer.class); - Map config = new HashMap<>(); + Map config = KafkaTestUtils.consumerProps("mockGroup", "false", this.embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(config, key, value); Deserializer keyDeserializer = cf.getKeyDeserializer(); assertThat(keyDeserializer).isSameAs(key); + cf.createKafkaConsumer(config); verify(key).configure(config, true); Deserializer valueDeserializer = cf.getValueDeserializer(); assertThat(valueDeserializer).isSameAs(value); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java index 5405c5ea8f..19c904fcc7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -69,6 +70,7 @@ /** * @author Gary Russell * @author Adrian Gygax + * @author Anders Swanson * * @since 1.3 */ @@ -286,7 +288,7 @@ void nullClusterId() { KafkaAdmin admin = new KafkaAdmin(Map.of()) { @Override - AdminClient createAdmin() { + protected Admin createAdmin() { return mock; }