From a84f36e4db7b6d4ad2d2928ae30034e2fa2847f5 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 12 Jun 2020 14:54:15 -0700 Subject: [PATCH 1/3] add kafka transaction --- .../kafka/annotation/KafkaTransaction.java | 4 ++ .../processor/KafkaConsumerProcessor.java | 38 ++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java new file mode 100644 index 000000000..11b1aa13c --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java @@ -0,0 +1,4 @@ +package io.micronaut.configuration.kafka.annotation; + +public @interface KafkaTransaction { +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index 60aa0430f..c4524def4 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -56,7 +56,11 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -73,6 +77,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -619,6 +624,7 @@ private void handleResultFlowable( Flowable recordMetadataProducer = resultFlowable.subscribeOn(executorScheduler) .flatMap((Function>) o -> { String[] destinationTopics = method.stringValues(SendTo.class); + boolean hasTransaction = method.hasAnnotation(KafkaTransaction.class); if (ArrayUtils.isNotEmpty(destinationTopics)) { Object key = consumerRecord.key(); Object value = o; @@ -630,6 +636,37 @@ private void handleResultFlowable( Argument.of((Class) (key != null ? key.getClass() : byte[].class)), Argument.of(value.getClass()) ); + //docs: https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html + if(hasTransaction) { + return Flowable.create(emitter -> { + try { + kafkaProducer.beginTransaction(); + for (String destinationTopic : destinationTopics) { + ProducerRecord record = new ProducerRecord( + destinationTopic, + null, + key, + value, + consumerRecord.headers() + ); + kafkaProducer.send(record, (metadata, exception) -> { + if (exception != null) { + emitter.onError(exception); + } else { + emitter.onNext(metadata); + } + }); + + } + kafkaProducer.commitTransaction(); + } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { + //TODO: workd out handler + } catch (KafkaException e){ + kafkaProducer.abortTransaction(); + } + emitter.onComplete(); + }, BackpressureStrategy.ERROR); + } return Flowable.create(emitter -> { for (String destinationTopic : destinationTopics) { @@ -640,7 +677,6 @@ private void handleResultFlowable( value, consumerRecord.headers() ); - kafkaProducer.send(record, (metadata, exception) -> { if (exception != null) { emitter.onError(exception); From 1fdae62332eb51e5bc3ecca2183e6a58ae01ff80 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 13 Jun 2020 10:03:05 -0700 Subject: [PATCH 2/3] add support for transaction --- .../kafka/annotation/KafkaTransaction.java | 7 +++++++ .../processor/KafkaConsumerProcessor.java | 21 ++++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java index 11b1aa13c..42e551632 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java @@ -1,4 +1,11 @@ package io.micronaut.configuration.kafka.annotation; + +/** + * Parameter level annotation to indicate when send to is all or None. + * + * @author Michael Pollind + * @since 1.2 + */ public @interface KafkaTransaction { } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index c4524def4..54849af79 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -621,6 +621,7 @@ private void handleResultFlowable( ConsumerRecord consumerRecord, Flowable resultFlowable, boolean isBlocking) { + AtomicBoolean firstError = new AtomicBoolean(false); Flowable recordMetadataProducer = resultFlowable.subscribeOn(executorScheduler) .flatMap((Function>) o -> { String[] destinationTopics = method.stringValues(SendTo.class); @@ -660,8 +661,21 @@ private void handleResultFlowable( } kafkaProducer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { - //TODO: workd out handler - } catch (KafkaException e){ + handleException(consumerBean, new KafkaListenerException( + "Unhandled exception [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + e.getMessage(), + e, + consumerBean, + kafkaConsumer, + consumerRecord + )); + } catch (KafkaException e) { + handleException(consumerBean, new KafkaListenerException( + "Error transaction aborted [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + e.getMessage(), + e, + consumerBean, + kafkaConsumer, + consumerRecord + )); kafkaProducer.abortTransaction(); } emitter.onComplete(); @@ -677,6 +691,7 @@ private void handleResultFlowable( value, consumerRecord.headers() ); + kafkaProducer.send(record, (metadata, exception) -> { if (exception != null) { emitter.onError(exception); @@ -703,7 +718,7 @@ private void handleResultFlowable( consumerRecord )); - if (kafkaListener.isTrue("redelivery")) { + if (kafkaListener.isTrue("redelivery") && !firstError.getAndSet(true)) { if (LOG.isDebugEnabled()) { LOG.debug("Attempting redelivery of record [{}] following error", consumerRecord); } From ba1213d9d763e963f179d08ed3e517dae696774f Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 13 Jun 2020 13:07:29 -0700 Subject: [PATCH 3/3] fix code format --- .../kafka/annotation/KafkaTransaction.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java index 42e551632..5413d37e5 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaTransaction.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.micronaut.configuration.kafka.annotation;