diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerInfo.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerInfo.java index e6b932552..829e9a00a 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerInfo.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerInfo.java @@ -81,12 +81,12 @@ final class ConsumerInfo { this.groupId = groupId; this.shouldRedeliver = kafkaListener.isTrue("redelivery"); this.offsetStrategy = offsetStrategy; - final Optional> errorStrategy = kafkaListener.getAnnotation("errorStrategy", ErrorStrategy.class); - this.errorStrategy = errorStrategy.map(a -> a.getRequiredValue(ErrorStrategyValue.class)).orElse(ErrorStrategyValue.NONE); - this.retryDelay = errorStrategy.flatMap(a -> a.get("retryDelay", Duration.class)).filter(d -> !d.isZero() && !d.isNegative()).orElse(null); - this.retryCount = errorStrategy.map(a -> a.intValue("retryCount").orElse(ErrorStrategy.DEFAULT_RETRY_COUNT)).orElse(0); - this.shouldHandleAllExceptions = errorStrategy.flatMap(a -> a.booleanValue("handleAllExceptions")).orElse(ErrorStrategy.DEFAULT_HANDLE_ALL_EXCEPTIONS); - this.exceptionTypes = Arrays.stream((Class[]) errorStrategy.map(a -> a.classValues("exceptionTypes")).orElse(ReflectionUtils.EMPTY_CLASS_ARRAY)).toList(); + final Optional> errorStrategyAnnotation = kafkaListener.getAnnotation("errorStrategy", ErrorStrategy.class); + this.errorStrategy = errorStrategyAnnotation.map(a -> a.getRequiredValue(ErrorStrategyValue.class)).orElse(ErrorStrategyValue.NONE); + this.retryDelay = errorStrategyAnnotation.flatMap(a -> a.get("retryDelay", Duration.class)).filter(d -> !d.isZero() && !d.isNegative()).orElse(null); + this.retryCount = errorStrategyAnnotation.map(a -> a.intValue("retryCount").orElse(ErrorStrategy.DEFAULT_RETRY_COUNT)).orElse(0); + this.shouldHandleAllExceptions = errorStrategyAnnotation.flatMap(a -> a.booleanValue("handleAllExceptions")).orElse(ErrorStrategy.DEFAULT_HANDLE_ALL_EXCEPTIONS); + this.exceptionTypes = Arrays.stream((Class[]) errorStrategyAnnotation.map(a -> a.classValues("exceptionTypes")).orElse(ReflectionUtils.EMPTY_CLASS_ARRAY)).toList(); this.producerClientId = kafkaListener.stringValue("producerClientId").orElse(null); this.producerTransactionalId = kafkaListener.stringValue("producerTransactionalId").filter(StringUtils::isNotEmpty).orElse(null); this.isTransactional = producerTransactionalId != null; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java index 27a49c062..b80b7ab74 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java @@ -28,6 +28,7 @@ import io.micronaut.core.async.publisher.Publishers; import io.micronaut.core.type.Argument; import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -40,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import java.nio.charset.StandardCharsets; @@ -54,14 +56,15 @@ @Internal final class ConsumerState { + @SuppressWarnings("java:S3416") // Using this logger because ConsumerState used to be an internal class private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class); private final KafkaConsumerProcessor kafkaConsumerProcessor; - private final ConsumerInfo info; - private final Consumer kafkaConsumer; + final ConsumerInfo info; + final Consumer kafkaConsumer; private final Object consumerBean; - private final Set subscriptions; - private Set assignments; + final Set subscriptions; + Set assignments; private Set pausedTopicPartitions; private Set pauseRequests; @Nullable @@ -119,26 +122,10 @@ synchronized boolean isPaused(@NonNull Collection topicPartition return pauseRequests.containsAll(topicPartitions) && pausedTopicPartitions.containsAll(topicPartitions); } - String getClientId() { - return info.clientId; - } - - Consumer getKafkaConsumer() { - return kafkaConsumer; - } - boolean isPolling() { return closedState == ConsumerCloseState.POLLING; } - Set getSubscriptions() { - return subscriptions; - } - - Set getAssignments() { - return assignments; - } - void threadPollLoop() { try (kafkaConsumer) { //noinspection InfiniteLoopStatement @@ -327,7 +314,6 @@ private void processIndividually(final ConsumerRecords consumerRecords) { } if (info.trackPartitions) { - assert currentOffsets != null; final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null); currentOffsets.put(topicPartition, offsetAndMetadata); @@ -390,99 +376,106 @@ private void handleResultFlux( } private Publisher sendToDestination(Object value, ConsumerRecord consumerRecord, ConsumerRecords consumerRecords) { - if (!info.sendToTopics.isEmpty()) { - final Object key = consumerRecord.key(); - if (value != null) { - final Producer kafkaProducer; - if (info.shouldSendOffsetsToTransaction) { - kafkaProducer = kafkaConsumerProcessor.getTransactionalProducer( - info.producerClientId, - info.producerTransactionalId, - byte[].class, - Object.class - ); + if (value == null || info.sendToTopics.isEmpty()) { + return Flux.empty(); + } + final Object key = consumerRecord.key(); + final Producer kafkaProducer; + if (info.shouldSendOffsetsToTransaction) { + kafkaProducer = kafkaConsumerProcessor.getTransactionalProducer( + info.producerClientId, + info.producerTransactionalId, + byte[].class, + Object.class + ); + } else { + kafkaProducer = kafkaConsumerProcessor.getProducer( + Optional.ofNullable(info.producerClientId).orElse(info.groupId), + (Class) (key != null ? key.getClass() : byte[].class), + value.getClass() + ); + } + return Flux.create(emitter -> sendToDestination(emitter, kafkaProducer, key, value, consumerRecord, consumerRecords)); + } + + private void sendToDestination(FluxSink emitter, Producer kafkaProducer, Object key, Object value, ConsumerRecord consumerRecord, ConsumerRecords consumerRecords) { + try { + if (info.shouldSendOffsetsToTransaction) { + beginTransaction(kafkaProducer); + } + sendToDestination(kafkaProducer, new FluxCallback(emitter), key, value, consumerRecord); + if (info.shouldSendOffsetsToTransaction) { + endTransaction(kafkaProducer, consumerRecords); + } + emitter.complete(); + } catch (Exception e) { + if (info.shouldSendOffsetsToTransaction) { + abortTransaction(kafkaProducer, e); + } + emitter.error(e); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void sendToDestination(Producer kafkaProducer, Callback callback, Object key, Object value, ConsumerRecord consumerRecord) { + for (String destinationTopic : info.sendToTopics) { + if (info.returnsManyKafkaMessages) { + final Iterable messages = (Iterable) value; + for (KafkaMessage message : messages) { + final ProducerRecord producerRecord = createFromMessage(destinationTopic, message); + kafkaProducer.send(producerRecord, callback); + } + } else { + final ProducerRecord producerRecord; + if (info.returnsOneKafkaMessage) { + producerRecord = createFromMessage(destinationTopic, (KafkaMessage) value); } else { - kafkaProducer = kafkaConsumerProcessor.getProducer( - Optional.ofNullable(info.producerClientId).orElse(info.groupId), - (Class) (key != null ? key.getClass() : byte[].class), - value.getClass() - ); + producerRecord = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.headers()); } - return Flux.create(emitter -> { - try { - if (info.shouldSendOffsetsToTransaction) { - try { - LOG.trace("Beginning transaction for producer: {}", info.producerTransactionalId); - kafkaProducer.beginTransaction(); - } catch (ProducerFencedException e) { - kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e); - } - } - for (String destinationTopic : info.sendToTopics) { - if (info.returnsManyKafkaMessages) { - Iterable messages = (Iterable) value; - for (KafkaMessage message : messages) { - ProducerRecord record = createFromMessage(destinationTopic, message); - kafkaProducer.send(record, (metadata, exception) -> { - if (exception != null) { - emitter.error(exception); - } else { - emitter.next(metadata); - } - }); - } - } else { - ProducerRecord record; - if (info.returnsOneKafkaMessage) { - record = createFromMessage(destinationTopic, (KafkaMessage) value); - } else { - record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.headers()); - } - LOG.trace("Sending record: {} for producer: {} {}", record, kafkaProducer, info.producerTransactionalId); - kafkaProducer.send(record, (metadata, exception) -> { - if (exception != null) { - emitter.error(exception); - } else { - emitter.next(metadata); - } - }); - } - } - if (info.shouldSendOffsetsToTransaction) { - Map offsetsToCommit = new HashMap<>(); - for (TopicPartition partition : consumerRecords.partitions()) { - List> partitionedRecords = consumerRecords.records(partition); - long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); - offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); - } - try { - final String producerTransactionalId = info.producerTransactionalId; - LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", offsetsToCommit, producerTransactionalId, info.groupId); - kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata(info.groupId)); - LOG.trace("Committing transaction for producer: {}", producerTransactionalId); - kafkaProducer.commitTransaction(); - LOG.trace("Committed transaction for producer: {}", producerTransactionalId); - } catch (ProducerFencedException e) { - kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e); - } - } - emitter.complete(); - } catch (Exception e) { - if (info.shouldSendOffsetsToTransaction) { - try { - LOG.trace("Aborting transaction for producer: {} because of error: {}", info.producerTransactionalId, e.getMessage()); - kafkaProducer.abortTransaction(); - } catch (ProducerFencedException ex) { - kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, ex); - } - } - emitter.error(e); - } - }); + LOG.trace("Sending record: {} for producer: {} {}", producerRecord, kafkaProducer, info.producerTransactionalId); + kafkaProducer.send(producerRecord, callback); } - return Flux.empty(); } - return Flux.empty(); + } + + private void beginTransaction(Producer kafkaProducer) { + try { + LOG.trace("Beginning transaction for producer: {}", info.producerTransactionalId); + kafkaProducer.beginTransaction(); + } catch (ProducerFencedException e) { + kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e); + } + } + + private void endTransaction(Producer kafkaProducer, ConsumerRecords consumerRecords) { + final Map offsetsToCommit = new HashMap<>(); + for (TopicPartition partition : consumerRecords.partitions()) { + List> partitionedRecords = consumerRecords.records(partition); + long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); + offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); + } + sendOffsetsToTransaction(kafkaProducer, offsetsToCommit); + } + + private void abortTransaction(Producer kafkaProducer, Exception e) { + try { + LOG.trace("Aborting transaction for producer: {} because of error: {}", info.producerTransactionalId, e.getMessage()); + kafkaProducer.abortTransaction(); + } catch (ProducerFencedException ex) { + kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, ex); + } + } + + private void sendOffsetsToTransaction(Producer kafkaProducer, Map offsetsToCommit) { + try { + LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", offsetsToCommit, info.producerTransactionalId, info.groupId); + kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata(info.groupId)); + LOG.trace("Committing transaction for producer: {}", info.producerTransactionalId); + kafkaProducer.commitTransaction(); + LOG.trace("Committed transaction for producer: {}", info.producerTransactionalId); + } catch (ProducerFencedException e) { + kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e); + } } private Publisher handleSendToError(Throwable error, ConsumerRecord consumerRecord) { @@ -496,6 +489,7 @@ private Publisher handleSendToError(Throwable error, ConsumerRec .doOnError(ex -> handleException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + info.method + "]: " + error.getMessage(), ex, consumerRecord)); } + @SuppressWarnings({ "rawtypes", "unchecked" }) private Mono redeliver(ConsumerRecord consumerRecord) { final Object key = consumerRecord.key(); final Object value = consumerRecord.value(); @@ -515,13 +509,7 @@ private Mono redeliver(ConsumerRecord consumerRecord) { final ProducerRecord producerRecord = new ProducerRecord(consumerRecord.topic(), consumerRecord.partition(), key, value, consumerRecord.headers()); LOG.trace("Sending record: {} for producer: {} {}", producerRecord, kafkaProducer, info.producerTransactionalId); - return Mono.create(emitter -> kafkaProducer.send(producerRecord, (metadata, exception) -> { - if (exception != null) { - emitter.error(exception); - } else { - emitter.success(metadata); - } - })); + return Mono.create(emitter -> kafkaProducer.send(producerRecord, new MonoCallback(emitter))); } private boolean resolveWithErrorStrategy(ConsumerRecord consumerRecord, Throwable e) { @@ -614,6 +602,7 @@ private OffsetCommitCallback resolveCommitCallback() { }; } + @SuppressWarnings({ "rawtypes", "unchecked" }) private static ProducerRecord createFromMessage(String topic, KafkaMessage message) { return new ProducerRecord( Optional.ofNullable(message.getTopic()).orElse(topic), diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/FluxCallback.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/FluxCallback.java new file mode 100644 index 000000000..2596e499c --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/FluxCallback.java @@ -0,0 +1,40 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.processor; + +import io.micronaut.core.annotation.Internal; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import reactor.core.publisher.FluxSink; + +@Internal +final class FluxCallback implements Callback { + + private final FluxSink emitter; + + FluxCallback(FluxSink emitter) { + this.emitter = emitter; + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + emitter.error(exception); + } else { + emitter.next(metadata); + } + } +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index e95ad7c9e..2c92a09fa 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -93,7 +93,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -115,6 +114,7 @@ class KafkaConsumerProcessor private final ExecutorService executorService; private final ApplicationConfiguration applicationConfiguration; private final BeanContext beanContext; + @SuppressWarnings("rawtypes") private final AbstractKafkaConsumerConfiguration defaultConsumerConfiguration; private final Map consumers = new ConcurrentHashMap<>(); @@ -146,6 +146,7 @@ class KafkaConsumerProcessor * @param startedEventPublisher The KafkaConsumerStartedPollingEvent publisher * @param subscribedEventPublisher The KafkaConsumerSubscribedEvent publisher */ + @SuppressWarnings("rawtypes") KafkaConsumerProcessor( @Named(TaskExecutors.MESSAGE_CONSUMER) ExecutorService executorService, ApplicationConfiguration applicationConfiguration, @@ -200,9 +201,11 @@ private ConsumerState getConsumerState(@NonNull String id) { @NonNull @Override + @SuppressWarnings("unchecked") public Consumer getConsumer(@NonNull String id) { ArgumentUtils.requireNonNull("id", id); - final Consumer consumer = getConsumerState(id).getKafkaConsumer(); + @SuppressWarnings("rawtypes") + final Consumer consumer = getConsumerState(id).kafkaConsumer; if (consumer == null) { throw new IllegalArgumentException("No consumer found for ID: " + id); } @@ -213,7 +216,7 @@ public Consumer getConsumer(@NonNull String id) { @Override public Set getConsumerSubscription(@NonNull final String id) { ArgumentUtils.requireNonNull("id", id); - final Set subscriptions = getConsumerState(id).getSubscriptions(); + final Set subscriptions = getConsumerState(id).subscriptions; if (subscriptions == null || subscriptions.isEmpty()) { throw new IllegalArgumentException("No consumer subscription found for ID: " + id); } @@ -224,7 +227,7 @@ public Set getConsumerSubscription(@NonNull final String id) { @Override public Set getConsumerAssignment(@NonNull final String id) { ArgumentUtils.requireNonNull("id", id); - final Set assignment = getConsumerState(id).getAssignments(); + final Set assignment = getConsumerState(id).assignments; if (assignment == null || assignment.isEmpty()) { throw new IllegalArgumentException("No consumer assignment found for ID: " + id); } @@ -239,7 +242,7 @@ public Set getConsumerIds() { @Override public boolean isPaused(@NonNull String id) { - return isPaused(id, getConsumerState(id).getAssignments()); + return isPaused(id, getConsumerState(id).assignments); } @Override @@ -301,7 +304,7 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met @PreDestroy public void close() { for (ConsumerState consumerState : consumers.values()) { - consumerState.getKafkaConsumer().wakeup(); + consumerState.kafkaConsumer.wakeup(); } for (ConsumerState consumerState : consumers.values()) { if (consumerState.isPolling()) { @@ -311,7 +314,7 @@ public void close() { if (LOG.isTraceEnabled()) { final Instant now = Instant.now(); if (now.isAfter(silentTime)) { - LOG.trace("Consumer {} is not closed yet (waiting {})", consumerState.getClientId(), Duration.between(start, now)); + LOG.trace("Consumer {} is not closed yet (waiting {})", consumerState.info.clientId, Duration.between(start, now)); // Inhibit TRACE messages for a while to avoid polluting the logs silentTime = now.plusSeconds(5); } @@ -319,7 +322,7 @@ public void close() { } while (consumerState.isPolling()); } if (LOG.isDebugEnabled()) { - LOG.debug("Consumer {} is closed", consumerState.getClientId()); + LOG.debug("Consumer {} is closed", consumerState.info.clientId); } } consumers.clear(); @@ -344,8 +347,8 @@ void handleException(Object consumerBean, KafkaListenerException kafkaListenerEx } } - ScheduledFuture scheduleTask(Duration delay, Runnable command) { - return taskScheduler.schedule(delay, command); + void scheduleTask(Duration delay, Runnable command) { + taskScheduler.schedule(delay, command); } Producer getProducer(String id, Class keyType, Class valueType) { @@ -357,12 +360,13 @@ Producer getTransactionalProducer(@Nullable String clientId, @Nulla } void handleProducerFencedException(Producer producer, ProducerFencedException e) { - LOG.error("Failed accessing the producer: " + producer, e); + LOG.error("Failed accessing the producer: {}", producer, e); transactionalProducerRegistry.close(producer); } + @SuppressWarnings("unchecked") Flux convertPublisher(T result) { - return Flux.from(Publishers.convertPublisher(beanContext.getConversionService(), result, Publisher.class)); + return Flux.from((Publisher) Publishers.convertPublisher(beanContext.getConversionService(), result, Publisher.class)); } BoundExecutable bind(ExecutableMethod method, Map, Object> boundArguments, ConsumerRecord consumerRecord) { @@ -375,6 +379,7 @@ BoundExecutable bindAsBatch(ExecutableMethod method, Map consumerAnnotation, final DefaultKafkaConsumerConfiguration consumerConfiguration, final String clientId, @@ -418,20 +423,17 @@ private void debugDeserializationConfiguration(final ExecutableMethod meth return; } final String logMethod = logMethod(method); - final Optional keyDeserializer = consumerConfiguration.getKeyDeserializer(); - if (keyDeserializer.isPresent()) { - LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), logMethod); - } else { - LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), logMethod); - } - final Optional valueDeserializer = consumerConfiguration.getValueDeserializer(); - if (valueDeserializer.isPresent()) { - LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), logMethod); - } else { - LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), logMethod); - } + final String keyDeserializerClass = consumerConfiguration.getKeyDeserializer() + .map(Object::toString) + .orElseGet(() -> properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + final String valueDeserializerClass = consumerConfiguration.getValueDeserializer() + .map(Object::toString) + .orElseGet(() -> properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); + LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializerClass, logMethod); + LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializerClass, logMethod); } + @SuppressWarnings({ "rawtypes", "unchecked" }) private void submitConsumerThreads(final ExecutableMethod method, final String clientId, final String groupId, @@ -530,6 +532,7 @@ private static Argument findBodyArgument(ExecutableMethod method) { .orElse(null)); } + @SuppressWarnings({ "rawtypes", "unchecked" }) private void configureDeserializers(final ExecutableMethod method, final DefaultKafkaConsumerConfiguration consumerConfiguration) { final Properties properties = consumerConfiguration.getConfig(); // figure out the Key deserializer @@ -539,7 +542,7 @@ private void configureDeserializers(final ExecutableMethod method, final D final Argument bodyArgument = batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg; - if (!properties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) && !consumerConfiguration.getKeyDeserializer().isPresent()) { + if (!properties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) && consumerConfiguration.getKeyDeserializer().isEmpty()) { final Optional> keyArgument = Arrays.stream(method.getArguments()) .filter(arg -> arg.isAnnotationPresent(KafkaKey.class)) .findFirst(); @@ -561,7 +564,7 @@ private void configureDeserializers(final ExecutableMethod method, final D } // figure out the Value deserializer - if (!properties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) && !consumerConfiguration.getValueDeserializer().isPresent()) { + if (!properties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) && consumerConfiguration.getValueDeserializer().isEmpty()) { if (bodyArgument == null) { //noinspection SingleStatementInBlock consumerConfiguration.setValueDeserializer(new StringDeserializer()); @@ -581,7 +584,7 @@ private void configureDeserializers(final ExecutableMethod method, final D debugDeserializationConfiguration(method, consumerConfiguration, properties); } - private static Argument getComponentType(final Argument argument) { + private static Argument getComponentType(final Argument argument) { final Class argumentType = argument.getType(); return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/MonoCallback.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/MonoCallback.java new file mode 100644 index 000000000..f1b5b6bdd --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/MonoCallback.java @@ -0,0 +1,41 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.processor; + +import io.micronaut.core.annotation.Internal; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.MonoSink; + +@Internal +final class MonoCallback implements Callback { + + private final MonoSink emitter; + + MonoCallback(MonoSink emitter) { + this.emitter = emitter; + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + emitter.error(exception); + } else { + emitter.success(metadata); + } + } +}