From 05c98184dda669e9d367e397a3c62c607fdb2cfb Mon Sep 17 00:00:00 2001 From: graemerocher Date: Tue, 4 May 2021 12:24:01 +0200 Subject: [PATCH 1/2] Fix handling of non-deprecated annotations --- .../kafka/bind/KafkaHeaderBinder.java | 1 + .../kafka/bind/KafkaMessageHeaderBinder.java | 42 +++++++++++++++++++ .../annotation/KafkaBatchListenerSpec.groovy | 6 +-- .../kafka/annotation/KafkaListenerSpec.groovy | 18 ++++---- .../docs/producer/headers/ProductClient.java | 4 +- .../health/KafkaConsumerMetricsSpec.groovy | 4 +- .../health/KafkaProducerMetricsSpec.groovy | 4 +- 7 files changed, 61 insertions(+), 18 deletions(-) create mode 100644 kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeaderBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeaderBinder.java index b145b15fd..20b5a5dd2 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeaderBinder.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaHeaderBinder.java @@ -33,6 +33,7 @@ * @since 1.0 */ @Singleton +@Deprecated public class KafkaHeaderBinder implements AnnotatedConsumerRecordBinder { @Override diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java new file mode 100644 index 000000000..94ea4d584 --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java @@ -0,0 +1,42 @@ +package io.micronaut.configuration.kafka.bind; + +import io.micronaut.core.annotation.AnnotationMetadata; +import io.micronaut.core.convert.ArgumentConversionContext; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.messaging.annotation.MessageHeader; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; + +import javax.inject.Singleton; +import java.util.Optional; + +@Singleton +public class KafkaMessageHeaderBinder implements AnnotatedConsumerRecordBinder { + + @Override + public Class annotationType() { + return MessageHeader.class; + } + + @Override + public BindingResult bind(ArgumentConversionContext context, ConsumerRecord source) { + Headers headers = source.headers(); + AnnotationMetadata annotationMetadata = context.getAnnotationMetadata(); + + String name = annotationMetadata.stringValue(MessageHeader.class, "name") + .orElseGet(() -> annotationMetadata.stringValue(MessageHeader.class) + .orElse(context.getArgument().getName())); + Iterable value = headers.headers(name); + + if (value.iterator().hasNext()) { + Optional converted = ConversionService.SHARED.convert(value, context); + return () -> converted; + } else if (context.getArgument().getType() == Optional.class) { + //noinspection unchecked + return () -> (Optional) Optional.of(Optional.empty()); + } else { + //noinspection unchecked + return BindingResult.EMPTY; + } + } +} diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy index 093c789c3..2bc37bcd9 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy @@ -4,7 +4,7 @@ import groovy.transform.EqualsAndHashCode import groovy.transform.ToString import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec import io.micronaut.context.annotation.Requires -import io.micronaut.messaging.annotation.Header +import io.micronaut.messaging.annotation.MessageHeader import io.micronaut.messaging.annotation.SendTo import io.reactivex.Flowable import reactor.core.publisher.Flux @@ -212,7 +212,7 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { void sendBooks(List books) @Topic(KafkaBatchListenerSpec.BOOKS_HEADERS_TOPIC) - @Header(name = "X-Foo", value = "Bar") + @MessageHeader(name = "X-Foo", value = "Bar") void sendBooksAndHeaders(List books) @Topic(KafkaBatchListenerSpec.BOOKS_FORWARD_LIST_TOPIC) @@ -250,7 +250,7 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { } @Topic(KafkaBatchListenerSpec.BOOKS_HEADERS_TOPIC) - void receiveList(List books, @Header("X-Foo") List foos) { + void receiveList(List books, @MessageHeader("X-Foo") List foos) { this.books.addAll books this.headers = foos } diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaListenerSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaListenerSpec.groovy index 6beeb2652..d3162ee20 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaListenerSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaListenerSpec.groovy @@ -13,8 +13,8 @@ import io.micronaut.context.annotation.Requires import io.micronaut.http.client.DefaultHttpClientConfiguration import io.micronaut.http.client.RxHttpClient import io.micronaut.messaging.MessageHeaders -import io.micronaut.messaging.annotation.Body -import io.micronaut.messaging.annotation.Header +import io.micronaut.messaging.annotation.MessageBody +import io.micronaut.messaging.annotation.MessageHeader import io.reactivex.Single import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.Producer @@ -200,7 +200,7 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec { @KafkaClient static interface MyClient { @Topic("words") - void sendSentence(@KafkaKey String key, String sentence, @Header String topic) + void sendSentence(@KafkaKey String key, String sentence, @MessageHeader String topic) @Topic("words") RecordMetadata sendGetRecordMetadata(@KafkaKey String key, String sentence) @@ -209,7 +209,7 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec { Single sendReactive(@KafkaKey String key, Book book) @Topic("books") - void sendBook(@KafkaKey String key, @Nullable @Body Book book) + void sendBook(@KafkaKey String key, @Nullable @MessageBody Book book) } @Requires(property = 'spec.name', value = 'KafkaListenerSpec') @@ -219,7 +219,7 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec { String lastTopic @Topic("words") - void countWord(String sentence, @Header String topic) { + void countWord(String sentence, @MessageHeader String topic) { wordCount += sentence.split(/\s/).size() lastTopic = topic } @@ -257,7 +257,7 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec { String body @Topic("words") - void countWord(@Body String body) { + void countWord(@MessageBody String body) { this.body = body } } @@ -270,9 +270,9 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec { String topic @Topic("words") - void countWord(@Body String sentence, - @Header Optional topic, - @Header Optional missing) { + void countWord(@MessageBody String sentence, + @MessageHeader Optional topic, + @MessageHeader Optional missing) { missingHeader = !missing.isPresent() this.sentence = sentence this.topic = topic.get() diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/headers/ProductClient.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/headers/ProductClient.java index ed96f98a0..b9513c38f 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/headers/ProductClient.java +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/producer/headers/ProductClient.java @@ -2,12 +2,12 @@ // tag::imports[] import io.micronaut.configuration.kafka.annotation.KafkaClient; -import io.micronaut.messaging.annotation.Header; +import io.micronaut.messaging.annotation.MessageHeader; // end::imports[] // tag::clazz[] @KafkaClient(id="product-client") -@Header(name = "X-Token", value = "${my.application.token}") +@MessageHeader(name = "X-Token", value = "${my.application.token}") public interface ProductClient { // end::clazz[] } diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaConsumerMetricsSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaConsumerMetricsSpec.groovy index 05edfefa8..9211abc51 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaConsumerMetricsSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaConsumerMetricsSpec.groovy @@ -8,7 +8,7 @@ import io.micronaut.configuration.metrics.management.endpoint.MetricsEndpoint import io.micronaut.context.annotation.Requires import io.micronaut.http.client.DefaultHttpClientConfiguration import io.micronaut.http.client.RxHttpClient -import io.micronaut.messaging.annotation.Header +import io.micronaut.messaging.annotation.MessageHeader import spock.lang.AutoCleanup import spock.lang.Shared @@ -63,7 +63,7 @@ class KafkaConsumerMetricsSpec extends AbstractEmbeddedServerSpec { String lastTopic @Topic("words-metrics") - void countWord(String sentence, @Header String topic) { + void countWord(String sentence, @MessageHeader String topic) { wordCount += sentence.split(/\s/).size() lastTopic = topic } diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaProducerMetricsSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaProducerMetricsSpec.groovy index cea0da699..a7d72e198 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaProducerMetricsSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaProducerMetricsSpec.groovy @@ -10,7 +10,7 @@ import io.micronaut.configuration.metrics.management.endpoint.MetricsEndpoint import io.micronaut.context.annotation.Requires import io.micronaut.http.client.DefaultHttpClientConfiguration import io.micronaut.http.client.RxHttpClient -import io.micronaut.messaging.annotation.Header +import io.micronaut.messaging.annotation.MessageHeader import io.reactivex.Single import org.apache.kafka.clients.producer.RecordMetadata import spock.lang.AutoCleanup @@ -63,7 +63,7 @@ class KafkaProducerMetricsSpec extends AbstractEmbeddedServerSpec { @KafkaClient static interface MyClientMetrics { @Topic("words-metrics") - void sendSentence(@KafkaKey String key, String sentence, @Header String topic) + void sendSentence(@KafkaKey String key, String sentence, @MessageHeader String topic) @Topic("words-metrics-two") RecordMetadata sendGetRecordMetadata(@KafkaKey String key, String sentence) From c5605777baff32eaddb914b9b40ad222dfe864d4 Mon Sep 17 00:00:00 2001 From: graemerocher Date: Tue, 4 May 2021 12:55:06 +0200 Subject: [PATCH 2/2] Cleanup / move to Gradle 7 --- build.gradle | 6 +++-- gradle.properties | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../streams/ConfiguredStreamBuilder.java | 4 ++-- .../configuration/kafka/ConsumerAware.java | 5 ++--- .../configuration/kafka/ConsumerRegistry.java | 22 +++++++++---------- .../kafka/KafkaConsumerAware.java | 6 ++--- .../kafka/KafkaProducerRegistry.java | 4 ++-- .../configuration/kafka/ProducerRegistry.java | 4 ++-- .../kafka/bind/KafkaMessageHeaderBinder.java | 21 ++++++++++++++++-- .../config/AbstractKafkaConfiguration.java | 4 ++-- .../AbstractKafkaConsumerConfiguration.java | 2 +- .../AbstractKafkaProducerConfiguration.java | 2 +- .../embedded/KafkaEmbeddedConfiguration.java | 4 ++-- .../DefaultKafkaListenerExceptionHandler.java | 8 +++---- .../KafkaClientIntroductionAdvice.java | 2 +- .../processor/KafkaConsumerProcessor.java | 22 +++++++++---------- .../kafka/scope/KafkaClientScope.java | 6 ++--- .../kafka/serde/CompositeSerdeRegistry.java | 4 ++-- .../brave/BraveKafkaTracingFactory.java | 2 +- .../offsets/rebalance/ProductListener.java | 4 ++-- 21 files changed, 77 insertions(+), 59 deletions(-) diff --git a/build.gradle b/build.gradle index ce1e6edfd..3691b4716 100644 --- a/build.gradle +++ b/build.gradle @@ -1,12 +1,14 @@ buildscript { repositories { - maven { url "https://repo.grails.org/grails/core" } + mavenCentral() + gradlePluginPortal() } dependencies { - classpath "io.micronaut.build.internal:micronaut-gradle-plugins:3.0.3" + classpath "io.micronaut.build.internal:micronaut-gradle-plugins:4.0.0.RC5" } } + subprojects { Project subproject -> group "io.micronaut.kafka" diff --git a/gradle.properties b/gradle.properties index 706427edf..f2ed9454c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ projectVersion=3.3.1-SNAPSHOT -micronautDocsVersion=1.0.25 +micronautDocsVersion=2.0.0.RC1 micronautBuildVersion=1.1.5 micronautVersion=2.4.2 groovyVersion=3.0.5 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 442d9132e..f371643ee 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.0-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/ConfiguredStreamBuilder.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/ConfiguredStreamBuilder.java index 908586301..2964501ce 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/ConfiguredStreamBuilder.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/ConfiguredStreamBuilder.java @@ -17,7 +17,7 @@ import org.apache.kafka.streams.StreamsBuilder; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; import java.util.Properties; /** @@ -44,7 +44,7 @@ public ConfiguredStreamBuilder(Properties configuration) { * * @return The configuration */ - public @Nonnull Properties getConfiguration() { + public @NonNull Properties getConfiguration() { return configuration; } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerAware.java b/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerAware.java index 65cb7c203..dde38b327 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerAware.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerAware.java @@ -16,8 +16,7 @@ package io.micronaut.configuration.kafka; import org.apache.kafka.clients.consumer.Consumer; - -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; /** * Interface for {@link io.micronaut.configuration.kafka.annotation.KafkaListener} instances to implement @@ -39,5 +38,5 @@ public interface ConsumerAware { * * @param consumer The consumer */ - void setKafkaConsumer(@Nonnull Consumer consumer); + void setKafkaConsumer(@NonNull Consumer consumer); } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerRegistry.java index bcbced86a..5a8cc32ce 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerRegistry.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; import java.util.Set; /** @@ -39,8 +39,8 @@ public interface ConsumerRegistry { * @return The consumer * @throws IllegalArgumentException If no consumer exists for the given ID */ - @Nonnull - Consumer getConsumer(@Nonnull String id); + @NonNull + Consumer getConsumer(@NonNull String id); /** * Returns a managed Consumer's subscriptions. @@ -49,8 +49,8 @@ public interface ConsumerRegistry { * @return The consumer subscription * @throws IllegalArgumentException If no consumer exists for the given ID */ - @Nonnull - Set getConsumerSubscription(@Nonnull String id); + @NonNull + Set getConsumerSubscription(@NonNull String id); /** * Returns a managed Consumer's assignment info. @@ -59,15 +59,15 @@ public interface ConsumerRegistry { * @return The consumer assignment * @throws IllegalArgumentException If no consumer exists for the given ID */ - @Nonnull - Set getConsumerAssignment(@Nonnull String id); + @NonNull + Set getConsumerAssignment(@NonNull String id); /** * The IDs of the available consumers. * * @return The consumers */ - @Nonnull Set getConsumerIds(); + @NonNull Set getConsumerIds(); /** * Is the consumer with the given ID paused. @@ -75,7 +75,7 @@ public interface ConsumerRegistry { * @param id True it is paused * @return True if it is paused */ - boolean isPaused(@Nonnull String id); + boolean isPaused(@NonNull String id); /** * Pause the consumer for the given ID. Note that this method will request that the consumer is paused, however @@ -84,7 +84,7 @@ public interface ConsumerRegistry { * * @param id The id of the consumer */ - void pause(@Nonnull String id); + void pause(@NonNull String id); /** @@ -94,5 +94,5 @@ public interface ConsumerRegistry { * * @param id The id of the consumer */ - void resume(@Nonnull String id); + void resume(@NonNull String id); } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerAware.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerAware.java index 36a57f7ae..d3d221f58 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerAware.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaConsumerAware.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; /** * Interface for {@link io.micronaut.configuration.kafka.annotation.KafkaListener} instances to implement @@ -38,10 +38,10 @@ public interface KafkaConsumerAware extends ConsumerAware { * * @param consumer The consumer */ - void setKafkaConsumer(@Nonnull KafkaConsumer consumer); + void setKafkaConsumer(@NonNull KafkaConsumer consumer); @Override - default void setKafkaConsumer(@Nonnull Consumer consumer) { + default void setKafkaConsumer(@NonNull Consumer consumer) { if (consumer instanceof KafkaConsumer) { setKafkaConsumer((KafkaConsumer) consumer); } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerRegistry.java index 0f3f50fbb..04bf9bdc2 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerRegistry.java @@ -18,7 +18,7 @@ import io.micronaut.core.type.Argument; import org.apache.kafka.clients.producer.KafkaProducer; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; /** * A registry of managed {@link KafkaProducer} instances key by id and type. @@ -41,5 +41,5 @@ public interface KafkaProducerRegistry extends ProducerRegistry { * @return The producer */ @Override - @Nonnull KafkaProducer getProducer(String id, Argument keyType, Argument valueType); + @NonNull KafkaProducer getProducer(String id, Argument keyType, Argument valueType); } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/ProducerRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/ProducerRegistry.java index 30429462c..8144aa32a 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/ProducerRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/ProducerRegistry.java @@ -18,7 +18,7 @@ import io.micronaut.core.type.Argument; import org.apache.kafka.clients.producer.Producer; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; /** * A registry of managed {@link Producer} instances key by id and type. @@ -37,6 +37,6 @@ public interface ProducerRegistry { * @param The value generic type * @return The producer */ - @Nonnull + @NonNull Producer getProducer(String id, Argument keyType, Argument valueType); } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java index 94ea4d584..0ea5023c1 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/KafkaMessageHeaderBinder.java @@ -1,8 +1,24 @@ +/* + * Copyright 2017-2021 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.micronaut.configuration.kafka.bind; import io.micronaut.core.annotation.AnnotationMetadata; import io.micronaut.core.convert.ArgumentConversionContext; import io.micronaut.core.convert.ConversionService; +import io.micronaut.messaging.annotation.Header; import io.micronaut.messaging.annotation.MessageHeader; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Headers; @@ -23,8 +39,9 @@ public BindingResult bind(ArgumentConversionContext context, ConsumerRecor Headers headers = source.headers(); AnnotationMetadata annotationMetadata = context.getAnnotationMetadata(); - String name = annotationMetadata.stringValue(MessageHeader.class, "name") - .orElseGet(() -> annotationMetadata.stringValue(MessageHeader.class) + // use deprecated versions as that is what is stored in metadata + String name = annotationMetadata.stringValue(Header.class, "name") + .orElseGet(() -> annotationMetadata.stringValue(Header.class) .orElse(context.getArgument().getName())); Iterable value = headers.headers(name); diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java index 3baf98b5b..51c427182 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java @@ -17,7 +17,7 @@ import io.micronaut.core.util.Toggleable; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; import java.util.Properties; /** @@ -71,7 +71,7 @@ protected AbstractKafkaConfiguration(Properties config) { /** * @return The Kafka configuration */ - public @Nonnull Properties getConfig() { + public @NonNull Properties getConfig() { if (config != null) { return config; } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConsumerConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConsumerConfiguration.java index fcd538b47..9cb98d089 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConsumerConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConsumerConfiguration.java @@ -17,7 +17,7 @@ import org.apache.kafka.common.serialization.Deserializer; -import javax.annotation.Nullable; +import io.micronaut.core.annotation.Nullable; import java.util.Optional; import java.util.Properties; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaProducerConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaProducerConfiguration.java index 5ea82a47a..1620cff72 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaProducerConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaProducerConfiguration.java @@ -17,7 +17,7 @@ import org.apache.kafka.common.serialization.Serializer; -import javax.annotation.Nullable; +import io.micronaut.core.annotation.Nullable; import java.util.Optional; import java.util.Properties; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedConfiguration.java index 2a4c2c4c1..f3b766f88 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/embedded/KafkaEmbeddedConfiguration.java @@ -19,7 +19,7 @@ import io.micronaut.context.annotation.ConfigurationProperties; import io.micronaut.core.util.Toggleable; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -65,7 +65,7 @@ public void setEnabled(boolean enabled) { /** * @return The broker properties. */ - public @Nonnull Properties getProperties() { + public @NonNull Properties getProperties() { return properties; } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java b/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java index 06fade123..6dd5ec6f6 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java @@ -23,7 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; import javax.inject.Singleton; import java.util.Optional; import java.util.regex.Matcher; @@ -82,9 +82,9 @@ public void setSkipRecordOnDeserializationFailure(boolean skipRecordOnDeserializ * @param kafkaConsumer The kafka consumer */ protected void seekPastDeserializationError( - @Nonnull SerializationException cause, - @Nonnull Object consumerBean, - @Nonnull Consumer kafkaConsumer) { + @NonNull SerializationException cause, + @NonNull Object consumerBean, + @NonNull Consumer kafkaConsumer) { try { final String message = cause.getMessage(); final Matcher matcher = SERIALIZATION_EXCEPTION_MESSAGE_PATTERN.matcher(message); diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java index b7a2e3e1a..eecc64731 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java @@ -56,7 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; +import io.micronaut.core.annotation.Nullable; import javax.annotation.PreDestroy; import javax.inject.Singleton; import java.nio.charset.StandardCharsets; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index f6f091846..79a87328f 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -81,7 +81,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; import javax.annotation.PreDestroy; import javax.inject.Named; import javax.inject.Singleton; @@ -182,9 +182,9 @@ public KafkaConsumerProcessor( }); } - @Nonnull + @NonNull @Override - public Consumer getConsumer(@Nonnull String id) { + public Consumer getConsumer(@NonNull String id) { ArgumentUtils.requireNonNull("id", id); final Consumer consumer = consumers.get(id); if (consumer == null) { @@ -193,9 +193,9 @@ public Consumer getConsumer(@Nonnull String id) { return consumer; } - @Nonnull + @NonNull @Override - public Set getConsumerSubscription(@Nonnull final String id) { + public Set getConsumerSubscription(@NonNull final String id) { ArgumentUtils.requireNonNull("id", id); final Set subscriptions = consumerSubscriptions.get(id); if (subscriptions == null || subscriptions.isEmpty()) { @@ -204,9 +204,9 @@ public Set getConsumerSubscription(@Nonnull final String id) { return subscriptions; } - @Nonnull + @NonNull @Override - public Set getConsumerAssignment(@Nonnull final String id) { + public Set getConsumerAssignment(@NonNull final String id) { ArgumentUtils.requireNonNull("id", id); final Set assignment = consumerAssignments.get(id); if (assignment == null || assignment.isEmpty()) { @@ -215,14 +215,14 @@ public Set getConsumerAssignment(@Nonnull final String id) { return assignment; } - @Nonnull + @NonNull @Override public Set getConsumerIds() { return Collections.unmodifiableSet(consumers.keySet()); } @Override - public boolean isPaused(@Nonnull String id) { + public boolean isPaused(@NonNull String id) { if (StringUtils.isNotEmpty(id) && consumers.containsKey(id)) { return paused.contains(id) && pausedConsumers.containsKey(id); } @@ -230,7 +230,7 @@ public boolean isPaused(@Nonnull String id) { } @Override - public void pause(@Nonnull String id) { + public void pause(@NonNull String id) { if (StringUtils.isNotEmpty(id) && consumers.containsKey(id)) { paused.add(id); } else { @@ -239,7 +239,7 @@ public void pause(@Nonnull String id) { } @Override - public void resume(@Nonnull String id) { + public void resume(@NonNull String id) { if (StringUtils.isNotEmpty(id) && consumers.containsKey(id)) { paused.remove(id); } else { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/scope/KafkaClientScope.java b/kafka/src/main/java/io/micronaut/configuration/kafka/scope/KafkaClientScope.java index 55a28f648..c3285d8e3 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/scope/KafkaClientScope.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/scope/KafkaClientScope.java @@ -38,8 +38,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; import javax.inject.Provider; import javax.inject.Singleton; import java.util.Map; @@ -117,7 +117,7 @@ public T get(BeanResolutionContext resolutionContext, BeanDefinition bean return getKafkaProducer(id, keyArgument, valueArgument); } - @Nonnull + @NonNull @Override public Producer getProducer(String id, Argument keyType, Argument valueType) { return getKafkaProducer(id, keyType, valueType); diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/serde/CompositeSerdeRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/serde/CompositeSerdeRegistry.java index 0869f8a7a..a9d45898a 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/serde/CompositeSerdeRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/serde/CompositeSerdeRegistry.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; import javax.inject.Singleton; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -54,7 +54,7 @@ public CompositeSerdeRegistry(SerdeRegistry... registries) { @SuppressWarnings("unchecked") @Override - @Nonnull + @NonNull public Serde getSerde(Class type) { Serde serde = serdeMap.get(type); diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaTracingFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaTracingFactory.java index c48a96b9c..df5dee3fc 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaTracingFactory.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/tracing/brave/BraveKafkaTracingFactory.java @@ -21,7 +21,7 @@ import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Requires; -import javax.annotation.Nullable; +import io.micronaut.core.annotation.Nullable; import javax.inject.Singleton; /** diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/offsets/rebalance/ProductListener.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/offsets/rebalance/ProductListener.java index 021b1f9a9..57a91fed4 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/offsets/rebalance/ProductListener.java +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/offsets/rebalance/ProductListener.java @@ -9,7 +9,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; -import javax.annotation.Nonnull; +import io.micronaut.core.annotation.NonNull; import java.util.Collection; // end::imports[] @@ -20,7 +20,7 @@ public class ProductListener implements ConsumerRebalanceListener, ConsumerAware private Consumer consumer; @Override - public void setKafkaConsumer(@Nonnull Consumer consumer) { // <1> + public void setKafkaConsumer(@NonNull Consumer consumer) { // <1> this.consumer = consumer; }