diff --git a/build.gradle b/build.gradle index 2e0cbf7da..446960edb 100644 --- a/build.gradle +++ b/build.gradle @@ -15,6 +15,7 @@ subprojects { Project subproject -> return } + group "io.micronaut.kafka" apply plugin: "io.micronaut.build.internal.common" diff --git a/gradle.properties b/gradle.properties index e00369daa..1eb2914b3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,7 +1,7 @@ -projectVersion=3.3.4-SNAPSHOT +projectVersion=4.0.0-SNAPSHOT micronautDocsVersion=2.0.0.RC1 micronautBuildVersion=1.1.5 -micronautVersion=2.4.2 +micronautVersion=2.5.7 micronautGradlePluginVersion=1.5.0 groovyVersion=3.0.5 spockVersion=2.0-M3-groovy-3.0 @@ -15,7 +15,7 @@ githubSlug=micronaut-projects/micronaut-kafka developers=Graeme Rocher githubBranch=master -githubCoreBranch=2.5.x +githubCoreBranch=3.0.x bomProperty=micronautKafkaVersion bomProperties=kafkaVersion diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index e708b1c02..7454180f2 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 0f80bbf51..05679dc3c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.0.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.1.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 4f906e0c8..744e882ed 100755 --- a/gradlew +++ b/gradlew @@ -72,7 +72,7 @@ case "`uname`" in Darwin* ) darwin=true ;; - MINGW* ) + MSYS* | MINGW* ) msys=true ;; NONSTOP* ) diff --git a/kafka-streams/build.gradle b/kafka-streams/build.gradle index 3d84c6e73..3b1625a9e 100644 --- a/kafka-streams/build.gradle +++ b/kafka-streams/build.gradle @@ -4,7 +4,7 @@ dependencies { api project(":kafka") api "io.micronaut:micronaut-messaging:$micronautVersion" api "org.apache.kafka:kafka-streams:${kafkaVersion}" - + implementation 'jakarta.inject:jakarta.inject-api:2.0.0' compileOnly "io.micronaut:micronaut-management" testAnnotationProcessor "io.micronaut:micronaut-inject-java" @@ -15,7 +15,7 @@ dependencies { testImplementation "io.micronaut:micronaut-inject-java" testImplementation "io.micronaut:micronaut-inject" testImplementation "io.micronaut.micrometer:micronaut-micrometer-core" - testImplementation "io.projectreactor:reactor-core:3.4.6" + testImplementation "io.projectreactor:reactor-core:3.4.7" testImplementation "io.micronaut:micronaut-management" testImplementation "org.apache.kafka:kafka-clients:${kafkaVersion}:test" testImplementation "org.apache.kafka:kafka_2.12:${kafkaVersion}" diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/DefaultKafkaStreamsConfiguration.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/DefaultKafkaStreamsConfiguration.java index e62845c37..55ca95855 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/DefaultKafkaStreamsConfiguration.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/DefaultKafkaStreamsConfiguration.java @@ -21,8 +21,8 @@ import io.micronaut.context.env.Environment; import io.micronaut.runtime.ApplicationConfiguration; -import javax.inject.Named; -import javax.inject.Singleton; +import jakarta.inject.Named; +import jakarta.inject.Singleton; import java.util.Properties; /** diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java index 0de88ac6f..849eda090 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.PreDestroy; -import javax.inject.Singleton; +import jakarta.inject.Singleton; import java.io.Closeable; import java.time.Duration; import java.util.Map; diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/health/KafkaStreamsHealth.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/health/KafkaStreamsHealth.java index e5fa3e83b..e1cbe6ae9 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/health/KafkaStreamsHealth.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/health/KafkaStreamsHealth.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.ThreadMetadata; import org.reactivestreams.Publisher; -import javax.inject.Singleton; +import jakarta.inject.Singleton; import java.util.Collection; import java.util.HashMap; import java.util.List; diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/listeners/BeforeStartKafkaStreamsListenerImp.java b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/listeners/BeforeStartKafkaStreamsListenerImp.java index 63c899c3f..7797b0ae2 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/listeners/BeforeStartKafkaStreamsListenerImp.java +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/listeners/BeforeStartKafkaStreamsListenerImp.java @@ -3,7 +3,7 @@ import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart; import io.micronaut.runtime.event.annotation.EventListener; -import javax.inject.Singleton; +import jakarta.inject.Singleton; @Singleton public class BeforeStartKafkaStreamsListenerImp { diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/optimization/OptimizationInteractiveQueryService.java b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/optimization/OptimizationInteractiveQueryService.java index 01fe3b96c..507119c18 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/optimization/OptimizationInteractiveQueryService.java +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/optimization/OptimizationInteractiveQueryService.java @@ -4,7 +4,7 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import javax.inject.Singleton; +import jakarta.inject.Singleton; import java.util.Optional; @Singleton diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/optimization/OptimizationStream.java b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/optimization/OptimizationStream.java index dded83d5f..a06b1ed0b 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/optimization/OptimizationStream.java +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/optimization/OptimizationStream.java @@ -9,8 +9,8 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import javax.inject.Named; -import javax.inject.Singleton; +import jakarta.inject.Named; +import jakarta.inject.Singleton; import java.util.Properties; @Factory diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/wordcount/InteractiveQueryServiceExample.java b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/wordcount/InteractiveQueryServiceExample.java index aa74dac77..67444202f 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/wordcount/InteractiveQueryServiceExample.java +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/wordcount/InteractiveQueryServiceExample.java @@ -4,7 +4,7 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import javax.inject.Singleton; +import jakarta.inject.Singleton; import java.util.Optional; /** diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/wordcount/WordCountStream.java b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/wordcount/WordCountStream.java index 8ef64dd85..076db861e 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/wordcount/WordCountStream.java +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/wordcount/WordCountStream.java @@ -12,8 +12,8 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import javax.inject.Named; -import javax.inject.Singleton; +import jakarta.inject.Named; +import jakarta.inject.Singleton; import java.util.Arrays; import java.util.Locale; import java.util.Properties; diff --git a/kafka/build.gradle b/kafka/build.gradle index 172ac2c23..86fad1467 100644 --- a/kafka/build.gradle +++ b/kafka/build.gradle @@ -7,7 +7,7 @@ dependencies { compileOnly 'io.zipkin.brave:brave-instrumentation-kafka-clients:5.13.3' api "io.micronaut:micronaut-messaging:$micronautVersion" api "org.apache.kafka:kafka-clients:${kafkaVersion}" - + implementation 'jakarta.inject:jakarta.inject-api:2.0.0' compileOnly "io.micronaut:micronaut-management" compileOnly "io.micronaut.micrometer:micronaut-micrometer-core" compileOnly "org.apache.kafka:kafka_2.12:${kafkaVersion}" @@ -18,7 +18,7 @@ dependencies { testImplementation "io.micronaut.test:micronaut-test-spock" testImplementation "io.micronaut:micronaut-inject-java" testImplementation "io.micronaut.micrometer:micronaut-micrometer-core" - testImplementation "io.projectreactor:reactor-core:3.4.6" + testImplementation "io.projectreactor:reactor-core:3.4.7" testImplementation "io.micronaut:micronaut-management" testImplementation "io.micronaut:micronaut-http-client" testImplementation "io.micronaut:micronaut-inject-groovy" diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/Acknowledgement.java b/kafka/src/main/java/io/micronaut/configuration/kafka/Acknowledgement.java deleted file mode 100644 index c3398b5a6..000000000 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/Acknowledgement.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.configuration.kafka; - - -/** - * Defines an interface that can be injected into {@link io.micronaut.configuration.kafka.annotation.KafkaListener} beans so that offsets can be manually committed. - * - * @deprecated Use {@link io.micronaut.messaging.Acknowledgement} instead - * @author graemerocher - * @since 1.0 - */ -@Deprecated -public interface Acknowledgement { - - /** - * Acknowledge the last {@link org.apache.kafka.clients.consumer.ConsumerRecord} synchronously. - */ - void ack(); -} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaAcknowledgement.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaAcknowledgement.java index d2695f34b..3f5863bd5 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaAcknowledgement.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaAcknowledgement.java @@ -23,7 +23,7 @@ * @author James Kleeh * @since 1.1.0 */ -public interface KafkaAcknowledgement extends Acknowledgement, io.micronaut.configuration.kafka.Acknowledgement { +public interface KafkaAcknowledgement extends Acknowledgement { /** * Kafka does not support rejection of messages explicitly. diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerAware.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerAware.java deleted file mode 100644 index d3d221f58..000000000 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerAware.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.configuration.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; - -import io.micronaut.core.annotation.NonNull; - -/** - * Interface for {@link io.micronaut.configuration.kafka.annotation.KafkaListener} instances to implement - * if they wish to obtain a reference to the underlying {@link org.apache.kafka.clients.consumer.KafkaConsumer}. - * - * @param The key type - * @param The value type - * @author Graeme Rocher - * @since 1.0 - * @deprecated Use {@link ConsumerAware} instead. - */ -@Deprecated -public interface KafkaConsumerAware extends ConsumerAware { - - /** - * Called when the underlying {@link KafkaConsumer} is created. - * - * @param consumer The consumer - */ - void setKafkaConsumer(@NonNull KafkaConsumer consumer); - - @Override - default void setKafkaConsumer(@NonNull Consumer consumer) { - if (consumer instanceof KafkaConsumer) { - setKafkaConsumer((KafkaConsumer) consumer); - } - } -} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerFactory.java index 9a14d27b3..43ea73bc8 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerFactory.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerFactory.java @@ -22,6 +22,7 @@ import io.micronaut.context.annotation.Prototype; import io.micronaut.context.exceptions.ConfigurationException; import io.micronaut.core.annotation.TypeHint; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.RoundRobinAssignor; @@ -81,7 +82,7 @@ public class KafkaConsumerFactory { * @return The consumer */ @Prototype - public KafkaConsumer createConsumer( + public Consumer createConsumer( @Parameter AbstractKafkaConsumerConfiguration consumerConfiguration) { Optional> keyDeserializer = consumerConfiguration.getKeyDeserializer(); diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java index 321091aa9..97c6523aa 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java @@ -15,16 +15,36 @@ */ package io.micronaut.configuration.kafka; +import io.micronaut.configuration.kafka.annotation.KafkaClient; import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration; +import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration; +import io.micronaut.configuration.kafka.serde.SerdeRegistry; +import io.micronaut.context.BeanContext; +import io.micronaut.context.annotation.Bean; import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Parameter; -import io.micronaut.context.annotation.Prototype; import io.micronaut.context.exceptions.ConfigurationException; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.type.Argument; +import io.micronaut.core.util.StringUtils; +import io.micronaut.inject.ArgumentInjectionPoint; +import io.micronaut.inject.FieldInjectionPoint; +import io.micronaut.inject.InjectionPoint; +import io.micronaut.inject.qualifiers.Qualifiers; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.PreDestroy; +import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; /** * A factory class for creating Kafka {@link org.apache.kafka.clients.producer.Producer} instances. @@ -33,36 +53,177 @@ * @since 1.0 */ @Factory -public class KafkaProducerFactory { +public class KafkaProducerFactory implements ProducerRegistry { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFactory.class); + private final Map clients = new ConcurrentHashMap<>(); + private final BeanContext beanContext; + private final SerdeRegistry serdeRegistry; + + /** + * Default constructor. + * @param beanContext The bean context + * @param serdeRegistry The serde registry + */ + public KafkaProducerFactory(BeanContext beanContext, SerdeRegistry serdeRegistry) { + this.beanContext = beanContext; + this.serdeRegistry = serdeRegistry; + } /** * Creates a new {@link KafkaProducer} for the given configuration. * - * @param producerConfiguration The producer configuration + * @param injectionPoint The injection point used to create the bean + * @param producerConfiguration An optional producer configuration * @param The key type * @param The value type * @return The consumer */ - @Prototype - public KafkaProducer createProducer(@Parameter AbstractKafkaProducerConfiguration producerConfiguration) { - Optional> keySerializer = producerConfiguration.getKeySerializer(); - Optional> valueSerializer = producerConfiguration.getValueSerializer(); - - Properties config = producerConfiguration.getConfig(); - if (keySerializer.isPresent() && valueSerializer.isPresent()) { - Serializer ks = keySerializer.get(); - Serializer vs = valueSerializer.get(); - return new KafkaProducer<>( - config, - ks, - vs - ); - } else if (keySerializer.isPresent() || valueSerializer.isPresent()) { - throw new ConfigurationException("Both the [keySerializer] and [valueSerializer] must be set when setting either"); + @Bean + public Producer getProducer( + @Nullable InjectionPoint> injectionPoint, + @Nullable @Parameter AbstractKafkaProducerConfiguration producerConfiguration) { + if (injectionPoint == null) { + if (producerConfiguration != null) { + Optional> keySerializer = producerConfiguration.getKeySerializer(); + Optional> valueSerializer = producerConfiguration.getValueSerializer(); + + Properties config = producerConfiguration.getConfig(); + if (keySerializer.isPresent() && valueSerializer.isPresent()) { + Serializer ks = keySerializer.get(); + Serializer vs = valueSerializer.get(); + return new KafkaProducer<>( + config, + ks, + vs + ); + } else if (keySerializer.isPresent() || valueSerializer.isPresent()) { + throw new ConfigurationException("Both the [keySerializer] and [valueSerializer] must be set when setting either"); + } else { + return new KafkaProducer<>( + config + ); + } + } else { + throw new ConfigurationException("No Kafka configuration specified when using direct instantiation"); + } + } + + Argument argument; + if (injectionPoint instanceof FieldInjectionPoint) { + argument = ((FieldInjectionPoint) injectionPoint).asArgument(); + } else if (injectionPoint instanceof ArgumentInjectionPoint) { + argument = ((ArgumentInjectionPoint) injectionPoint).getArgument(); } else { - return new KafkaProducer<>( - config - ); + throw new ConfigurationException("Cannot directly retrieve KafkaProducer instances. Use @Inject or constructor injection"); + } + + Argument k = argument.getTypeVariable("K").orElse(null); + Argument v = argument.getTypeVariable("V").orElse(null); + + if (k == null || v == null) { + throw new ConfigurationException("@KafkaClient used on type missing generic argument values for Key and Value: " + injectionPoint); + } + final String id = injectionPoint.getAnnotationMetadata().stringValue(KafkaClient.class).orElse(null); + return getKafkaProducer(id, k, v); + } + + @SuppressWarnings("unchecked") + private T getKafkaProducer(@Nullable String id, Argument keyType, Argument valueType) { + ClientKey key = new ClientKey( + id, + keyType.getType(), + valueType.getType() + ); + + return (T) clients.computeIfAbsent(key, clientKey -> { + Supplier defaultResolver = () -> beanContext.getBean(AbstractKafkaProducerConfiguration.class); + AbstractKafkaProducerConfiguration config; + boolean hasId = StringUtils.isNotEmpty(id); + if (hasId) { + config = beanContext.findBean( + AbstractKafkaProducerConfiguration.class, + Qualifiers.byName(id) + ).orElseGet(defaultResolver); + } else { + config = defaultResolver.get(); + } + + DefaultKafkaProducerConfiguration newConfig = new DefaultKafkaProducerConfiguration(config); + + Properties properties = newConfig.getConfig(); + if (!properties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + Serializer keySerializer = serdeRegistry.pickSerializer(keyType); + newConfig.setKeySerializer(keySerializer); + } + + if (!properties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + Serializer valueSerializer = serdeRegistry.pickSerializer(valueType); + newConfig.setValueSerializer(valueSerializer); + } + + if (hasId) { + properties.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, id); + } + return beanContext.createBean(Producer.class, newConfig); + }); + } + + /** + * Shuts down any existing clients. + */ + @PreDestroy + protected void stop() { + for (Producer producer : clients.values()) { + try { + producer.close(); + } catch (Exception e) { + LOG.warn("Error shutting down Kafka producer: {}", e.getMessage(), e); + } + } + clients.clear(); + } + + @Override + public Producer getProducer(String id, Argument keyType, Argument valueType) { + return getKafkaProducer(id, keyType, valueType); + } + + + /** + * key for retrieving built producers. + * + * @author Graeme Rocher + * @since 1.0 + */ + private static final class ClientKey { + private final String id; + private final Class keyType; + private final Class valueType; + + ClientKey(String id, Class keyType, Class valueType) { + this.id = id; + this.keyType = keyType; + this.valueType = valueType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientKey clientKey = (ClientKey) o; + return Objects.equals(id, clientKey.id) && + Objects.equals(keyType, clientKey.keyType) && + Objects.equals(valueType, clientKey.valueType); + } + + @Override + public int hashCode() { + + return Objects.hash(id, keyType, valueType); } } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerRegistry.java deleted file mode 100644 index 04bf9bdc2..000000000 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerRegistry.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.configuration.kafka; - -import io.micronaut.core.type.Argument; -import org.apache.kafka.clients.producer.KafkaProducer; - -import io.micronaut.core.annotation.NonNull; - -/** - * A registry of managed {@link KafkaProducer} instances key by id and type. - * - * @author graemerocher - * @since 1.0 - * @deprecated Use {@link ProducerRegistry} instead - */ -@Deprecated -public interface KafkaProducerRegistry extends ProducerRegistry { - - /** - * Returns a managed Producer. - * - * @param id The id of the producer. - * @param keyType The key type - * @param valueType The value type - * @param The key generic type - * @param The value generic type - * @return The producer - */ - @Override - @NonNull KafkaProducer getProducer(String id, Argument keyType, Argument valueType); -} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/admin/AdminClientFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/admin/AdminClientFactory.java index ce39b7b6b..ed2ae3f48 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/admin/AdminClientFactory.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/admin/AdminClientFactory.java @@ -21,7 +21,7 @@ import io.micronaut.context.annotation.Requires; import org.apache.kafka.clients.admin.AdminClient; -import javax.inject.Singleton; +import jakarta.inject.Singleton; /** * Creates the Kakfa {@link org.apache.kafka.clients.admin.AdminClient}. diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaClient.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaClient.java index fd30ce1aa..28bf2dd60 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaClient.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaClient.java @@ -16,14 +16,11 @@ package io.micronaut.configuration.kafka.annotation; import io.micronaut.aop.Introduction; -import io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice; import io.micronaut.context.annotation.AliasFor; import io.micronaut.context.annotation.Property; -import io.micronaut.context.annotation.Type; import io.micronaut.retry.annotation.Recoverable; +import jakarta.inject.Singleton; -import javax.inject.Scope; -import javax.inject.Singleton; import java.lang.annotation.Documented; import java.lang.annotation.Retention; @@ -35,13 +32,11 @@ * * @author Graeme Rocher * @since 1.0 - * @see KafkaClientIntroductionAdvice + * @see io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice */ @Documented @Retention(RUNTIME) -@Scope @Introduction -@Type(KafkaClientIntroductionAdvice.class) @Recoverable @Singleton public @interface KafkaClient { @@ -92,7 +87,7 @@ /** * @return The {@code ack} setting for the client, which impacts message delivery durability. * - * @see org.apache.kafka.clients.producer.ProducerConfig#ACKS_DOC + * @see org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG * @see Acknowledge */ int acks() default Acknowledge.DEFAULT; @@ -100,7 +95,7 @@ /** * Constants for the {@code ack} setting for the client, which impacts message delivery durability. * - * @see org.apache.kafka.clients.producer.ProducerConfig#ACKS_DOC + * @see org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG */ @SuppressWarnings("WeakerAccess") class Acknowledge { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaKey.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaKey.java index c9e05d1f9..85886f740 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaKey.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaKey.java @@ -29,5 +29,6 @@ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.PARAMETER}) @Bindable +@Inherited public @interface KafkaKey { } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaListener.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaListener.java index 0985bf064..ba590a370 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaListener.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaListener.java @@ -88,7 +88,7 @@ * for a consumer you can alter this setting. Note that this means that multiple partitions will * be allocated to a single application. * - *

NOTE: When using this setting if your bean is {@link javax.inject.Singleton} then local state will be s + *

NOTE: When using this setting if your bean is {@link jakarta.inject.Singleton} then local state will be s * shared between invocations from different consumer threads

* * @return The number of threads @@ -96,7 +96,7 @@ int threads() default 1; /** - * The timeout to use for calls to {@link org.apache.kafka.clients.consumer.Consumer#poll(long)}. + * The timeout to use for calls to {@link org.apache.kafka.clients.consumer.Consumer#poll(java.time.Duration)}. * * @return The timeout. Defaults to 100ms */ diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaPartition.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaPartition.java new file mode 100644 index 000000000..fa2353431 --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaPartition.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.annotation; + +import io.micronaut.core.bind.annotation.Bindable; + +import java.lang.annotation.*; + +/** + * Parameter level annotation to indicate which parameter is bound to the Kafka Partition. + * It can be used in {@link java.lang.Integer} or {@code int}. + * + *

When used in producers, indicates which partition is to be used. If the provided value is {@code null} + * then the configured/default partitioning strategy takes place.

+ * + *

When used in consumers, it is populated with the partition that the record was received from.

+ * + *

Note that while using {@link KafkaPartition} in the same method as {@link KafkaPartitionKey} + * will not throw an exception, the outcome of doing so is left unspecified.

+ * + * @author André Prata + * @since 3.3.4 + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.PARAMETER}) +@Bindable +@Inherited +public @interface KafkaPartition { +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaPartitionKey.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaPartitionKey.java new file mode 100644 index 000000000..eba2c3497 --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaPartitionKey.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.annotation; + +import io.micronaut.core.bind.annotation.Bindable; + +import java.lang.annotation.*; + +/** + * Parameter level annotation for Kafka producers to indicate which parameter to compute the Kafka Partition from. + * + *

The partition is computed by first serializing the object, using an appropriate serializer from + * {@link io.micronaut.configuration.kafka.serde.SerdeRegistry} as determined by, and then computing the partition + * number using the same algorithm as Kafka's own {@code DefaultStrategy} ({@code murmur2})

+ * + *

If the provided value is {@code null} then the configured/default partitioning strategy takes place.

+ * + *

Note that while using {@link KafkaPartitionKey} in the same method as {@link KafkaPartition} + * will not throw an exception, the outcome of doing so is left unspecified.

+ * + * @author André Prata + * @since 3.3.4 + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.PARAMETER}) +@Bindable +@Inherited +public @interface KafkaPartitionKey { +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTimestamp.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTimestamp.java index cacbb0f46..6c1afe288 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTimestamp.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTimestamp.java @@ -29,5 +29,6 @@ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.PARAMETER}) @Bindable +@Inherited public @interface KafkaTimestamp { } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/OffsetStrategy.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/OffsetStrategy.java index 0ae20fe9e..94d99615b 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/OffsetStrategy.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/OffsetStrategy.java @@ -27,7 +27,7 @@ */ public enum OffsetStrategy { /** - * Automatically commit offsets with the {@link org.apache.kafka.clients.consumer.Consumer#poll(long)} loop. + * Automatically commit offsets with the {@link org.apache.kafka.clients.consumer.Consumer#poll(java.time.Duration)} loop. */ AUTO, /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/Topic.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/Topic.java index 611d17a58..cdcbcd754 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/Topic.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/Topic.java @@ -16,6 +16,7 @@ package io.micronaut.configuration.kafka.annotation; import io.micronaut.context.annotation.AliasFor; +import io.micronaut.context.annotation.Executable; import io.micronaut.core.bind.annotation.Bindable; import io.micronaut.messaging.annotation.MessageMapping; @@ -32,6 +33,9 @@ @Target({ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE}) @Bindable @Repeatable(Topics.class) +@Executable +@MessageMapping +@Inherited public @interface Topic { /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/Topics.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/Topics.java index 2a9163c3e..b38a85601 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/Topics.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/Topics.java @@ -26,6 +26,7 @@ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) +@Inherited public @interface Topics { /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/ConsumerRecordBinderRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/ConsumerRecordBinderRegistry.java index 2dc681309..025f03625 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/ConsumerRecordBinderRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/ConsumerRecordBinderRegistry.java @@ -20,9 +20,9 @@ import io.micronaut.core.bind.annotation.Bindable; import io.micronaut.core.type.Argument; import io.micronaut.core.util.ArrayUtils; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.ConsumerRecord; -import javax.inject.Singleton; import java.lang.annotation.Annotation; import java.util.HashMap; import java.util.Map; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeaderBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeaderBinder.java index 20b5a5dd2..4020f7411 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeaderBinder.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeaderBinder.java @@ -19,10 +19,10 @@ import io.micronaut.core.convert.ArgumentConversionContext; import io.micronaut.core.convert.ConversionService; import io.micronaut.messaging.annotation.Header; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Headers; -import javax.inject.Singleton; import java.util.Optional; /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeadersBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeadersBinder.java index 86a744e0c..024f2413e 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeadersBinder.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeadersBinder.java @@ -19,9 +19,9 @@ import io.micronaut.core.convert.ArgumentConversionContext; import io.micronaut.core.type.Argument; import io.micronaut.messaging.MessageHeaders; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.ConsumerRecord; -import javax.inject.Singleton; import java.util.Optional; /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaKeyBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaKeyBinder.java index 1745d459d..3c5f7f54d 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaKeyBinder.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaKeyBinder.java @@ -18,9 +18,9 @@ import io.micronaut.configuration.kafka.annotation.KafkaKey; import io.micronaut.core.convert.ArgumentConversionContext; import io.micronaut.core.convert.ConversionService; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.ConsumerRecord; -import javax.inject.Singleton; import java.util.Optional; /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java index 0ea5023c1..3d031fc1a 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java @@ -20,12 +20,16 @@ import io.micronaut.core.convert.ConversionService; import io.micronaut.messaging.annotation.Header; import io.micronaut.messaging.annotation.MessageHeader; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Headers; -import javax.inject.Singleton; import java.util.Optional; +/** + * Binds message headers. + * @param The generic type + */ @Singleton public class KafkaMessageHeaderBinder implements AnnotatedConsumerRecordBinder { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessagingBodyBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessagingBodyBinder.java index cf49719ab..37efec76c 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessagingBodyBinder.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessagingBodyBinder.java @@ -18,9 +18,9 @@ import io.micronaut.core.convert.ConversionService; import io.micronaut.core.convert.ArgumentConversionContext; import io.micronaut.messaging.annotation.Body; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.ConsumerRecord; -import javax.inject.Singleton; import java.util.Optional; /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaBodyBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaPartitionBinder.java similarity index 68% rename from kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaBodyBinder.java rename to kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaPartitionBinder.java index 3e114447e..b9649c1a9 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaBodyBinder.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaPartitionBinder.java @@ -15,33 +15,33 @@ */ package io.micronaut.configuration.kafka.bind; +import io.micronaut.configuration.kafka.annotation.KafkaPartition; import io.micronaut.core.convert.ArgumentConversionContext; import io.micronaut.core.convert.ConversionService; -import io.micronaut.http.annotation.Body; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.ConsumerRecord; -import javax.inject.Singleton; import java.util.Optional; /** - * The default binder that binds the body of a ConsumerRecord. + * Binder for binding the parameters that is designated the {@link KafkaPartition}. * - * @param The target generic type - * @author Graeme Rocher - * @since 1.0 + * @param The target type + * @author André Prata + * @since 3.3.4 */ @Singleton -public class KafkaBodyBinder implements AnnotatedConsumerRecordBinder { +public class KafkaPartitionBinder implements AnnotatedConsumerRecordBinder { @Override - public Class annotationType() { - return Body.class; + public Class annotationType() { + return KafkaPartition.class; } @Override public BindingResult bind(ArgumentConversionContext context, ConsumerRecord source) { - Object value = source.value(); - Optional converted = ConversionService.SHARED.convert(value, context); + Object partition = source.partition(); + Optional converted = ConversionService.SHARED.convert(partition, context); return () -> converted; } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java index 266f09a78..a4192582f 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java @@ -24,11 +24,11 @@ import io.micronaut.core.convert.ConversionService; import io.micronaut.core.type.Argument; import io.reactivex.Flowable; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.reactivestreams.Publisher; -import javax.inject.Singleton; import java.util.ArrayList; import java.util.List; import java.util.Optional; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java index 51c427182..824095784 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java @@ -38,11 +38,6 @@ public abstract class AbstractKafkaConfiguration implements Toggleable { */ public static final String PREFIX = "kafka"; - /** - * The property to use to enable embedded Kafka. - */ - public static final String EMBEDDED = "kafka.embedded.enabled"; - /** * The topics that should be created. */ diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/config/DefaultKafkaConsumerConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/config/DefaultKafkaConsumerConfiguration.java index ae08534ba..ffd6f8d0b 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/config/DefaultKafkaConsumerConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/config/DefaultKafkaConsumerConfiguration.java @@ -18,8 +18,8 @@ import io.micronaut.context.annotation.Primary; import io.micronaut.context.annotation.Prototype; import io.micronaut.context.annotation.Requires; +import jakarta.inject.Inject; -import javax.inject.Inject; import java.util.Properties; /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/config/DefaultKafkaProducerConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/config/DefaultKafkaProducerConfiguration.java index 5112e17b0..8cc2cd5e5 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/config/DefaultKafkaProducerConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/config/DefaultKafkaProducerConfiguration.java @@ -18,8 +18,8 @@ import io.micronaut.context.annotation.Primary; import io.micronaut.context.annotation.Prototype; import io.micronaut.context.annotation.Requires; +import jakarta.inject.Inject; -import javax.inject.Inject; import java.util.Properties; /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/convert/KafkaHeaderConverter.java b/kafka/src/main/java/io/micronaut/configuration/kafka/convert/KafkaHeaderConverter.java index 1ff128dc7..28c2a5c64 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/convert/KafkaHeaderConverter.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/convert/KafkaHeaderConverter.java @@ -20,7 +20,7 @@ import io.micronaut.core.convert.TypeConverter; import org.apache.kafka.common.header.Header; -import javax.inject.Singleton; +import jakarta.inject.Singleton; import java.util.Optional; /** diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbedded.java b/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbedded.java deleted file mode 100644 index ac329bfa2..000000000 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbedded.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.configuration.kafka.embedded; - -import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration; -import io.micronaut.context.annotation.Requires; -import io.micronaut.context.env.Environment; -import io.micronaut.context.event.BeanCreatedEvent; -import io.micronaut.context.event.BeanCreatedEventListener; -import io.micronaut.context.exceptions.ConfigurationException; -import io.micronaut.core.io.socket.SocketUtils; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.TestUtils; -import kafka.zk.EmbeddedZookeeper; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.utils.MockTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.PreDestroy; -import javax.inject.Singleton; -import java.io.IOException; -import java.nio.file.Files; -import java.util.List; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -/** - * This class will configure a Kafka server for the test environment if no server is already available. - * - * @author Graeme Rocher - * @since 1.0 - * @deprecated Embedded Kafka is deprecated. For Testing please use Test Containers instead: https://www.testcontainers.org/modules/kafka/ - */ -@Singleton -@Requires(env = {Environment.TEST, Environment.DEVELOPMENT}) -@Requires(classes = {KafkaServer.class, TestUtils.class, org.apache.kafka.test.TestUtils.class}) -@Requires(property = AbstractKafkaConfiguration.EMBEDDED) -@Deprecated -public class KafkaEmbedded implements BeanCreatedEventListener, AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaEmbedded.class); - - private EmbeddedZookeeper zkServer; - private KafkaServer kafkaServer; - - private final KafkaEmbeddedConfiguration embeddedConfiguration; - private final AtomicBoolean init = new AtomicBoolean(false); - - /** - * Construct a new instance. - * - * @param embeddedConfiguration The {@link KafkaEmbeddedConfiguration} - */ - public KafkaEmbedded(KafkaEmbeddedConfiguration embeddedConfiguration) { - this.embeddedConfiguration = embeddedConfiguration; - } - - @Override - public synchronized AbstractKafkaConfiguration onCreated(BeanCreatedEvent event) { - - LOG.warn("Embedded Kafka is deprecated. For Testing please use Test Containers instead: https://www.testcontainers.org/modules/kafka/"); - - AbstractKafkaConfiguration config = event.getBean(); - if (kafkaServer != null) { - return config; - } - - String bootstrapServer = config.getConfig().getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); - - String[] hostAndPort = bootstrapServer.split(",")[0].split(":"); - int kafkaPort = -1; - if (hostAndPort.length == 2) { - try { - kafkaPort = Integer.parseInt(hostAndPort[1]); - } catch (NumberFormatException e) { - return config; - } - } - - boolean randomPort = kafkaPort == -1; - if (embeddedConfiguration.isEnabled()) { - int retries = 0; - do { - // only handle localhost - final int targetPort = randomPort ? SocketUtils.findAvailableTcpPort() : kafkaPort; - if (kafkaServer == null && - targetPort > -1 && - SocketUtils.isTcpPortAvailable(targetPort) && - init.compareAndSet(false, true)) { - try { - if (zkServer == null) { - initZooKeeper(); - } - - // setup Broker - Properties brokerProps = embeddedConfiguration.getProperties(); - String zkConnect = "127.0.0.1:" + zkServer.port(); - - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.putIfAbsent("broker.id", "0"); - - brokerProps.put("port", targetPort); - brokerProps.putIfAbsent("offsets.topic.replication.factor" , "1"); - - brokerProps.computeIfAbsent("log.dirs", o -> { - try { - return Files.createTempDirectory("kafka-").toAbsolutePath().toString(); - } catch (IOException e) { - throw new ConfigurationException("Error creating log directory for embedded Kafka server: " + e.getMessage(), e); - } - }); - - brokerProps.setProperty( - "listeners", - "PLAINTEXT://127.0.0.1:" + targetPort - ); - KafkaConfig kafkaConfig = new KafkaConfig(brokerProps); - this.kafkaServer = TestUtils.createServer(kafkaConfig, new MockTime()); - final Integer numPartitions = kafkaConfig.numPartitions(); - LOG.info("Started Embedded Kafka on Port: {}", targetPort); - - createTopics(targetPort, numPartitions); - return config; - } catch (Throwable e) { - // check server not already running - if (!e.getMessage().contains("Address already in use")) { - throw new ConfigurationException("Error starting embedded Kafka server: " + e.getMessage(), e); - - } - retries++; - } - } - - } while (retries < 3); - throw new ConfigurationException("Error starting embedded Kafka server. Could not start after attempting port binding several times"); - } else { - if (kafkaPort > -1) { - try { - createTopics(kafkaPort, 1); - } catch (Throwable e) { - throw new ConfigurationException("Error creating Kafka Topics: " + e.getMessage(), e); - } - } - return config; - } - - } - - private void createTopics(int targetPort, Integer numPartitions) throws InterruptedException, java.util.concurrent.ExecutionException { - List topics = embeddedConfiguration.getTopics(); - - LOG.debug("Creating Kafka Topics in Embedded Kafka: {}", topics); - if (!topics.isEmpty()) { - Properties properties = new Properties(); - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, ("127.0.0.1:" + targetPort)); - AdminClient adminClient = AdminClient.create(properties); - final CreateTopicsResult result = adminClient.createTopics(topics.stream().map(s -> - new NewTopic(s, numPartitions, (short) 1)).collect(Collectors.toList()) - ); - result.all().get(); - - LOG.info("Created Kafka Topics in Embedded Kafka: {}", topics); - } - } - - @Override - @PreDestroy - public void close() { - new Thread(() -> { - if (kafkaServer != null) { - try { - kafkaServer.shutdown(); - } catch (Exception e) { - LOG.warn("Error shutting down embedded Kafka Server: {}", e.getMessage(), e); - } - } - if (zkServer != null) { - try { - zkServer.shutdown(); - } catch (Exception e) { - LOG.warn("Error shutting down embedded ZooKeeper: {}", e.getMessage(), e); - } - } - }, "embedded-kafka-shutdown-thread").start(); - } - - /** - * Return the configured Kafka server is it was configured. - * - * @return An optional {@link KafkaServer} - */ - public Optional getKafkaServer() { - return Optional.ofNullable(kafkaServer); - } - - /** - * Returns the port Zookeeper is running on if it was created. - * @return The Zookeeper port - */ - public Optional getZkPort() { - if (zkServer != null) { - return Optional.of(zkServer.port()); - } else { - return Optional.empty(); - } - } - - private void initZooKeeper() { - zkServer = new EmbeddedZookeeper(); - } -} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedConfiguration.java deleted file mode 100644 index f3b766f88..000000000 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedConfiguration.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.configuration.kafka.embedded; - -import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration; -import io.micronaut.context.annotation.ConfigurationProperties; -import io.micronaut.core.util.Toggleable; - -import io.micronaut.core.annotation.NonNull; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -/** - * Configuration for the embedded Kafka server. - * - * @author Graeme Rocher - * @since 1.0 - */ -@ConfigurationProperties(KafkaEmbeddedConfiguration.PREFIX) -public class KafkaEmbeddedConfiguration implements Toggleable { - /** - * The prefix used for configuration. - */ - @SuppressWarnings("WeakerAccess") - public static final String PREFIX = AbstractKafkaConfiguration.PREFIX + ".embedded"; - - /** - * The default enable value. - */ - @SuppressWarnings("WeakerAccess") - public static final boolean DEFAULT_ENABLED = false; - - private boolean enabled = DEFAULT_ENABLED; - private List topics = new ArrayList<>(); - private Properties properties = new Properties(); - - @Override - public boolean isEnabled() { - return enabled; - } - - /** - * Sets whether the embedded Kafka server is enabled. Default value ({@value #DEFAULT_ENABLED}). - * - * @param enabled True if it is. - */ - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - /** - * @return The broker properties. - */ - public @NonNull Properties getProperties() { - return properties; - } - - /** - * Sets the broker properties. - * - * @param properties The broker properties. - */ - public void setProperties(Properties properties) { - if (properties != null) { - this.properties = properties; - } - } - - /** - * @return The topics that should be created by the embedded instance - */ - public List getTopics() { - return topics; - } - - /** - * The topics that should be created by the embedded instance. - * @param topics The topic names - */ - public void setTopics(List topics) { - if (topics != null) { - this.topics = topics; - } - } -} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/package-info.java b/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/package-info.java deleted file mode 100644 index 712431001..000000000 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Support classes for embedding Kafka within applications and tests. - * - * @author graemerocher - * @since 1.0 - */ -package io.micronaut.configuration.kafka.embedded; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java b/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java index 6dd5ec6f6..6981fd2aa 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java @@ -16,6 +16,7 @@ package io.micronaut.configuration.kafka.exceptions; import io.micronaut.context.annotation.Primary; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; @@ -24,7 +25,6 @@ import org.slf4j.LoggerFactory; import io.micronaut.core.annotation.NonNull; -import javax.inject.Singleton; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/executor/ConsumerExecutorServiceConfig.java b/kafka/src/main/java/io/micronaut/configuration/kafka/executor/ConsumerExecutorServiceConfig.java index 1da621deb..1a98fbb26 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/executor/ConsumerExecutorServiceConfig.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/executor/ConsumerExecutorServiceConfig.java @@ -22,9 +22,8 @@ import io.micronaut.scheduling.executor.ExecutorConfiguration; import io.micronaut.scheduling.executor.ExecutorType; import io.micronaut.scheduling.executor.UserExecutorConfiguration; - -import javax.inject.Named; -import javax.inject.Singleton; +import jakarta.inject.Named; +import jakarta.inject.Singleton; /** * Configures a {@link java.util.concurrent.ScheduledExecutorService} for running {@link io.micronaut.configuration.kafka.annotation.KafkaListener} instances. diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java b/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java index 3b73c45b9..df39fe476 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java @@ -23,6 +23,7 @@ import io.micronaut.management.health.indicator.HealthIndicator; import io.micronaut.management.health.indicator.HealthResult; import io.reactivex.Flowable; +import jakarta.inject.Singleton; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; @@ -32,7 +33,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; -import javax.inject.Singleton; import java.util.Collection; import java.util.Collections; import java.util.Map; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java index eecc64731..2e2daf7c6 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java @@ -16,10 +16,13 @@ package io.micronaut.configuration.kafka.intercept; import io.micronaut.aop.InterceptedMethod; +import io.micronaut.aop.InterceptorBean; import io.micronaut.aop.MethodInterceptor; import io.micronaut.aop.MethodInvocationContext; import io.micronaut.configuration.kafka.annotation.KafkaClient; import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.KafkaPartition; +import io.micronaut.configuration.kafka.annotation.KafkaPartitionKey; import io.micronaut.configuration.kafka.annotation.KafkaTimestamp; import io.micronaut.configuration.kafka.annotation.Topic; import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration; @@ -51,6 +54,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; @@ -58,7 +62,6 @@ import io.micronaut.core.annotation.Nullable; import javax.annotation.PreDestroy; -import javax.inject.Singleton; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -73,6 +76,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * Implementation of the {@link io.micronaut.configuration.kafka.annotation.KafkaClient} advice annotation. @@ -81,7 +85,7 @@ * @see io.micronaut.configuration.kafka.annotation.KafkaClient * @since 1.0 */ -@Singleton +@InterceptorBean(KafkaClient.class) public class KafkaClientIntroductionAdvice implements MethodInterceptor, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(KafkaClientIntroductionAdvice.class); @@ -111,9 +115,11 @@ public KafkaClientIntroductionAdvice( public final Object intercept(MethodInvocationContext context) { if (context.hasAnnotation(KafkaClient.class)) { - AnnotationValue client = context.findAnnotation(KafkaClient.class).orElseThrow(() -> new IllegalStateException("No @KafkaClient annotation present on method: " + context)); + if (!context.hasAnnotation(KafkaClient.class)) { + throw new IllegalStateException("No @KafkaClient annotation present on method: " + context); + } - boolean isBatchSend = client.isTrue("batch"); + boolean isBatchSend = context.isTrue(KafkaClient.class, "batch"); String topic = context.stringValue(Topic.class) .orElse(null); @@ -142,6 +148,7 @@ public final Object intercept(MethodInvocationContext context) { Object key = null; Object value = null; Long timestampArgument = null; + Function partitionSupplier = producer -> null; for (int i = 0; i < arguments.length; i++) { Argument argument = arguments[i]; if (ProducerRecord.class.isAssignableFrom(argument.getType()) || argument.isAnnotationPresent(Body.class)) { @@ -160,6 +167,22 @@ public final Object intercept(MethodInvocationContext context) { if (o instanceof Long) { timestampArgument = (Long) o; } + } else if (argument.isAnnotationPresent(KafkaPartition.class)) { + Object o = parameterValues[i]; + if (o != null && Integer.class.isAssignableFrom(o.getClass())) { + partitionSupplier = __ -> (Integer) o; + } + } else if (argument.isAnnotationPresent(KafkaPartitionKey.class)) { + String finalTopic = topic; + Object partitionKey = parameterValues[i]; + if (partitionKey != null) { + Serializer serializer = serdeRegistry.pickSerializer(argument); + if (serializer == null) { + serializer = new ByteArraySerializer(); + } + byte[] partitionKeyBytes = serializer.serialize(finalTopic, parameterValues[i]); + partitionSupplier = producer -> Utils.toPositive(Utils.murmur2(partitionKeyBytes)) % producer.partitionsFor(finalTopic).size(); + } } else if (argument.isAnnotationPresent(io.micronaut.messaging.annotation.Header.class)) { final AnnotationMetadata annotationMetadata = argument.getAnnotationMetadata(); String argumentName = argument.getName(); @@ -224,7 +247,8 @@ public final Object intercept(MethodInvocationContext context) { Producer kafkaProducer = getProducer(bodyArgument, keyArgument, context); - Long timestamp = client.isTrue("timestamp") ? Long.valueOf(System.currentTimeMillis()) : timestampArgument; + Integer partition = partitionSupplier.apply(kafkaProducer); + Long timestamp = context.isTrue(KafkaClient.class, "timestamp") ? Long.valueOf(System.currentTimeMillis()) : timestampArgument; Duration maxBlock = context.getValue(KafkaClient.class, "maxBlock", Duration.class) .orElse(null); @@ -253,6 +277,7 @@ public final Object intercept(MethodInvocationContext context) { kafkaHeaders, reactiveTypeValue, key, + partition, value, timestamp, maxBlock); @@ -291,7 +316,7 @@ public void onComplete() { }); } else { - ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, value, timestamp); + ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp); if (LOG.isTraceEnabled()) { LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record); } @@ -328,6 +353,7 @@ public void onComplete() { kafkaHeaders, reactiveTypeValue, key, + partition, value, timestamp, maxBlock); @@ -351,13 +377,14 @@ public void onComplete() { String finalTopic = topic; Argument finalBodyArgument = bodyArgument; Object finalKey = key; + Integer finalPartition = partition; Argument finalReactiveTypeValue = reactiveTypeValue; returnFlowable = bodyEmitter.flatMap(o -> - buildSendFlowable(context, finalTopic, finalBodyArgument, kafkaProducer, kafkaHeaders, finalKey, o, timestamp, finalReactiveTypeValue) + buildSendFlowable(context, finalTopic, finalBodyArgument, kafkaProducer, kafkaHeaders, finalKey, finalPartition, o, timestamp, finalReactiveTypeValue) ); } else { - returnFlowable = buildSendFlowable(context, topic, bodyArgument, kafkaProducer, kafkaHeaders, key, value, timestamp, reactiveTypeValue); + returnFlowable = buildSendFlowable(context, topic, bodyArgument, kafkaProducer, kafkaHeaders, key, partition, value, timestamp, reactiveTypeValue); } } return interceptedMethod.handleResult(returnFlowable); @@ -373,6 +400,7 @@ public void onComplete() { kafkaHeaders, returnTypeArgument, key, + partition, value, timestamp, maxBlock @@ -401,7 +429,7 @@ public void onComplete() { List results = new ArrayList(); for (Object o : batchValue) { - ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, o, timestamp); + ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, o, timestamp); if (LOG.isTraceEnabled()) { LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record); @@ -425,7 +453,7 @@ public void onComplete() { } }); } - ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, value, timestamp); + ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp); LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record); @@ -484,11 +512,12 @@ private Flowable buildSendFlowable( Producer kafkaProducer, List
kafkaHeaders, Object key, + Integer partition, Object value, Long timestamp, Argument reactiveValueType) { Flowable returnFlowable; - ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, value, timestamp); + ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp); returnFlowable = Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> { if (exception != null) { emitter.onError(wrapException(context, exception)); @@ -515,6 +544,7 @@ private Flowable buildSendFlowable( List
kafkaHeaders, Argument returnType, Object key, + Integer partition, Object value, Long timestamp, Duration maxBlock) { @@ -527,7 +557,7 @@ private Flowable buildSendFlowable( Class finalJavaReturnType = javaReturnType; Flowable sendFlowable = valueFlowable.flatMap(o -> { - ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, o, timestamp); + ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, o, timestamp); LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record); @@ -565,10 +595,10 @@ private MessagingClientException wrapException(MethodInvocationContext kafkaHeaders, Object key, Object value, Long timestamp) { + private ProducerRecord buildProducerRecord(String topic, Integer partition, List
kafkaHeaders, Object key, Object value, Long timestamp) { return new ProducerRecord( topic, - null, + partition, timestamp, key, value, @@ -579,7 +609,7 @@ private ProducerRecord buildProducerRecord(String topic, List
kafkaHeade @SuppressWarnings("unchecked") private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgument, AnnotationMetadata metadata) { Class keyType = keyArgument != null ? keyArgument.getType() : byte[].class; - String clientId = metadata.getValue(KafkaClient.class, String.class).orElse(null); + String clientId = metadata.stringValue(KafkaClient.class).orElse(null); ProducerKey key = new ProducerKey(keyType, bodyArgument.getType(), clientId); return producerMap.computeIfAbsent(key, producerKey -> { String producerId = producerKey.id; @@ -611,7 +641,7 @@ private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgume String.valueOf(maxBlock.toMillis()) )); - Integer ack = metadata.getValue(KafkaClient.class, "acks", Integer.class).orElse(KafkaClient.Acknowledge.DEFAULT); + Integer ack = metadata.intValue(KafkaClient.class, "acks").orElse(KafkaClient.Acknowledge.DEFAULT); if (ack != KafkaClient.Acknowledge.DEFAULT) { String acksValue = ack == -1 ? "all" : String.valueOf(ack); @@ -660,7 +690,7 @@ private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgume /** * Key used to cache {@link org.apache.kafka.clients.producer.Producer} instances. */ - private final class ProducerKey { + private static final class ProducerKey { final Class keyType; final Class valueType; final String id; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index 79a87328f..16eb71468 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -15,7 +15,6 @@ */ package io.micronaut.configuration.kafka.processor; -import io.micronaut.configuration.kafka.Acknowledgement; import io.micronaut.configuration.kafka.ConsumerAware; import io.micronaut.configuration.kafka.ConsumerRegistry; import io.micronaut.configuration.kafka.KafkaAcknowledgement; @@ -52,6 +51,7 @@ import io.micronaut.inject.BeanDefinition; import io.micronaut.inject.ExecutableMethod; import io.micronaut.inject.qualifiers.Qualifiers; +import io.micronaut.messaging.Acknowledgement; import io.micronaut.messaging.annotation.Body; import io.micronaut.messaging.annotation.SendTo; import io.micronaut.messaging.exceptions.MessagingSystemException; @@ -62,6 +62,8 @@ import io.reactivex.Scheduler; import io.reactivex.functions.Function; import io.reactivex.schedulers.Schedulers; +import jakarta.inject.Named; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -83,8 +85,6 @@ import io.micronaut.core.annotation.NonNull; import javax.annotation.PreDestroy; -import javax.inject.Named; -import javax.inject.Singleton; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -112,7 +112,7 @@ @Singleton @Requires(beans = KafkaDefaultConfiguration.class) public class KafkaConsumerProcessor - implements ExecutableMethodProcessor, AutoCloseable, ConsumerRegistry { + implements ExecutableMethodProcessor, AutoCloseable, ConsumerRegistry { private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class); @@ -385,7 +385,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod method, f .filter(arg -> Consumer.class.isAssignableFrom(arg.getType())) .findFirst(); final Optional ackArg = Arrays.stream(method.getArguments()) - .filter(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()) || io.micronaut.messaging.Acknowledgement.class.isAssignableFrom(arg.getType())) + .filter(arg -> Acknowledgement.class.isAssignableFrom(arg.getType())) .findFirst(); try { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/scope/KafkaClientScope.java b/kafka/src/main/java/io/micronaut/configuration/kafka/scope/KafkaClientScope.java deleted file mode 100644 index c3285d8e3..000000000 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/scope/KafkaClientScope.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.configuration.kafka.scope; - -import io.micronaut.configuration.kafka.ProducerRegistry; -import io.micronaut.configuration.kafka.annotation.KafkaClient; -import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration; -import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration; -import io.micronaut.configuration.kafka.serde.SerdeRegistry; -import io.micronaut.context.BeanContext; -import io.micronaut.context.BeanResolutionContext; -import io.micronaut.context.LifeCycle; -import io.micronaut.context.exceptions.DependencyInjectionException; -import io.micronaut.context.scope.CustomScope; -import io.micronaut.core.annotation.AnnotationValue; -import io.micronaut.core.type.Argument; -import io.micronaut.core.util.StringUtils; -import io.micronaut.inject.BeanDefinition; -import io.micronaut.inject.BeanIdentifier; -import io.micronaut.inject.ParametrizedProvider; -import io.micronaut.inject.qualifiers.Qualifiers; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.micronaut.core.annotation.NonNull; -import io.micronaut.core.annotation.Nullable; -import javax.inject.Provider; -import javax.inject.Singleton; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Supplier; - -/** - * A scope implementation for injecting {@link Producer} instances. - * - * @author Graeme Rocher - * @since 1.0 - */ -@SuppressWarnings("unused") -@Singleton -public class KafkaClientScope implements CustomScope, LifeCycle, ProducerRegistry { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaClientScope.class); - private final Map clients = new ConcurrentHashMap<>(); - - private final BeanContext beanContext; - private final SerdeRegistry serdeRegistry; - - /** - * Constructs a new client scope. - * - * @param beanContext The bean context - * @param serdeRegistry The serde registry - */ - public KafkaClientScope( - BeanContext beanContext, - SerdeRegistry serdeRegistry) { - this.beanContext = beanContext; - this.serdeRegistry = serdeRegistry; - } - - @Override - public boolean isRunning() { - return true; - } - - @Override - public Class annotationType() { - return KafkaClient.class; - } - - @Override - public T get(BeanResolutionContext resolutionContext, BeanDefinition beanDefinition, BeanIdentifier identifier, Provider provider) { - BeanResolutionContext.Segment segment = resolutionContext.getPath().currentSegment().orElseThrow(() -> - new IllegalStateException("@KafkaClient used in invalid location") - ); - Argument argument = segment.getArgument(); - AnnotationValue annotation = argument.findAnnotation(KafkaClient.class) - .orElseThrow(() -> new DependencyInjectionException(resolutionContext, argument, "KafkaClientScope called for injection point that is not annotated with @KafkaClient")); - if (!Producer.class.isAssignableFrom(argument.getType())) { - throw new DependencyInjectionException(resolutionContext, argument, "@KafkaClient used on type that is not a " + Producer.class.getName()); - } - if (!(provider instanceof ParametrizedProvider)) { - throw new DependencyInjectionException(resolutionContext, argument, "KafkaClientScope called with invalid bean provider"); - } - - Optional> k = argument.getTypeVariable("K"); - Optional> v = argument.getTypeVariable("V"); - - if (!k.isPresent() || !v.isPresent()) { - throw new DependencyInjectionException(resolutionContext, argument, "@KafkaClient used on type missing generic argument values for Key and Value"); - - } - - String id = annotation.getValue(String.class).orElse(null); - Argument keyArgument = k.get(); - Argument valueArgument = v.get(); - return getKafkaProducer(id, keyArgument, valueArgument); - } - - @NonNull - @Override - public Producer getProducer(String id, Argument keyType, Argument valueType) { - return getKafkaProducer(id, keyType, valueType); - } - - @SuppressWarnings("unchecked") - private T getKafkaProducer(@Nullable String id, Argument keyType, Argument valueType) { - ClientKey key = new ClientKey( - id, - keyType.getType(), - valueType.getType() - ); - - return (T) clients.computeIfAbsent(key, clientKey -> { - Supplier defaultResolver = () -> beanContext.getBean(AbstractKafkaProducerConfiguration.class); - AbstractKafkaProducerConfiguration config; - boolean hasId = StringUtils.isNotEmpty(id); - if (hasId) { - config = beanContext.findBean( - AbstractKafkaProducerConfiguration.class, - Qualifiers.byName(id) - ).orElseGet(defaultResolver); - } else { - config = defaultResolver.get(); - } - - DefaultKafkaProducerConfiguration newConfig = new DefaultKafkaProducerConfiguration(config); - - Properties properties = newConfig.getConfig(); - if (!properties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { - Serializer keySerializer = serdeRegistry.pickSerializer(keyType); - newConfig.setKeySerializer(keySerializer); - } - - if (!properties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { - Serializer valueSerializer = serdeRegistry.pickSerializer(valueType); - newConfig.setValueSerializer(valueSerializer); - } - - if (hasId) { - properties.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, id); - } - return beanContext.createBean(Producer.class, newConfig); - }); - } - - @Override - public KafkaClientScope stop() { - for (Producer producer : clients.values()) { - try { - producer.close(); - } catch (Exception e) { - LOG.warn("Error shutting down Kafka producer: {}", e.getMessage(), e); - } - } - clients.clear(); - return this; - } - - @Override - public Optional remove(BeanIdentifier identifier) { - return Optional.empty(); - } - - /** - * key for retrieving built producers. - * - * @author Graeme Rocher - * @since 1.0 - */ - private class ClientKey { - private final String id; - private final Class keyType; - private final Class valueType; - - ClientKey(String id, Class keyType, Class valueType) { - this.id = id; - this.keyType = keyType; - this.valueType = valueType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ClientKey clientKey = (ClientKey) o; - return Objects.equals(id, clientKey.id) && - Objects.equals(keyType, clientKey.keyType) && - Objects.equals(valueType, clientKey.valueType); - } - - @Override - public int hashCode() { - - return Objects.hash(id, keyType, valueType); - } - } -} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/scope/package-info.java b/kafka/src/main/java/io/micronaut/configuration/kafka/scope/package-info.java deleted file mode 100644 index 430964d2a..000000000 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/scope/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2017-2020 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Contains the scope implementation for {@link io.micronaut.configuration.kafka.annotation.KafkaClient}. - * - * @author graemerocher - * @since 1.0 - */ -package io.micronaut.configuration.kafka.scope; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/serde/CompositeSerdeRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/serde/CompositeSerdeRegistry.java index a9d45898a..0db4d7743 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/serde/CompositeSerdeRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/serde/CompositeSerdeRegistry.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.serialization.Serdes; import io.micronaut.core.annotation.NonNull; -import javax.inject.Singleton; +import jakarta.inject.Singleton; import java.util.*; import java.util.concurrent.ConcurrentHashMap; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerde.java b/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerde.java index 7b11ad82c..c5b6ea0ab 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerde.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerde.java @@ -18,13 +18,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.micronaut.context.annotation.Parameter; import io.micronaut.context.annotation.Prototype; +import io.micronaut.core.annotation.Creator; import io.micronaut.core.serialize.exceptions.SerializationException; import io.micronaut.jackson.serialize.JacksonObjectSerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; -import javax.inject.Inject; import java.util.Map; /** @@ -47,7 +47,7 @@ public class JsonSerde implements Serializer, Deserializer, Serde { * @param objectSerializer The {@link JacksonObjectSerializer} * @param type The target type */ - @Inject + @Creator public JsonSerde(JacksonObjectSerializer objectSerializer, @Parameter Class type) { this.objectSerializer = objectSerializer; this.type = type; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerdeRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerdeRegistry.java index 56ee5011a..6397202de 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerdeRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerdeRegistry.java @@ -17,10 +17,10 @@ import io.micronaut.context.BeanContext; import io.micronaut.core.reflect.ClassUtils; +import jakarta.inject.Singleton; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import javax.inject.Singleton; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/KafkaConsumerTracingInstrumentation.java b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/KafkaConsumerTracingInstrumentation.java index 6e58a148a..b2bad9e73 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/KafkaConsumerTracingInstrumentation.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/KafkaConsumerTracingInstrumentation.java @@ -16,15 +16,15 @@ package io.micronaut.configuration.kafka.tracing; import io.micronaut.configuration.kafka.tracing.brave.BraveKafkaConsumerTracingInstrumentation; +import io.micronaut.context.BeanProvider; import io.micronaut.context.annotation.Requires; import io.micronaut.context.event.BeanCreatedEvent; import io.micronaut.context.event.BeanCreatedEventListener; import io.opentracing.Tracer; import io.opentracing.contrib.kafka.TracingKafkaConsumer; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.Consumer; -import javax.inject.Provider; -import javax.inject.Singleton; /** * Instruments Kafka consumers with Open Tracing support. @@ -38,13 +38,13 @@ @Requires(classes = TracingKafkaConsumer.class) public class KafkaConsumerTracingInstrumentation implements BeanCreatedEventListener> { - private final Provider tracerProvider; + private final BeanProvider tracerProvider; /** * Default constructor. * @param tracerProvider The tracer provider */ - protected KafkaConsumerTracingInstrumentation(Provider tracerProvider) { + protected KafkaConsumerTracingInstrumentation(BeanProvider tracerProvider) { this.tracerProvider = tracerProvider; } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/KafkaProducerTracingInstrumentation.java b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/KafkaProducerTracingInstrumentation.java index 90467bda2..0fea06963 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/KafkaProducerTracingInstrumentation.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/KafkaProducerTracingInstrumentation.java @@ -16,16 +16,15 @@ package io.micronaut.configuration.kafka.tracing; import io.micronaut.configuration.kafka.tracing.brave.BraveKafkaProducerTracingInstrumentation; +import io.micronaut.context.BeanProvider; import io.micronaut.context.annotation.Requires; import io.micronaut.context.event.BeanCreatedEvent; import io.micronaut.context.event.BeanCreatedEventListener; import io.opentracing.Tracer; import io.opentracing.contrib.kafka.TracingKafkaProducer; +import jakarta.inject.Singleton; import org.apache.kafka.clients.producer.Producer; -import javax.inject.Provider; -import javax.inject.Singleton; - /** * Instruments Kafka producers with Open Tracing support. * @@ -38,19 +37,23 @@ @Requires(classes = TracingKafkaProducer.class) public class KafkaProducerTracingInstrumentation implements BeanCreatedEventListener> { - private final Provider tracerProvider; + private final BeanProvider tracerProvider; /** * Default constructor. * @param tracerProvider The tracer provider */ - protected KafkaProducerTracingInstrumentation(Provider tracerProvider) { + protected KafkaProducerTracingInstrumentation(BeanProvider tracerProvider) { this.tracerProvider = tracerProvider; } @Override public Producer onCreated(BeanCreatedEvent> event) { final Producer kafkaProducer = event.getBean(); - return new TracingKafkaProducer<>(kafkaProducer, tracerProvider.get()); + if (kafkaProducer instanceof TracingKafkaProducer) { + return kafkaProducer; + } else { + return new TracingKafkaProducer<>(kafkaProducer, tracerProvider.get()); + } } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaConsumerTracingInstrumentation.java b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaConsumerTracingInstrumentation.java index c5ba563fc..624113d1c 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaConsumerTracingInstrumentation.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaConsumerTracingInstrumentation.java @@ -19,9 +19,9 @@ import io.micronaut.context.annotation.Requires; import io.micronaut.context.event.BeanCreatedEvent; import io.micronaut.context.event.BeanCreatedEventListener; +import jakarta.inject.Singleton; import org.apache.kafka.clients.consumer.Consumer; -import javax.inject.Singleton; /** * Kafka consumer tracing instrumentation using Brave. diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaProducerTracingInstrumentation.java b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaProducerTracingInstrumentation.java index 19c6162f2..e4f9b5b3a 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaProducerTracingInstrumentation.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaProducerTracingInstrumentation.java @@ -19,9 +19,9 @@ import io.micronaut.context.annotation.Requires; import io.micronaut.context.event.BeanCreatedEvent; import io.micronaut.context.event.BeanCreatedEventListener; +import jakarta.inject.Singleton; import org.apache.kafka.clients.producer.Producer; -import javax.inject.Singleton; /** * Kafka producer tracing instrumentation using Brave. @@ -44,6 +44,7 @@ public BraveKafkaProducerTracingInstrumentation(KafkaTracing kafkaTracing) { @Override public Producer onCreated(BeanCreatedEvent> event) { - return kafkaTracing.producer(event.getBean()); + final Producer producer = event.getBean(); + return kafkaTracing.producer(producer); } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaTracingFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaTracingFactory.java index df5dee3fc..01c2c87de 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaTracingFactory.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaTracingFactory.java @@ -22,7 +22,7 @@ import io.micronaut.context.annotation.Requires; import io.micronaut.core.annotation.Nullable; -import javax.inject.Singleton; +import jakarta.inject.Singleton; /** * Brave's Kafka tracing factory. diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/AbstractKafkaContainerSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/AbstractKafkaContainerSpec.groovy index 1598a94c8..1de93aa55 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/AbstractKafkaContainerSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/AbstractKafkaContainerSpec.groovy @@ -7,7 +7,7 @@ import spock.lang.Shared abstract class AbstractKafkaContainerSpec extends AbstractKafkaSpec { - @Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer() + @Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer().withEnv(getEnvVariables()) @Shared @AutoCleanup ApplicationContext context void setupSpec() { @@ -17,4 +17,9 @@ abstract class AbstractKafkaContainerSpec extends AbstractKafkaSpec { ['kafka.bootstrap.servers': kafkaContainer.bootstrapServers] ) } + + protected Map getEnvVariables() { + return [:] + } + } diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaListenerSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaListenerSpec.groovy index d3162ee20..7d8137fa7 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaListenerSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaListenerSpec.groovy @@ -25,7 +25,7 @@ import spock.lang.AutoCleanup import spock.lang.Shared import spock.lang.Stepwise -import javax.annotation.Nullable +import io.micronaut.core.annotation.Nullable import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaPartitionSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaPartitionSpec.groovy new file mode 100644 index 000000000..6e4bfb785 --- /dev/null +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaPartitionSpec.groovy @@ -0,0 +1,182 @@ +package io.micronaut.configuration.kafka.annotation + +import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec +import io.micronaut.context.annotation.Requires + +import java.util.concurrent.ConcurrentHashMap + +import static io.micronaut.configuration.kafka.annotation.KafkaClient.Acknowledge.ALL +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST +import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS + +class KafkaPartitionSpec extends AbstractKafkaContainerSpec { + + public static final String TOPIC_WORDS = "KafkaPartitionSpec-words" + + protected Map getEnvVariables() { + super.envVariables + ["KAFKA_NUM_PARTITIONS": "3"] + } + + protected Map getConfiguration() { + super.configuration + [(EMBEDDED_TOPICS): [TOPIC_WORDS]] + } + + def "test client without partition"() { + given: + ClientWithoutPartition client = context.getBean(ClientWithoutPartition) + SentenceListener listener = context.getBean(SentenceListener) + listener.entries.clear() + + when: + client.sendSentence("key1", "sentence1") + client.sendSentence("key2", "sentence2") + client.sendSentence("key3", "sentence3") + + then: + conditions.eventually { + listener.entries.size() == 3 + listener.entries["sentence1"] == 2 // "key1" happens to result in this + listener.entries["sentence2"] == 2 // "key2" happens to result in this + listener.entries["sentence3"] == 1 // "key3" happens to result in this + } + } + + def "test client with integer partition"() { + given: + ClientWithIntegerPartition client = context.getBean(ClientWithIntegerPartition) + SentenceListener listener = context.getBean(SentenceListener) + listener.entries.clear() + + when: + client.sendSentence(1, "key1", "sentence1") + client.sendSentence(2, "key2", "sentence2") + client.sendSentence(2, "key3", "sentence3") + + then: + conditions.eventually { + listener.entries.size() == 3 + listener.entries["sentence1"] == 1 + listener.entries["sentence2"] == 2 + listener.entries["sentence3"] == 2 + } + } + + def "test client with integer partition null"() { + given: + ClientWithIntegerPartition client = context.getBean(ClientWithIntegerPartition) + SentenceListener listener = context.getBean(SentenceListener) + listener.entries.clear() + + when: + client.sendSentence(null, "key1", "sentence1") + client.sendSentence(null, "key2", "sentence2") + client.sendSentence(null, "key3", "sentence3") + + then: + conditions.eventually { + listener.entries.size() == 3 + listener.entries["sentence1"] == 2 // "key1" happens to result in this + listener.entries["sentence2"] == 2 // "key2" happens to result in this + listener.entries["sentence3"] == 1 // "key3" happens to result in this + } + } + + def "test client with int partition"() { + given: + ClientWithIntPartition client = context.getBean(ClientWithIntPartition) + SentenceListener listener = context.getBean(SentenceListener) + listener.entries.clear() + + when: + client.sendSentence(1, "key1", "sentence1") + client.sendSentence(2, "key2", "sentence2") + client.sendSentence(2, "key3", "sentence3") + + then: + conditions.eventually { + listener.entries.size() == 3 + listener.entries["sentence1"] == 1 + listener.entries["sentence2"] == 2 + listener.entries["sentence3"] == 2 + } + } + + def "test client with partition key"() { + given: + ClientWithPartitionKey client = context.getBean(ClientWithPartitionKey) + SentenceListener listener = context.getBean(SentenceListener) + listener.entries.clear() + + when: + client.sendSentence("par-key1", "key1", "sentence1") + client.sendSentence("par-key2", "key2", "sentence2") + client.sendSentence("par-key3", "key3", "sentence3") + + then: + conditions.eventually { + listener.entries.size() == 3 + listener.entries["sentence1"] == 2 // "par-key1" happens to result in this + listener.entries["sentence2"] == 0 // "par-key2" happens to result in this + listener.entries["sentence3"] == 2 // "par-key3" happens to result in this + } + } + + def "test client with partition key null"() { + given: + ClientWithPartitionKey client = context.getBean(ClientWithPartitionKey) + SentenceListener listener = context.getBean(SentenceListener) + listener.entries.clear() + + when: + client.sendSentence(null, "key1", "sentence1") + client.sendSentence(null, "key2", "sentence2") + client.sendSentence(null, "key3", "sentence3") + + then: + conditions.eventually { + listener.entries.size() == 3 + listener.entries["sentence1"] == 2 // "key1" happens to result in this + listener.entries["sentence2"] == 2 // "key2" happens to result in this + listener.entries["sentence3"] == 1 // "key3" happens to result in this + } + } + + @Requires(property = 'spec.name', value = 'KafkaPartitionSpec') + @KafkaClient(acks = ALL) + static interface ClientWithoutPartition { + @Topic(KafkaPartitionSpec.TOPIC_WORDS) + void sendSentence(@KafkaKey String key, String sentence) + } + + @Requires(property = 'spec.name', value = 'KafkaPartitionSpec') + @KafkaClient(acks = ALL) + static interface ClientWithIntPartition { + @Topic(KafkaPartitionSpec.TOPIC_WORDS) + void sendSentence(@KafkaPartition int partition, @KafkaKey String key, String sentence) + } + + @Requires(property = 'spec.name', value = 'KafkaPartitionSpec') + @KafkaClient(acks = ALL) + static interface ClientWithIntegerPartition { + @Topic(KafkaPartitionSpec.TOPIC_WORDS) + void sendSentence(@KafkaPartition Integer partition, @KafkaKey String key, String sentence) + } + + @Requires(property = 'spec.name', value = 'KafkaPartitionSpec') + @KafkaClient(acks = ALL) + static interface ClientWithPartitionKey { + @Topic(KafkaPartitionSpec.TOPIC_WORDS) + void sendSentence(@KafkaPartitionKey String partitionKey, @KafkaKey String key, String sentence) + } + + @Requires(property = 'spec.name', value = 'KafkaPartitionSpec') + @KafkaListener(offsetReset = EARLIEST) + static class SentenceListener { + ConcurrentHashMap entries = new ConcurrentHashMap<>() + + @Topic(KafkaPartitionSpec.TOPIC_WORDS) + void receive(@KafkaPartition int partition, String sentence) { + entries.put(sentence, partition) + } + } +} diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaProducerSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaProducerSpec.groovy index 5fa456f16..6100ebf2b 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaProducerSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaProducerSpec.groovy @@ -21,7 +21,7 @@ import org.testcontainers.containers.KafkaContainer import spock.lang.AutoCleanup import spock.lang.Shared -import javax.inject.Singleton +import jakarta.inject.Singleton import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicInteger diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaTypeConversionSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaTypeConversionSpec.groovy index 37f5d37be..dd2877af3 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaTypeConversionSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaTypeConversionSpec.groovy @@ -7,7 +7,7 @@ import io.micronaut.context.annotation.Requires import org.apache.kafka.common.errors.SerializationException import spock.lang.Stepwise -import javax.inject.Inject +import jakarta.inject.Inject import java.util.concurrent.ConcurrentHashMap import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/DocTests.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/DocTests.java index b9fd59489..2787214aa 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/DocTests.java +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/DocTests.java @@ -7,11 +7,12 @@ import org.junit.BeforeClass; import org.junit.Test; import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; public class DocTests { static ApplicationContext applicationContext; - static KafkaContainer kafkaContainer = new KafkaContainer(); + static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")); @BeforeClass public static void setup() { diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/offsets/ack/ProductListener.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/offsets/ack/ProductListener.java index 6dba31230..f7a6125e4 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/offsets/ack/ProductListener.java +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/offsets/ack/ProductListener.java @@ -1,11 +1,11 @@ package io.micronaut.configuration.kafka.docs.consumer.offsets.ack; -import io.micronaut.configuration.kafka.Acknowledgement; import io.micronaut.configuration.kafka.annotation.KafkaListener; import io.micronaut.configuration.kafka.annotation.OffsetReset; import io.micronaut.configuration.kafka.annotation.OffsetStrategy; import io.micronaut.configuration.kafka.annotation.Topic; import io.micronaut.configuration.kafka.docs.consumer.config.Product; +import io.micronaut.messaging.Acknowledgement; class ProductListener { diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/inject/BookSender.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/inject/BookSender.java index 9923fd898..ebe37bf40 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/inject/BookSender.java +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/inject/BookSender.java @@ -7,7 +7,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import javax.inject.Singleton; +import jakarta.inject.Singleton; import java.util.concurrent.Future; // end::imports[] diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/inject/BookSenderTest.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/inject/BookSenderTest.java index 056ab8b50..d01dff7d4 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/inject/BookSenderTest.java +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/inject/BookSenderTest.java @@ -4,6 +4,8 @@ import io.micronaut.configuration.kafka.docs.consumer.batch.Book; import io.micronaut.context.ApplicationContext; import org.junit.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; import java.util.Collections; import java.util.Map; @@ -13,16 +15,21 @@ public class BookSenderTest { // tag::test[] @Test public void testBookSender() { - Map config = Collections.singletonMap( // <1> - AbstractKafkaConfiguration.EMBEDDED, true - ); + try (KafkaContainer container = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"))) { + container.start(); + Map config = Collections.singletonMap( // <1> + AbstractKafkaConfiguration.DEFAULT_BOOTSTRAP_SERVERS, + container.getBootstrapServers() + ); - try (ApplicationContext ctx = ApplicationContext.run(config)) { - BookSender bookSender = ctx.getBean(BookSender.class); // <2> - Book book = new Book(); - book.setTitle("The Stand"); - bookSender.send("Stephen King", book); + try (ApplicationContext ctx = ApplicationContext.run(config)) { + BookSender bookSender = ctx.getBean(BookSender.class); // <2> + Book book = new Book(); + book.setTitle("The Stand"); + bookSender.send("Stephen King", book); + } } + } // end::test[] } diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/embedded/EmbeddedMain.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/embedded/EmbeddedMain.java deleted file mode 100644 index 212575cfe..000000000 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/embedded/EmbeddedMain.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.micronaut.configuration.kafka.embedded; - -import io.micronaut.context.ApplicationContext; - -import java.util.Collections; - -import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED; -import static io.micronaut.context.env.Environment.TEST; - -public class EmbeddedMain { - - public static void main(String...args) { - ApplicationContext applicationContext = ApplicationContext.run( - Collections.singletonMap(EMBEDDED, true), - TEST - ); - KafkaEmbedded embedded = applicationContext.getBean(KafkaEmbedded.class); - } -} diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedSpec.groovy deleted file mode 100644 index fe7c13f6b..000000000 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedSpec.groovy +++ /dev/null @@ -1,101 +0,0 @@ -package io.micronaut.configuration.kafka.embedded - -import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration -import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration -import io.micronaut.context.ApplicationContext -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.consumer.ConsumerConfig -import spock.lang.AutoCleanup -import spock.lang.Specification - -import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED -import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS - -class KafkaEmbeddedSpec extends Specification { - - @AutoCleanup ApplicationContext applicationContext - - void "test run kafka embedded server"() { - given: - run() - - when: - AbstractKafkaConsumerConfiguration config = applicationContext.getBean(AbstractKafkaConsumerConfiguration) - Properties props = config.getConfig() - - then: - props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == AbstractKafkaConfiguration.DEFAULT_BOOTSTRAP_SERVERS - - when: - KafkaEmbedded kafkaEmbedded = applicationContext.getBean(KafkaEmbedded) - - then: - kafkaEmbedded.kafkaServer.isPresent() - kafkaEmbedded.zkPort.isPresent() - } - - void "test run kafka embedded server with multi partition topic"() { - given: - int partitionNumber = 10 - String topicName = "multi-partition-topic" - - run( - (EMBEDDED_TOPICS): topicName, - "kafka.embedded.properties.num.partitions": partitionNumber - ) - - AdminClient adminClient = createAdminClient() - - when: - KafkaEmbedded kafkaEmbedded = applicationContext.getBean(KafkaEmbedded) - - then: - kafkaEmbedded.kafkaServer.isPresent() - kafkaEmbedded.zkPort.isPresent() - - and: - adminClient - .describeTopics([topicName]).values() - .get(topicName).get() - .partitions().size() == partitionNumber - - cleanup: - adminClient.close() - } - - void "test run kafka embedded server with single partition topic"() { - given: - String topicName = "single-partition-topic" - - run((EMBEDDED_TOPICS): topicName) - - AdminClient adminClient = createAdminClient() - - when: - KafkaEmbedded kafkaEmbedded = applicationContext.getBean(KafkaEmbedded) - - then: - kafkaEmbedded.kafkaServer.isPresent() - kafkaEmbedded.zkPort.isPresent() - - and: - adminClient - .describeTopics([topicName]).values() - .get(topicName).get() - .partitions().size() == 1 - - cleanup: - adminClient.close() - } - - private static AdminClient createAdminClient() { - AdminClient.create((CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG): AbstractKafkaConfiguration.DEFAULT_BOOTSTRAP_SERVERS) - } - - private void run(Map extraProps = [:]) { - applicationContext = ApplicationContext.run( - [(EMBEDDED): true] + extraProps - ) - } -} diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaHealthIndicatorSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaHealthIndicatorSpec.groovy index b941feb19..a4299dce2 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaHealthIndicatorSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaHealthIndicatorSpec.groovy @@ -1,15 +1,16 @@ package io.micronaut.configuration.kafka.health +import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration import io.micronaut.context.ApplicationContext import io.micronaut.core.io.socket.SocketUtils import io.micronaut.management.health.indicator.HealthResult import org.apache.kafka.clients.admin.Config import org.apache.kafka.clients.admin.ConfigEntry import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName import spock.lang.Specification import spock.lang.Unroll -import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED import static io.micronaut.configuration.kafka.health.KafkaHealthIndicator.DEFAULT_REPLICATION_PROPERTY import static io.micronaut.configuration.kafka.health.KafkaHealthIndicator.REPLICATION_PROPERTY import static io.micronaut.health.HealthStatus.DOWN @@ -60,8 +61,10 @@ class KafkaHealthIndicatorSpec extends Specification { @Unroll void "test kafka health indicator - disabled (#configvalue)"() { given: + KafkaContainer container = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")) + container.start() ApplicationContext applicationContext = ApplicationContext.run( - (EMBEDDED) : true, + (AbstractKafkaConfiguration.DEFAULT_BOOTSTRAP_SERVERS): container.getBootstrapServers(), "kafka.health.enabled": configvalue ) @@ -73,6 +76,7 @@ class KafkaHealthIndicatorSpec extends Specification { cleanup: applicationContext.close() + container.stop() where: configvalue << [false, "false", "no", ""] diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/AssignToPartitionSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/AssignToPartitionSpec.groovy index 1607f40e4..823e0b2d2 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/AssignToPartitionSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/AssignToPartitionSpec.groovy @@ -10,7 +10,7 @@ import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRebalanceListener import org.apache.kafka.common.TopicPartition -import javax.inject.Singleton +import jakarta.inject.Singleton import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/BatchManualOffsetCommitSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/BatchManualOffsetCommitSpec.groovy index 2dec40a79..d1557874f 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/BatchManualOffsetCommitSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/BatchManualOffsetCommitSpec.groovy @@ -9,7 +9,7 @@ import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition -import javax.inject.Singleton +import jakarta.inject.Singleton import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/ManualAckSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/ManualAckSpec.groovy index e47860628..a785359c2 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/ManualAckSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/ManualAckSpec.groovy @@ -1,13 +1,12 @@ package io.micronaut.configuration.kafka.offsets import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec -import io.micronaut.configuration.kafka.Acknowledgement import io.micronaut.configuration.kafka.annotation.KafkaClient import io.micronaut.configuration.kafka.annotation.KafkaListener import io.micronaut.configuration.kafka.annotation.Topic import io.micronaut.context.annotation.Requires - -import javax.inject.Singleton +import io.micronaut.messaging.Acknowledgement +import jakarta.inject.Singleton import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/ManualOffsetCommitSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/ManualOffsetCommitSpec.groovy index cb656b5b5..e5c1feb52 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/ManualOffsetCommitSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/ManualOffsetCommitSpec.groovy @@ -9,7 +9,7 @@ import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition -import javax.inject.Singleton +import jakarta.inject.Singleton import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/PerRecordOffsetCommitSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/PerRecordOffsetCommitSpec.groovy index 96c3b0ef2..71f2cccbc 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/PerRecordOffsetCommitSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/offsets/PerRecordOffsetCommitSpec.groovy @@ -6,7 +6,7 @@ import io.micronaut.configuration.kafka.annotation.KafkaListener import io.micronaut.configuration.kafka.annotation.Topic import io.micronaut.context.annotation.Requires -import javax.inject.Singleton +import jakarta.inject.Singleton import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.SYNC_PER_RECORD diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/scope/KafkaClientScopeSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/scope/KafkaClientScopeSpec.groovy index f22913d72..f74387408 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/scope/KafkaClientScopeSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/scope/KafkaClientScopeSpec.groovy @@ -6,8 +6,8 @@ import io.micronaut.context.ApplicationContext import io.micronaut.context.annotation.Requires import org.apache.kafka.clients.producer.Producer -import javax.inject.Inject -import javax.inject.Singleton +import jakarta.inject.Inject +import jakarta.inject.Singleton class KafkaClientScopeSpec extends AbstractKafkaSpec { diff --git a/kafka/src/test/groovy/io/micronaut/test/DisabledClientFallback.java b/kafka/src/test/groovy/io/micronaut/test/DisabledClientFallback.java index 933a2ff52..8548bee04 100644 --- a/kafka/src/test/groovy/io/micronaut/test/DisabledClientFallback.java +++ b/kafka/src/test/groovy/io/micronaut/test/DisabledClientFallback.java @@ -1,11 +1,13 @@ package io.micronaut.test; +import io.micronaut.configuration.kafka.annotation.KafkaClient; import io.micronaut.context.annotation.Replaces; import io.micronaut.context.annotation.Requires; import io.micronaut.core.util.StringUtils; @Requires(property = "kafka.enabled", notEquals = StringUtils.TRUE, defaultValue = StringUtils.TRUE) @Replaces(DisabledClient.class) +@KafkaClient public class DisabledClientFallback implements DisabledClient { @Override public void send(String message) { diff --git a/kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java b/kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java index c56918d0a..8f274ea61 100644 --- a/kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java +++ b/kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java @@ -4,7 +4,7 @@ import io.micronaut.configuration.kafka.annotation.KafkaListener; import io.micronaut.configuration.kafka.annotation.Topic; -import javax.inject.Singleton; +import jakarta.inject.Singleton; @Singleton public class DisabledConsumer { diff --git a/kafka/src/test/groovy/io/micronaut/test/DisabledSpec.groovy b/kafka/src/test/groovy/io/micronaut/test/DisabledSpec.groovy index 18d1d538d..f9ccadf5b 100644 --- a/kafka/src/test/groovy/io/micronaut/test/DisabledSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/test/DisabledSpec.groovy @@ -4,7 +4,7 @@ package io.micronaut.test import io.micronaut.context.ApplicationContext import spock.lang.Specification -import javax.inject.Singleton +import jakarta.inject.Singleton /** * Test that micronaut-kafka can be disabled by setting 'kafka.enabled': 'false'. diff --git a/kafka/src/test/resources/logback.xml b/kafka/src/test/resources/logback.xml index a16977786..ceef75ada 100644 --- a/kafka/src/test/resources/logback.xml +++ b/kafka/src/test/resources/logback.xml @@ -12,7 +12,7 @@ - + - + \ No newline at end of file diff --git a/src/main/docs/guide/kafkaClient/kafkaClientConfiguration.adoc b/src/main/docs/guide/kafkaClient/kafkaClientConfiguration.adoc index b2dc5fa2e..7e2d3de3d 100644 --- a/src/main/docs/guide/kafkaClient/kafkaClientConfiguration.adoc +++ b/src/main/docs/guide/kafkaClient/kafkaClientConfiguration.adoc @@ -23,7 +23,7 @@ To configure different properties for each client, you should set a `@KafkaClien @KafkaClient("product-client") ---- -This serves 2 purposes. Firstly it sets the value of the `client.id` setting used to build the `KafkaProducer`. Secondly, it allows you to apply per producer configuration in `application.yml`: +This serves 2 purposes. Firstly it sets the value of the `client.id` setting used to build the `Producer`. Secondly, it allows you to apply per producer configuration in `application.yml`: .Applying Default Configuration [source,yaml] diff --git a/src/main/docs/guide/kafkaClient/kafkaClientScope.adoc b/src/main/docs/guide/kafkaClient/kafkaClientScope.adoc index 09f075769..9e29c49f3 100644 --- a/src/main/docs/guide/kafkaClient/kafkaClientScope.adoc +++ b/src/main/docs/guide/kafkaClient/kafkaClientScope.adoc @@ -10,8 +10,8 @@ include::{testskafka}/producer/inject/BookSender.java[tags=imports, indent=0] include::{testskafka}/producer/inject/BookSender.java[tags=clazz, indent=0] ---- -<1> The `KafkaProducer` is dependency injected into the constructor. If not specified in configuration, the key and value serializer are inferred from the generic type arguments. -<2> The `KafkaProducer` is used to send records +<1> The `Producer` is dependency injected into the constructor. If not specified in configuration, the key and value serializer are inferred from the generic type arguments. +<2> The `Producer` is used to send records Note that there is no need to call the `close()` method to shut down the `KafkaProducer`, it is fully managed by Micronaut and will be shutdown when the application shuts down. diff --git a/src/main/docs/guide/kafkaStreams.adoc b/src/main/docs/guide/kafkaStreams.adoc index 9cd1276b5..fbd7bd386 100644 --- a/src/main/docs/guide/kafkaStreams.adoc +++ b/src/main/docs/guide/kafkaStreams.adoc @@ -107,7 +107,7 @@ kafka: The above configuration sets the `num.stream.threads` setting of the Kafka `StreamsConfig` to `1` for the `default` stream, and the same setting to `10` for a stream named `my-stream`. -You can then inject an `api:configuration.kafka.streams.ConfiguredStreamBuilder[]` specfically for the above configuration using `javax.inject.Named`: +You can then inject an `api:configuration.kafka.streams.ConfiguredStreamBuilder[]` specfically for the above configuration using `jakarta.inject.Named`: [source,java] ---- diff --git a/tests/tasks-sasl-plaintext/build.gradle b/tests/tasks-sasl-plaintext/build.gradle index 38c0a7680..1e6bd58f3 100644 --- a/tests/tasks-sasl-plaintext/build.gradle +++ b/tests/tasks-sasl-plaintext/build.gradle @@ -20,6 +20,7 @@ repositories { dependencies { implementation project(":kafka") + implementation 'jakarta.inject:jakarta.inject-api:2.0.0' testImplementation "io.micronaut:micronaut-http-client" testImplementation platform("org.testcontainers:testcontainers-bom:$testContainersVersion") testImplementation "org.testcontainers:kafka" diff --git a/tests/tasks-sasl-plaintext/src/main/java/example/TasksPublisher.java b/tests/tasks-sasl-plaintext/src/main/java/example/TasksPublisher.java index 7e9c08251..98bbb05ff 100644 --- a/tests/tasks-sasl-plaintext/src/main/java/example/TasksPublisher.java +++ b/tests/tasks-sasl-plaintext/src/main/java/example/TasksPublisher.java @@ -4,7 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Singleton; +import jakarta.inject.Singleton; import java.util.concurrent.atomic.AtomicInteger; @Singleton diff --git a/tests/tasks-sasl-plaintext/src/test/groovy/example/TasksSpec.groovy b/tests/tasks-sasl-plaintext/src/test/groovy/example/TasksSpec.groovy index 0f1b3c645..ff32aebd5 100644 --- a/tests/tasks-sasl-plaintext/src/test/groovy/example/TasksSpec.groovy +++ b/tests/tasks-sasl-plaintext/src/test/groovy/example/TasksSpec.groovy @@ -12,7 +12,7 @@ import spock.lang.Shared import spock.lang.Specification import spock.util.concurrent.PollingConditions -import javax.inject.Inject +import jakarta.inject.Inject @MicronautTest class TasksSpec extends Specification implements TestPropertyProvider {