Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of non-deprecated annotations #364

Merged
merged 2 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
buildscript {
repositories {
maven { url "https://repo.grails.org/grails/core" }
mavenCentral()
gradlePluginPortal()
}
dependencies {
classpath "io.micronaut.build.internal:micronaut-gradle-plugins:3.0.3"
classpath "io.micronaut.build.internal:micronaut-gradle-plugins:4.0.0.RC5"
}
}


subprojects { Project subproject ->
group "io.micronaut.kafka"

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
projectVersion=3.3.1-SNAPSHOT
micronautDocsVersion=1.0.25
micronautDocsVersion=2.0.0.RC1
micronautBuildVersion=1.1.5
micronautVersion=2.4.2
groovyVersion=3.0.5
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.0-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.apache.kafka.streams.StreamsBuilder;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;
import java.util.Properties;

/**
Expand All @@ -44,7 +44,7 @@ public ConfiguredStreamBuilder(Properties configuration) {
*
* @return The configuration
*/
public @Nonnull Properties getConfiguration() {
public @NonNull Properties getConfiguration() {
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
package io.micronaut.configuration.kafka;

import org.apache.kafka.clients.consumer.Consumer;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;

/**
* Interface for {@link io.micronaut.configuration.kafka.annotation.KafkaListener} instances to implement
Expand All @@ -39,5 +38,5 @@ public interface ConsumerAware<K, V> {
*
* @param consumer The consumer
*/
void setKafkaConsumer(@Nonnull Consumer<K, V> consumer);
void setKafkaConsumer(@NonNull Consumer<K, V> consumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;
import java.util.Set;

/**
Expand All @@ -39,8 +39,8 @@ public interface ConsumerRegistry {
* @return The consumer
* @throws IllegalArgumentException If no consumer exists for the given ID
*/
@Nonnull
<K, V> Consumer<K, V> getConsumer(@Nonnull String id);
@NonNull
<K, V> Consumer<K, V> getConsumer(@NonNull String id);

/**
* Returns a managed Consumer's subscriptions.
Expand All @@ -49,8 +49,8 @@ public interface ConsumerRegistry {
* @return The consumer subscription
* @throws IllegalArgumentException If no consumer exists for the given ID
*/
@Nonnull
Set<String> getConsumerSubscription(@Nonnull String id);
@NonNull
Set<String> getConsumerSubscription(@NonNull String id);

/**
* Returns a managed Consumer's assignment info.
Expand All @@ -59,23 +59,23 @@ public interface ConsumerRegistry {
* @return The consumer assignment
* @throws IllegalArgumentException If no consumer exists for the given ID
*/
@Nonnull
Set<TopicPartition> getConsumerAssignment(@Nonnull String id);
@NonNull
Set<TopicPartition> getConsumerAssignment(@NonNull String id);

/**
* The IDs of the available consumers.
*
* @return The consumers
*/
@Nonnull Set<String> getConsumerIds();
@NonNull Set<String> getConsumerIds();

/**
* Is the consumer with the given ID paused.
*
* @param id True it is paused
* @return True if it is paused
*/
boolean isPaused(@Nonnull String id);
boolean isPaused(@NonNull String id);

/**
* Pause the consumer for the given ID. Note that this method will request that the consumer is paused, however
Expand All @@ -84,7 +84,7 @@ public interface ConsumerRegistry {
*
* @param id The id of the consumer
*/
void pause(@Nonnull String id);
void pause(@NonNull String id);


/**
Expand All @@ -94,5 +94,5 @@ public interface ConsumerRegistry {
*
* @param id The id of the consumer
*/
void resume(@Nonnull String id);
void resume(@NonNull String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;

/**
* Interface for {@link io.micronaut.configuration.kafka.annotation.KafkaListener} instances to implement
Expand All @@ -38,10 +38,10 @@ public interface KafkaConsumerAware<K, V> extends ConsumerAware<K, V> {
*
* @param consumer The consumer
*/
void setKafkaConsumer(@Nonnull KafkaConsumer<K, V> consumer);
void setKafkaConsumer(@NonNull KafkaConsumer<K, V> consumer);

@Override
default void setKafkaConsumer(@Nonnull Consumer<K, V> consumer) {
default void setKafkaConsumer(@NonNull Consumer<K, V> consumer) {
if (consumer instanceof KafkaConsumer) {
setKafkaConsumer((KafkaConsumer<K, V>) consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.micronaut.core.type.Argument;
import org.apache.kafka.clients.producer.KafkaProducer;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;

/**
* A registry of managed {@link KafkaProducer} instances key by id and type.
Expand All @@ -41,5 +41,5 @@ public interface KafkaProducerRegistry extends ProducerRegistry {
* @return The producer
*/
@Override
@Nonnull <K, V> KafkaProducer<K, V> getProducer(String id, Argument<K> keyType, Argument<V> valueType);
@NonNull <K, V> KafkaProducer<K, V> getProducer(String id, Argument<K> keyType, Argument<V> valueType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.micronaut.core.type.Argument;
import org.apache.kafka.clients.producer.Producer;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;

/**
* A registry of managed {@link Producer} instances key by id and type.
Expand All @@ -37,6 +37,6 @@ public interface ProducerRegistry {
* @param <V> The value generic type
* @return The producer
*/
@Nonnull
@NonNull
<K, V> Producer<K, V> getProducer(String id, Argument<K> keyType, Argument<V> valueType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* @since 1.0
*/
@Singleton
@Deprecated
public class KafkaHeaderBinder<T> implements AnnotatedConsumerRecordBinder<Header, T> {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2017-2021 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.bind;

import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.messaging.annotation.Header;
import io.micronaut.messaging.annotation.MessageHeader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;

import javax.inject.Singleton;
import java.util.Optional;

@Singleton
public class KafkaMessageHeaderBinder<T> implements AnnotatedConsumerRecordBinder<MessageHeader, T> {

@Override
public Class<MessageHeader> annotationType() {
return MessageHeader.class;
}

@Override
public BindingResult<T> bind(ArgumentConversionContext<T> context, ConsumerRecord<?, ?> source) {
Headers headers = source.headers();
AnnotationMetadata annotationMetadata = context.getAnnotationMetadata();

// use deprecated versions as that is what is stored in metadata
String name = annotationMetadata.stringValue(Header.class, "name")
.orElseGet(() -> annotationMetadata.stringValue(Header.class)
.orElse(context.getArgument().getName()));
Iterable<org.apache.kafka.common.header.Header> value = headers.headers(name);

if (value.iterator().hasNext()) {
Optional<T> converted = ConversionService.SHARED.convert(value, context);
return () -> converted;
} else if (context.getArgument().getType() == Optional.class) {
//noinspection unchecked
return () -> (Optional<T>) Optional.of(Optional.empty());
} else {
//noinspection unchecked
return BindingResult.EMPTY;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.micronaut.core.util.Toggleable;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;
import java.util.Properties;

/**
Expand Down Expand Up @@ -71,7 +71,7 @@ protected AbstractKafkaConfiguration(Properties config) {
/**
* @return The Kafka configuration
*/
public @Nonnull Properties getConfig() {
public @NonNull Properties getConfig() {
if (config != null) {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.apache.kafka.common.serialization.Deserializer;

import javax.annotation.Nullable;
import io.micronaut.core.annotation.Nullable;
import java.util.Optional;
import java.util.Properties;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.apache.kafka.common.serialization.Serializer;

import javax.annotation.Nullable;
import io.micronaut.core.annotation.Nullable;
import java.util.Optional;
import java.util.Properties;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.util.Toggleable;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void setEnabled(boolean enabled) {
/**
* @return The broker properties.
*/
public @Nonnull Properties getProperties() {
public @NonNull Properties getProperties() {
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import io.micronaut.core.annotation.NonNull;
import javax.inject.Singleton;
import java.util.Optional;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -82,9 +82,9 @@ public void setSkipRecordOnDeserializationFailure(boolean skipRecordOnDeserializ
* @param kafkaConsumer The kafka consumer
*/
protected void seekPastDeserializationError(
@Nonnull SerializationException cause,
@Nonnull Object consumerBean,
@Nonnull Consumer<?, ?> kafkaConsumer) {
@NonNull SerializationException cause,
@NonNull Object consumerBean,
@NonNull Consumer<?, ?> kafkaConsumer) {
try {
final String message = cause.getMessage();
final Matcher matcher = SERIALIZATION_EXCEPTION_MESSAGE_PATTERN.matcher(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import io.micronaut.core.annotation.Nullable;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
import java.nio.charset.StandardCharsets;
Expand Down
Loading